File Coverage

blib/lib/POE/Component/MessageQueue/Storage/Memory.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Storage::Memory;
19 1     1   2105 use Moose;
  0            
  0            
20             with qw(POE::Component::MessageQueue::Storage);
21              
22             # destination => @messages
23             has 'messages' => (is => 'ro', default => sub { {} });
24              
25             sub store
26             {
27             my ($self, $msg, $callback) = @_;
28              
29             my $id = $msg->id;
30             my $destination = $msg->destination;
31              
32             # push onto our array
33             my $aref = ($self->messages->{$destination} ||= []);
34             push(@$aref, $msg);
35             $self->log(info => "Added $id");
36              
37             goto $callback if $callback;
38             }
39              
40             sub _msg_foreach
41             {
42             my ($self, $action) = @_;
43             foreach my $messages_in_dest (values %{$self->messages})
44             {
45             foreach my $message (@$messages_in_dest)
46             {
47             $action->($message);
48             }
49             }
50             }
51              
52             sub _msg_foreach_ids
53             {
54             my ($self, $ids, $action) = @_;
55             my %id_hash = map { ($_, 1) } (@$ids);
56             $self->_msg_foreach(sub {
57             my $msg = $_[0];
58             $action->($msg) if (exists $id_hash{$msg->id});
59             });
60             }
61              
62             sub get
63             {
64             my ($self, $ids, $callback) = @_;
65             my @messages;
66             $self->_msg_foreach_ids($ids, sub {push(@messages, $_[0])});
67             @_ = (\@messages);
68             goto $callback;
69             }
70              
71             sub get_all
72             {
73             my ($self, $callback) = @_;
74             my @messages;
75             $self->_msg_foreach(sub {push(@messages, $_[0])});
76             @_ = (\@messages);
77             goto $callback;
78             }
79              
80             sub claim_and_retrieve
81             {
82             my ($self, $destination, $client_id, $callback) = @_;
83             my $oldest;
84             my $aref = $self->messages->{$destination} || [];
85             my $current_time = time();
86             foreach my $msg (@$aref)
87             {
88             unless ($msg->claimed || ($msg->has_delay and $current_time < $msg->deliver_at) ||
89             ($oldest && $oldest->timestamp < $msg->timestamp))
90             {
91             $oldest = $msg;
92             }
93             }
94             $self->_claim_it_yo($oldest, $client_id) if $oldest;
95             @_ = ($oldest);
96             goto $callback;
97             }
98              
99             sub get_oldest
100             {
101             my ($self, $callback) = @_;
102             my $oldest;
103             $self->_msg_foreach(sub {
104             my $msg = shift;
105             $oldest = $msg unless ($oldest && ($oldest->timestamp < $msg->timestamp));
106             });
107             @_ = ($oldest);
108             goto $callback;
109             }
110              
111             sub remove
112             {
113             my ($self, $message_ids, $callback) = @_;
114             # Stuff IDs into a hash so we can quickly check if a message is on the list
115             my %id_hash = map { ($_, 1) } (@$message_ids);
116              
117             foreach my $messages (values %{$self->messages})
118             {
119             my $max = scalar @{$messages};
120              
121             for ( my $i = 0; $i < $max; $i++ )
122             {
123             my $message = $messages->[$i];
124             # Check if this messages is in the "remove" list
125             next unless exists $id_hash{$message->id};
126             splice @$messages, $i, 1;
127             $i--; $max--;
128             }
129             }
130              
131             goto $callback if $callback;
132             }
133              
134             sub empty
135             {
136             my ($self, $callback) = @_;
137             %{$self->messages} = ();
138             goto $callback if $callback;
139             }
140              
141             sub _claim_it_yo
142             {
143             my ($self, $msg, $client_id) = @_;;
144             $msg->claim($client_id);
145             $self->log('info', sprintf('Message %s claimed by client %s',
146             $msg->id, $client_id));
147             }
148              
149             sub claim
150             {
151             my ($self, $ids, $client_id, $callback) = @_;
152              
153             $self->_msg_foreach_ids($ids, sub {
154             $self->_claim_it_yo($_[0], $client_id);
155             });
156              
157             goto $callback if $callback;
158             }
159              
160             sub disown_destination
161             {
162             my ($self, $destination, $client_id, $callback) = @_;
163             my $aref = $self->messages->{$destination} || [];
164             $_->disown foreach grep {$_->claimed && $_->claimant eq $client_id} @$aref;
165              
166             goto $callback if $callback;
167             }
168              
169             sub disown_all
170             {
171             my ($self, $client_id, $callback) = @_;
172             $self->_msg_foreach(sub {
173             my $m = $_[0];
174             $m->disown() if $m->claimed && $m->claimant eq $client_id;
175             });
176             goto $callback if $callback;
177             }
178              
179             sub storage_shutdown
180             {
181             my ($self, $callback) = @_;
182             goto $callback if $callback;
183             }
184              
185             1;
186              
187             __END__
188              
189             =pod
190              
191             =head1 NAME
192              
193             POE::Component::MessageQueue::Storage::Memory -- In memory storage engine.
194              
195             =head1 SYNOPSIS
196              
197             use POE;
198             use POE::Component::MessageQueue;
199             use POE::Component::MessageQueue::Storage::Memory;
200             use strict;
201              
202             POE::Component::MessageQueue->new({
203             storage => POE::Component::MessageQueue::Storage::Memory->new()
204             });
205              
206             POE::Kernel->run();
207             exit;
208              
209             =head1 DESCRIPTION
210              
211             A storage engine that keeps all the messages in memory. Provides no persistence
212             what-so-ever.
213              
214             For an alternative in-memory storage engine optimized for a large number of
215             messages, please see L<POE::Component::MessageQueue::Storage::BigMemory>.
216              
217             I wouldn't suggest using this as your main storage engine because if messages
218             aren't removed by consumers, it will continue to consume more memory until it
219             explodes. Check-out L<POE::Component::MessageQueue::Storage::Complex> which
220             can use this module internally to keep messages in memory for a period of
221             time before moving them into persistent storage.
222              
223             =head1 CONSTRUCTOR PARAMETERS
224              
225             None to speak of!
226              
227             =head1 SUPPORTED STOMP HEADERS
228              
229             =over 4
230              
231             =item B<persistent>
232              
233             I<Ignored>. Nothing is persistent in this store.
234              
235             =item B<expire-after>
236              
237             I<Ignored>. All messages are kept until handled.
238              
239             =item B<deliver-after>
240              
241             I<Fully Supported>.
242              
243             =back
244              
245             =head1 SEE ALSO
246              
247             L<POE::Component::MessageQueue::Storage::BigMemory> -- Alternative memory-based storage engine.
248              
249             L<POE::Component::MessageQueue>,
250             L<POE::Component::MessageQueue::Storage>
251              
252             I<Other storage engines:>
253              
254             L<POE::Component::MessageQueue::Storage::BigMemory>,
255             L<POE::Component::MessageQueue::Storage::FileSystem>,
256             L<POE::Component::MessageQueue::Storage::DBI>,
257             L<POE::Component::MessageQueue::Storage::Generic>,
258             L<POE::Component::MessageQueue::Storage::Generic::DBI>,
259             L<POE::Component::MessageQueue::Storage::Throttled>,
260             L<POE::Component::MessageQueue::Storage::Complex>,
261             L<POE::Component::MessageQueue::Storage::Default>
262              
263             =cut