File Coverage

lib/MooseX/Workers/Engine.pm
Criterion Covered Total %
statement 103 143 72.0
branch 37 62 59.6
condition 11 24 45.8
subroutine 22 28 78.5
pod 4 8 50.0
total 177 265 66.7


line stmt bran cond sub pod time code
1             package MooseX::Workers::Engine;
2             our $AUTHORITY = 'cpan:PERIGRIN';
3             $MooseX::Workers::Engine::VERSION = '0.24';
4 17     17   108 use Moose;
  17         28  
  17         146  
5 17     17   99567 use POE qw(Wheel::Run);
  17         572637  
  17         108  
6 17     17   1617441 use MooseX::Workers::Job ();
  17         35  
  17         203  
7 17     17   66 use Package::Stash ();
  17         323  
  17         235  
8 17     17   68 use Try::Tiny;
  17         18  
  17         30346  
9              
10             has visitor => (
11             is => 'ro',
12             does => 'MooseX::Workers',
13             );
14              
15             has max_workers => (
16             isa => 'Int',
17             is => 'rw',
18             default => sub { 5 },
19             );
20              
21             # Processes currently running
22             has process_list => (
23             traits => [ 'Hash' ],
24             isa => 'HashRef',
25             default => sub { {} },
26             handles => {
27             set_process => 'set',
28             get_process => 'get',
29             remove_process => 'delete',
30             process_list => 'kv',
31             }
32             );
33              
34             # Processes waiting to run
35             has process_queue => (
36             traits => [ 'Array' ],
37             isa => 'ArrayRef',
38             default => sub { [] },
39             handles => {
40             enqueue_process => 'push',
41             dequeue_process => 'shift',
42             process_queue => 'elements',
43             }
44             );
45              
46             has workers => (
47             traits => [ 'Hash' ],
48             isa => 'HashRef',
49             is => 'rw',
50             lazy => 1,
51             required => 1,
52             default => sub { {} },
53             handles => {
54             set_worker => 'set',
55             get_worker => 'get',
56             remove_worker => 'delete',
57             has_workers => 'count',
58             num_workers => 'count',
59             get_worker_ids => 'keys',
60             },
61             );
62              
63             has jobs => (
64             traits => [ 'Hash' ],
65             isa => 'HashRef',
66             is => 'rw',
67             lazy => 1,
68             required => 1,
69             default => sub { {} },
70             handles => {
71             set_job => 'set',
72             get_job => 'get',
73             remove_job => 'delete',
74             has_jobs => 'count',
75             num_jobs => 'count',
76             },
77             );
78              
79             has session => (
80             isa => 'POE::Session',
81             is => 'ro',
82             required => 1,
83             lazy => 1,
84             default => sub {
85             POE::Session->create(
86             object_states => [
87             $_[0] => [
88             qw(
89             _start
90             _stop
91             _worker_stdout
92             _worker_stderr
93             _worker_error
94             _worker_done
95             _worker_started
96             _sig_child
97             add_worker
98             _kill_worker
99             )
100             ],
101             ],
102             );
103             },
104             clearer => 'remove_manager',
105             predicate => 'has_manager',
106             );
107              
108             sub yield {
109 78     78 1 550 my $self = shift;
110 78         2128 $poe_kernel->post( $self->session => @_ );
111             }
112              
113             sub call {
114 120     120 1 195 my $self = shift;
115 120         2507 return $poe_kernel->call( $self->session => @_ );
116             }
117              
118             sub put_worker {
119 0     0 0 0 my ( $self, $wheel_id ) = splice @_, 0, 2;
120 0         0 $self->get_worker($wheel_id)->put(@_);
121             }
122              
123             sub kill_worker {
124 0     0 0 0 my ( $self, $wheel_id ) = splice @_, 0, 2;
125 0         0 $self->get_worker($wheel_id)->kill(@_);
126 0         0 $self->remove_worker($wheel_id);
127             }
128              
129             sub stdout_filter {
130 86     86 0 1374 my $self = $_[OBJECT];
131 86         1973 $self->visitor->stdout_filter;
132             }
133              
134             sub stderr_filter {
135 75     75 0 925 my $self = $_[OBJECT];
136 75         1555 $self->visitor->stderr_filter;
137             }
138              
139             #
140             # EVENTS
141             #
142              
143             sub add_worker {
144 125     125 1 6191 my ( $self, $job, $args, $kernel, $heap ) = @_[ OBJECT, ARG0, ARG1, KERNEL, HEAP ];
145              
146             # if we've reached the worker threashold, set off a warning
147 125 100       4014 if ( $self->num_workers >= $self->max_workers ) {
148 52 50       146 if ( $args->{enqueue} ) {
149 52         1497 $self->enqueue_process([$job, $args]);
150 52         137 return;
151             } else {
152 0         0 $self->visitor->max_workers_reached($job);
153 0         0 return;
154             }
155             }
156              
157 73         102 my $command;
158              
159 73 100 66     350 if (not (blessed $job && $job->isa('MooseX::Workers::Job'))) {
160 67         2042 $job = MooseX::Workers::Job->new(command => $job);
161             }
162              
163 73 50       348 $self->_fixup_job_for_win32($job) if $^O eq 'MSWin32';
164              
165 73         1774 $command = $job->command;
166 73         1560 $args = $job->args;
167              
168 73         93 my @optional_io_filters;
169 73 100       253 push @optional_io_filters, 'StdoutFilter', $self->stdout_filter if $self->stdout_filter;
170 73 100       952 push @optional_io_filters, 'StderrFilter', $self->stderr_filter if $self->stderr_filter;
171            
172 73 50 66     904 $args = [$args] if defined $args && (not ref $args eq 'ARRAY');
173              
174 73 100       611 my $wheel = POE::Wheel::Run->new(
175             Program => $command,
176             ($args ? (ProgramArgs => $args) : ()),
177             @optional_io_filters,
178             StdoutEvent => '_worker_stdout',
179             StderrEvent => '_worker_stderr',
180             ErrorEvent => '_worker_error',
181             CloseEvent => '_worker_done',
182             );
183 73         252568 $kernel->sig_child($wheel->PID, "_sig_child");
184              
185 73         9882 $self->set_worker( $wheel->ID => $wheel );
186 73         257 $self->set_process( $wheel->PID => $wheel->ID );
187              
188 73         216 $job->ID($wheel->ID);
189 73         222 $job->PID($wheel->PID);
190 73         214 $self->set_job( $wheel->ID => $job );
191 73 100       2121 if ($job->timeout) {
192 2         69 $heap->{wheel_to_timer}{$wheel->ID} =
193             $kernel->delay_set('_kill_worker', $job->timeout, $wheel->ID);
194             }
195              
196 73 100       2063 $job->name($job->PID) unless defined $job->name;
197              
198 73         235 $self->yield( '_worker_started' => $wheel->ID => $job );
199 73         7304 return ( $wheel->ID => $wheel->PID );
200             }
201              
202             sub _fixup_job_for_win32 {
203 0     0   0 my ($self, $job) = @_;
204              
205 0 0       0 return unless $^O eq 'MSWin32';
206              
207 0         0 my $cmd = $job->command;
208              
209 0 0       0 if ($job->is_coderef) {
210             # do the binmoding for the user, and set up an INT handler because we kill on timeouts with INT for win32
211             $job->command(sub {
212 0     0   0 binmode STDOUT;
213 0         0 binmode STDERR;
214 0         0 binmode STDIN;
215 0         0 local $SIG{INT} = sub { exit 0 };
  0         0  
216 0         0 $cmd->(@_);
217 0         0 });
218             }
219             else {
220             # this makes builtins like 'echo' work with Win32::Job which ::Wheel::Run uses
221 0         0 $job->command('c:\windows\system32\cmd.exe');
222 0 0       0 $job->args(['/c', $cmd, @{ $job->args || [] }]);
  0         0  
223              
224             # now translate CRLF -> LF for Filter::Line
225 0         0 my $visitor_class = ref $self->visitor;
226 0         0 my $visitor_stash = Package::Stash->new($visitor_class);
227            
228 0 0       0 if (not $visitor_stash->has_symbol('$__MX_WORKERS_STDIO_FIXED_UP')) {
229 0         0 foreach my $stream (qw/stdout stderr/) {
230 0         0 my $visitor = $self->visitor;
231 0         0 my $method = "worker_${stream}";
232 0     0   0 my $filter = try { $visitor->${\"${stream}_filter"} };
  0         0  
  0         0  
233              
234 0 0 0     0 if (((not defined $filter)
      0        
235             || (blessed $filter && $filter->isa('POE::Filter::Line'))
236             ) && $visitor->can($method)) {
237              
238 0         0 my $was_immutable = not $visitor_class->meta->is_mutable;
239              
240 0 0       0 $visitor_class->meta->make_mutable if $was_immutable;
241              
242             $visitor_class->meta->add_around_method_modifier($method, sub {
243 0     0   0 my ($orig, $self, $input) = splice @_, 0, 3;
244              
245 0         0 $input =~ s/\015\z//;
246              
247 0         0 $self->$orig($input, @_);
248 0         0 });
249              
250 0 0       0 $visitor_class->meta->make_immutable if $was_immutable;
251             }
252             }
253              
254 0         0 $visitor_stash->add_symbol('$__MX_WORKERS_STDIO_FIXED_UP', 1);
255             }
256             }
257             }
258              
259             sub _kill_worker {
260 1     1   999184 my ( $self, $wheel_id ) = @_[ OBJECT, ARG0 ];
261 1         76 my $job = $self->get_job($wheel_id);
262 1 50       25 $self->visitor->worker_timeout( $job )
263             if $self->visitor->can('worker_timeout');
264             # we send win32 coderefs an INT, see _fixup_job_for_win32
265 1 50 33     426 $self->get_worker($wheel_id)->kill($^O eq 'MSWin32' && $job->is_coderef ? 'INT' : ());
266             }
267              
268             sub _start {
269 15     15   4400 my ($self) = $_[OBJECT];
270 15 100       421 $self->visitor->worker_manager_start()
271             if $self->visitor->can('worker_manager_start');
272              
273             # Set an alias to ensure our manager session is not cleaned up.
274 15         7488 $_[KERNEL]->alias_set("manager");
275              
276             # Register the generic signal handler for any signals our visitor
277             # class wishes to receive.
278 15         946 my @visitor_methods = map { $_->name } $self->visitor->meta->get_all_methods;
  554         21862  
279 15         73 for my $sig_handler (grep { /^sig_/ } @visitor_methods){
  554         615  
280 2         9 (my $sig) = ($sig_handler =~ /^sig_(.*)/);
281 2 100 66     24 next if uc($sig) eq 'CHLD' or uc($sig) eq 'CHILD';
282              
283 1         6 $poe_kernel->state( $sig_handler, $self, '_sig_handler' );
284 1         25 $poe_kernel->sig( $sig => $sig_handler );
285             }
286             }
287              
288             sub _stop {
289 15     15   6776 my ($self) = $_[OBJECT];
290 15 100       449 $self->visitor->worker_manager_stop()
291             if $self->visitor->can('worker_manager_stop');
292 15         14701 $self->remove_manager;
293             }
294              
295             sub _sig_child {
296 73     73   18419 my ($self) = $_[OBJECT];
297 73 100       2111 $self->visitor->sig_child( $self->get_process($_[ARG1]), $_[ARG2] )
298             if $self->visitor->can('sig_child');
299 73         2926 $self->remove_process( $_[ARG1] );
300 73         291 $_[KERNEL]->sig_handled();
301             }
302              
303             # A generic sig handler (for everything except SIGCHLD)
304             sub _sig_handler {
305 1     1   207 my ($self, $state) = @_[OBJECT,STATE];
306 1         72 $self->visitor->$state( @_[ARG0..ARG9] );
307 1         262 $_[KERNEL]->sig_handled();
308             }
309              
310             sub _worker_stdout {
311 72     72   41508 my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
312 72         2572 my $job = $self->get_job($wheel_id);
313 72 50       1577 $self->visitor->worker_stdout( $input, $job )
314             if $self->visitor->can('worker_stdout');
315             }
316              
317             sub _worker_stderr {
318 70     70   41037 my ($self, $input, $wheel_id) = @_[ OBJECT, ARG0, ARG1 ];
319 70         142 $wheel_id =~ tr[ -~][]cd;
320 70         2508 my $job = $self->get_job($wheel_id);
321 70 50       1502 $self->visitor->worker_stderr( $input, $job )
322             if $self->visitor->can('worker_stderr');
323             }
324              
325             sub _worker_error {
326 146     146   36392 my ($self) = $_[OBJECT];
327 146 50 33     1005 return if $_[ARG0] eq "read" && $_[ARG1] == 0;
328              
329             # $operation, $errnum, $errstr, $wheel_id
330 0 0       0 $self->visitor->worker_error( @_[ ARG0 .. ARG3 ] )
331             if $self->visitor->can('worker_error');
332             }
333              
334             sub _worker_done {
335 73     73   2410 my ($self, $wheel_id, $kernel, $heap) = @_[ OBJECT, ARG0, KERNEL, HEAP ];
336 73         2772 my $job = $self->get_job($wheel_id);
337 73 100       256 $kernel->alarm_remove(delete $heap->{wheel_to_timer}{$wheel_id}) if $heap->{wheel_to_timer}{$wheel_id};
338              
339 73 100       2005 $self->visitor->worker_done( $job )
340             if $self->visitor->can('worker_done');
341              
342 73         29679 $self->delete_worker( $wheel_id );
343              
344 73 100       23300 if (my $code = $self->visitor->can('worker_finished')) {
345 72         1547 $self->visitor->$code($job);
346             }
347              
348             # If we have free workers and processes in queue, then dequeue one of them.
349 73   100     32604 while ( $self->num_workers < $self->max_workers &&
350             (my $jobref = $self->dequeue_process)
351             ) {
352 52         86 my ($cmd, $args) = @$jobref;
353             # This has to be call(), not yield() so num_workers increments before
354             # next loop above.
355 52         226 $self->call(add_worker => $cmd, $args);
356             }
357             }
358              
359             sub delete_worker {
360 73     73 1 133 my ( $self, $wheelID ) = @_;
361 73         2935 my $wheel = $self->get_worker($wheelID);
362 73         258 $self->remove_worker( $wheel->ID );
363             }
364              
365             sub _worker_started {
366 73     73   47463 my ( $self, $wheel_id, $command ) = @_[ OBJECT, ARG0, ARG1 ];
367 73         2762 my $job = $self->get_job($wheel_id);
368 73 50       1600 $self->visitor->worker_started( $job, $command )
369             if $self->visitor->can('worker_started');
370             }
371              
372              
373 17     17   109 no Moose;
  17         20  
  17         143  
