File Coverage

blib/lib/Sys/ForkQueue.pm
Criterion Covered Total %
statement 11 13 84.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 16 18 88.8


line stmt bran cond sub pod time code
1             package Sys::ForkQueue;
2             {
3             $Sys::ForkQueue::VERSION = '0.14';
4             }
5             BEGIN {
6 1     1   29526 $Sys::ForkQueue::AUTHORITY = 'cpan:TEX';
7             }
8             # ABSTRACT: Run any number of jobs in a controlled manner in parallel.
9              
10 1     1   29 use 5.010_000;
  1         3  
  1         38  
11 1     1   1129 use mro 'c3';
  1         796  
  1         7  
12 1     1   45 use feature ':5.10';
  1         2  
  1         114  
13              
14 1     1   509 use Moose;
  0            
  0            
15             use namespace::autoclean;
16              
17             # use IO::Handle;
18             # use autodie;
19             # use MooseX::Params::Validate;
20              
21             # for fork()
22             use Errno qw(EAGAIN);
23             use POSIX qw(WNOHANG SIGTERM);
24             use Sys::CPU;
25              
26             # DGR: we'll it's ugly but that's the way fork() works in perl ...
27             ## no critic (ProhibitPackageVars)
28             # for fork control
29             our $zombies = 0;
30             our %Kid_Status;
31             our %childs_running = ();
32             ## use critic
33             ## no critic (RequireLocalizedPunctuationVars)
34             $SIG{CHLD} = sub { $zombies++ };
35             $SIG{INT} = \&_sigterm;
36             $SIG{TERM} = \&_sigterm;
37             ## use critic
38              
39             has 'chdir' => (
40             'is' => 'rw',
41             'isa' => 'Str',
42             'default' => 0,
43             );
44              
45             has 'umask' => (
46             'is' => 'rw',
47             'isa' => 'Str',
48             'default' => 0,
49             );
50              
51             has 'jobs' => (
52             'is' => 'ro',
53             'isa' => 'ArrayRef[Str]',
54             'required' => 1,
55             );
56              
57             has '_job_status' => (
58             'is' => 'ro',
59             'isa' => 'HashRef[Int]',
60             'default' => sub { {} },
61             );
62              
63             has 'code' => (
64             'is' => 'ro',
65             'isa' => 'CodeRef',
66             'required' => 1,
67             );
68              
69             has 'args' => (
70             'is' => 'rw',
71             'isa' => 'HashRef',
72             'default' => sub { {} },
73             );
74              
75             has 'concurrency' => (
76             'is' => 'rw',
77             'isa' => 'Int',
78             'lazy' => 1,
79             'builder' => '_num_cores',
80             );
81              
82             has 'redirect_output' => (
83             'is' => 'rw',
84             'isa' => 'Str',
85             );
86              
87             has 'chdir' => (
88             'is' => 'rw',
89             'isa' => 'Str',
90             'default' => 0,
91             );
92              
93             has 'setsid' => (
94             'is' => 'rw',
95             'isa' => 'Bool',
96             'default' => 0,
97             );
98              
99             has 'delayedfork' => (
100             'is' => 'rw',
101             'isa' => 'Bool',
102             'default' => 1,
103             );
104              
105             with qw(Log::Tree::RequiredLogger);
106              
107             sub _num_cores {
108             my $self = shift;
109              
110             return Sys::CPU::cpu_count() || 1;
111             }
112              
113             sub run {
114             my $self = shift;
115              
116             # Loop control
117             my $concurrency = $self->concurrency(); # 0 means inifite num. of forks
118             my $forks_running = 0;
119             my $childs_returned = 0;
120             my $ok = 1;
121              
122             JOB: foreach my $job ( @{ $self->jobs() } ) {
123              
124             while ( $concurrency && $forks_running >= $concurrency ) {
125              
126             # wait until there is a free slot to run
127             ## no critic (ProhibitSleepViaSelect)
128             select undef, undef, undef, 0.2;
129             ## use critic
130             if ($zombies) {
131             my $reaped = $self->_reaper();
132             $childs_returned += $reaped if $reaped;
133             $forks_running = $forks_running - $reaped if $reaped;
134             }
135             }
136             if ( !$concurrency || $forks_running < $concurrency ) {
137             $self->logger->log( message => "Creating fork for Job: $job", level => 'debug', );
138              
139             # fork() - see Programming Perl p. 737
140             FORK:
141             {
142             if ( my $pid = fork ) {
143              
144             # This is the parent process, child pid is in $pid
145             $forks_running++;
146             $childs_running{$pid} = 1;
147             ## no critic (ProhibitSleepViaSelect)
148             select undef, undef, undef, 0.1 if $self->delayedfork();
149             ## use critic
150             }
151             elsif ( defined $pid ) {
152              
153             # prevent the possibility to acquire a controlling terminal
154             $SIG{'HUP'} = 'IGNORE';
155              
156             # bring Logger in a suitable state
157             # this will at least clear the internal logging buffer, other tasks may be performed as well depending on
158             # the implementation of the Logger
159             $self->logger()->forked();
160              
161             if ( $self->setsid() ) {
162             $self->logger()->log( message => 'Calling setsid', level => 'debug', );
163             POSIX::setsid() # create own process group
164             }
165             if ( $self->chdir() && -d $self->chdir() ) {
166             $self->logger()->log( message => 'Changing work dir to ' . $self->chdir(), level => 'debug', );
167             chdir( $self->chdir() );
168             }
169             elsif ( $self->chdir() ) {
170             $self->logger()->log( message => 'Changing work dir to /.', level => 'debug', );
171             chdir(q{/});
172             }
173              
174             # clear the file creation mask
175             umask $self->umask();
176             ## no critic (RequireCheckedClose)
177             close(STDIN);
178             if ( $self->redirect_output() ) {
179             $self->logger()->log( message => 'Redirecting output to ' . $self->redirect_output(), level => 'debug', );
180             close(STDOUT);
181             close(STDERR);
182             }
183             ## use critic
184             ## no critic (RequireCheckedOpen)
185             open( STDIN, '<', '/dev/null' );
186             if ( $self->redirect_output() ) {
187             open( STDOUT, '>>', $self->redirect_output() . q{.} . $job );
188             open( STDERR, '>>', $self->redirect_output() . q{.} . $job );
189             }
190             ## use critic
191              
192             # $pid is null, if defined
193             # This is the child process
194             # get the pid of the parent via getppid
195             my $pid = $$;
196             my $ppid = getppid();
197             $self->logger()->prefix('[CHILD '.$job.q{ }.$pid.q{/}.$ppid.']');
198              
199             $self->logger->log( message => 'Fork for Job '.$job.' running ...', level => 'debug', );
200              
201             my $t0 = time(); # starttime
202             my $status = &{ $self->code() }( $job, $self->args() );
203             my $d0 = time() - $t0; # duration
204             if ($status) {
205             $self->logger->log( message => 'Fork finished with SUCCESS after running for ' . $d0 . 's.', level => 'debug', );
206             exit 0;
207             }
208             else {
209             $self->logger->log( message => 'Fork finished with FAILURE after running ' . $d0 . 's.', level => 'warning', );
210             exit 1;
211             }
212              
213             # end of fork(). The child _must_ exit here!
214             }
215             elsif ( $! == EAGAIN ) {
216              
217             # EAGAIN, probably temporary fork error
218             sleep 5;
219             redo FORK;
220             }
221             else {
222              
223             # Strange fork error
224             warn 'Can not exec fork: '.$!."\n";
225             }
226             } # FORK
227             } # if-forks-running-lt-concurrency
228             else {
229             $self->logger->log( message => 'Too many childs to spawn a new one (Running: '.$forks_running.' / Max: '.$concurrency.')', level => 'debug', );
230             sleep 1;
231             redo JOB;
232             }
233             } # end of foreach jobs
234             $self->logger()->log( message => 'Dispatched all childs. Waiting for them to finish ...', level => 'debug', );
235             my $child;
236             while ( ( $child = waitpid( -1, 0 ) ) > 0 ) {
237             $self->_job_status()->{$child} = $? >> 8;
238             delete( $childs_running{$child} );
239             $childs_returned++;
240             if ( $self->_job_status()->{$child} != 0 ) {
241             $ok = 0;
242             }
243             }
244             $self->logger()->log( message => '[PARENT] Collected all child stati.', level => 'debug', );
245             $self->logger()->prefix(q{});
246             if ($ok) {
247             $self->logger()->log( message => 'All childs returned w/o error', level => 'debug', );
248             return 1;
249             }
250             else {
251             $self->logger()->log( message => 'Some childs returned an error', level => 'error', );
252             return;
253             }
254             }
255              
256             ############################################
257             # Usage : none, called by $SIG{CHLD}
258             # Purpose : Collect zombies
259             # Returns : Number of zombies collected
260             # Parameters : none
261             # Throws : no exceptions
262             # Comments : none
263             # See Also : Programming Perl, p. 432
264             sub _reaper {
265             my $self = shift;
266              
267             $zombies = 0;
268             my $childs_finished = 0;
269             my $child;
270             while ( ( $child = waitpid( -1, WNOHANG ) ) > 0 ) {
271             $self->_job_status()->{$child} = $? >> 8;
272             delete( $childs_running{$child} );
273             $childs_finished++;
274             }
275             return $childs_finished;
276             }
277              
278             sub _sigterm {
279             #print "Received SIGTERM. Aborting running forks ...\n";
280              
281             # kill childs - kill(TERM, -$$):
282             my $cnt = kill( SIGTERM, q{-} . $$ );
283             say 'Signaled '.$cnt.' processes in current processgroup';
284             foreach my $child_pid ( keys %childs_running ) {
285             next unless $child_pid;
286             kill( SIGTERM, $child_pid );
287             say 'Signaled '.$child_pid;
288             }
289              
290             # die
291             exit;
292             }
293              
294             no Moose;
295             __PACKAGE__->meta->make_immutable;
296              
297             1;
298              
299             __END__
300              
301             =pod
302              
303             =encoding utf-8
304              
305             =head1 NAME
306              
307             Sys::ForkQueue - Run any number of jobs in a controlled manner in parallel.
308              
309             =head1 SYNOPSIS
310              
311             use Sys::ForkQueue;
312             my @jobs = qw(1 2 3 4 5 6 7 8 9 10);
313             my $Queue = Sys::ForkQueue::->new({
314             'jobs' => \@jobs,
315             'code' => \&worker,
316             'logger' => Log::Tree::->new(),
317             });
318             $Queue->run();
319              
320             sub worker { ... }
321              
322             =head1 DESCRIPTION
323              
324             This class implements a job controller that can run any number of
325             jobs with configurable parllelism.
326              
327             =head1 ATTRIBUTES
328              
329             =head2 chdir
330              
331             Change to this directory after fork.
332              
333             If the given directory does not exist, change to /.
334              
335             =head2 umask
336              
337             Set this umask after fork.
338              
339             =head2 jobs
340              
341             Must contain a list of job names. Each will be passed
342             to the CODEREF in $self->code() when it's runnable.
343              
344             =head2 code
345              
346             The CODEREF. This will called for every job in the list.
347             Ths first argument will be the job name. The second one
348             will be $self->args() which is an hashref.
349              
350             =head2 args
351              
352             This will be passed to every invocation of $self->code().
353              
354             =head2 concurrency
355              
356             Run this many jobs in parallel.
357              
358             =head2 redirect_output
359              
360             Redirect all output to this file.
361              
362             =head2 chdir
363              
364             Change to this directory after fork()ing.
365              
366             =head2 setsid
367              
368             Call setsid after fork().
369              
370             =head2 delayedfork
371              
372             Sleep for a brief time after fork. Set this to false
373             if you plan to run many short lived jobs.
374              
375             =head1 NAME
376              
377             Sys::ForkQueue - Run any number of jobs in a controlled manner in parallel.
378              
379             =head1 SUBROUTINES/METHODS
380              
381             =head2 run
382              
383             Run all enqueud jobs.
384              
385             =head2 EAGAIN
386              
387             Imported from Errno.
388              
389             =head2 SIGTERM
390              
391             Imported from POSIX.
392              
393             =head2 WNOHANG
394              
395             Imported from POSIX.
396              
397             1; # End of Sys::ForkQueue
398              
399             =head1 AUTHOR
400              
401             Dominik Schulz <tex@cpan.org>
402              
403             =head1 COPYRIGHT AND LICENSE
404              
405             This software is copyright (c) 2012 by Dominik Schulz.
406              
407             This is free software; you can redistribute it and/or modify it under
408             the same terms as the Perl 5 programming language system itself.
409              
410             =cut