File Coverage

blib/lib/Parallel/Tiny.pm
Criterion Covered Total %
statement 73 74 98.6
branch 12 18 66.6
condition 7 14 50.0
subroutine 15 16 93.7
pod 2 2 100.0
total 109 124 87.9


line stmt bran cond sub pod time code
1             package Parallel::Tiny;
2 11     11   869473 use strict;
  11         11  
  11         473  
3 11     11   33 use warnings;
  11         22  
  11         275  
4 11     11   5456 use POSIX qw(WNOHANG);
  11         40106  
  11         55  
5 11     11   13827 use Sys::Prctl qw(prctl);
  11         8668  
  11         506  
6              
7             ## prctl() defaults
8 11     11   55 use constant PR_SET_PDEATHSIG => 1;
  11         11  
  11         627  
9 11     11   33 use constant SIGHUP => 1;
  11         11  
  11         352  
10              
11             ## new() defaults
12 11     11   22 use constant DEFAULT_ERROR_TIMEOUT => 10;
  11         11  
  11         352  
13 11     11   33 use constant DEFAULT_REAP_TIMEOUT => .1;
  11         11  
  11         341  
14 11     11   33 use constant DEFAULT_SUBNAME => 'run';
  11         0  
  11         341  
15 11     11   22 use constant DEFAULT_WORKERS => 1;
  11         11  
  11         352  
16 11     11   33 use constant DEFAULT_WORKER_TOTAL => 1;
  11         0  
  11         5049  
17              
18             our $VERSION = 0.10;
19              
20             =head1 NAME
21              
22             Parallel::Tiny
23              
24             =head1 DESCRIPTION
25              
26             Provides a very simple, no frills fork manager.
27              
28             =head1 SYNOPSIS
29              
30             my $obj = My::Handler->new();
31              
32             my $forker = Parallel::Tiny->new(
33             handler => $obj,
34             subname => 'start', # My::Handler must implement start()
35             workers => 4,
36             worker_total => 'infinite',
37             );
38              
39             $forker->run();
40              
41             =head1 METHODS
42              
43             =over
44              
45             =item new()
46              
47             Returns a new Parallel::Tiny fork manager.
48              
49             takes arguments as a hash or hashref with the following arguments:
50              
51             handler - an object you provide which has a run() method (unless you define "subname")
52             (required)
53              
54             reap_timeout - the number of seconds to wait between runs of
55             waitpid() to reap children
56             (default .1)
57              
58             subname - the name of the sub you want to invoke on child spawn
59             (default 'run')
60              
61             workers - the number of simoltaneous forked processes you
62             want to allow at one time
63             (default 1)
64              
65             worker_total - the total number of processes that you want to run
66             (default 1)
67              
68             You can for instance, say that you want to run 100 proccesses,
69             but only 4 at a time like this:
70              
71             my $forker = Parallel::Tiny->new(
72             handler => $obj,
73             workers => 4,
74             worker_total => 100,
75             );
76              
77             If you want you can provide 'infinite' for worker_total.
78             If you do this, you're responsible for stopping the fork manager!
79              
80             Signals:
81             ---
82              
83             If the parent is sent SIGTERM, the parent will wait to reap all children.
84              
85             If the parent is killed before its children finish, children are configured to receive HUP.
86              
87             =cut
88              
89             sub new {
90 24     24 1 757 my $class = shift;
91 24 50       134 my $args = ref($_[0]) ? $_[0] : {@_};
92              
93             # set some defaults
94 24   50     155 $args->{reap_timeout} ||= DEFAULT_REAP_TIMEOUT;
95 24   50     133 $args->{subname} ||= DEFAULT_SUBNAME;
96 24   50     117 $args->{workers} ||= DEFAULT_WORKERS;
97 24   50     67 $args->{worker_total} ||= DEFAULT_WORKER_TOTAL;
98              
99             # special configuration
100 24 50       112 undef $args->{worker_total} if $args->{worker_total} eq 'infinite';
101              
102             # check args
103 24 50       116 die 'no handler provided' unless $args->{handler};
104 24 50       156 die "handler doesn't implement $args->{subname}()" unless $args->{handler}->can($args->{subname});
105              
106             return bless({
107             _continue => 1,
108             _handler => $args->{handler},
109             _jobs => {},
110             _reap_timeout => $args->{reap_timeout},
111             _subname => $args->{subname},
112             _workers => $args->{workers},
113             _worker_total => $args->{worker_total},
114 24         504 }, $class);
115             }
116              
117             =item run()
118              
119             Start spooling jobs according to the configuration.
120              
121             =cut
122              
123             sub run {
124 21     21 1 2402 my $self = shift;
125              
126 21     0   393 local $SIG{TERM} = sub { $self->{_continue} = 0 };
  0         0  
127              
128             # setup the fork manager
129 21         79 my $handler = $self->{_handler};
130 21         32 my $subname = $self->{_subname};
131              
132 21         99 while ($self->_waitqueue()) {
133             # parent work
134 65         44994 my $pid = fork();
135 65 100       1242 if ($pid) {
136 55 50 33     1258 $self->{_worker_total}-- if defined $self->{_worker_total} and $self->{_worker_total} > 0;
137 55         1825 $self->{_jobs}{$pid} = 1;
138 55         939 next;
139             }
140              
141             # child work
142 10         680 prctl(PR_SET_PDEATHSIG, SIGHUP);
143 10         2846 $SIG{$_} = 'DEFAULT' for keys(%SIG);
144 10         270 $0 = $0 . ' - worker';
145 10         266 $handler->$subname();
146              
147             # child cleanup
148 10         442 exit 0;
149             }
150              
151             # wait for children
152 11         1637981 while ( wait() != -1 ) {}
153              
154 11         539 return 1;
155             }
156              
157             ## Private
158             ############
159              
160             ## waits for another job slot to become available
161             ## short circuits if we've received SIGTERM or reached worker total threshold
162             sub _waitqueue {
163 76     76   244 my $self = shift;
164              
165             # check for any stopping conditions
166 76 100 66     748 return 0 if defined $self->{_worker_total} and $self->{_worker_total} <= 0;
167              
168             # wait to reap at least one child
169 65         86 while (keys(%{ $self->{_jobs} }) >= $self->{_workers}) {
  196896         262407  
170 196831 50       222266 return 0 unless $self->{_continue};
171 196831         181497 $self->_reapchildren();
172 196831         174861 sleep $self->{_reap_timeout};
173             }
174              
175 65         165 return 1;
176             }
177              
178             ## checks all currently running jobs and reaps any that have finished, opening
179             ## their slot.
180             sub _reapchildren {
181 196831     196831   117814 my $self = shift;
182 196831         107793 foreach my $pid (keys(%{ $self->{_jobs} })) {
  196831         220495  
183 393662         386327 my $waitpid = waitpid($pid, WNOHANG);
184 393662 100       490600 delete $self->{_jobs}{$pid} if $waitpid > 0;
185             }
186             }
187              
188             =back
189              
190             =cut
191              
192             1;
193              
194             # ABSTRACT: Provides a very simple, no frills fork manager.
195