File Coverage

blib/lib/Parallel/Boss.pm
Criterion Covered Total %
statement 9 50 18.0
branch 0 16 0.0
condition 0 6 0.0
subroutine 3 10 30.0
pod 2 2 100.0
total 14 84 16.6


line stmt bran cond sub pod time code
1             package Parallel::Boss;
2 1     1   19736 use strict;
  1         2  
  1         46  
3 1     1   8 use warnings;
  1         2  
  1         83  
4             our $VERSION = "0.01";
5             $VERSION = eval $VERSION;
6              
7 1     1   548 use IO::Select;
  1         1593  
  1         588  
8              
9             =head1 NAME
10              
11             Parallel::Boss - manage worker processes
12              
13             =head1 VERSION
14              
15             This document describes Parallel::Boss version 0.01
16              
17             =head1 SYNOPSIS
18              
19             use Parallel::Boss;
20              
21             my $worker = sub {
22             my ( $boss, @args ) = @_;
23             while ( $boss->is_watching ) {
24              
25             # pretend to be working
26             ...;
27             }
28             };
29              
30             Parallel::Boss->run(
31             num_workers => 4,
32             args => \@args,
33             worker => $worker,
34             );
35              
36             =head1 DESCRIPTION
37              
38             Module running specified number of worker processes.
39              
40             =head1 METHODS
41              
42             =cut
43              
44             =head2 run
45              
46             $class->run(%params)
47              
48             start specified number of workers and supervise them. If any of the workers
49             exits, a new one will be started as a replacement. If parent process receives
50             HUP signal, then it sends HUP signal to every worker process and restarts
51             workers if they exit. If parent process receives INT, QUIT, or TERM, it sends
52             TERM to all workers, waits for up to 15 seconds till they all exit, and sends
53             KILL to those workers that are still running, after all workers exited the run
54             method returns.
55              
56             The following parameters are accepted:
57              
58             =over 4
59              
60             =item B
61              
62             number of workers to start
63              
64             =item B
65              
66             reference to array of arguments that should be passed to worker subroutine
67              
68             =item B
69              
70             subroutine that will be executed by every worker. If it returns, the worker
71             process exits. The subroutine passed the Parallel::Boss object as the first
72             argument, and array specified by I as the following arguments.
73              
74             =back
75              
76             =cut
77              
78             sub run {
79 0     0 1   my ( $class, %args ) = @_;
80              
81 0           my $self = bless \%args, $class;
82              
83 0 0         pipe( my $rd, my $wr ) or die "Couldn't create a pipe";
84 0           $self->{_rd} = $rd;
85 0           $self->{_wr} = $wr;
86              
87             local $SIG{QUIT} = local $SIG{INT} = local $SIG{TERM} = sub {
88 0     0     $self->_kill_children("TERM");
89 0           $self->{_finish} = 1;
90 0           $self->{_wr}->close;
91 0           alarm 15;
92 0           };
93 0     0     local $SIG{HUP} = sub { $self->_kill_children("HUP"); };
  0            
94             local $SIG{ALRM} = sub {
95 0 0   0     $self->_kill_children("KILL") if $self->{_finish};
96 0           };
97              
98 0           for ( 1 .. $self->{num_workers} ) {
99 0           $self->_spawn;
100             }
101              
102 0           while (1) {
103 0           my $pid = wait;
104 0 0         delete $self->{_workers}{$pid} or next;
105 0 0 0       last if $self->{_finish} and not keys %{ $self->{_workers} };
  0            
106 0 0         $self->_spawn unless $self->{_finish};
107             }
108             }
109              
110             sub _spawn {
111 0     0     my ($self) = @_;
112 0           my $pid = fork;
113 0 0         if ( not defined $pid ) {
114 0           $self->_kill_children("KILL");
115 0           die "Couldn't fork, exiting: $!";
116             }
117              
118 0 0         if ($pid) {
119 0           $self->{_workers}{$pid} = 1;
120             }
121             else {
122 0           $self->{_wr}->close;
123 0           $SIG{$_} = 'DEFAULT' for qw(QUIT HUP INT TERM ALRM);
124 0           $self->{worker}->( $self, @{ $self->{args} } );
  0            
125 0           exit 0;
126             }
127             }
128              
129             sub _kill_children {
130 0     0     my ($self, $sig) = @_;
131              
132 0           kill $sig => keys %{ $self->{_workers} };
  0            
133             }
134              
135             =head2 is_watching
136              
137             $boss->is_watching
138              
139             this method should be periodically invoked by the worker process. It checks if
140             the parent process is still running, if not the worker should exit.
141              
142             =cut
143              
144             sub is_watching {
145 0     0 1   my ($self) = @_;
146 0   0       $self->{_select} //= IO::Select->new( $self->{_rd} );
147 0 0         return if $self->{_select}->can_read;
148 0           return 1;
149             }
150              
151             1;
152              
153             __END__