File Coverage

lib/MooseX/Workers.pm
Criterion Covered Total %
statement 24 27 88.8
branch 2 2 100.0
condition n/a
subroutine 10 12 83.3
pod 6 7 85.7
total 42 48 87.5


line stmt bran cond sub pod time code
1             package MooseX::Workers;
2             our $AUTHORITY = 'cpan:PERIGRIN';
3             $MooseX::Workers::VERSION = '0.24';
4 17     17   809198 use Moose::Role;
  17         423253  
  17         85  
5 17     17   83907 use MooseX::Workers::Job;
  17         48  
  17         936  
6              
7 17     17   9879 use MooseX::Workers::Engine;
  17         51  
  17         5694  
8              
9             has _ctor_params => (
10             is => 'ro',
11             isa => 'HashRef',
12             lazy => 1,
13             default => sub { {} },
14             );
15              
16             around BUILDARGS => sub {
17             my ($orig, $class, %args) = @_;
18              
19             shift;
20             shift;
21            
22             return $class->$orig( _ctor_params => \%args, @_ );
23             };
24              
25             has Engine => (
26             isa => 'MooseX::Workers::Engine',
27             is => 'ro',
28             lazy => 1,
29             required => 1,
30             builder => '_build_Engine',
31             handles => [
32             qw(
33             max_workers
34             has_workers
35             num_workers
36             put_worker
37             kill_worker
38             get_worker
39             )
40             ],
41             );
42             sub _build_Engine {
43 19     19   31 my $self = shift;
44 19         43 my @args;
45 19 100       356 push @args, max_workers => $self->_ctor_params->{max_workers} if exists $self->_ctor_params->{max_workers};
46 19         148 MooseX::Workers::Engine->new( visitor => $self, @args );
47             }
48              
49             sub spawn {
50 7     7 1 2626 my ( $self, $cmd, $args ) = @_;
51 7         137 return $self->Engine->call( add_worker => $cmd => $args );
52             }
53              
54             __PACKAGE__->meta->add_method( 'fork' => __PACKAGE__->can('spawn') );
55              
56             sub run_command {
57 5     5 1 503 my ( $self, $cmd ) = @_;
58 5         104 $self->Engine->yield( add_worker => $cmd );
59             }
60              
61             sub enqueue {
62 61     61 1 2552 my ( $self, $cmd ) = @_;
63 61         1251 $self->Engine->call( add_worker => $cmd, { enqueue => 1 } );
64             }
65              
66             sub check_worker_threshold {
67 0     0 1 0 return $_[0]->num_workers >= $_[0]->max_workers;
68             }
69              
70             sub check_worker_threashold {
71 0     0 0 0 warn 'check_worker_threashold (note the typo) is deprecated '
72             . 'please use check_worker_threshold instead';
73 0         0 shift->check_worker_threshold;
74             }
75              
76 60     60 1 166 sub stdout_filter { undef }
77 71     71 1 205 sub stderr_filter { undef }
78              
79 17     17   139 no Moose::Role;
  17         25  
  17         102  
