File Coverage

blib/lib/POE/Component/MessageQueue/Queue.pm
Criterion Covered Total %
statement 78 85 91.7
branch 20 26 76.9
condition 9 12 75.0
subroutine 18 20 90.0
pod 0 9 0.0
total 125 152 82.2


line stmt bran cond sub pod time code
1             #
2             # Copyright 2007-2010 David Snopek <dsnopek@gmail.com>
3             #
4             # This program is free software: you can redistribute it and/or modify
5             # it under the terms of the GNU General Public License as published by
6             # the Free Software Foundation, either version 2 of the License, or
7             # (at your option) any later version.
8             #
9             # This program is distributed in the hope that it will be useful,
10             # but WITHOUT ANY WARRANTY; without even the implied warranty of
11             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12             # GNU General Public License for more details.
13             #
14             # You should have received a copy of the GNU General Public License
15             # along with this program. If not, see <http://www.gnu.org/licenses/>.
16             #
17              
18             package POE::Component::MessageQueue::Queue;
19              
20 11     11   1964 use POE;
  11         543  
  11         86  
21 11     11   7177 use POE::Session;
  11         24  
  11         40  
22 11     11   733 use Moose;
  11         22  
  11         80  
23              
24             with qw(POE::Component::MessageQueue::Destination);
25              
26 33     33 0 157 sub flag { has $_[0] => (is => 'rw', default => 0) }
27             flag 'pumping';
28             flag 'pump_pending';
29             flag 'shutting_down';
30              
31             sub stash {
32 22     22 0 89 my $n = $_[0];
33             has $n => (
34             is => 'rw',
35             isa => 'HashRef',
36 8     8   294 default => sub { {} },
37 22         256 traits => ['Hash'],
38             handles => {
39             "set_$n" => 'set',
40             "del_$n" => 'delete',
41             "${n}_keys" => 'keys',
42             }
43             );
44             }
45              
46             stash 'waiting';
47             stash 'serviced';
48              
49             sub next_subscriber
50             {
51 988     988 0 1874 my $self = $_[0];
52 988 100       36245 if (my @keys = $self->waiting_keys)
53             {
54 405         908 my $id = pop(@keys);
55 405         15278 my $s = $self->del_waiting($id);
56 405 50       13194 if($s->client)
57             {
58 405         15247 $self->set_serviced($id, $s);
59 405         3714 return $s;
60             }
61             else
62             {
63 0         0 $self->delete_subscription($id);
64 0         0 return $self->next_subscriber;
65             }
66             }
67             else
68             {
69 583         17850 my $serviced = $self->serviced;
70 583         19067 $self->serviced($self->waiting);
71 583         17141 $self->waiting($serviced);
72 583   66     2702 return (keys %$serviced) && $self->next_subscriber;
73             }
74             }
75              
76             sub next_ready
77             {
78 470     470 0 1028 my $self = $_[0];
79 470         979 my ($s, %seen);
80              
81 470   100     1477 while ($s = $self->next_subscriber and !exists $seen{$s})
82             {
83 271 100       1253 return $s if $s->ready;
84 134         632 $seen{$s} = 1;
85             }
86 333         1439 return;
87             }
88              
89             after set_subscription => __PACKAGE__->can('set_waiting');
90             after delete_subscription => sub {
91             my ($self, @args) = @_;
92             $self->del_waiting(@args);
93             $self->del_serviced(@args);
94             };
95              
96             __PACKAGE__->meta->make_immutable();
97              
98             sub BUILD
99             {
100 4     4 0 14 my ($self, $args) = @_;
101 4         44 POE::Session->create(
102             object_states => [ $self => [qw(_start _shutdown _pump_state _pump_timer)]],
103             );
104             }
105              
106             sub _start
107             {
108 4     4   999 my ($self, $kernel) = @_[OBJECT, KERNEL];
109 4         203 $kernel->alias_set($self->name);
110              
111 4 50       348 $kernel->delay(_pump_timer => $self->parent->pump_frequency)
112             if ($self->parent->pump_frequency);
113             }
114              
115             sub shutdown {
116 4     4 0 15 my $self = $_[0];
117 4         128 $self->shutting_down(1);
118 4         117 $poe_kernel->post($self->name, '_shutdown')
119             }
120              
121             sub _shutdown
122             {
123 4     4   3710 my ($self, $kernel) = @_[OBJECT, KERNEL];
124 4         199 $kernel->alias_remove($self->name);
125 4         267 $kernel->alarm_remove_all();
126             }
127              
128             # This is the pumping philosophy: When we receive a pump request, we will
129             # give everyone a chance to claim a message. If any pumps are asked for while
130             # this is happening, we will remember and do another pump when this one is
131             # finished (just one).
132              
133             # This means we're serializing claim and retrieve requests. More work needs
134             # to be done to determine whether this is good or necessary.
135              
136 0     0 0 0 sub is_persistent { return 1 }
137              
138             sub _pump_state
139             {
140 321     321   44660 my $self = $_[OBJECT];
141 321 100       11808 return if $self->shutting_down;
142              
143 320 100       1199 if (my $s = $self->next_ready)
144             {
145 105         439 $s->ready(0);
146              
147             $self->storage->claim_and_retrieve($self->name, $s->client->id, sub {
148 105 100   105   402 if (my $msg = $_[0])
149             {
150 98         696 $self->dispatch_message($msg, $s);
151 98         5512 $poe_kernel->post($self->name, '_pump_state');
152             }
153             else
154             {
155 7         37 $s->ready(1);
156 7         31 $self->_done_pumping();
157             }
158 105         712 });
159             }
160             else
161             {
162 215         794 $self->_done_pumping();
163             }
164             }
165              
166             sub _done_pumping
167             {
168 222     222   490 my $self = $_[0];
169 222         7037 $self->pumping(0);
170 222 50       6753 $self->pump() if $self->pump_pending;
171             }
172              
173             sub _pump_timer
174             {
175 0     0   0 my ($self, $kernel) = @_[OBJECT, KERNEL];
176 0 0       0 return if $self->shutting_down;
177              
178             # pump (the 1 means that we won't set pump_pending if
179             # we are already in a pumping state).
180 0         0 $self->pump(1);
181              
182 0         0 $kernel->delay(_pump_timer => $self->parent->pump_frequency);
183             }
184              
185             sub pump
186             {
187 230     230 0 684 my ($self, $skip_pending) = @_;
188              
189 230 100 66     8164 if($self->pumping and not $skip_pending)
190             {
191 5         167 $self->pump_pending(1);
192             }
193             else
194             {
195 225         7436 $self->log(debug => ' -- PUMP QUEUE: '.$self->name.' -- ');
196 225         1315 $self->notify('pump');
197 225         11243 $self->pump_pending(0);
198 225         6904 $self->pumping(1);
199 225         7093 $poe_kernel->call($self->name, '_pump_state');
200             }
201             }
202              
203             sub send
204             {
205 150     150 0 368 my ($self, $message) = @_;
206 150 50       6402 return if $self->shutting_down;
207              
208             # If we already have a ready subscriber, we'll dispatch before we
209             # store to give the subscriber a headstart on processing.
210 150 100 66     5141 if (not $message->has_delay and my $s = $self->next_ready)
211             {
212 32         995 my $mid = $message->id;
213 32         1053 my $cid = $s->client->id;
214 32 100       1029 if ($s->client_ack)
215             {
216 2         74 $message->claim($cid);
217 2         15 $self->log(info => "QUEUE: Message $mid claimed by $cid during send");
218 2         13 $self->storage->store($message);
219 2         13 $self->notify(store => $message);
220             }
221             else
222             {
223 30         160 $self->log(info => "QUEUE: Message $mid not stored, sent to $cid");
224             }
225 32         184 $self->dispatch_message($message, $s);
226             }
227             else
228             {
229             $self->storage->store($message, sub {
230 118     118   68447 $self->notify(store => $message);
231 118         2352 $self->pump();
232 118         563 });
233             }
234             }
235              
236             1;
237