File Coverage

blib/lib/POE/Component/MessageQueue/Storage/BigMemory.pm
Criterion Covered Total %
statement 100 102 98.0
branch 24 26 92.3
condition 8 15 53.3
subroutine 18 19 94.7
pod 0 8 0.0
total 150 170 88.2


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007, 2008 Paul Driver <frodwith@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::BigMemory::MessageElement;
19 12     12   91 use base qw(Heap::Elem);
  12         32  
  12         4285  
20              
21             sub new
22             {
23 1416     1416   3006 my ($class, $message) = @_;
24 1416         4150 my $self = $class->SUPER::new;
25              
26 1416         17369 $self->val($message);
27 1416         6739 bless($self, $class);
28             }
29              
30             sub cmp
31             {
32 8952     8952   331705 my ($self, $other) = @_;
33 8952         22592 return $self->val->timestamp <=> $other->val->timestamp;
34             }
35              
36             1;
37              
38             package POE::Component::MessageQueue::Storage::BigMemory::DelayedMessageElement;
39 12     12   3711 use base qw(Heap::Elem);
  12         27  
  12         1630  
40              
41             sub new
42             {
43 2     2   7 my ($class, $message) = @_;
44 2         15 my $self = $class->SUPER::new;
45              
46 2         32 $self->val($message);
47 2         17 bless($self, $class);
48             }
49              
50             sub cmp
51             {
52 0     0   0 my ($self, $other) = @_;
53 0         0 return $self->val->deliver_at <=> $other->val->deliver_at;
54             }
55              
56             1;
57              
58             package POE::Component::MessageQueue::Storage::BigMemory;
59 12     12   109 use Moose;
  12         34  
  12         78  
60             with qw(POE::Component::MessageQueue::Storage);
61              
62 12     12   78205 use Heap::Fibonacci;
  12         15063  
  12         500  
63              
64 12     12   88 use constant empty_hashref => (is => 'ro', default => sub { {} });
  12         31  
  12         13460  
  36         27216  
