File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Memory.pm
Criterion Covered Total %
statement 63 63 100.0
branch 17 22 77.2
condition 19 24 79.1
subroutine 16 16 100.0
pod 0 8 0.0
total 115 133 86.4


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Memory;
19 11     11   11114 use Moose;
  11         31  
  11         85  
20             with qw(POE::Component::MessageQueue::Storage);
21              
22             # destination => @messages
23             has 'messages' => (is => 'ro', default => sub { {} });
24              
25             sub store
26             {
27 201     201 0 101218 my ($self, $msg, $callback) = @_;
28              
29 201         6491 my $id = $msg->id;
30 201         5638 my $destination = $msg->destination;
31              
32             # push onto our array
33 201   100     5916 my $aref = ($self->messages->{$destination} ||= []);
34 201         445 push(@$aref, $msg);
35 201         838 $self->log(info => "Added $id");
36              
37 201 50       856 goto $callback if $callback;
38             }
39              
40             sub _msg_foreach
41             {
42 8     8   21 my ($self, $action) = @_;
43 8         15 foreach my $messages_in_dest (values %{$self->messages})
  8         287  
44             {
45 28         66 foreach my $message (@$messages_in_dest)
46             {
47 1397         2576 $action->($message);
48             }
49             }
50             }
51              
52             sub _msg_foreach_ids
53             {
54 3     3   14 my ($self, $ids, $action) = @_;
55 3         8 my %id_hash = map { ($_, 1) } (@$ids);
  3         23  
56             $self->_msg_foreach(sub {
57 600     600   967 my $msg = $_[0];
58 600 100       15218 $action->($msg) if (exists $id_hash{$msg->id});
59 3         39 });
60             }
61              
62             sub get
63             {
64             my ($self, $ids, $callback) = @_;
65             my @messages;
66             $self->_msg_foreach_ids($ids, sub {push(@messages, $_[0])});
67             @_ = (\@messages);
68             goto $callback;
69             }
70              
71             sub get_all
72             {
73 3     3 0 47 my ($self, $callback) = @_;
74 3         7 my @messages;
75 3     397   28 $self->_msg_foreach(sub {push(@messages, $_[0])});
  397         684  
76 3         15 @_ = (\@messages);
77 3         11 goto $callback;
78             }
79              
80             sub claim_and_retrieve
81             {
82 410     410 0 1006480 my ($self, $destination, $client_id, $callback) = @_;
83 410         708 my $oldest;
84 410   50     12693 my $aref = $self->messages->{$destination} || [];
85 410         754 my $current_time = time();
86 410         888 foreach my $msg (@$aref)
87             {
88 20402 100 100     597181 unless ($msg->claimed || ($msg->has_delay and $current_time < $msg->deliver_at) ||
      66        
      100        
      66        
89             ($oldest && $oldest->timestamp < $msg->timestamp))
90             {
91 1457         3058 $oldest = $msg;
92             }
93             }
94 410 100       1600 $self->_claim_it_yo($oldest, $client_id) if $oldest;
95 410         1098 @_ = ($oldest);
96 410         1391 goto $callback;
97             }
98              
99             sub get_oldest
100             {
101 1     1 0 64 my ($self, $callback) = @_;
102 1         3 my $oldest;
103             $self->_msg_foreach(sub {
104 200     200   323 my $msg = shift;
105 200 100 100     5330 $oldest = $msg unless ($oldest && ($oldest->timestamp < $msg->timestamp));
106 1         14 });
107 1         8 @_ = ($oldest);
108 1         8 goto $callback;
109             }
110              
111             sub remove
112             {
113             my ($self, $message_ids, $callback) = @_;
114             # Stuff IDs into a hash so we can quickly check if a message is on the list
115             my %id_hash = map { ($_, 1) } (@$message_ids);
116              
117             foreach my $messages (values %{$self->messages})
118             {
119             my $max = scalar @{$messages};
120              
121             for ( my $i = 0; $i < $max; $i++ )
122             {
123             my $message = $messages->[$i];
124             # Check if this messages is in the "remove" list
125             next unless exists $id_hash{$message->id};
126             splice @$messages, $i, 1;
127             $i--; $max--;
128             }
129             }
130              
131             goto $callback if $callback;
132             }
133              
134             sub empty
135             {
136 1     1 0 4296 my ($self, $callback) = @_;
137 1         3 %{$self->messages} = ();
  1         40  
138 1 50       7 goto $callback if $callback;
139             }
140              
141             sub _claim_it_yo
142             {
143 402     402   852 my ($self, $msg, $client_id) = @_;;
144 402         12117 $msg->claim($client_id);
145 402         11569 $self->log('info', sprintf('Message %s claimed by client %s',
146             $msg->id, $client_id));
147             }
148              
149             sub claim
150             {
151             my ($self, $ids, $client_id, $callback) = @_;
152              
153             $self->_msg_foreach_ids($ids, sub {
154             $self->_claim_it_yo($_[0], $client_id);
155             });
156              
157             goto $callback if $callback;
158             }
159              
160             sub disown_destination
161             {
162 400     400 0 55482 my ($self, $destination, $client_id, $callback) = @_;
163 400   50     12358 my $aref = $self->messages->{$destination} || [];
164 400 100       888 $_->disown foreach grep {$_->claimed && $_->claimant eq $client_id} @$aref;
  20000         582945  
165              
166 400 50       1724 goto $callback if $callback;
167             }
168              
169             sub disown_all
170             {
171 1     1 0 3 my ($self, $client_id, $callback) = @_;
172             $self->_msg_foreach(sub {
173 200     200   317 my $m = $_[0];
174 200 100 66     5759 $m->disown() if $m->claimed && $m->claimant eq $client_id;
175 1         16 });
176 1 50       10 goto $callback if $callback;
177             }
178              
179             sub storage_shutdown
180             {
181 1     1 0 4626 my ($self, $callback) = @_;
182 1 50       6 goto $callback if $callback;
183             }
184              
185             1;
186              
187             __END__
188              
189             =pod
190              
191             =head1 NAME
192              
193             POE::Component::MessageQueue::Storage::Memory -- In memory storage engine.
194              
195             =head1 SYNOPSIS
196              
197             use POE;
198             use POE::Component::MessageQueue;
199             use POE::Component::MessageQueue::Storage::Memory;
200             use strict;
201              
202             POE::Component::MessageQueue->new({
203             storage => POE::Component::MessageQueue::Storage::Memory->new()
204             });
205              
206             POE::Kernel->run();
207             exit;
208              
209             =head1 DESCRIPTION
210              
211             A storage engine that keeps all the messages in memory. Provides no persistence
212             what-so-ever.
213              
214             For an alternative in-memory storage engine optimized for a large number of
215             messages, please see L<POE::Component::MessageQueue::Storage::BigMemory>.
216              
217             I wouldn't suggest using this as your main storage engine because if messages
218             aren't removed by consumers, it will continue to consume more memory until it
219             explodes. Check-out L<POE::Component::MessageQueue::Storage::Complex> which
220             can use this module internally to keep messages in memory for a period of
221             time before moving them into persistent storage.
222              
223             =head1 CONSTRUCTOR PARAMETERS
224              
225             None to speak of!
226              
227             =head1 SUPPORTED STOMP HEADERS
228              
229             =over 4
230              
231             =item B<persistent>
232              
233             I<Ignored>. Nothing is persistent in this store.
234              
235             =item B<expire-after>
236              
237             I<Ignored>. All messages are kept until handled.
238              
239             =item B<deliver-after>
240              
241             I<Fully Supported>.
242              
243             =back
244              
245             =head1 SEE ALSO
246              
247             L<POE::Component::MessageQueue::Storage::BigMemory> -- Alternative memory-based storage engine.
248              
249             L<POE::Component::MessageQueue>,
250             L<POE::Component::MessageQueue::Storage>
251              
252             I<Other storage engines:>
253              
254             L<POE::Component::MessageQueue::Storage::BigMemory>,
255             L<POE::Component::MessageQueue::Storage::FileSystem>,
256             L<POE::Component::MessageQueue::Storage::DBI>,
257             L<POE::Component::MessageQueue::Storage::Generic>,
258             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
259             L<POE::Component::MessageQueue::Storage::Throttled>,
260             L<POE::Component::MessageQueue::Storage::Complex>,
261             L<POE::Component::MessageQueue::Storage::Default>
262              
263             =cut