File Coverage

blib/lib/POE/Component/SmokeBox/JobQueue.pm
Criterion Covered Total %
statement 153 204 75.0
branch 54 90 60.0
condition 13 26 50.0
subroutine 27 33 81.8
pod 11 11 100.0
total 258 364 70.8


line stmt bran cond sub pod time code
1             package POE::Component::SmokeBox::JobQueue;
2             $POE::Component::SmokeBox::JobQueue::VERSION = '0.58';
3             #ABSTRACT: An array based queue for SmokeBox
4              
5 19     19   13996 use strict;
  19         43  
  19         575  
6 19     19   154 use warnings;
  19         43  
  19         646  
7 19     19   664 use POE qw(Component::SmokeBox::Backend Component::SmokeBox::Job Component::SmokeBox::Smoker Component::SmokeBox::Result);
  19         31895  
  19         155  
8 19     19   1160 use Params::Check qw(check);
  19         42  
  19         48962  
9              
10             # Stolen from POE::Wheel. This is static data, shared by all
11             my $current_id = 0;
12             my %active_identifiers;
13              
14             sub spawn {
15 24     24 1 6360 my $package = shift;
16 24         91 my %params = @_;
17 24         128 $params{lc $_} = delete $params{$_} for keys %params;
18 24 100       115 $params{'delay'} = 0 unless exists $params{'delay'};
19 24         56 my $options = delete $params{'options'};
20 24         69 my $self = bless \%params, $package;
21 24 100       413 $self->{session_id} = POE::Session->create(
22             object_states => [
23             $self => {
24             'shutdown' => '_shutdown',
25             submit => '_submit',
26             cancel => '_cancel',
27             },
28             $self => [qw(_start _process_queue _backend_done _process_queue_delayed)],
29             ],
30             heap => $self,
31             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
32             )->ID();
33 24         4691 return $self;
34             }
35              
36             sub session_id {
37 74     74 1 450 return $_[0]->{session_id};
38             }
39              
40             sub shutdown {
41 24     24 1 4591 my $self = shift;
42 24         200 $poe_kernel->call( $self->session_id() => 'shutdown' => @_ );
43             }
44              
45             sub _start {
46 24     24   7918 my ($kernel,$self) = @_[KERNEL,OBJECT];
47 24         100 $self->{session_id} = $_[SESSION]->ID();
48 24 50       246 if ( $self->{alias} ) {
49 0         0 $kernel->alias_set( $self->{alias} );
50             }
51             else {
52 24         120 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
53             }
54 24         989 $self->{_queue} = [ ];
55 24         108 return;
56             }
57              
58             sub _shutdown {
59 24     24   2330 my ($kernel,$self) = @_[KERNEL,OBJECT];
60 24         170 $self->{_shutdown} = 1;
61 24 50       163 if ( $self->{alias} ) {
62 0         0 $kernel->alias_remove($_) for $kernel->alias_list();
63             }
64             else {
65 24         159 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ );
66             }
67             # shutdown currently running backend
68 24 50       1422 $self->{_current}->{backend}->shutdown() if $self->{_current}->{backend};
69             # remove queued jobs.
70             # $kernel->refcount_decrement( $_->{session}, __PACKAGE__ ) for @{ $self->{_queue} };
71 24         78 $kernel->refcount_decrement( $_, __PACKAGE__ ) for keys %{ $self->{_refcounts} };
  24         131  
