File Coverage

blib/lib/POE/Component/SmokeBox/JobQueue.pm
Criterion Covered Total %
statement 153 204 75.0
branch 54 90 60.0
condition 13 26 50.0
subroutine 27 33 81.8
pod 11 11 100.0
total 258 364 70.8


line stmt bran cond sub pod time code
1             package POE::Component::SmokeBox::JobQueue;
2             $POE::Component::SmokeBox::JobQueue::VERSION = '0.56';
3             #ABSTRACT: An array based queue for SmokeBox
4              
5 19     19   12735 use strict;
  19         46  
  19         568  
6 19     19   157 use warnings;
  19         45  
  19         602  
7 19     19   541 use POE qw(Component::SmokeBox::Backend Component::SmokeBox::Job Component::SmokeBox::Smoker Component::SmokeBox::Result);
  19         30282  
  19         139  
8 19     19   1097 use Params::Check qw(check);
  19         48  
  19         46595  
9              
10             # Stolen from POE::Wheel. This is static data, shared by all
11             my $current_id = 0;
12             my %active_identifiers;
13              
14             sub spawn {
15 24     24 1 4459 my $package = shift;
16 24         87 my %params = @_;
17 24         121 $params{lc $_} = delete $params{$_} for keys %params;
18 24 100       107 $params{'delay'} = 0 unless exists $params{'delay'};
19 24         63 my $options = delete $params{'options'};
20 24         62 my $self = bless \%params, $package;
21 24 100       316 $self->{session_id} = POE::Session->create(
22             object_states => [
23             $self => {
24             'shutdown' => '_shutdown',
25             submit => '_submit',
26             cancel => '_cancel',
27             },
28             $self => [qw(_start _process_queue _backend_done _process_queue_delayed)],
29             ],
30             heap => $self,
31             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
32             )->ID();
33 24         4185 return $self;
34             }
35              
36             sub session_id {
37 74     74 1 450 return $_[0]->{session_id};
38             }
39              
40             sub shutdown {
41 24     24 1 4156 my $self = shift;
42 24         135 $poe_kernel->call( $self->session_id() => 'shutdown' => @_ );
43             }
44              
45             sub _start {
46 24     24   7143 my ($kernel,$self) = @_[KERNEL,OBJECT];
47 24         100 $self->{session_id} = $_[SESSION]->ID();
48 24 50       181 if ( $self->{alias} ) {
49 0         0 $kernel->alias_set( $self->{alias} );
50             }
51             else {
52 24         100 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
53             }
54 24         921 $self->{_queue} = [ ];
55 24         84 return;
56             }
57              
58             sub _shutdown {
59 24     24   1916 my ($kernel,$self) = @_[KERNEL,OBJECT];
60 24         149 $self->{_shutdown} = 1;
61 24 50       147 if ( $self->{alias} ) {
62 0         0 $kernel->alias_remove($_) for $kernel->alias_list();
63             }
64             else {
65 24         124 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ );
66             }
67             # shutdown currently running backend
68 24 50       1288 $self->{_current}->{backend}->shutdown() if $self->{_current}->{backend};
69             # remove queued jobs.
70             # $kernel->refcount_decrement( $_->{session}, __PACKAGE__ ) for @{ $self->{_queue} };
71 24         57 $kernel->refcount_decrement( $_, __PACKAGE__ ) for keys %{ $self->{_refcounts} };
  24         129  
