File Coverage

blib/lib/Parallel/Tiny.pm
Criterion Covered Total %
statement 73 74 98.6
branch 12 18 66.6
condition 7 14 50.0
subroutine 15 16 93.7
pod 2 2 100.0
total 109 124 87.9


line stmt bran cond sub pod time code
1             package Parallel::Tiny;
2 11     11   823999 use strict;
  11         22  
  11         451  
3 11     11   44 use warnings;
  11         11  
  11         374  
4 11     11   4510 use POSIX qw(WNOHANG);
  11         38544  
  11         44  
5 11     11   14498 use Sys::Prctl qw(prctl);
  11         9845  
  11         649  
6              
7             our $VERSION = 1.00;
8              
9             # defaults for prctl()
10 11     11   55 use constant PR_SET_PDEATHSIG => 1;
  11         0  
  11         638  
11 11     11   33 use constant SIGHUP => 1;
  11         11  
  11         363  
12              
13             # defaults for the new() method
14 11     11   44 use constant DEFAULT_ERROR_TIMEOUT => 10;
  11         11  
  11         352  
15 11     11   33 use constant DEFAULT_REAP_TIMEOUT => .1;
  11         11  
  11         352  
16 11     11   33 use constant DEFAULT_SUBNAME => 'run';
  11         11  
  11         363  
17 11     11   33 use constant DEFAULT_WORKERS => 1;
  11         11  
  11         341  
18 11     11   33 use constant DEFAULT_WORKER_TOTAL => 1;
  11         11  
  11         4565  
19              
20             =head1 NAME
21              
22             Parallel::Tiny
23              
24             =head1 DESCRIPTION
25              
26             Provides a simple, no frills fork manager.
27              
28             =head1 SYNOPSIS
29              
30             Given an object that provides a C method, you can create a C fork manager object that will execute that method several times.
31              
32             my $obj = My::Handler->new();
33             my $forker = Parallel::Tiny->new(
34             handler => $obj,
35             workers => 4,
36             worker_total => 32,
37             );
38             $forker->run();
39              
40             In the above example we will execute the C method for a C object 4 workers at a time, until 32 total workers have completed/died.
41              
42             =head1 METHODS
43              
44             =over
45              
46             =item new()
47              
48             Returns a new C fork manager.
49              
50             Takes the following arguments as a hash or hashref:
51              
52             {
53             handler => $handler, # provide an object with a run() method, this will be your worker (required)
54             reap_timeout => $reap_timeout, # how long to wait in between reaping children (default ".1")
55             subname => $subname, # a method name to execute for the $handler (default "run")
56             workers => $workers, # the number of workers that can run simultaneously (default 1)
57             worker_total => $worker_total, # the total number of times to run before stopping (default 1)
58             }
59              
60             For instance, you could run 100 workers, 4 workers at a time:
61              
62             my $forker = Parallel::Tiny->new(
63             handler => $obj,
64             workers => 4,
65             worker_total => 100,
66             );
67              
68             C can be provided for the C<$worker_total> but you will need to manage stopping the fork manager elsewhere.
69              
70             If the parent is sent C it will wait to reap all currently executing children before finishing.
71              
72             If the parent is killed, children will receive C, which you will need to deal with in your C<$handler>.
73              
74             =cut
75              
76             sub new {
77 24     24 1 822 my $class = shift;
78 24 50       189 my $args = ref( $_[0] ) ? $_[0] : {@_};
79              
80             # set some defaults
81 24   50     228 $args->{reap_timeout} ||= DEFAULT_REAP_TIMEOUT;
82 24   50     132 $args->{subname} ||= DEFAULT_SUBNAME;
83 24   50     56 $args->{workers} ||= DEFAULT_WORKERS;
84 24   50     67 $args->{worker_total} ||= DEFAULT_WORKER_TOTAL;
85              
86             # special configuration
87 24 50       109 undef $args->{worker_total} if $args->{worker_total} eq 'infinite';
88              
89             # check args
90 24 50       147 die 'no handler provided' unless $args->{handler};
91             die "handler doesn't implement $args->{subname}()"
92 24 50       220 unless $args->{handler}->can( $args->{subname} );
93              
94             return bless(
95             {
96             _continue => 1,
97             _handler => $args->{handler},
98             _jobs => {},
99             _reap_timeout => $args->{reap_timeout},
100             _subname => $args->{subname},
101             _workers => $args->{workers},
102             _worker_total => $args->{worker_total},
103             },
104 24         563 $class
105             );
106             }
107              
108             =item run()
109              
110             Start running a number of parallel workers equal to C<$workers>, until a number of workers equal to C<$worker_total> have been completed.
111              
112             =cut
113              
114             sub run {
115 21     21 1 4443 my $self = shift;
116              
117 21     0   580 local $SIG{TERM} = sub { $self->{_continue} = 0 };
  0         0  
118              
119             # setup the fork manager
120 21         65 my $handler = $self->{_handler};
121 21         42 my $subname = $self->{_subname};
122              
123 21         67 while ( $self->_waitqueue() ) {
124              
125             # parent work
126 65         40903 my $pid = fork();
127 65 100       1403 if ($pid) {
128             $self->{_worker_total}--
129 55 50 33     966 if defined $self->{_worker_total} and $self->{_worker_total} > 0;
130 55         1850 $self->{_jobs}{$pid} = 1;
131 55         711 next;
132             }
133              
134             # child work
135 10         761 prctl( PR_SET_PDEATHSIG, SIGHUP );
136 10         3170 $SIG{$_} = 'DEFAULT' for keys %SIG;
137 10         396 $0 = $0 . ' - worker';
138 10         228 $handler->$subname();
139              
140             # child cleanup
141 10         406 exit 0;
142             }
143              
144             # wait for children
145 11         1627486 while ( wait() != -1 ) { }
146              
147 11         728 return 1;
148             }
149              
150             ##-----------------------------------------------------------------------------
151             ## Private Methods
152             ##-----------------------------------------------------------------------------
153              
154             # waits for another job slot to become available short circuits if we've received SIGTERM or reached worker total threshold
155             sub _waitqueue {
156 76     76   155 my $self = shift;
157              
158             # short circuit if we've already completed all the workers we've been configured to run
159 76 100 66     577 return 0 if defined $self->{_worker_total} and $self->{_worker_total} <= 0;
160              
161             # wait to reap at least one child
162 65         72 while ( keys %{ $self->{_jobs} } >= $self->{_workers} ) {
  26608         58821  
163 26543 50       36582 return 0 unless $self->{_continue};
164 26543         31942 $self->_reapchildren();
165 26543         1796235 sleep $self->{_reap_timeout};
166             }
167              
168 65         168 return 1;
169             }
170              
171             # cleans up children that are no longer running
172             sub _reapchildren {
173 26543     26543   18774 my $self = shift;
174 26543         14995 foreach my $pid ( keys %{ $self->{_jobs} } ) {
  26543         42399  
175 53086         82334 my $waitpid = waitpid( $pid, WNOHANG );
176 53086 100       79005 delete $self->{_jobs}{$pid} if $waitpid > 0;
177             }
178             }
179              
180             =back
181              
182             =cut
183              
184             1;
185              
186             # ABSTRACT: Provides a simple, no frills fork manager.
187