File Coverage

blib/lib/POE/Component/MessageQueue/Queue.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Queue;
19              
20 1     1   3122 use POE;
  0            
  0            
21             use POE::Session;
22             use Moose;
23              
24             with qw(POE::Component::MessageQueue::Destination);
25              
26             sub flag { has $_[0] => (is => 'rw', default => 0) }
27             flag 'pumping';
28             flag 'pump_pending';
29             flag 'shutting_down';
30              
31             sub stash {
32             my $n = $_[0];
33             has $n => (
34             is => 'rw',
35             isa => 'HashRef',
36             default => sub { {} },
37             traits => ['Hash'],
38             handles => {
39             "set_$n" => 'set',
40             "del_$n" => 'delete',
41             "${n}_keys" => 'keys',
42             }
43             );
44             }
45              
46             stash 'waiting';
47             stash 'serviced';
48              
49             sub next_subscriber
50             {
51             my $self = $_[0];
52             if (my @keys = $self->waiting_keys)
53             {
54             my $id = pop(@keys);
55             my $s = $self->del_waiting($id);
56             if($s->client)
57             {
58             $self->set_serviced($id, $s);
59             return $s;
60             }
61             else
62             {
63             $self->delete_subscription($id);
64             return $self->next_subscriber;
65             }
66             }
67             else
68             {
69             my $serviced = $self->serviced;
70             $self->serviced($self->waiting);
71             $self->waiting($serviced);
72             return (keys %$serviced) && $self->next_subscriber;
73             }
74             }
75              
76             sub next_ready
77             {
78             my $self = $_[0];
79             my ($s, %seen);
80              
81             while ($s = $self->next_subscriber and !exists $seen{$s})
82             {
83             return $s if $s->ready;
84             $seen{$s} = 1;
85             }
86             return;
87             }
88              
89             after set_subscription => __PACKAGE__->can('set_waiting');
90             after delete_subscription => sub {
91             my ($self, @args) = @_;
92             $self->del_waiting(@args);
93             $self->del_serviced(@args);
94             };
95              
96             __PACKAGE__->meta->make_immutable();
97              
98             sub BUILD
99             {
100             my ($self, $args) = @_;
101             POE::Session->create(
102             object_states => [ $self => [qw(_start _shutdown _pump_state _pump_timer)]],
103             );
104             }
105              
106             sub _start
107             {
108             my ($self, $kernel) = @_[OBJECT, KERNEL];
109             $kernel->alias_set($self->name);
110              
111             $kernel->delay(_pump_timer => $self->parent->pump_frequency)
112             if ($self->parent->pump_frequency);
113             }
114              
115             sub shutdown {
116             my $self = $_[0];
117             $self->shutting_down(1);
118             $poe_kernel->post($self->name, '_shutdown')
119             }
120              
121             sub _shutdown
122             {
123             my ($self, $kernel) = @_[OBJECT, KERNEL];
124             $kernel->alias_remove($self->name);
125             $kernel->alarm_remove_all();
126             }
127              
128             # This is the pumping philosophy: When we receive a pump request, we will
129             # give everyone a chance to claim a message. If any pumps are asked for while
130             # this is happening, we will remember and do another pump when this one is
131             # finished (just one).
132              
133             # This means we're serializing claim and retrieve requests. More work needs
134             # to be done to determine whether this is good or necessary.
135              
136             sub is_persistent { return 1 }
137              
138             sub _pump_state
139             {
140             my $self = $_[OBJECT];
141             return if $self->shutting_down;
142              
143             if (my $s = $self->next_ready)
144             {
145             $s->ready(0);
146              
147             $self->storage->claim_and_retrieve($self->name, $s->client->id, sub {
148             if (my $msg = $_[0])
149             {
150             $self->dispatch_message($msg, $s);
151             $poe_kernel->post($self->name, '_pump_state');
152             }
153             else
154             {
155             $s->ready(1);
156             $self->_done_pumping();
157             }
158             });
159             }
160             else
161             {
162             $self->_done_pumping();
163             }
164             }
165              
166             sub _done_pumping
167             {
168             my $self = $_[0];
169             $self->pumping(0);
170             $self->pump() if $self->pump_pending;
171             }
172              
173             sub _pump_timer
174             {
175             my ($self, $kernel) = @_[OBJECT, KERNEL];
176             return if $self->shutting_down;
177              
178             # pump (the 1 means that we won't set pump_pending if
179             # we are already in a pumping state).
180             $self->pump(1);
181              
182             $kernel->delay(_pump_timer => $self->parent->pump_frequency);
183             }
184              
185             sub pump
186             {
187             my ($self, $skip_pending) = @_;
188              
189             if($self->pumping and not $skip_pending)
190             {
191             $self->pump_pending(1);
192             }
193             else
194             {
195             $self->log(debug => ' -- PUMP QUEUE: '.$self->name.' -- ');
196             $self->notify('pump');
197             $self->pump_pending(0);
198             $self->pumping(1);
199             $poe_kernel->call($self->name, '_pump_state');
200             }
201             }
202              
203             sub send
204             {
205             my ($self, $message) = @_;
206             return if $self->shutting_down;
207              
208             # If we already have a ready subscriber, we'll dispatch before we
209             # store to give the subscriber a headstart on processing.
210             if (not $message->has_delay and my $s = $self->next_ready)
211             {
212             my $mid = $message->id;
213             my $cid = $s->client->id;
214             if ($s->client_ack)
215             {
216             $message->claim($cid);
217             $self->log(info => "QUEUE: Message $mid claimed by $cid during send");
218             $self->storage->store($message);
219             $self->notify(store => $message);
220             }
221             else
222             {
223             $self->log(info => "QUEUE: Message $mid not stored, sent to $cid");
224             }
225             $self->dispatch_message($message, $s);
226             }
227             else
228             {
229             $self->storage->store($message, sub {
230             $self->notify(store => $message);
231             $self->pump();
232             });
233             }
234             }
235              
236             1;
237