File Coverage

blib/lib/Parallel/Boss.pm
Criterion Covered Total %
statement 8 48 16.6
branch 0 16 0.0
condition 0 2 0.0
subroutine 3 9 33.3
pod 1 1 100.0
total 12 76 15.7


line stmt bran cond sub pod time code
1             package Parallel::Boss;
2 2     2   157283 use 5.012;
  2         6  
3 2     2   7 use strict;
  2         3  
  2         34  
4 2     2   6 use warnings;
  2         12  
  2         971  
5             our $VERSION = "0.02";
6             my $XS_VERSION=$VERSION;
7             $VERSION = eval $VERSION;
8              
9             require XSLoader;
10             XSLoader::load("Parallel::Boss", $XS_VERSION);
11              
12             =head1 NAME
13              
14             Parallel::Boss - manage worker processes
15              
16             =head1 VERSION
17              
18             This document describes Parallel::Boss version 0.02
19              
20             =head1 SYNOPSIS
21              
22             use Parallel::Boss;
23              
24             my $worker = sub {
25             my ( @args ) = @_;
26             # pretend to be working
27             ...;
28             };
29              
30             Parallel::Boss->run(
31             num_workers => 4,
32             args => \@args,
33             exit_timeout => 15,
34             worker => $worker,
35             );
36              
37             =head1 DESCRIPTION
38              
39             Module running specified number of worker processes.
40              
41             =head1 METHODS
42              
43             =cut
44              
45             =head2 run
46              
47             $class->run(%params)
48              
49             start specified number of workers and supervise them. If any of the workers
50             exits, a new one will be started as a replacement. If parent process receives
51             HUP signal, then it sends HUP signal to every worker process and restarts
52             workers if they exit. If parent process receives INT, QUIT, or TERM, it sends
53             TERM to all workers, waits for up to I seconds till they all
54             exit, and sends KILL to those workers that are still running, after all workers
55             exited the run method returns. Each worker process runs watchdog thread that
56             detects if the parent process has died and terminates the worker by sending it
57             first SIGTERM and then calling _exit(2) after I seconds if the
58             worker is still running.
59              
60             The following parameters are accepted:
61              
62             =over 4
63              
64             =item B
65              
66             number of workers to start
67              
68             =item B
69              
70             reference to array of arguments that should be passed to worker subroutine
71              
72             =item B
73              
74             when parent process signalled to exit it first sends to all workers SIGTERM. If
75             exit_timeout is set and greater than zero then after exit_timeout workers that
76             are still running are sent SIGKILL.
77              
78             =item B
79              
80             subroutine that will be executed by every worker. If it returns, the worker
81             process exits. The subroutine passed the Parallel::Boss object as the first
82             argument, and array specified by I as the following arguments.
83              
84             =back
85              
86             =cut
87              
88             sub run {
89 0     0 1   my ( $class, %args ) = @_;
90              
91 0           my $self = bless \%args, $class;
92              
93 0 0         pipe( my $rd, my $wr ) or die "Couldn't create a pipe";
94 0           $self->{_rd} = $rd;
95 0           $self->{_wr} = $wr;
96              
97             local $SIG{QUIT} = local $SIG{INT} = local $SIG{TERM} = sub {
98 0     0     $self->{_finish} = 1;
99 0           $self->{_wr}->close;
100 0           $self->_kill_children("TERM");
101 0 0         alarm $self->{exit_timeout} if $self->{exit_timeout};
102 0           };
103 0     0     local $SIG{HUP} = sub { $self->_kill_children("HUP"); };
  0            
104             local $SIG{ALRM} = sub {
105 0 0   0     $self->_kill_children("KILL") if $self->{_finish};
106 0           };
107              
108 0           for ( 1 .. $self->{num_workers} ) {
109 0           $self->_spawn;
110             }
111              
112 0           while (1) {
113 0           my $pid = wait;
114 0 0         delete $self->{_workers}{$pid} or next;
115 0 0         if ($self->{_finish}) {
116 0 0         last unless keys %{ $self->{_workers} };
  0            
117             } else {
118 0           $self->_spawn;
119             }
120             }
121             }
122              
123             sub _spawn {
124 0     0     my ($self) = @_;
125 0           my $pid = fork;
126 0 0         if ( not defined $pid ) {
127 0           $self->_kill_children("KILL");
128 0           die "Couldn't fork, exiting: $!";
129             }
130              
131 0 0         if ($pid) {
132 0           $self->{_workers}{$pid} = 1;
133             }
134             else {
135 0           eval {
136 0           $self->{_wr}->close;
137 0           $SIG{$_} = 'DEFAULT' for qw(QUIT HUP INT TERM ALRM);
138 0   0       _start_watchdog( $self->{_rd}->fileno, ($self->{exit_timeout} // 0 ));
139 0           $self->{worker}->( $self, @{ $self->{args} } );
  0            
140             };
141 0           exit 0;
142             }
143             }
144              
145             sub _kill_children {
146 0     0     my ($self, $sig) = @_;
147              
148 0           kill $sig => keys %{ $self->{_workers} };
  0            
149             }
150              
151             1;
152              
153             __END__