374             1;
375             __END__
376              
377             =head1 NAME
378              
379             MooseX::Workers::Engine - Provide the workhorse to MooseX::Workers
380              
381             =head1 SYNOPSIS
382              
383             package MooseX::Workers;
384              
385             has Engine => (
386             isa => 'MooseX::Workers::Engine',
387             is => 'ro',
388             lazy => 1,
389             required => 1,
390             default => sub { MooseX::Workers::Engine->new( visitor => $_[0] ) },
391             handles => [
392             qw(
393             max_workers
394             has_workers
395             num_workers
396             put_worker
397             kill_worker
398             )
399             ],
400             );
401              
402             =head1 DESCRIPTION
403              
404             MooseX::Workers::Engine provides the main functionality
405             to MooseX::Workers. It wraps a POE::Session and as many POE::Wheel::Run
406             objects as it needs.
407              
408             =head1 ATTRIBUTES
409              
410             =over
411              
412             =item visitor
413              
414             Hold a reference to our main object so we can use the callbacks on it.
415              
416             =item max_workers
417              
418             An Integer specifying the maximum number of workers we have.
419              
420             =item workers
421              
422             An ArrayRef of POE::Wheel::Run objects that are our workers.
423              
424             =item session
425              
426             Contains the POE::Session that controls the workers.
427              
428             =back
429              
430             =head1 METHODS
431              
432             =over
433              
434             =item yield
435              
436             Helper method to post events to our internal manager session.
437              
438             =item call
439              
440             Helper method to call events to our internal manager session.
441             This is synchronous and will block incoming data from the children
442             if it takes too long to return.
443              
444             =item set_worker($key)
445              
446             Set the worker at $key
447              
448             =item get_worker($key)
449              
450             Retrieve the worker at $key
451              
452             =item delete_worker($key)
453              
454             Remove the worker atx $key
455              
456             =item has_workers
457              
458             Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
459              
460             =item num_workers
461              
462             Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
463              
464             =item has_manager
465              
466             Check to see if we have a manager session.
467              
468             =item remove_manager
469              
470             Remove the manager session.
471              
472             =item meta
473              
474             The Metaclass for MooseX::Workers::Engine see Moose's documentation.
475              
476             =back
477              
478             =head1 EVENTS
479              
480             =over
481              
482             =item add_worker ($command)
483              
484             Create a POE::Wheel::Run object to handle $command. If $command holds a scalar, it will be executed as exec($scalar).
485             Shell metacharacters will be expanded in this form. If $command holds an array reference,
486             it will executed as exec(@$array). This form of exec() doesn't expand shell metacharacters.
487             If $command holds a code reference, it will be called in the forked child process, and then
488             the child will exit.
489              
490             See POE::Wheel::Run for more details.
491              
492             =back
493              
494             =head1 INTERFACE
495              
496             MooseX::Worker::Engine fires the following callbacks to its visitor object:
497              
498             =over
499              
500             =item worker_manager_start
501              
502             Called when the managing session is started.
503              
504             =item worker_manager_stop
505              
506             Called when the managing session stops.
507              
508             =item max_workers_reached
509              
510             Called when we reach the maximum number of workers.
511              
512             =item worker_stdout
513              
514             Called when a child prints to STDOUT.
515              
516             =item worker_stderr
517              
518             Called when a child prints to STDERR.
519              
520             =item worker_error
521              
522             Called when there is an error condition detected with the child.
523              
524             =item worker_done
525              
526             Called when a worker completes $command.
527              
528             =item worker_started
529              
530             Called when a worker starts $command.
531              
532             =item sig_child($PID, $ret)
533              
534             Called when the managing session receives a SIG CHLD event.
535              
536             =item sig_*
537              
538             Called when the underlying POE Kernel receives a signal; this is not limited to
539             OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept
540             arbitrary POE signals (sent via POE::Kernel->signal), but does exclude
541             SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
542              
543             These interface methods are automatically inserted when MooseX::Worker::Engine
544             detects that the visitor object contains any methods beginning with sig_.
545             Signals are case-sensitive, so if you wish to handle a TERM signal, you must
546             define a sig_TERM() method. Note also that this action is performed upon
547             MooseX::Worker::Engine startup, so any run-time modification of the visitor
548             object is not likely to be detected.
549              
550             =back
551              
552             =cut
553              
554             1;
555              
556