File Coverage

blib/lib/MultiProcFactory.pm
Criterion Covered Total %
statement 200 221 90.5
branch 50 70 71.4
condition 9 18 50.0
subroutine 34 39 87.1
pod 22 23 95.6
total 315 371 84.9


line stmt bran cond sub pod time code
1             package MultiProcFactory;
2             # @(#) $Name: $ $Id: MultiProcFactory.pm,v 1.6 2004/09/21 23:06:49 aaron Exp $
3             ## Aaron Dancygier
4              
5             ## Base class forking object for distributed processing
6             ## among N children. Parent aggregates when children are done.
7              
8 52     52   15998 use 5.005;
  52         157  
  52         1893  
9 26     26   105 use strict;
  26         52  
  26         624  
10 26     26   130 use Carp;
  26         101  
  26         2338  
11 26     26   22684 use IO::File;
  26         589800  
  26         3603  
12 26     26   31261 use IPC::Shareable;
  26         632482  
  26         1501  
13              
14 26     26   261 use vars qw($VERSION);
  26         51  
  26         80609  
15             $VERSION = '0.04';
16              
17             sub catch_int {
18 0     0 0 0 IPC::Shareable->clean_up;
19             }
20              
21             sub factory {
22 52     52 1 364573 my $baseclass = shift;
23 52         1876 my (%params) = @_;
24            
25 52         172 my $obj;
26              
27 52 50       269 croak("must have work_by parameter defined cannot create class\n")
28             unless (defined($params{work_by}));
29              
30 52         193 my $class = $params{work_by};
31              
32 52 50       943 unless ($class =~ /^\w+(?:\:\:\w+)*$/) {
33 0         0 croak("must supply work_by parameter with class name\n");
34             }
35              
36 26     26   15949 eval "use $class";
  26         53  
  26         445  
  52         8068  
37              
38 52 50       263 if ($@) {
39 0         0 croak "Error in factory method\n@_, $@";
40             }
41            
42 52         875 return "$class"->new(%params);
43             }
44              
45             sub new {
46 52     52 1 393 my ($class, %args) = @_;
47              
48 52         403 my $tmp_log_name = $0;
49              
50 52         365 $tmp_log_name =~ s/\.(?:[^\.]+)$//;
51              
52 52   66     416 $args{log_file} ||= "$tmp_log_name.log";
53 52 100       320 $args{log_children} = 0 unless(defined($args{log_children}));
54 52 100       346 $args{log_parent} = 1 unless(defined($args{log_parent}));
55 52   50     612 $args{log_child_append} ||= 0;
56 52   50     450 $args{log_parent_append} ||= 0;
57              
58             ## set parent signal handlers
59 52         2720 $SIG{QUIT} = $SIG{ABRT} = $SIG{TERM} = $SIG{INT} = \&catch_int;
60              
61             ## set up default signal handlers
62 52         356 _set_parent_signals(\%args);
63              
64             ## set up default shared data structures
65 52 100       235 unless ($args{IPC_OFF}) {
66 51         819 my $scalar_handle = tie my $shm_scalar, 'IPC::Shareable', undef, {destroy => 1};
67 51         33438 my $hash_handle = tie my %shm_hash, 'IPC::Shareable', undef, {destroy => 1};
68              
69 51         27911 $args{share_scalar}{handle} = $scalar_handle;
70 51         145 $args{share_scalar}{var} = \$shm_scalar;
71 51         170 $args{share_hash}{handle} = $hash_handle;
72 51         195 $args{share_hash}{var} = \%shm_hash;
73             }
74              
75 52 50       760 croak("required parameter do_child code reference is missing or not a code reference\n")
76             unless (ref($args{do_child}) eq 'CODE');
77              
78 52 50       250 croak("required parameter do_parent_final code reference is missing or not a code reference\n")
79             unless (ref($args{do_parent_final}) eq 'CODE');
80              
81 52 50       192 if (ref($args{do_parent_init})) {
82 0 0       0 unless (ref($args{do_parent_init}) eq 'CODE') {
83 0         0 croak("optional parameter do_parent_init must be a code reference\n")
84             }
85             } else {
86 52     51   388 $args{do_parent_init} = sub { return 1; };
  51         102  
87             }
88              
89 52   50     508 $args{max_proc} ||= 20;
90              
91 52         148 my $self = bless \%args, $class;
92              
93 52         501 $self->_set_do_parent_init();
94 52         346 $self->_set_do_parent_final();
95 52         302 $self->_set_do_child();
96 52         294 $self->init();
97              
98 52         447 return $self;
99             }
100              
101             sub run {
102 51     51 1 35029 my $self = shift;
103              
104 51         119 my $pfh;
105              
106 51         391 $self->{log_file} = $self->set_parent_logname();
107              
108 51 50       195 my $mode = ($self->{log_parent_append})
109             ? '>>'
110             : '>'
111             ;
112              
113 51 100       263 if ($self->{log_parent}) {
114 34   33     466 $self->{logp} = IO::File->new("$mode$self->{log_file}") ||
115             croak("unable to open parent filehandle $!\n");
116 34         7651 $pfh = $self->{logp};
117 34         322 $pfh->autoflush(1);
118             }
119              
120 51         1989 $self->do_parent_init();
121              
122 51 100       195 if ($self->{log_parent}) {
123 34         5227 $self->log_parent(localtime() . " [$$] parent begin\n");
124             }
125              
126 51         135 my ($pid, $prockey);
127              
128 51         170 my $max_proc = $self->{max_proc};
129              
130 51         77 my $child_count = 0;
131              
132 51         338 my @partition_keys = $self->get_prockeys();
133              
134 51         153 my $proc_count = 0;
135              
136 51         195 START: while(@partition_keys) {
137 51   66     406 while ($child_count < $max_proc and @partition_keys) {
138 324         4497 my $key = shift @partition_keys;
139 324         600108 $pid = fork();
140 324         33867 $child_count ++;
141 324         582 $proc_count ++;
142            
143 324 100       16613 if ($pid) {
    50          
144             ## parent go get another one
145 300         16214 next;
146             } elsif(defined($pid)) {
147 24         2684 local $^W = undef;
148 24         6486 $SIG{QUIT} = $SIG{ABRT} = $SIG{TERM} = $SIG{INT} = undef;
149 24         7144 $self->_set_child_signals();
150 24         231 $prockey = $key;
151 24         457 last;
152             }
153             }
154            
155 51 100       3476 if ($pid) {
156 27         146 my $kid;
157            
158 27         155 do {
159 27         139563052 $kid = waitpid(-1, 0);
160            
161 27 100       1289 if ($self->{log_parent}) {
162 18         4650 $self->log_parent(localtime() . " [$$] child_count: $child_count, parent repeaping: $kid\n");
163             }
164              
165 27         270 $child_count --;
166              
167 27         7430 next START;
168             } until ($kid == -1);
169            
170 0         0 next;
171             }
172              
173 24 100       14415 if ($self->{log_children}) {
174 8         21 my $tmp_log_name = $self->{log_file};
175 8         58083 $tmp_log_name =~ s/\.log$//;
176 8         143 $self->{current_child_logname} = "$tmp_log_name\_$proc_count\.log";
177 8         437 $self->{current_child_logname} = $self->set_child_logname();
178              
179 8 50       156 my $mode = ($self->{log_child_append})
180             ? '>>'
181             : '>'
182             ;
183              
184 8   33     726 $self->{logc} = IO::File->new("$mode$self->{current_child_logname}") ||
185             croak("unable to open child filehandle n child [$$]$!\n");
186 8         1349134 $self->{logc}->autoflush(1);
187             }
188              
189 24         1896 $self->{prockey} = $prockey;
190 24         9007 $self->do_child_init();
191 24         178 $self->work();
192              
193 24 100       1867 if ($self->{log_children}) {
194 8         88 $self->{logc}->close();
195             }
196              
197 24         64717 exit(0);
198             }
199              
200 27         216 my $kid;
201              
202 27         149 do {
203 216         23451544 $kid = waitpid(-1, 0);
204              
205 216 100       2615 if ($self->{log_parent}) {
206 144         12080 $self->log_parent(localtime() . " [$$] child_count: $child_count, parent repeaping: $kid\n");
207             }
208            
209 216         1783 $child_count --;
210              
211             } until ($kid == -1);
212              
213 27         975 $self->do_parent_final();
214            
215 27 100       266 if ($self->{log_parent}) {
216 18         1239 $self->log_parent(localtime() . " [$$] parent done\n");
217 18         917 $pfh->close();
218             }
219              
220 27         1719 return 1;
221             }
222              
223             sub init {
224 0     0 1 0 my $self = shift;
225              
226 0         0 croak("init() must be implemented in $self->{'work_by'}\n");
227             }
228              
229             sub do_child_init {
230 0     0 1 0 my $self = shift;
231              
232 0         0 croak("do_child_init() must be implemented in $self->{'work_by'}\n");
233             }
234              
235             sub work {
236 0     0 1 0 my $self = shift;
237              
238 0         0 croak("work() must be implemented in $self->{'work_by'}\n");
239             }
240              
241             sub log_parent {
242 430     430 1 75485 my ($self, $text) = @_;
243              
244 430 100       3316 if ($self->{log_parent}) {
245 358         814 my $pfh = $self->{logp};
246              
247 358         17403 print $pfh $text;
248             }
249             }
250              
251             sub log_child {
252 16     16 1 5677 my ($self, $text) = @_;
253              
254 16 100       111 if ($self->{log_children}) {
255 8         20 my $cfh = $self->{logc};
256              
257 8         58888 print $cfh $text;
258             }
259             }
260              
261             sub get_prockey {
262 40     40 1 15154 my $self = shift;
263              
264 40         535 return $self->{prockey};
265             }
266              
267             sub get_prockeys {
268 106     106 1 41356 my $self = shift;
269              
270 106         159 return (keys %{$self->{partition_hash}});
  106         1358  
271             }
272              
273             sub scalar_lock {
274 24     24 1 119 my $self = shift;
275              
276 24 50       3241 (! $self->{IPC_OFF})
277             ? return $self->{share_scalar}{handle}->shlock()
278             : return undef
279             ;
280             }
281              
282             sub scalar_unlock {
283 24     24 1 58 my $self = shift;
284              
285 24 50       1146 (! $self->{IPC_OFF})
286             ? return $self->{share_scalar}{handle}->shunlock()
287             : return undef
288             ;
289             }
290              
291             sub hash_lock {
292 24     24 1 173 my $self = shift;
293              
294 24 50       235 (! $self->{IPC_OFF})
295             ? return $self->{share_hash}{handle}->shlock()
296             : return undef
297             ;
298             }
299              
300             sub hash_unlock {
301 24     24 1 51 my $self = shift;
302              
303 24 50       216 (! $self->{IPC_OFF})
304             ? return $self->{share_hash}{handle}->shunlock()
305             : return undef
306             ;
307             }
308              
309             sub set_hash_element {
310 24     24 1 11036 my $self = shift;
311              
312 24         334 my ($key, $value) = @_;
313            
314 24 50       182 if (! $self->{IPC_OFF}) {
315 24         389 $self->hash_lock();
316 24         37409 $self->{share_hash}{var}{$key} = $value;
317 24         1385 $self->hash_unlock();
318             }
319             }
320              
321             sub get_hash_element {
322 456     456 1 34924 my ($self, $key) = @_;
323              
324 456 100       4183 (! $self->{IPC_OFF})
325             ? return $self->{share_hash}{var}{$key}
326             : return undef
327             ;
328             }
329              
330             sub set_scalar {
331 0     0 1 0 my $self = shift;
332              
333 0         0 my $value = shift;
334              
335 0 0       0 if (! $self->{IPC_OFF}) {
336 0         0 $self->scalar_lock();
337 0         0 ${$self->{share_scalar}{var}} = $value;
  0         0  
338 0         0 $self->scalar_unlock();
339             }
340             }
341              
342             sub inc_scalar {
343 16     16 1 1077 my $self = shift;
344              
345 16 50       646 if (! $self->{IPC_OFF}) {
346 16         596 $self->scalar_lock();
347 16         64203 ${$self->{share_scalar}{var}} ++;
  16         1048  
348 16         5269 $self->scalar_unlock();
349             }
350             }
351              
352             sub dec_scalar {
353 8     8 1 1244 my $self = shift;
354            
355 8 50       86 if (! $self->{IPC_OFF}) {
356 8         352 $self->scalar_lock();
357 8         21382 ${$self->{share_scalar}{var}} --;
  8         1025  
358 8         7369 $self->scalar_unlock();
359             }
360             }
361              
362             sub get_scalar {
363 52     52 1 605 my $self = shift;
364              
365 51         345 (! $self->{IPC_OFF})
366 52 100       334 ? return ${$self->{share_scalar}{var}}
367             : return undef
368             ;
369             }
370              
371             sub set_parent_logname {
372 51     51 1 102 my ($self) = @_;
373              
374 51 100       348 $self->{log_file} .= '.log'
375             unless ($self->{log_file} =~ /\.log$/);
376 51         169 return $self->{log_file};
377             }
378              
379             sub set_child_logname {
380 8     8 1 83 my ($self) = @_;
381              
382 8         57 return $self->{current_child_logname};
383             }
384              
385             sub _set_do_child {
386 52     52   69 my ($self) = @_;
387              
388             {
389 52         78 local $^W = undef;
  52         155  
390 26     26   215 no strict;
  26         52  
  26         2082  
391 52         158 *{ ref($self) . '::' . 'do_child' } = $self->{do_child};
  52         387  
392             };
393             }
394              
395             sub _set_do_parent_final {
396 52     52   79 my ($self) = @_;
397            
398             {
399 52         103 local $^W = undef;
  52         147  
400 26     26   105 no strict;
  26         52  
  26         1922  
401 52         321 *{ ref($self) . '::' . 'do_parent_final' } = $self->{do_parent_final};
  52         305  
402             };
403             }
404              
405             sub _set_do_parent_init {
406 52     52   122 my ($self) = @_;
407            
408             {
409 52         95 local $^W = undef;
  52         430  
410 26     26   105 no strict;
  26         51  
  26         3805  
411 52         225 *{ ref($self) . '::' . 'do_parent_init' } = $self->{do_parent_init};
  52         484  
412             };
413             }
414              
415             sub _set_parent_signals {
416 52     52   173 my ($self) = @_;
417            
418 52         87 foreach my $signame (keys %{$self->{parent_sig}}) {
  52         509  
419 0         0 $SIG{$signame} = $self->{parent_sig}{$signame};
420             }
421             }
422            
423             sub _set_child_signals {
424 24     24   399 my ($self) = @_;
425            
426 24         122 foreach my $signame (keys %{$self->{child_sig}}) {
  24         1784  
427 0           $SIG{$signame} = $self->{child_sig}{$signame};
428             }
429             }
430              
431             1;
432              
433             __END__