File Coverage

blib/lib/WorkerManager.pm
Criterion Covered Total %
statement 25 137 18.2
branch 0 26 0.0
condition 0 5 0.0
subroutine 9 28 32.1
pod 0 9 0.0
total 34 205 16.5


line stmt bran cond sub pod time code
1             package WorkerManager;
2 1     1   696 use strict;
  1         2  
  1         29  
3 1     1   5 use warnings;
  1         1  
  1         24  
4              
5 1     1   4 use Carp;
  1         2  
  1         100  
6 1     1   616 use Parallel::ForkManager;
  1         105945  
  1         49  
7 1     1   540 use Module::Load ();
  1         1314  
  1         23  
8 1     1   1261 use Time::Piece;
  1         11570  
  1         5  
9 1     1   77 use IO::Handle;
  1         3  
  1         45  
10 1     1   472 use IO::File;
  1         998  
  1         311  
11             our $LOGGER;
12              
13             our $VERSION = '0.1002';
14              
15             sub new {
16 0     0 0   my $class = shift;
17 0           my %args = @_;
18 0 0         if (@_ % 2) {
19 0           Carp::croak("Odd number of elements: " . __PACKAGE__ . "::new");
20             }
21              
22 0           my $self = bless {
23             max_processes => 4,
24             works_per_child => 100,
25             @_,
26             pids => {},
27             };
28              
29 0           $self->init;
30 0           $self;
31             }
32              
33             BEGIN {
34             $LOGGER = sub {
35 0           my ($class, $msg) = @_;
36 0           print localtime->datetime, " $class $msg\n";
37 1     1   1372 };
38             }
39              
40             my $fh;
41             sub open_logs {
42 0     0 0   my $self = shift;
43              
44 0 0         if ($self->{log_file}) {
45 0           $fh = IO::File->new(">>" . $self->{log_file});
46 0           $fh->autoflush(1);
47             $LOGGER = sub {
48 0     0     my ($class, $msg) = @_;
49 0           $msg =~ s/\s+$//;
50 0   0       $fh ||= IO::File->new(">>" . $self->{log_file});
51 0           $fh->print(localtime->datetime. " $class $msg\n");
52 0           };
53             }
54            
55 0 0         if ($self->{error_log_file}) {
56             # close STDOUT;
57             # close STDERR;
58            
59             open(STDOUT, ">>" . $self->{error_log_file})
60 0 0         or die "Failed to re-open STDOUT to ". $self->{error_log_file};
61 0 0         open STDERR, ">>&STDOUT" or die "Can't dup STDOUT: $!";
62             }
63              
64 0           STDOUT->autoflush(1);
65 0           STDERR->autoflush(1);
66             }
67              
68             sub init {
69 0     0 0   my $self = shift;
70              
71 0           for my $key (keys %{$self->{env}}) {
  0            
72 0 0         $ENV{$key} = $self->{env}{$key} if !defined $ENV{$key};
73             }
74              
75 0           my $worker_client_class = "WorkerManager::" . $self->{type};
76 0           Module::Load::load($worker_client_class);
77 0 0         $self->{client} = $worker_client_class->new($self->{worker}, $self->{worker_options}) or die;
78              
79             $self->{pm} = Parallel::ForkManager->new($self->{max_processes})
80 0 0         or die("Unable to create ForkManager object: $!\n");
81              
82             $self->{pm}->run_on_finish(
83 0     0     sub { my ($pid, $exit_code, $ident) = @_;
84 0           $LOGGER->('WorkerManager', "$ident exited with PID $pid and exit code: $exit_code");
85 0           delete $self->{pids}->{$pid};
86             }
87 0           );
88              
89             $self->{pm}->run_on_start(
90 0     0     sub { my ($pid,$ident)=@_;
91 0           $LOGGER->('WorkerManager', "$ident started with PID $pid");
92 0           $self->{pids}->{$pid} = $ident;
93             #print join(',', map {"$_($self->{pids}->{$_})"} keys %{$self->{pids}});
94             #print "\n";
95             }
96 0           );
97              
98 0           $self->{count} = 0;
99 0           $self->{terminating} = undef;
100              
101 0           $self->open_logs;
102 0           $self->set_signal_handlers;
103             }
104              
105             sub set_signal_handlers {
106 0     0 0   my $self = shift;
107              
108 0           setpgrp;
109             my $terminate_handle = sub {
110 0     0     my $sig = shift;
111 0           warn "=== killed by $sig. ($$)";
112              
113 0           $self->{terminating} = 1;
114 0 0         $self->{client}->terminate if $self->{client};
115 0 0         if ($self->{pm}->is_parent) {
116 0           $self->terminate_all_children;
117             }
118 0           };
119              
120              
121 0           $SIG{QUIT} = $terminate_handle;
122 0           $SIG{TERM} = $terminate_handle;
123              
124             my $interrupt_handle = sub {
125 0     0     my $sig = shift;
126             # warn "=== killed by $sig. ($$)";
127              
128 0           $self->{terminating} = 1;
129 0 0         $self->{client}->terminate if $self->{client};
130 0 0         if ($self->{pm}->is_parent) {
131 0           $self->killall_children;
132             }
133 0           exit(1);
134 0           };
135              
136 0           $SIG{INT} = $interrupt_handle;
137              
138             my $reopen_log_handle = sub {
139 0     0     my $sig = shift;
140 0           $self->open_logs;
141 0           };
142 0           $SIG{HUP} = $reopen_log_handle;
143             }
144              
145             sub set_signal_handlers_for_child {
146 0     0 0   my $self = shift;
147              
148 0           setpgrp;
149             my $terminate_handle = sub {
150 0     0     my $sig = shift;
151 0           $self->{terminating} = 1;
152 0           $self->{client}->terminate;
153 0           };
154              
155 0           $SIG{QUIT} = $terminate_handle;
156 0           $SIG{TERM} = $terminate_handle;
157              
158             my $interrupt_handle = sub {
159 0     0     my $sig = shift;
160 0           warn "killed by $sig. ($$)";
161 0           $self->{terminating} = 1;
162 0           $self->{client}->terminate;
163 0           exit 0;
164 0           };
165 0           $SIG{INT} = $interrupt_handle;
166              
167             my $reopen_log_handle = sub {
168 0     0     my $sig = shift;
169 0           $self->open_logs;
170 0           };
171 0           $SIG{HUP} = $reopen_log_handle;
172             }
173              
174             sub terminate_all_children {
175 0     0 0   my $self = shift;
176 0           warn "terminating. children: " . join(",", keys %{$self->{pids}});
  0            
177 0           kill "TERM", $_ for keys %{$self->{pids}};
  0            
178             }
179              
180             sub killall_children {
181 0     0 0   my $self = shift;
182 0           warn "killing. children: " . join(",", keys %{$self->{pids}});
  0            
183 0           kill "INT", $_ for keys %{$self->{pids}};
  0            
184             }
185              
186             sub reopen_children {
187 0     0 0   my $self = shift;
188 0           $self->terminate_all_children;
189             }
190              
191             sub main {
192 0     0 0   my $self = shift;
193 0           while (!$self->{terminating}) {
194 0 0         my $pid = $self->{pm}->start(++$self->{count}) and next;
195 0           $self->set_signal_handlers_for_child;
196 0           $0 .= " [child process $self->{count}]";
197 0           $self->{client}->work($self->{works_per_child});
198 0           $self->{pm}->finish;
199             }
200 0           $self->terminate_all_children;
201             local $SIG{ALRM} = sub {
202 0     0     $LOGGER->('WorkerManager', "Timeout to terminate children");
203 0           $self->killall_children;
204 0           exit 1;
205 0           };
206 0   0       alarm($self->{wait_terminating} || 10);
207 0           $self->{pm}->wait_all_children;
208 0           alarm 0;
209              
210             }
211              
212             1;
213              
214             __END__