65              
66             # claimer_id => heap element
67             has 'claimed' => empty_hashref;
68             # queue_name => heap of messages
69             has 'unclaimed' => empty_hashref;
70             # queue_name => heap of messages
71             has 'delayed' => empty_hashref;
72             # message_id => info hash
73             has 'messages' => empty_hashref;
74              
75             has 'message_heap' => (
76             is => 'rw',
77             default => sub { Heap::Fibonacci->new },
78             );
79              
80             # Where messages are stored:
81             # -- A heap of all unclaimed messages sorted by timestamp
82             # -- Per destination heaps for unclaimed messages
83             # -- Per destination heaps for delayed messages
84             # -- A hash of claimant => messages.
85             #
86             # There is also a hash of ids to info about heap elements and such.
87              
88             sub _make_heap_elem
89             {
90 1416     1416   4439 POE::Component::MessageQueue::Storage::BigMemory::MessageElement->new(@_);
91             }
92              
93             sub _make_delayed_heap_elem
94             {
95 2     2   28 POE::Component::MessageQueue::Storage::BigMemory::DelayedMessageElement->new(@_);
96             }
97              
98             sub store
99             {
100 708     708 0 116069 my ($self, $msg, $callback) = @_;
101              
102 708         1877 my $main = _make_heap_elem($msg);
103 708         25217 $self->message_heap->add($main);
104              
105 708         24730 my $info = $self->messages->{$msg->id} = {
106             message => $msg,
107             main => $main,
108             };
109              
110 708 100 66     23104 if ($msg->has_delay && $msg->deliver_at > time())
111             {
112 2         8 my $elem = _make_delayed_heap_elem($msg);
113             my $heap =
114 2   33     68 ($self->delayed->{$msg->destination} ||= Heap::Fibonacci->new);
115 2         29 $heap->add($elem);
116 2         53 $info->{delayed} = $elem;
117             }
118             else
119             {
120 706         1807 my $elem = _make_heap_elem($msg);
121 706 100       22946 if ($msg->claimed)
122             {
123 2         66 $self->claimed->{$msg->claimant}->{$msg->destination} = $elem;
124             }
125             else
126             {
127             my $heap =
128 704   66     22595 ($self->unclaimed->{$msg->destination} ||= Heap::Fibonacci->new);
129 704         2589 $heap->add($elem);
130 704         3071 $info->{unclaimed} = $elem;
131             }
132             }
133              
134 708         19948 my $id = $msg->id;
135 708         3771 $self->log(info => "Added $id.");
136 708         2077 @_ = ();
137 708 100       3275 goto $callback if $callback;
138             }
139              
140             sub get
141             {
142             my ($self, $ids, $callback) = @_;
143             @_ = ([map $_->{message}, grep $_,
144             map $self->messages->{$_}, @$ids]);
145             goto $callback;
146             }
147              
148             sub get_all
149             {
150 17     17 0 106 my ($self, $callback) = @_;
151 17         62 @_ = ([map $_->{message}, values %{$self->messages}]);
  17         782  
152 17         103 goto $callback;
153             }
154              
155             sub get_oldest
156             {
157 393     393 0 1275 my ($self, $callback) = @_;
158 393         14736 my $top = $self->message_heap->top;
159 393   66     3943 @_ = ($top && $top->val);
160 393         2730 goto $callback;
161             }
162              
163             sub claim_and_retrieve
164             {
165 1648     1648 0 2010620 my ($self, $destination, $client_id, $callback) = @_;
166            
167             # move delayed messages to normal storage
168 1648 100       56723 if (my $delayed = $self->delayed->{$destination})
169             {
170 6         23 my $time = time();
171              
172 6         53 while (my $elem = $delayed->top)
173             {
174 6         103 my $msg = $elem->val;
175 6 100       289 last unless ($msg->deliver_at <= $time);
176              
177 2         75 my $info = $self->messages->{$msg->id};
178              
179             # remove from the delayed heap
180 2         17 $delayed->delete($elem);
181 2         112 delete $info->{delayed};
182              
183             # make a normal heap element
184 2         13 $elem = _make_heap_elem($msg);
185              
186             # add to the unclaimed heap
187             my $unclaimed =
188 2   33     80 ($self->unclaimed->{$msg->destination} ||= Heap::Fibonacci->new);
189 2         35 $unclaimed->add($elem);
190 2         92 $info->{unclaimed} = $elem;
191             }
192             }
193              
194 1648         3411 my $message;
195 1648         53796 my $heap = $self->unclaimed->{$destination};
196 1648 100       4574 if ($heap)
197             {
198 1223         4081 my $top = $heap->top;
199 1223 100       8521 $message = $top->val if $top;
200 1223 100       18954 $self->claim($message->id, $client_id) if $message;
201             }
202 1648         4684 @_ = ($message);
203 1648         5744 goto $callback;
204             }
205              
206             sub remove
207             {
208             my ($self, $ids, $callback) = @_;
209              
210             foreach my $id (@$ids)
211             {
212             my $info = delete $self->messages->{$id};
213             next unless $info && $info->{message};
214             my $msg = $info->{message};
215              
216             $self->message_heap->delete($info->{main});
217             if ($msg->claimed)
218             {
219             delete $self->claimed->{$msg->claimant}->{$msg->destination};
220             }
221             elsif ($info->{delayed})
222             {
223             $self->delayed->{$msg->destination}->delete($info->{delayed});
224             }
225             elsif ($info->{unclaimed})
226             {
227             $self->unclaimed->{$msg->destination}->delete($info->{unclaimed});
228             }
229             }
230              
231             @_ = ();
232             goto $callback if $callback;
233             }
234              
235             sub empty
236             {
237 7     7 0 4468 my ($self, $callback) = @_;
238              
239 7         32 %{$self->$_} = () foreach qw(messages claimed unclaimed delayed);
  28         3542  
240 7         68 $self->message_heap(Heap::Fibonacci->new);
241 7         2723 @_ = ();
242 7 50       62 goto $callback if $callback;
243             }
244              
245             sub claim
246             {
247             my ($self, $ids, $client_id, $callback) = @_;
248              
249             foreach my $id (@$ids)
250             {
251             my $info = $self->messages->{$id} || next;
252             my $message = $info->{message};
253             my $destination = $message->destination;
254            
255             if ($message->claimed)
256             {
257             # According to the docs, we just Do What We're Told.
258             $self->claimed->{$client_id}->{$destination} =
259             delete $self->claimed->{$message->claimant}->{$destination}
260             }
261             elsif ($info->{delayed})
262             {
263             my $elem = $self->claimed->{$client_id}->{$destination} =
264             delete $self->messages->{$message->id}->{delayed};
265             $self->delayed->{$destination}->delete($elem);
266             }
267             elsif ($info->{unclaimed})
268             {
269             my $elem = $self->claimed->{$client_id}->{$destination} =
270             delete $self->messages->{$message->id}->{unclaimed};
271             $self->unclaimed->{$destination}->delete($elem);
272             }
273             $message->claim($client_id);
274             $self->log(info => "Message $id claimed by client $client_id");
275             }
276             @_ = ();
277             goto $callback if $callback;
278             }
279              
280             sub disown_all
281             {
282 16     16 0 59 my ($self, $client_id, $callback) = @_;
283             # We just happen to know that disown_destination is synchronous, so we can
284             # ignore the usual callback dance
285 16         39 foreach my $dest (keys %{$self->claimed->{$client_id}}) {
  16         610  
286 1         6 $self->disown_destination($dest, $client_id)
287             }
288 16         47 @_ = ();
289 16 50       87 goto $callback if $callback;
290             }
291              
292             sub disown_destination
293             {
294 1601     1601 0 60154 my ($self, $destination, $client_id, $callback) = @_;
295 1601         56394 my $elem = delete $self->claimed->{$client_id}->{$destination};
296 1601 100       5286 if ($elem)
297             {
298 496         1508 my $message = $elem->val;
299 496         17297 $message->disown();
300 496         15616 $self->unclaimed->{$destination}->add($elem);
301 496         16932 $self->messages->{$message->id}->{unclaimed} = $elem;
302             }
303 1601         3916 @_ = ();
304 1601 100       7593 goto $callback if $callback;
305             }
306              
307             # We don't persist anything, so just call our complete handler.
308             sub storage_shutdown
309             {
310 9     9 0 4683 my ($self, $callback) = @_;
311 9         27 @_ = ();
312 9 100       125 goto $callback if $callback;
313             }
314              
315             1;
316              
317             __END__
318              
319             =pod
320              
321             =head1 NAME
322              
323             POE::Component::MessageQueue::Storage::BigMemory -- In-memory storage engine
324             optimized for a large number of messages.
325              
326             =head1 SYNOPSIS
327              
328             use POE;
329             use POE::Component::MessageQueue;
330             use POE::Component::MessageQueue::Storage::BigMemory;
331             use strict;
332              
333             POE::Component::MessageQueue->new({
334             storage => POE::Component::MessageQueue::Storage::BigMemory->new()
335             });
336              
337             POE::Kernel->run();
338             exit;
339              
340             =head1 DESCRIPTION
341              
342             An in-memory storage engine that is optimised for a large number of messages.
343             Its an alternative to L<POE::Componenent::MessageQueue::Storage::Memory>, which
344             stores everything in a Perl ARARY, which can slow the MQ to a CRAWL when the
345             number of messsages in this store gets big.
346              
347             store() is a little bit slower per message in this module and it uses
348             more memory per message. Everything else should be considerably more efficient,
349             though, especially when the number of messages starts to climb. Many operations
350             in Storage::Memory are O(n*n). Most operations in this module are O(1)!
351              
352             I wouldn't suggest using this as your main storage engine because if messages
353             aren't removed by consumers, it will continue to consume more memory until it
354             explodes. Check-out L<POE::Component::MessageQueue::Storage::Complex> which
355             can use this module internally to keep messages in memory for a period of
356             time before moving them into persistent storage.
357              
358             =head1 CONSTRUCTOR PARAMETERS
359              
360             None to speak of!
361              
362             =head1 SUPPORTED STOMP HEADERS
363              
364             =over 4
365              
366             =item B<persistent>
367              
368             I<Ignored>. Nothing is persistent in this store.
369              
370             =item B<expire-after>
371              
372             I<Ignored>. All messages are kept until handled.
373              
374             =item B<deliver-after>
375              
376             I<Fully Supported>.
377              
378             =back
379              
380             =head1 SEE ALSO
381              
382             L<POE::Component::MessageQueue>,
383             L<POE::Component::MessageQueue::Storage>
384              
385             I<Other storage engines:>
386              
387             L<POE::Component::MessageQueue::Storage::Memory>,
388             L<POE::Component::MessageQueue::Storage::FileSystem>,
389             L<POE::Component::MessageQueue::Storage::DBI>,
390             L<POE::Component::MessageQueue::Storage::Generic>,
391             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
392             L<POE::Component::MessageQueue::Storage::Throttled>,
393             L<POE::Component::MessageQueue::Storage::Complex>,
394             L<POE::Component::MessageQueue::Storage::Default>
395              
396             =cut