72 24         148 delete $self->{_queue};
73              
74             # remove delay for jobs if we set one
75 24 100       152 $kernel->alarm_remove( delete $self->{_delay} ) if exists $self->{_delay};
76              
77 24         392 return;
78             }
79              
80             sub _process_queue_delayed {
81 8     8   7989171 my ($kernel,$self) = @_[KERNEL,OBJECT];
82 8 50       122 delete $self->{_delay} if exists $self->{_delay};
83 8         169 $kernel->yield( '_process_queue', 'DELAYDONE' );
84 8         1744 return;
85             }
86              
87             sub _process_queue {
88 193     193   236152 my ($kernel,$self,$delaydone) = @_[KERNEL,OBJECT,ARG0];
89 193 100       873 return if $self->{_shutdown};
90 190 50       797 return if exists $self->{_delay};
91 190 100 100     877 return if exists $self->{paused} and $self->{paused} == 2;
92 188         480 my ($job, $smoker );
93 188 100       713 if ( $self->{_current} ) {
94 117 100       510 return if $self->{_current}->{backend};
95 90         256 $job = $self->{_current};
96              
97             # do we have a delay between smokers?
98 90 100 100     2978 if ( $job->{job}->delay > 0 and ! defined $delaydone and scalar @{ $job->{smokers} } > 0 ) {
  9   100     2422  
99             # fire off an alarm for the next iteration
100             #warn "Setting delay(" . $job->{job}->delay . ") for smoker" if $ENV{PERL5_SMOKEBOX_DEBUG};
101 6         59 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $job->{job}->delay );
102 6         1293 return;
103             }
104              
105 84         22014 $smoker = shift @{ $job->{smokers} };
  84         511  
106 84 100       433 unless ( $smoker ) {
107             # Reached the end send an event back to the original requestor
108 50         857 delete $self->{_current};
109 50         416 delete $job->{smokers};
110 50         300 my $session = delete $job->{session};
111 50         431 $kernel->post( $session, delete $job->{event}, $job );
112 50         7142 $self->{_refcounts}->{ $session }--;
113 50 100       425 if ( $self->{_refcounts}->{ $session } <= 0 ) {
114 23         251 $kernel->refcount_decrement( $session, __PACKAGE__ );
115 23         1013 delete $self->{_refcounts}->{ $session };
116             }
117              
118             # did we enable delay between jobs?
119             # don't check the queue, we force a delay all the time so if we add a job, we're already delaying for it...
120 50 100       353 if ( $self->{delay} > 0 ) {
121             # fire off an alarm for the next iteration
122             #warn "Setting delay($self->{delay}) for job" if $ENV{PERL5_SMOKEBOX_DEBUG};
123 4         60 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $self->{delay} );
124             } else {
125 46         299 $kernel->yield( '_process_queue' );
126             }
127 50         4968 return;
128             }
129             }
130             else {
131 71         735 $job = $self->_shift();
132 71 100       315 return unless $job;
133 50         129 $smoker = shift @{ $job->{smokers} };
  50         202  
134 50         597 $job->{result} = POE::Component::SmokeBox::Result->new();
135 50         236 $self->{_current} = $job;
136             }
137 84         1076 $job->{backend} = POE::Component::SmokeBox::Backend->spawn( event => '_backend_done', $job->{job}->dump_data(), $smoker->dump_data(), );
138 84         713 return;
139             }
140              
141             sub _backend_done {
142 84     84   42878 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
143 84         833 delete $self->{_current}->{backend};
144 84         1182 $self->{_current}->{result}->add_result( $result );
145 84         649 $kernel->yield( '_process_queue' );
146 84         8853 return;
147             }
148              
149             sub submit {
150 51     51 1 179 my $self = shift;
151 51         249 return $poe_kernel->call( $self->{session_id}, 'submit', @_ );
152             }
153              
154             sub cancel {
155 0     0 1 0 my $self = shift;
156 0         0 return $poe_kernel->call( $self->{session_id}, 'cancel', @_ );
157             }
158              
159             sub _submit {
160 51     51   3746 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
161 51 50       200 return if $self->{_shutdown};
162 51         87 my $args;
163 51 100       212 if ( ref( $_[ARG0] ) eq 'HASH' ) {
164 44         72 $args = { %{ $_[ARG0] } };
  44         199  
165             }
166             else {
167 7         31 $args = { @_[ARG0..$#_] };
168             }
169              
170             my $tmpl = {
171             event => { required => 1, defined => 1, },
172 44 50   44   3500 session => { defined => 1, allow => [ sub { return 1 if $poe_kernel->alias_resolve( $_[0] ); }, ], },
173             type => { defined => 1, allow => [qw(push unshift)], default => 'push', },
174             job => { required => 1, defined => 1,
175 50 50 33 50   3705 allow => [ sub { return 1 if ref $_[0] and $_[0]->isa('POE::Component::SmokeBox::Job'); }, ], },
176             smokers => { required => 1, defined => 1, allow => [
177             sub {
178             return 1 if ref $_[0] eq 'ARRAY'
179 50         221 and scalar @{ $_[0] }
180 50 50 50 50   3497 and ( grep { $_->isa('POE::Component::SmokeBox::Smoker') } @{ $_[0] } ) == @{ $_[0] };
  84   33     339  
  50         138  
  50         222  
181 51         732 }, ],
182             },
183              
184             };
185              
186 51 100       199 my $checked = check( $tmpl, $args, 1 ) or return;
187 50 100       1325 $checked->{session} = $kernel->alias_resolve( $checked->{session} )->ID() if $checked->{session};
188 50 100       1147 $checked->{session} = $sender->ID() unless $checked->{session};
189 50         169 my $type = delete $checked->{type};
190 50         154 my $id = $self->_add_job( $checked, $type );
191 50 50       146 return unless $id;
192 50 100       178 unless ( defined $self->{_refcounts}->{ $checked->{session} } ) {
193 23         86 $kernel->refcount_increment( $checked->{session}, __PACKAGE__ );
194             }
195 50         745 $self->{_refcounts}->{ $checked->{session} }++;
196 50         100 $checked->{submitted} = time();
197             #$checked->{job}->id( $id );
198 50         544 return $id;
199             }
200              
201             sub _cancel {
202 0     0   0 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
203 0 0       0 return if $self->{_shutdown};
204 0         0 my $args;
205 0 0       0 if ( ref( $_[ARG0] ) eq 'HASH' ) {
206 0         0 $args = { %{ $_[ARG0] } };
  0         0  
207             }
208             else {
209 0         0 $args = { @_[ARG0..$#_] };
210             }
211              
212 0         0 my $tmpl = {
213             job => { required => 1, defined => 1, },
214             };
215              
216 0 0       0 my $checked = check( $tmpl, $args, 1 ) or return;
217 0         0 return $self->_remove_job( $checked->{job} );
218             }
219              
220             sub _push {
221 50     50   152 my ($self,$job) = @_;
222 50 50       166 return unless ref $job eq 'HASH';
223 50         118 my $id = _allocate_identifier();
224 50         148 $job->{id} = $id;
225 50         82 CORE::push @{ $self->{_queue} }, $job;
  50         120  
226 50         151 $self->{_jobs}->{ $id } = $job;
227 50         130 $poe_kernel->post( $self->session_id(), '_process_queue' );
228 50         5677 return $id;
229             }
230              
231             sub _unshift {
232 0     0   0 my ($self,$job) = @_;
233 0 0       0 return unless ref $job eq 'HASH';
234 0         0 my $id = _allocate_identifier();
235 0         0 $job->{id} = $id;
236 0         0 CORE::unshift @{ $self->{_queue} }, $job;
  0         0  
237 0         0 $self->{_jobs}->{ $id } = $job;
238 0         0 $poe_kernel->post( $self->session_id(), '_process_queue' );
239 0         0 return $id;
240             }
241              
242             sub _shift {
243 71     71   235 my $self = CORE::shift;
244 71 100       347 return if $self->{paused};
245 68         173 my $job = CORE::shift @{ $self->{_queue} };
  68         275  
246 68 100       268 return unless $job;
247 50         234 delete $self->{_jobs}->{ $job->{id} };
248 50         427 _free_identifier( $job->{id} );
249 50         181 delete $job->{id};
250 50         147 return $job;
251             }
252              
253             sub _pop {
254 0     0   0 my $self = CORE::shift;
255 0 0       0 return if $self->{paused};
256 0         0 my $job = CORE::pop @{ $self->{_queue} };
  0         0  
257 0 0       0 return unless $job;
258 0         0 delete $self->{_jobs}->{ $job->{id} };
259 0         0 _free_identifier( $job->{id} );
260 0         0 delete $job->{id};
261 0         0 return $job;
262             }
263              
264             sub _add_job {
265 50     50   119 my ($self,$job,$type) = @_;
266 50 50       168 $type = lc $type if $type;
267 50 50 33     205 if ( $type and grep { /^\Q$type\E$/ } qw(push unshift) ) {
  100         771  
268 50         115 $type = '_' . $type;
269 50         177 return $self->$type( $job );
270             }
271 0         0 return $self->_push( $job );
272             }
273              
274             sub _remove_job {
275 0     0   0 my ($self,$type) = @_;
276 0 0       0 return if $self->{paused};
277 0 0       0 $type = lc $type if $type;
278 0 0 0     0 if ( $type and grep { /^\Q$type\E$/ } qw(pop shift) ) {
  0 0 0     0  
279 0         0 $type = '_' . $type;
280 0         0 return $self->$type();
281             }
282             elsif ( $type and defined $self->{_jobs}->{ $type } ) {
283 0         0 my $job = delete $self->{_jobs}->{ $type };
284 0         0 my $i = 0;
285 0         0 for ( @{ $self->{_queue} } ) {
  0         0  
286 0 0       0 splice(@{ $self->{_queue} }, $i, 1) if $_->{id} eq $type;
  0         0  
287 0         0 ++$i;
288             }
289 0         0 delete $job->{id};
290             }
291 0         0 return $self->_shift();
292             }
293              
294             sub pending_jobs {
295 11     11 1 5009266 my $self = CORE::shift;
296 11         26 return @{ $self->{_queue} };
  11         81  
297             }
298              
299             sub pause_queue {
300 3     3 1 9 my $self = CORE::shift;
301 3         19 $self->{paused} = 1;
302             }
303              
304             sub pause_queue_now {
305 2     2 1 7 my $self = CORE::shift;
306 2         16 $self->{paused} = 2;
307             }
308              
309             sub resume_queue {
310 5     5 1 24 my $self = CORE::shift;
311 5         20 delete $self->{paused};
312 5         54 $poe_kernel->post( $self->{session_id}, '_process_queue' );
313             }
314              
315             sub queue_paused {
316 20 100   20 1 7971227 if ( exists $_[0]->{paused} ) {
317 9         81 return 1;
318             } else {
319 11         109 return 0;
320             }
321             }
322              
323             sub current_job {
324 0     0 1 0 my $self = CORE::shift;
325 0         0 return $self->{_current};
326             }
327              
328             sub _allocate_identifier {
329 50     50   83 while (1) {
330 50 50       192 last unless exists $active_identifiers{ ++$current_id };
331             }
332 50         183 return $active_identifiers{$current_id} = $current_id;
333             }
334              
335             sub _free_identifier {
336 50     50   158 my $id = CORE::shift;
337 50         244 delete $active_identifiers{$id};
338             }
339              
340             1;
341              
342             __END__