File Coverage

blib/lib/POE/Component/SmokeBox/JobQueue.pm
Criterion Covered Total %
statement 153 204 75.0
branch 54 90 60.0
condition 13 26 50.0
subroutine 27 33 81.8
pod 11 11 100.0
total 258 364 70.8


line stmt bran cond sub pod time code
1             package POE::Component::SmokeBox::JobQueue;
2             $POE::Component::SmokeBox::JobQueue::VERSION = '0.54';
3             #ABSTRACT: An array based queue for SmokeBox
4              
5 19     19   12438 use strict;
  19         40  
  19         540  
6 19     19   137 use warnings;
  19         41  
  19         573  
7 19     19   480 use POE qw(Component::SmokeBox::Backend Component::SmokeBox::Job Component::SmokeBox::Smoker Component::SmokeBox::Result);
  19         32000  
  19         126  
8 19     19   1048 use Params::Check qw(check);
  19         45  
  19         44096  
9              
10             # Stolen from POE::Wheel. This is static data, shared by all
11             my $current_id = 0;
12             my %active_identifiers;
13              
14             sub spawn {
15 24     24 1 3460 my $package = shift;
16 24         167 my %params = @_;
17 24         181 $params{lc $_} = delete $params{$_} for keys %params;
18 24 100       114 $params{'delay'} = 0 unless exists $params{'delay'};
19 24         52 my $options = delete $params{'options'};
20 24         55 my $self = bless \%params, $package;
21 24 100       316 $self->{session_id} = POE::Session->create(
22             object_states => [
23             $self => {
24             'shutdown' => '_shutdown',
25             submit => '_submit',
26             cancel => '_cancel',
27             },
28             $self => [qw(_start _process_queue _backend_done _process_queue_delayed)],
29             ],
30             heap => $self,
31             ( ref($options) eq 'HASH' ? ( options => $options ) : () ),
32             )->ID();
33 24         4017 return $self;
34             }
35              
36             sub session_id {
37 74     74 1 453 return $_[0]->{session_id};
38             }
39              
40             sub shutdown {
41 24     24 1 3436 my $self = shift;
42 24         129 $poe_kernel->call( $self->session_id() => 'shutdown' => @_ );
43             }
44              
45             sub _start {
46 24     24   6925 my ($kernel,$self) = @_[KERNEL,OBJECT];
47 24         80 $self->{session_id} = $_[SESSION]->ID();
48 24 50       198 if ( $self->{alias} ) {
49 0         0 $kernel->alias_set( $self->{alias} );
50             }
51             else {
52 24         86 $kernel->refcount_increment( $self->{session_id} => __PACKAGE__ );
53             }
54 24         838 $self->{_queue} = [ ];
55 24         78 return;
56             }
57              
58             sub _shutdown {
59 24     24   1969 my ($kernel,$self) = @_[KERNEL,OBJECT];
60 24         149 $self->{_shutdown} = 1;
61 24 50       181 if ( $self->{alias} ) {
62 0         0 $kernel->alias_remove($_) for $kernel->alias_list();
63             }
64             else {
65 24         171 $kernel->refcount_decrement( $self->{session_id} => __PACKAGE__ );
66             }
67             # shutdown currently running backend
68 24 50       1303 $self->{_current}->{backend}->shutdown() if $self->{_current}->{backend};
69             # remove queued jobs.
70             # $kernel->refcount_decrement( $_->{session}, __PACKAGE__ ) for @{ $self->{_queue} };
71 24         87 $kernel->refcount_decrement( $_, __PACKAGE__ ) for keys %{ $self->{_refcounts} };
  24         125  
72 24         201 delete $self->{_queue};
73              
74             # remove delay for jobs if we set one
75 24 100       143 $kernel->alarm_remove( delete $self->{_delay} ) if exists $self->{_delay};
76              
77 24         377 return;
78             }
79              
80             sub _process_queue_delayed {
81 8     8   7994011 my ($kernel,$self) = @_[KERNEL,OBJECT];
82 8 50       121 delete $self->{_delay} if exists $self->{_delay};
83 8         91 $kernel->yield( '_process_queue', 'DELAYDONE' );
84 8         1196 return;
85             }
86              
87             sub _process_queue {
88 193     193   218902 my ($kernel,$self,$delaydone) = @_[KERNEL,OBJECT,ARG0];
89 193 100       867 return if $self->{_shutdown};
90 190 50       578 return if exists $self->{_delay};
91 190 100 100     731 return if exists $self->{paused} and $self->{paused} == 2;
92 188         435 my ($job, $smoker );
93 188 100       626 if ( $self->{_current} ) {
94 117 100       463 return if $self->{_current}->{backend};
95 90         216 $job = $self->{_current};
96              
97             # do we have a delay between smokers?
98 90 100 100     2847 if ( $job->{job}->delay > 0 and ! defined $delaydone and scalar @{ $job->{smokers} } > 0 ) {
  9   100     2531  
99             # fire off an alarm for the next iteration
100             #warn "Setting delay(" . $job->{job}->delay . ") for smoker" if $ENV{PERL5_SMOKEBOX_DEBUG};
101 6         50 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $job->{job}->delay );
102 6         1243 return;
103             }
104              
105 84         18310 $smoker = shift @{ $job->{smokers} };
  84         509  
106 84 100       386 unless ( $smoker ) {
107             # Reached the end send an event back to the original requestor
108 50         432 delete $self->{_current};
109 50         554 delete $job->{smokers};
110 50         239 my $session = delete $job->{session};
111 50         370 $kernel->post( $session, delete $job->{event}, $job );
112 50         6084 $self->{_refcounts}->{ $session }--;
113 50 100       297 if ( $self->{_refcounts}->{ $session } <= 0 ) {
114 23         232 $kernel->refcount_decrement( $session, __PACKAGE__ );
115 23         929 delete $self->{_refcounts}->{ $session };
116             }
117              
118             # did we enable delay between jobs?
119             # don't check the queue, we force a delay all the time so if we add a job, we're already delaying for it...
120 50 100       368 if ( $self->{delay} > 0 ) {
121             # fire off an alarm for the next iteration
122             #warn "Setting delay($self->{delay}) for job" if $ENV{PERL5_SMOKEBOX_DEBUG};
123 4         43 $self->{_delay} = $kernel->delay_set( '_process_queue_delayed' => $self->{delay} );
124             } else {
125 46         247 $kernel->yield( '_process_queue' );
126             }
127 50         4159 return;
128             }
129             }
130             else {
131 71         1513 $job = $self->_shift();
132 71 100       279 return unless $job;
133 50         95 $smoker = shift @{ $job->{smokers} };
  50         310  
134 50         547 $job->{result} = POE::Component::SmokeBox::Result->new();
135 50         134 $self->{_current} = $job;
136             }
137 84         736 $job->{backend} = POE::Component::SmokeBox::Backend->spawn( event => '_backend_done', $job->{job}->dump_data(), $smoker->dump_data(), );
138 84         617 return;
139             }
140              
141             sub _backend_done {
142 84     84   35022 my ($kernel,$self,$result) = @_[KERNEL,OBJECT,ARG0];
143 84         594 delete $self->{_current}->{backend};
144 84         902 $self->{_current}->{result}->add_result( $result );
145 84         527 $kernel->yield( '_process_queue' );
146 84         8069 return;
147             }
148              
149             sub submit {
150 51     51 1 120 my $self = shift;
151 51         188 return $poe_kernel->call( $self->{session_id}, 'submit', @_ );
152             }
153              
154             sub cancel {
155 0     0 1 0 my $self = shift;
156 0         0 return $poe_kernel->call( $self->{session_id}, 'cancel', @_ );
157             }
158              
159             sub _submit {
160 51     51   3518 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
161 51 50       158 return if $self->{_shutdown};
162 51         159 my $args;
163 51 100       143 if ( ref( $_[ARG0] ) eq 'HASH' ) {
164 44         63 $args = { %{ $_[ARG0] } };
  44         187  
165             }
166             else {
167 7         40 $args = { @_[ARG0..$#_] };
168             }
169              
170             my $tmpl = {
171             event => { required => 1, defined => 1, },
172 44 50   44   2070 session => { defined => 1, allow => [ sub { return 1 if $poe_kernel->alias_resolve( $_[0] ); }, ], },
173             type => { defined => 1, allow => [qw(push unshift)], default => 'push', },
174             job => { required => 1, defined => 1,
175 50 50 33 50   4389 allow => [ sub { return 1 if ref $_[0] and $_[0]->isa('POE::Component::SmokeBox::Job'); }, ], },
176             smokers => { required => 1, defined => 1, allow => [
177             sub {
178             return 1 if ref $_[0] eq 'ARRAY'
179 50         222 and scalar @{ $_[0] }
180 50 50 50 50   4192 and ( grep { $_->isa('POE::Component::SmokeBox::Smoker') } @{ $_[0] } ) == @{ $_[0] };
  84   33     282  
  50         123  
  50         220  
181 51         671 }, ],
182             },
183              
184             };
185              
186 51 100       202 my $checked = check( $tmpl, $args, 1 ) or return;
187 50 100       1140 $checked->{session} = $kernel->alias_resolve( $checked->{session} )->ID() if $checked->{session};
188 50 100       1078 $checked->{session} = $sender->ID() unless $checked->{session};
189 50         187 my $type = delete $checked->{type};
190 50         142 my $id = $self->_add_job( $checked, $type );
191 50 50       170 return unless $id;
192 50 100       156 unless ( defined $self->{_refcounts}->{ $checked->{session} } ) {
193 23         84 $kernel->refcount_increment( $checked->{session}, __PACKAGE__ );
194             }
195 50         716 $self->{_refcounts}->{ $checked->{session} }++;
196 50         121 $checked->{submitted} = time();
197             #$checked->{job}->id( $id );
198 50         541 return $id;
199             }
200              
201             sub _cancel {
202 0     0   0 my ($kernel,$self,$sender) = @_[KERNEL,OBJECT,SENDER];
203 0 0       0 return if $self->{_shutdown};
204 0         0 my $args;
205 0 0       0 if ( ref( $_[ARG0] ) eq 'HASH' ) {
206 0         0 $args = { %{ $_[ARG0] } };
  0         0  
207             }
208             else {
209 0         0 $args = { @_[ARG0..$#_] };
210             }
211              
212 0         0 my $tmpl = {
213             job => { required => 1, defined => 1, },
214             };
215              
216 0 0       0 my $checked = check( $tmpl, $args, 1 ) or return;
217 0         0 return $self->_remove_job( $checked->{job} );
218             }
219              
220             sub _push {
221 50     50   109 my ($self,$job) = @_;
222 50 50       152 return unless ref $job eq 'HASH';
223 50         119 my $id = _allocate_identifier();
224 50         115 $job->{id} = $id;
225 50         79 CORE::push @{ $self->{_queue} }, $job;
  50         190  
226 50         160 $self->{_jobs}->{ $id } = $job;
227 50         136 $poe_kernel->post( $self->session_id(), '_process_queue' );
228 50         5466 return $id;
229             }
230              
231             sub _unshift {
232 0     0   0 my ($self,$job) = @_;
233 0 0       0 return unless ref $job eq 'HASH';
234 0         0 my $id = _allocate_identifier();
235 0         0 $job->{id} = $id;
236 0         0 CORE::unshift @{ $self->{_queue} }, $job;
  0         0  
237 0         0 $self->{_jobs}->{ $id } = $job;
238 0         0 $poe_kernel->post( $self->session_id(), '_process_queue' );
239 0         0 return $id;
240             }
241              
242             sub _shift {
243 71     71   237 my $self = CORE::shift;
244 71 100       226 return if $self->{paused};
245 68         132 my $job = CORE::shift @{ $self->{_queue} };
  68         215  
246 68 100       259 return unless $job;
247 50         187 delete $self->{_jobs}->{ $job->{id} };
248 50         1213 _free_identifier( $job->{id} );
249 50         141 delete $job->{id};
250 50         124 return $job;
251             }
252              
253             sub _pop {
254 0     0   0 my $self = CORE::shift;
255 0 0       0 return if $self->{paused};
256 0         0 my $job = CORE::pop @{ $self->{_queue} };
  0         0  
257 0 0       0 return unless $job;
258 0         0 delete $self->{_jobs}->{ $job->{id} };
259 0         0 _free_identifier( $job->{id} );
260 0         0 delete $job->{id};
261 0         0 return $job;
262             }
263              
264             sub _add_job {
265 50     50   122 my ($self,$job,$type) = @_;
266 50 50       132 $type = lc $type if $type;
267 50 50 33     143 if ( $type and grep { /^\Q$type\E$/ } qw(push unshift) ) {
  100         768  
268 50         115 $type = '_' . $type;
269 50         162 return $self->$type( $job );
270             }
271 0         0 return $self->_push( $job );
272             }
273              
274             sub _remove_job {
275 0     0   0 my ($self,$type) = @_;
276 0 0       0 return if $self->{paused};
277 0 0       0 $type = lc $type if $type;
278 0 0 0     0 if ( $type and grep { /^\Q$type\E$/ } qw(pop shift) ) {
  0 0 0     0  
279 0         0 $type = '_' . $type;
280 0         0 return $self->$type();
281             }
282             elsif ( $type and defined $self->{_jobs}->{ $type } ) {
283 0         0 my $job = delete $self->{_jobs}->{ $type };
284 0         0 my $i = 0;
285 0         0 for ( @{ $self->{_queue} } ) {
  0         0  
286 0 0       0 splice(@{ $self->{_queue} }, $i, 1) if $_->{id} eq $type;
  0         0  
287 0         0 ++$i;
288             }
289 0         0 delete $job->{id};
290             }
291 0         0 return $self->_shift();
292             }
293              
294             sub pending_jobs {
295 11     11 1 5008763 my $self = CORE::shift;
296 11         23 return @{ $self->{_queue} };
  11         64  
297             }
298              
299             sub pause_queue {
300 3     3 1 10 my $self = CORE::shift;
301 3         17 $self->{paused} = 1;
302             }
303              
304             sub pause_queue_now {
305 2     2 1 9 my $self = CORE::shift;
306 2         16 $self->{paused} = 2;
307             }
308              
309             sub resume_queue {
310 5     5 1 903 my $self = CORE::shift;
311 5         22 delete $self->{paused};
312 5         31 $poe_kernel->post( $self->{session_id}, '_process_queue' );
313             }
314              
315             sub queue_paused {
316 20 100   20 1 7981752 if ( exists $_[0]->{paused} ) {
317 9         72 return 1;
318             } else {
319 11         107 return 0;
320             }
321             }
322              
323             sub current_job {
324 0     0 1 0 my $self = CORE::shift;
325 0         0 return $self->{_current};
326             }
327              
328             sub _allocate_identifier {
329 50     50   86 while (1) {
330 50 50       159 last unless exists $active_identifiers{ ++$current_id };
331             }
332 50         154 return $active_identifiers{$current_id} = $current_id;
333             }
334              
335             sub _free_identifier {
336 50     50   177 my $id = CORE::shift;
337 50         218 delete $active_identifiers{$id};
338             }
339              
340             1;
341              
342             __END__