72 24         146 delete $self->{_queue};
73              
74             # remove delay for jobs if we set one
75 24 100       168 $kernel->alarm_remove( delete $self->{_delay} ) if exists $self->{_delay};
76              
77 24         359 return;
78             }
79              
80             sub _process_queue_delayed {
81 8     8   7983659 my ($kernel,$self) = @_[KERNEL,OBJECT];
82 8 50       80 delete $self->{_delay} if exists $self->{_delay};
83 8         82 $kernel->yield( '_process_queue', 'DELAYDONE' );
84 8         1024 return;
85             }
86              
87             sub _process_queue {
88 193     193   247310 my ($kernel,$self,$delaydone) = @_[KERNEL,OBJECT,ARG0];
89 193 100       887 return if $self->{_shutdown};
90 190 50       672 return if exists $self->{_delay};
91 190 100 100     802 return if exists $self->{paused} and $self->{paused} == 2;
92 188         434 my ($job, $smoker );
93 188 100       590 if ( $self->{_current} ) {
94 117 100       486 return if $self->{_current}->{backend};
95 90         262 $job = $self->{_current};
96              
97             # do we have a delay between smokers?
98 90 100 100     2340 if ( $job->{job}->delay > 0 and ! defined $delaydone and scalar @{ $job->{smokers} } > 0 ) {
  9   100     2071  
99             # fire off an alarm for the next iteration
100             #warn "Setting delay(" . $job->{job}->delay . ") for smoker" if $ENV{PERL5_SMOKEBOX_DEBUG};
101 6         57 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $job->{job}->delay );
102 6         1170 return;
103             }
104              
105 84         17819 $smoker = shift @{ $job->{smokers} };
  84         462  
106 84 100       362 unless ( $smoker ) {
107             # Reached the end send an event back to the original requestor
108 50         588 delete $self->{_current};
109 50         290 delete $job->{smokers};
110 50         267 my $session = delete $job->{session};
111 50         376 $kernel->post( $session, delete $job->{event}, $job );
112 50         6460 $self->{_refcounts}->{ $session }--;
113 50 100       329 if ( $self->{_refcounts}->{ $session } <= 0 ) {
114 23         209 $kernel->refcount_decrement( $session, __PACKAGE__ );
115 23         912 delete $self->{_refcounts}->{ $session };
116             }
117              
118             # did we enable delay between jobs?
119             # don't check the queue, we force a delay all the time so if we add a job, we're already delaying for it...
120 50 100       281 if ( $self->{delay} > 0 ) {
121             # fire off an alarm for the next iteration
122             #warn "Setting delay($self->{delay}) for job" if $ENV{PERL5_SMOKEBOX_DEBUG};
123 4         36 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $self->{delay} );
124             } else {
125 46         275 $kernel->yield( '_process_queue' );
126             }
127 50         4753 return;
128             }
129             }
130             else {
131 71         470 $job = $self->_shift();
132 71 100       279 return unless $job;
133 50         116 $smoker = shift @{ $job->{smokers} };
  50         177  
134 50         527 $job->{result} = POE::Component::SmokeBox::Result->new();
135 50         137 $self->{_current} = $job;
136             }
137 84         852 $job->{backend} = POE::Component::SmokeBox::Backend->spawn( event => '_backend_done', $job->{job}->dump_data(), $smoker->dump_data(), );
138 84         645 return;
139             }
140              
141             sub _backend_done {
142 84     84   36686 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
143 84         667 delete $self->{_current}->{backend};
144 84         998 $self->{_current}->{result}->add_result( $result );
145 84         558 $kernel->yield( '_process_queue' );
146 84         7948 return;
147             }
148              
149             sub submit {
150 51     51 1 142 my $self = shift;
151 51         232 return $poe_kernel->call( $self->{session_id}, 'submit', @_ );
152             }
153              
154             sub cancel {
155 0     0 1 0 my $self = shift;
156 0         0 return $poe_kernel->call( $self->{session_id}, 'cancel', @_ );
157             }
158              
159             sub _submit {
160 51     51   3669 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
161 51 50       196 return if $self->{_shutdown};
162 51         90 my $args;
163 51 100       194 if ( ref( $_[ARG0] ) eq 'HASH' ) {
164 44         68 $args = { %{ $_[ARG0] } };
  44         216  
165             }
166             else {
167 7         31 $args = { @_[ARG0..$#_] };
168             }
169              
170             my $tmpl = {
171             event => { required => 1, defined => 1, },
172 44 50   44   2768 session => { defined => 1, allow => [ sub { return 1 if $poe_kernel->alias_resolve( $_[0] ); }, ], },
173             type => { defined => 1, allow => [qw(push unshift)], default => 'push', },
174             job => { required => 1, defined => 1,
175 50 50 33 50   3628 allow => [ sub { return 1 if ref $_[0] and $_[0]->isa('POE::Component::SmokeBox::Job'); }, ], },
176             smokers => { required => 1, defined => 1, allow => [
177             sub {
178             return 1 if ref $_[0] eq 'ARRAY'
179 50         241 and scalar @{ $_[0] }
180 50 50 50 50   4530 and ( grep { $_->isa('POE::Component::SmokeBox::Smoker') } @{ $_[0] } ) == @{ $_[0] };
  84   33     307  
  50         137  
  50         229  
181 51         729 }, ],
182             },
183              
184             };
185              
186 51 100       189 my $checked = check( $tmpl, $args, 1 ) or return;
187 50 100       1269 $checked->{session} = $kernel->alias_resolve( $checked->{session} )->ID() if $checked->{session};
188 50 100       1107 $checked->{session} = $sender->ID() unless $checked->{session};
189 50         136 my $type = delete $checked->{type};
190 50         158 my $id = $self->_add_job( $checked, $type );
191 50 50       135 return unless $id;
192 50 100       184 unless ( defined $self->{_refcounts}->{ $checked->{session} } ) {
193 23         89 $kernel->refcount_increment( $checked->{session}, __PACKAGE__ );
194             }
195 50         767 $self->{_refcounts}->{ $checked->{session} }++;
196 50         106 $checked->{submitted} = time();
197             #$checked->{job}->id( $id );
198 50         551 return $id;
199             }
200              
201             sub _cancel {
202 0     0   0 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
203 0 0       0 return if $self->{_shutdown};
204 0         0 my $args;
205 0 0       0 if ( ref( $_[ARG0] ) eq 'HASH' ) {
206 0         0 $args = { %{ $_[ARG0] } };
  0         0  
207             }
208             else {
209 0         0 $args = { @_[ARG0..$#_] };
210             }
211              
212 0         0 my $tmpl = {
213             job => { required => 1, defined => 1, },
214             };
215              
216 0 0       0 my $checked = check( $tmpl, $args, 1 ) or return;
217 0         0 return $self->_remove_job( $checked->{job} );
218             }
219              
220             sub _push {
221 50     50   128 my ($self,$job) = @_;
222 50 50       166 return unless ref $job eq 'HASH';
223 50         118 my $id = _allocate_identifier();
224 50         147 $job->{id} = $id;
225 50         103 CORE::push @{ $self->{_queue} }, $job;
  50         133  
226 50         122 $self->{_jobs}->{ $id } = $job;
227 50         132 $poe_kernel->post( $self->session_id(), '_process_queue' );
228 50         5663 return $id;
229             }
230              
231             sub _unshift {
232 0     0   0 my ($self,$job) = @_;
233 0 0       0 return unless ref $job eq 'HASH';
234 0         0 my $id = _allocate_identifier();
235 0         0 $job->{id} = $id;
236 0         0 CORE::unshift @{ $self->{_queue} }, $job;
  0         0  
237 0         0 $self->{_jobs}->{ $id } = $job;
238 0         0 $poe_kernel->post( $self->session_id(), '_process_queue' );
239 0         0 return $id;
240             }
241              
242             sub _shift {
243 71     71   213 my $self = CORE::shift;
244 71 100       275 return if $self->{paused};
245 68         161 my $job = CORE::shift @{ $self->{_queue} };
  68         229  
246 68 100       302 return unless $job;
247 50         217 delete $self->{_jobs}->{ $job->{id} };
248 50         452 _free_identifier( $job->{id} );
249 50         143 delete $job->{id};
250 50         130 return $job;
251             }
252              
253             sub _pop {
254 0     0   0 my $self = CORE::shift;
255 0 0       0 return if $self->{paused};
256 0         0 my $job = CORE::pop @{ $self->{_queue} };
  0         0  
257 0 0       0 return unless $job;
258 0         0 delete $self->{_jobs}->{ $job->{id} };
259 0         0 _free_identifier( $job->{id} );
260 0         0 delete $job->{id};
261 0         0 return $job;
262             }
263              
264             sub _add_job {
265 50     50   121 my ($self,$job,$type) = @_;
266 50 50       164 $type = lc $type if $type;
267 50 50 33     177 if ( $type and grep { /^\Q$type\E$/ } qw(push unshift) ) {
  100         746  
268 50         132 $type = '_' . $type;
269 50         209 return $self->$type( $job );
270             }
271 0         0 return $self->_push( $job );
272             }
273              
274             sub _remove_job {
275 0     0   0 my ($self,$type) = @_;
276 0 0       0 return if $self->{paused};
277 0 0       0 $type = lc $type if $type;
278 0 0 0     0 if ( $type and grep { /^\Q$type\E$/ } qw(pop shift) ) {
  0 0 0     0  
279 0         0 $type = '_' . $type;
280 0         0 return $self->$type();
281             }
282             elsif ( $type and defined $self->{_jobs}->{ $type } ) {
283 0         0 my $job = delete $self->{_jobs}->{ $type };
284 0         0 my $i = 0;
285 0         0 for ( @{ $self->{_queue} } ) {
  0         0  
286 0 0       0 splice(@{ $self->{_queue} }, $i, 1) if $_->{id} eq $type;
  0         0  
287 0         0 ++$i;
288             }
289 0         0 delete $job->{id};
290             }
291 0         0 return $self->_shift();
292             }
293              
294             sub pending_jobs {
295 11     11 1 5009666 my $self = CORE::shift;
296 11         23 return @{ $self->{_queue} };
  11         72  
297             }
298              
299             sub pause_queue {
300 3     3 1 11 my $self = CORE::shift;
301 3         20 $self->{paused} = 1;
302             }
303              
304             sub pause_queue_now {
305 2     2 1 8 my $self = CORE::shift;
306 2         13 $self->{paused} = 2;
307             }
308              
309             sub resume_queue {
310 5     5 1 17 my $self = CORE::shift;
311 5         20 delete $self->{paused};
312 5         37 $poe_kernel->post( $self->{session_id}, '_process_queue' );
313             }
314              
315             sub queue_paused {
316 20 100   20 1 7974344 if ( exists $_[0]->{paused} ) {
317 9         74 return 1;
318             } else {
319 11         95 return 0;
320             }
321             }
322              
323             sub current_job {
324 0     0 1 0 my $self = CORE::shift;
325 0         0 return $self->{_current};
326             }
327              
328             sub _allocate_identifier {
329 50     50   79 while (1) {
330 50 50       161 last unless exists $active_identifiers{ ++$current_id };
331             }
332 50         201 return $active_identifiers{$current_id} = $current_id;
333             }
334              
335             sub _free_identifier {
336 50     50   143 my $id = CORE::shift;
337 50         221 delete $active_identifiers{$id};
338             }
339              
340             1;
341              
342             __END__