80             1;
81             __END__
82              
83             =head1 NAME
84              
85             MooseX::Workers - Simple sub-process management for asynchronous tasks
86              
87             =head1 SYNOPSIS
88              
89             EXAMPLE #1:
90             package Manager;
91             # This example prints output from the children normally on both STDOUT and STDERR
92              
93             use Moose;
94             with qw(MooseX::Workers);
95              
96             sub run {
97             $_[0]->spawn( sub { sleep 3; print "Hello World\n" } );
98             warn "Running now ... ";
99             POE::Kernel->run();
100             }
101              
102             # Implement our Interface
103             sub worker_stdout { shift; warn join ' ', @_; }
104             sub worker_stderr { shift; warn join ' ', @_; }
105              
106             sub worker_manager_start { warn 'started worker manager' }
107             sub worker_manager_stop { warn 'stopped worker manager' }
108              
109             sub max_workers_reached { warn 'maximum worker count reached' }
110             sub worker_error { shift; warn join ' ', @_; }
111             sub worker_finished { warn 'a worker has finished' }
112             sub worker_started { shift; warn join ' ', @_; }
113             sub sig_child { shift; warn join ' ', @_; }
114             sub sig_TERM { shift; warn 'Handled TERM' }
115              
116             no Moose;
117              
118             Manager->new->run();
119              
120              
121             EXAMPLE #2:
122             package Manager;
123              
124             # This example prints output from the children normally on
125             # STDERR but uses STDOUT to returns a hashref from the child to
126             # the parent
127              
128             use Moose;
129             with qw(MooseX::Workers);
130             use POE qw(Filter::Reference Filter::Line);
131              
132             sub run {
133             $_[0]->spawn(
134             sub {
135             sleep 3;
136              
137             # Return a hashref (arrayref, whatever) to the parent using P::F::Reference
138             print @{POE::Filter::Reference->new->put([ {msg => "Hello World"} ])}; # Note the [] around the return val
139              
140             # Print normally using P::F::Line (shown for
141             # completeness; in practice, just don't bother
142             # defining the _filter method
143             #
144             print STDERR "Hey look, an error message";
145             }
146             );
147              
148             POE::Kernel->run();
149             }
150              
151             # Implement our Interface
152             # These two are both optional; if defined (as here), they
153             # should return a subclass of POE::Filter.
154             sub stdout_filter { POE::Filter::Reference->new }
155             sub stderr_filter { POE::Filter::Line->new }
156              
157             sub worker_stdout {
158             my ( $self, $result ) = @_; # $result will be a hashref: {msg => "Hello World"}
159             print $result->{msg};
160              
161             # Note that you can do more than just print the message --
162             # e.g. this is the way to return data from the children for
163             # accumulation in the parent.
164             }
165             sub worker_stderr {
166             my ( $self, $stderr_msg ) = @_; # $stderr_msg will be a string: "Hey look, an error message";
167             warn $stderr_msg;
168             }
169              
170             # From here down, this is identical to the previous example.
171             sub worker_manager_start { warn 'started worker manager' }
172             sub worker_manager_stop { warn 'stopped worker manager' }
173              
174             sub max_workers_reached { warn 'maximum worker count reached' }
175             sub worker_error { shift; warn join ' ', @_; }
176             sub worker_finished { warn 'a worker has finished' }
177             sub worker_started { shift; warn join ' ', @_; }
178             sub sig_child { shift; warn join ' ', @_; }
179             sub sig_TERM { shift; warn 'Handled TERM' }
180              
181             no Moose;
182              
183             Manager->new->run();
184              
185             =head1 DESCRIPTION
186              
187             MooseX::Workers is a Role that provides easy delegation of long-running tasks
188             into a managed child process. Process management is taken care of via POE and its
189             POE::Wheel::Run module.
190              
191             =head1 METHODS
192              
193             =over
194              
195             =item spawn ($command)
196              
197             =item fork ($command)
198              
199             =item run_command ($command)
200              
201             These three methods are the whole point of this module.
202             They pass $command through to the MooseX::Worker::Engine which will take
203             care of running $command for you.
204              
205             spawn() and fork() both invoke L<POE::Kernel> call(), which is synchronous.
206              
207             run_command() invokes L<POE::Kernel> yield(), which is asynchronous.
208              
209             If max_workers() has been reached, run_command() warns and does nothing. It is up to you to re-submit
210             $command. See enqueue() if you want us to run $command as soon as another worker is free.
211              
212             =item enqueue($command)
213              
214             Just like run_command(), only that if max_workers() has been set and that number of workers
215             has been reached, then we add $command to a FIFO command queue. As soon as any running
216             worker exits, the first $command in queue (if any) will be run.
217              
218             =item check_worker_threshold
219              
220             This will check to see how many workers you have compared to the max_workers limit. It returns true
221             if the $num_workers is >= $max_workers;
222              
223             =item max_workers($count)
224              
225             An accessor for the maximum number of workers. This is delegated to the MooseX::Workers::Engine object.
226              
227             =item has_workers
228              
229             Check to see if we have *any* workers currently. This is delegated to the MooseX::Workers::Engine object.
230              
231             =item num_workers
232              
233             Return the current number of workers. This is delegated to the MooseX::Workers::Engine object.
234              
235             =item meta
236              
237             The Metaclass for MooseX::Workers::Engine see Moose's documentation.
238              
239             =back
240              
241             =head1 INTERFACE
242              
243             MooseX::Worker::Engine supports the following callbacks:
244              
245             =over
246              
247             =item worker_manager_start
248              
249             Called when the managing session is started
250              
251             =item worker_manager_stop
252              
253             Called when the managing session stops
254              
255             =item max_workers_reached
256              
257             Called when we reach the maximum number of workers
258              
259             =item stdout_filter
260              
261             OPTIONAL. If defined, this should return an object that isa
262             POE::Filter. If it doesn't, the results are undefined. Anything that
263             a child proc sends on STDOUT will be passed through the relevant
264             filter.
265              
266             =item stderr_filter
267              
268             OPTIONAL. If defined, this should return an object that isa
269             POE::Filter. If it doesn't, the results are undefined. Anything that
270             a child proc sends on STDERR will be passed through the relevant
271             filter.
272              
273             =item worker_stdout
274              
275             Called when a child prints to STDOUT. If C<stdout_filter> was
276             defined, the output will be filtered appropriately, as described
277             above. This is useful to allow child processes to return data to the
278             parent (generally via POE::Filter::Reference).
279              
280             =item worker_stderr
281              
282             Called when a child prints to STDERR. Filtered through the result of
283             C<stderr_filter> if that method is defined.
284              
285             =item worker_error
286              
287             Called when there is an error condition detected with the child.
288              
289             =item worker_finished
290              
291             Called when a worker completes $command.
292              
293             If the command was a L<MooseX::Workers::Job>, it will get the removed job
294             instance as the first parameter.
295              
296             =item worker_done
297              
298             B<*DEPRECATED*>
299              
300             This is called before the worker is removed, so L</num_workers> and
301             L</has_workers> does not reflect that a worker has just finished. Use
302             L</worker_finished> instead.
303              
304             Gets the L<MooseX::Workers::Job> instance, if the $command was a job, and the
305             L<POE::Wheel::Run> id otherwise.
306              
307             =item worker_started
308              
309             Called when a worker starts $command
310              
311             =item sig_child
312              
313             Called when the mangaging session recieves a SIG CHLD event
314              
315             =item sig_*
316              
317             Called when the underlying POE Kernel receives a signal; this is not limited to
318             OS signals (ie. what you'd usually handle in Perl's %SIG) so will also accept
319             arbitrary POE signals (sent via POE::Kernel->signal), but does exclude
320             SIGCHLD/SIGCHILD, which is instead handled by sig_child above.
321              
322             These interface methods are automatically inserted when MooseX::Worker::Engine
323             detects that your manager class contains any methods beginning with sig_.
324             Signals are case-sensitive, so if you wish to handle a TERM signal, you must
325             define a sig_TERM() method. Note also that this action is performed upon
326             MooseX::Worker::Engine startup, so any run-time modification of your class
327             which 'does' MooseX::Workers is not likely to be detected.
328              
329             See the sig_TERM handler in the SYNOPSIS for an example.
330              
331             =back
332              
333             See L<MooseX::Workers::Engine> for more details.
334             Also see L<MooseX::Workers::Job> if you'd like to give your tasks
335             names, or set timeouts on them.
336              
337             =head1 WIN32 NOTES
338              
339             You don't need to binmode the STDIN/STDOUT/STDERR streams in your coderefs, this
340             is done for you. If you need utf8, it is safe to re-binmode them to
341             C<:encoding(UTF-8)>.
342              
343             Coderef workers that time out are killed with a SIGINT rather than a SIGTERM,
344             because TERM does not behave compatibly (thanks Rocco!) This is done with a:
345              
346             local $SIG{INT} = sub { exit 0 };
347              
348             that wraps the coderef.
349              
350             You cannot catch a TERM sent to the parent process (see L<perlport/kill>, use
351             INT instead.
352              
353             External programs are run with L<Win32::Job> by L<POE::Wheel::Run>. They are
354             prepended with C<cmd /c> so that builtin cmd commands also work. Use a
355             L<MooseX::Workers::Job> with a string program and arrayref args for this. If
356             you are using L<POE::Filter::Line> with an external program (which is the
357             default if you don't set the filter) the CRs from line ends will be removed
358             automatically.
359              
360             =head1 BUGS AND LIMITATIONS
361              
362             Please report any bugs or feature requests to
363             C<bug-moosex-workers@rt.cpan.org>, or through the web interface at
364             L<http://rt.cpan.org>.
365              
366             Version control: L<https://github.com/jhannah/moosex-workers>
367              
368             =head1 AUTHORS
369              
370             Chris Prather C<< <perigrin@cpan.org> >>
371              
372             Tom Lanyon C<< <dec@cpan.org> >>
373              
374             Jay Hannah C<< <jay@jays.net> >>
375              
376             Justin Hunter C<< <justin.d.hunter@gmail.com> >>
377              
378             David K. Storrs C<< <david.storrs@gmail.com> >>
379              
380             Rafael Kitover C<< <rkitover@cpan.org> >>
381              
382             =head1 LICENCE AND COPYRIGHT
383              
384             Copyright (c) 2007-2013, Chris Prather C<< <perigrin@cpan.org> >>. Some rights reserved.
385              
386             This module is free software; you can redistribute it and/or
387             modify it under the same terms as Perl itself. See L<perlartistic>.
388              
389              
390             =head1 DISCLAIMER OF WARRANTY
391              
392             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
393             FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT WHEN
394             OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER PARTIES
395             PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND, EITHER
396             EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
397             WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE. THE
398             ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE SOFTWARE IS WITH
399             YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME THE COST OF ALL
400             NECESSARY SERVICING, REPAIR, OR CORRECTION.
401              
402             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
403             WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
404             REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE
405             LIABLE TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL,
406             OR CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE
407             THE SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
408             RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
409             FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
410             SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF
411             SUCH DAMAGES.
412              
413             =cut
414              
415             1;
416              
417