File Coverage

blib/lib/Parallel/Fork/BossWorkerAsync.pm
Criterion Covered Total %
statement 299 332 90.0
branch 95 148 64.1
condition 34 50 68.0
subroutine 27 27 100.0
pod 5 18 27.7
total 460 575 80.0


line stmt bran cond sub pod time code
1             package Parallel::Fork::BossWorkerAsync;
2 28     28   382667 use strict;
  28         38  
  28         637  
3 28     28   79 use warnings;
  28         38  
  28         559  
4 28     28   99 use Carp;
  28         37  
  28         1712  
5 28     28   14969 use Data::Dumper qw( Dumper );
  28         181013  
  28         1646  
6 28     28   14157 use Socket qw( AF_UNIX SOCK_STREAM PF_UNSPEC );
  28         89365  
  28         4053  
7 28     28   140 use Fcntl qw( F_GETFL F_SETFL O_NONBLOCK );
  28         28  
  28         1121  
8 28     28   12893 use POSIX qw( EINTR EWOULDBLOCK );
  28         118073  
  28         155  
9 28     28   34519 use IO::Select ();
  28         30477  
  28         70710  
10              
11             our @ISA = qw();
12             our $VERSION = '0.09';
13              
14             # TO DO (wish list):
15             # Restart crashed child workers.
16              
17             # -----------------------------------------------------------------
18             sub new {
19 28     28 1 18034 my ($class, %attrs)=@_;
20             my $self = {
21             work_handler => $attrs{work_handler}, # required
22             init_handler => $attrs{init_handler} || undef, # optional
23             exit_handler => $attrs{exit_handler} || undef, # optional
24             result_handler => $attrs{result_handler} || undef, # optional
25             worker_count => $attrs{worker_count} || 3, # optional, how many child workers
26             global_timeout => $attrs{global_timeout} || 0, # optional, in seconds, 0 is unlimited
27             msg_delimiter => $attrs{msg_delimiter} || "\0\0\0", # optional, may not appear in data
28             read_size => $attrs{read_size} || 1024*1024, # optional, defaults to 1 MB
29 28   100     983 verbose => $attrs{verbose} || 0, # optional, *undocumented*, 0=silence, 1=debug
      100        
      50        
      100        
      100        
      100        
      50        
      50        
30             shutting_down => 0,
31             force_down => 0,
32             pending => 0,
33             result_stream => '',
34             result_queue => [],
35             job_queue => [],
36             };
37 28   33     165 bless($self, ref($class) || $class);
38              
39 28 50       145 croak("Parameter 'work_handler' is required") if ! defined($self->{work_handler});
40              
41             # Start the "boss" process, which will start the workers
42 28         92 $self->start_boss();
43              
44 6         263 return $self;
45             }
46              
47             # -----------------------------------------------------------------
48             sub serialize {
49 86     86 0 92 my ($self, $ref)=@_;
50 86         245 local $Data::Dumper::Deepcopy = 1;
51 86         246 local $Data::Dumper::Indent = 0;
52 86         157 local $Data::Dumper::Purity = 1;
53 86         426 return Dumper($ref) . $self->{msg_delimiter};
54             }
55              
56             # -----------------------------------------------------------------
57             sub deserialize {
58 86     86 0 114 my ($self, $data)=@_;
59 86         333 $data = substr($data, 0, - length($self->{msg_delimiter}));
60 86         80 my $VAR1;
61 86         11494 my $ref = eval($data);
62 86 50       297 if ($@) {
63 0         0 confess("failed to deserialize: $@");
64             }
65 86         160 return $ref;
66             }
67              
68             # -----------------------------------------------------------------
69             # Pass one or more hashrefs for the jobs.
70             # Main app sends jobs to Boss.
71             sub add_work {
72 43     43 1 238 my ($self, @jobs)=@_;
73 43         100 $self->blocking($self->{boss_socket}, 1);
74 43         78 while (@jobs) {
75 43         74 $self->log("add_work: adding job to queue\n");
76 43         324 my $job = shift(@jobs);
77 43         77 my $n = syswrite( $self->{boss_socket}, $self->serialize($job) );
78 43 50       4610 croak("add_work: app write to boss: syswrite: $!") if ! defined($n);
79 43         52 $self->{pending} ++;
80 43         112 $self->log("add_work: job added to queue, $self->{pending} pending\n");
81             }
82             }
83              
84             # -----------------------------------------------------------------
85             # Syntactic nicety
86             sub get_result_nb {
87 1     1 0 8 my ($self)=@_;
88 1         5 return $self->get_result(blocking => 0);
89             }
90              
91             # -----------------------------------------------------------------
92             # Main app gets a complete, single result from Boss.
93             # If defined, result_handler fires here.
94             # Return is result of work_handler, or result_handler (if defined),
95             # or {} (empty hash ref).
96             # Undef is returned if socket marked nonblocking and read would have
97             # blocked.
98             sub get_result {
99 44     44 1 127 my ($self, %args)=@_;
100 44 100       121 $args{blocking} = 1 if ! defined($args{blocking});
101 44 50       60 carp("get_result() when no results pending") if ! $self->pending();
102              
103 44         36 my $rq_count = scalar(@{ $self->{result_queue} });
  44         60  
104 44         120 $self->log("get_result: $self->{pending} jobs in process, $rq_count results ready\n");
105              
106 44 100       44 if ( ! @{ $self->{result_queue} }) {
  44         79  
107 31         66 $self->blocking($self->{boss_socket}, $args{blocking});
108 31         113 $self->read($self->{boss_socket}, $self->{result_queue}, \$self->{result_stream}, 'app');
109            
110             # Handle nonblocking case
111 31 50 66     79 if ( ! $args{blocking} && ! @{ $self->{result_queue} }) {
  1         3  
112 1         11 return undef;
113             }
114             }
115              
116 43         66 $self->log("get_result: got result\n");
117              
118 43         44 $self->{pending} --;
119 43 50 66     133 if ($self->{pending} == 0 && $self->{shutting_down}) {
120 0         0 $self->log("get_result: no jobs pending; closing boss\n");
121 0         0 close($self->{boss_socket});
122             }
123 43         35 my $ref = $self->deserialize( shift( @{ $self->{result_queue} } ) );
  43         105  
124 43 50       151 my $retval = $self->{result_handler} ? $self->{result_handler}->($ref) : $ref;
125 43 50       200 $retval = {} if ! defined($retval);
126 43         149 return $retval;
127             }
128              
129             # -----------------------------------------------------------------
130             # Main app calls to see if there are submitted jobs for which no
131             # response has been collected. It doesn't mean the responses are
132             # ready yet.
133             sub pending {
134 97     97 1 115 my ($self)=@_;
135 97         154 return $self->{pending};
136             }
137              
138             # -----------------------------------------------------------------
139             # App tells boss to shut down by half-close.
140             # Boss then finishes work in progress, and eventually tells
141             # workers to exit.
142             # Boss sends all results back to app before exiting itself.
143             # Note: Boss won't be able to close cleanly if app ignores
144             # final reads...
145             # args: force => 0,1 defaults to 0
146             sub shut_down {
147 6     6 1 13 my ($self, %args)=@_;
148 6   50     86 $args{force} ||= 0;
149 6         9 $self->{shutting_down} = 1;
150              
151 6         12 $self->log("shut_down: MARK\n");
152              
153 6 50       21 if ($args{force}) {
    50          
154             # kill boss pid
155 0         0 kill(9, $self->{boss_pid});
156             } elsif ($self->pending()) {
157 0         0 shutdown($self->{boss_socket}, 1);
158             } else {
159 6         3223 close($self->{boss_socket});
160             }
161              
162 6         3551600 while (wait() != -1) {}; # waits/reaps Boss process
163             }
164              
165             # -----------------------------------------------------------------
166             # Make socket blocking/nonblocking
167             sub blocking {
168 127     127 0 310 my ($self, $socket, $makeblocking)=@_;
169              
170             ### --- W I N D O W S --- ###
171 127 50       1186 if ($^O eq 'MSWin32') {
172             # ioctl() requires a pointer to a long, containing the nonblocking value.
173              
174             # The long var
175 0 0       0 my $nonblocking = pack('L', $makeblocking ? 0 : 1);
176              
177             # The pointer to it
178 0         0 my $plong = unpack('I', pack('P', $nonblocking));
179              
180             # The nonblocking request. FIONBIO is 0x8004667e
181 0 0       0 ioctl($socket, 0x8004667e, $plong)
182             or croak("ioctl failed: $!");
183            
184              
185             ### --- LINUX, BSD, etc --- ###
186             } else {
187 127 50       547 my $flags = fcntl($socket, F_GETFL, 0)
188             or croak("fcntl failed: $!");
189 127         428 my $blocking = ($flags & O_NONBLOCK) == 0;
190 127 100 100     1102 if ($blocking && ! $makeblocking) {
    100 66        
191 54         133 $flags |= O_NONBLOCK;
192             } elsif (! $blocking && $makeblocking) {
193 1         1 $flags &= ~O_NONBLOCK;
194             } else {
195             # do nothing
196 72         84 return;
197             }
198              
199 55 50       658 fcntl($socket, F_SETFL, $flags)
200             or croak("fcntl failed: $!");
201             }
202              
203 55         647 return;
204             }
205              
206             # -----------------------------------------------------------------
207             sub start_boss {
208 28     28 0 28 my ($self)=@_;
209 28         74 $self->log("start_boss: start\n");
210 28         33 eval {
211 28         33 my ($b1, $b2);
212 28 50       1144 socketpair($b1, $b2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
213             or die("socketpair: $!");
214              
215 28         19118 my $pid = fork();
216 28 50       958 defined $pid || confess("fork failed: $!");
217              
218 28 100       500 if ($pid) {
219             # Application (parent)
220 6         97 $self->{boss_pid} = $pid;
221              
222             # App won't write to, or read from itself.
223 6         105 close($b2);
224 6         45 $self->{boss_socket} = $b1;
225              
226 6         145 $self->log("start_boss: Application: Boss started\n");
227              
228             } else {
229             # Manager aka Boss (child)
230             # Boss won't write to, or read from itself.
231 22         696 close($b1);
232            
233 22         464 $self->{app_socket} = $b2;
234            
235             # Make nonblocking
236 22         582 $self->blocking( $self->{app_socket}, 0 );
237 22         2002 open(STDIN, '/dev/null');
238            
239 22         202 $self->start_workers();
240 6         62 $self->boss_loop();
241 6         1518801 while (wait() != -1) {}; # waits/reaps workers only
242              
243 6         54 $self->log("start_boss: Boss: exiting\n");
244 6         1149 exit;
245             }
246             };
247 6 50       68 if ($@) {
248 0         0 croak($@);
249             }
250             }
251              
252             # -----------------------------------------------------------------
253             sub start_workers {
254 22     22 0 48 my ($self)=@_;
255 22         436 $self->log("start_workers: starting $self->{worker_count} workers\n");
256 22         122 eval {
257 22         1086 for (1 .. $self->{worker_count}) {
258 47         39 my ($w1, $w2);
259 47 50       2197 socketpair($w1, $w2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
260             or die("socketpair: $!");
261            
262 47         21249 my $pid = fork();
263 47 50       841 defined $pid || confess("fork failed: $!");
264              
265 47 100       716 if ($pid) {
266             # Boss (parent)
267 31         594 close($w2);
268 31         1001 $self->{workers}->{ $w1 } = { pid => $pid, socket => $w1 };
269              
270             # Make nonblocking
271 31         613 $self->blocking( $w1, 0 );
272            
273             } else {
274             # Worker (child)
275 16         712 close($self->{app_socket});
276 16         465 delete($self->{workers});
277 16         103 close($w1);
278 16         187 $self->{socket} = $w2;
279 16         754 open(STDIN, '/dev/null');
280            
281 16         480 $self->worker_loop();
282 16         3253 exit;
283             }
284             }
285              
286 6         128 $self->log("start_workers: start workers complete\n");
287             };
288 6 50       140 if ($@) {
289 0         0 croak($@);
290             }
291             }
292              
293             # -----------------------------------------------------------------
294             # Boss process; have an open socket to the app, and one to each worker.
295             # Loop select(), checking for read and write on app socket, and read
296             # on working children, and write on idle children.
297             # Keep track of idle vs. working children.
298             # When receive a shutdown order from the app, keep looping until the
299             # job queue is empty, and all results have been retrieved (all
300             # children will now be idle.) Then close the worker sockets.
301             # They'll be reading, and will notice this and exit.
302             # Don't deserialize any data. Just look for the delimiters to know
303             # we're processing whole records.
304             #
305              
306             sub boss_loop {
307 6     6 0 75 my ($self)=@_;
308              
309 6         34 $self->log("boss_loop: start\n");
310 6         6 eval {
311             # handy
312 6         11 my $workers = $self->{workers};
313            
314             # All workers start out idle
315 6         60 for my $s (keys(%$workers)) {
316 16         37 $workers->{ $s }->{idle} = 1;
317             }
318            
319 6         16 while ( 1 ) {
320             # When to exit loop?
321             # shutting_down = 1
322             # job_queue empty
323             # all workers idle, and no partial jobs
324             # result_queue empty
325 65 50 66     176 if ($self->{shutting_down} &&
      66        
326 6         60 ! @{ $self->{job_queue} } &&
327 6         34 ! @{ $self->{result_queue} } ) {
328 6         11 my $busy=0;
329 6         9 my $partials = 0;
330 6         21 for my $s (keys(%$workers)) {
331 16 50       47 if ( ! $workers->{ $s }->{idle}) {
    50          
332 0         0 $busy ++;
333 0         0 last;
334             } elsif (exists($workers->{ $s }->{partial_job})) {
335 0         0 $partials ++;
336 0         0 last;
337             }
338             }
339 6 50 33     234 if ( ! $busy && ! $partials) {
340             # Close all workers
341 6         59 for my $s (keys(%$workers)) {
342 16         9892 close($workers->{ $s }->{socket});
343             }
344 6         808 close($self->{app_socket});
345 6         23 last;
346             }
347             }
348            
349             # Set up selectors:
350             # Always check app for read, unless shutting down. App write only if
351             # there's something in @result_queue.
352 59         65 my (@rpids, @wpids);
353 59         350 my $rs = IO::Select->new();
354 59 50       526 if ( ! $self->{shutting_down}) {
355 59         152 $rs->add($self->{app_socket});
356 59         2025 push(@rpids, "app");
357             }
358 59         201 my $ws = IO::Select->new();
359 59 100       284 if ( @{ $self->{result_queue} } ) {
  59         128  
360 20         46 $ws->add($self->{app_socket});
361 20         363 push(@wpids, "app");
362             }
363            
364             # Check workers for read only if not idle
365             # Otherwise, IF job_queue isn't empty,
366             # check nonidle workers for write.
367 59         132 for my $s (keys(%$workers)) {
368 163 100       228 if ( $workers->{ $s }->{idle}) {
369 103 100 66     71 if ( @{ $self->{job_queue} } || exists($workers->{ $s }->{partial_job})) {
  103         307  
370 48         87 $ws->add($workers->{ $s }->{socket});
371 48         836 push(@wpids, $workers->{ $s }->{pid});
372             }
373             } else {
374 60         113 $rs->add($workers->{ $s }->{socket});
375 60         1021 push(@rpids, $workers->{ $s }->{pid});
376             }
377             }
378            
379             # Blocking
380 59         197 my @rdy = IO::Select->select($rs, $ws, undef);
381 59 50       8024751 if ( ! @rdy) {
382 0 0       0 if ($! == EINTR) {
383             # signal interrupt, continue waiting
384 0         0 next;
385             }
386 0         0 croak("select failed: $!");
387             }
388 59         115 my ($r, $w) = @rdy[0,1];
389            
390             # Now we have zero or more reabable sockets, and
391             # zero or more writable sockets, but there's at
392             # least one socket among the two groups.
393             # Read first, as things read can be further handled
394             # by writables immediately afterwards.
395            
396 59         90 for my $rh (@$r) {
397 55         44 my ($source, $queue, $rstream);
398 55 100       126 if ($rh != $self->{app_socket}) {
399 43         82 $source = $workers->{$rh}->{pid};
400 43         51 $queue = $self->{result_queue};
401 43         90 $rstream = \$workers->{$rh}->{result_stream};
402             } else {
403 12         19 $source = 'app';
404 12         16 $queue = $self->{job_queue};
405 12         29 $rstream = \$self->{job_stream};
406             }
407              
408 55         109 $self->log("boss_loop: reading socket\n");
409 55         117 $self->read($rh, $queue, $rstream, 'boss');
410 55         78 $self->log("boss_loop: read socket complete\n");
411             }
412              
413 59         338 for my $wh (@$w) {
414 68 100       152 my $source = exists($workers->{ $wh }) ? $workers->{ $wh }->{pid} : "app";
415 68         101 $self->log("boss_loop: writing socket\n");
416 68         118 $self->write($wh);
417 68         88 $self->log("boss_loop: write socket complete\n");
418             }
419             }
420             };
421 6 50       32 if ($@) {
422 0         0 croak($@);
423             }
424             }
425              
426             # -----------------------------------------------------------------
427             sub write {
428 68     68 0 60 my ($self, $socket)=@_;
429 68 100       131 if ($socket == $self->{app_socket}) {
430 20         52 $self->write_app($socket);
431             } else {
432 48         90 $self->write_worker($socket);
433             }
434             }
435              
436             # -----------------------------------------------------------------
437             sub write_app {
438 20     20 0 26 my ($self, $socket)=@_;
439            
440             # App socket: write all bytes until would block, or complete.
441             # This means process result_queue in order, doing as many elems
442             # as possible. Don't remove from the queue until complete. In
443             # other words, the first item on the queue may be a partial from
444             # the previous write attempt.
445 20         30 my $queue = $self->{result_queue};
446 20         43 while (@$queue) {
447 43         47 $self->log("write_app: processing queue entry\n");
448 43         69 while ( $queue->[0] ) {
449 43         461 my $n = syswrite($socket, $queue->[0]);
450 43 50       118 if ( ! defined($n)) {
    50          
451             # Block or real socket error
452 0 0       0 if ($! == EWOULDBLOCK) {
453             # That's it for this socket, try another, or select again.
454 0         0 return;
455             } else {
456 0         0 croak("boss write to app: syswrite: $!");
457             }
458             }
459            
460             elsif ($n == 0) {
461             # Application error: socket has been closed prematurely by other party.
462             # Boss is supposed to close app socket before app. App tells Boss to
463             # stop, but it only happens after all existing work is completed, and
464             # data is sent back to app.
465 0         0 croak("boss write to app: peer closed prematurely");
466            
467             } else {
468             # wrote some bytes, remove them from the queue elem
469 43         125 substr($queue->[0], 0, $n) = '';
470             }
471             }
472             # queue elem is empty, remove it, go try next one
473 43         66 $self->log("write_app: process queue entry complete\n");
474 43         86 shift(@$queue);
475             }
476 20         31 $self->log("write_app: all queue entries have been written\n");
477             # queue is empty, all written!
478             }
479            
480             # -----------------------------------------------------------------
481             sub write_worker {
482 48     48 0 43 my ($self, $socket)=@_;
483            
484             # A worker: check to see if we have a remaining partial
485             # job we already started to send. If so, continue with this.
486             # Otherwise, take a *single* job off the job_queue, and send that.
487             # When we've gotten either complete, or would block, write remaining
488             # portion to per-worker job-in-progress, or make it '' if complete.
489             # With worker, we only send ONE job, never more.
490             # Once job send is complete, mark worker not-idle.
491            
492 48 50       105 if ( ! exists($self->{workers}->{ $socket }->{partial_job})) {
493 48         58 $self->log("write_worker: processing new job\n");
494 48 100       36 if (@{ $self->{job_queue} }) {
  48         109  
495 43         36 $self->{workers}->{ $socket }->{partial_job} = shift(@{ $self->{job_queue} });
  43         122  
496             } else {
497             # Nothing left on queue. Remember, we select on *all* idle workers,
498             # even if there's only one job on the queue.
499 5         11 return;
500             }
501             } else {
502 0         0 $self->log("write_worker: processing job remnant\n");
503             }
504 43         71 my $rjob = \$self->{workers}->{ $socket }->{partial_job};
505            
506 43         90 while ( length($$rjob) ) {
507 43         65 $self->log("write_worker: writing...\n");
508 43         5700 my $n = syswrite($socket, $$rjob);
509 43 50       109 if ( ! defined($n)) {
    50          
510             # Block or real socket error
511 0 0       0 if ($! == EWOULDBLOCK) {
512             # That's it for this socket, try another, or select again.
513 0         0 return;
514             } else {
515 0         0 croak("boss write to worker: syswrite: $!");
516             }
517             }
518            
519             elsif ($n == 0) {
520             # Application error: socket has been closed prematurely by other party.
521             # Boss is supposed to close worker socket before worker - that's how
522             # worker knows to exit.
523 0         0 croak("boss write to worker: peer closed prematurely (pid " . $self->{workers}->{ $socket }->{pid} . ")");
524            
525             } else {
526             # wrote some bytes, remove them from the job
527 43         72 substr($$rjob, 0, $n) = '';
528 43         127 $self->log("write_worker: wrote $n bytes\n");
529             }
530             }
531             # job all written!
532 43         54 $self->log("write_worker: job complete\n");
533 43         81 delete($self->{workers}->{ $socket }->{partial_job});
534 43         160 $self->{workers}->{ $socket }->{idle} = 0;
535             }
536              
537             # -----------------------------------------------------------------
538             # Boss exits loop on error, wouldblock, or shutdown msg (socket close).
539             # Worker exits loop on error, recd full record, or boss socket close.
540             # App exits loop on error, recd full record, wouldblock (nb only), early boss close (error).
541             # Stream (as external ref) isn't needed for worker, as it's blocking, and only reads a single
542             # record, no more.
543             # So $rstream can be undef, and if so, we init locally.
544             sub read {
545 145     145 0 244 my ($self, $socket, $queue, $rstream, $iam)=@_;
546 145         146 my $stream;
547 145 100       294 $rstream = \$stream if ! defined($rstream);
548 145 100       328 $$rstream = '' if ! defined($$rstream);
549              
550             # croak messaging details...
551 145         136 my $source;
552 145 100       327 if ($iam eq 'boss') {
553 55 100       92 if ($socket == $self->{app_socket}) {
554 12         34 $source = 'app';
555             } else {
556 43         112 $source = "worker [$self->{workers}->{$socket}->{pid}]";
557             }
558             } else { # app or worker, same source
559 90         240 $source = "boss";
560             }
561              
562 145         226 while ( 1 ) {
563 195         587 $self->log("read: $iam is reading...\n");
564              
565 195         14304528 my $n = sysread($socket, $$rstream, $self->{read_size}, length($$rstream));
566 195 100       893 if ( ! defined($n)) {
    100          
567 50 50       612 if ($! == EINTR) {
    50          
568             # signal interrupt, continue reading
569 0         0 next;
570             } elsif ($! == EWOULDBLOCK) {
571 50         111 last; # No bytes recd, no need to chunk.
572             } else {
573 0         0 croak("$iam read from $source: sysread: $!");
574             }
575            
576             } elsif ($n == 0) {
577             # Application error: socket has been closed prematurely by other party.
578             # Boss is supposed to close worker socket before worker - that's how
579             # worker knows to exit.
580             # Boss is supposed to close app socket before app. App tells Boss to
581             # stop, but it only happens after all existing work is completed, and
582             # data is sent back to app.
583 22 100       1047 if ($iam eq 'boss') {
    50          
584 6 50       31 if ($socket == $self->{app_socket}) {
    0          
585 6         16 $self->{shutting_down} = 1;
586             } elsif (exists($self->{workers}->{$socket})) {
587 0         0 croak("$iam read from $source: peer closed prematurely (pid " . $self->{workers}->{ $socket }->{pid} . ")");
588             }
589             } elsif ($iam eq 'worker') {
590 16         1210 close($socket);
591             } else { # i am app
592 0         0 croak("$iam read from $source: peer closed prematurely (pid " . $self->{boss_pid} . ")");
593             }
594              
595             # if we didn't croak...
596 22         116 last;
597            
598             } else {
599             # We actually read some bytes. See if we can chunk
600             # out any record(s).
601 123         1450 $self->log("read: $iam read $n bytes\n");
602            
603             # Split on delimiter
604 123         94488 my @records = split(/(?<=$self->{msg_delimiter})/, $$rstream);
605              
606             # All but last elem are full records
607 123         212 my $rcount=$#records;
608 123         700 push(@$queue, @records[0..$#records-1]);
609              
610             # Deal with last elem, which may or may not be full record
611 123 50       1497 if ($records[ $#records ] =~ /$self->{msg_delimiter}$/) {
612             # We have a full record
613 123         143 $rcount++;
614 123         468 $self->log("read: $iam pushing full record onto queue\n");
615 123         203 push(@$queue, $records[ $#records ]);
616 123         141 $$rstream = '';
617 123 100       370 if (exists($self->{workers}->{ $socket })) {
618 43         76 $self->{workers}->{ $socket }->{idle} = 1;
619             }
620             } else {
621 0         0 $$rstream = $records[$#records];
622             }
623              
624             # Boss grabs all it can get, only exiting loop on wouldblock.
625             # App (even nb method), and workers all exit when one full
626             # record is received.
627 123 100 66     1030 last if $rcount && $iam ne 'boss';
628             }
629             }
630             }
631              
632             # -----------------------------------------------------------------
633             # Worker process; single blocking socket open to boss.
634             # Blocking select loop:
635             # Only do read OR write, not both. We never want more than a single
636             # job at a time. So, if no job currently, read, waiting for one.
637             # Get a job, perform it, and try to write results.
638             # Send delimiter, which tells boss it has all the results, and we're ready
639             # for another job.
640             #
641             sub worker_loop {
642 16     16 0 199 my ($self)=@_;
643 16         170 eval {
644 16 100       538 if ($self->{init_handler}) {
645 3         34 $self->log("worker_loop: calling init_handler()\n");
646 3         54 $self->{init_handler}->();
647             }
648              
649             # String buffers to store serialized data: in and out.
650 16         56 my $result_stream;
651 16         82 while ( 1 ) {
652 102 100       6337 if (defined($result_stream)) {
653             # We have a result: write it to boss
654 43         114 $self->log("worker_loop: writing result...\n");
655            
656 43         470 my $n = syswrite( $self->{socket}, $result_stream);
657 43 50       102 croak("worker [$$] write to boss: syswrite: $!") if ! defined($n);
658 43 50       159 $self->log("worker_loop: wrote $n bytes\n") if defined($n);
659 43         55 $result_stream = undef;
660             # will return to top of loop
661            
662             } else {
663             # Get job from boss
664            
665 59         101 my @queue;
666 59         196 $self->log("worker_loop: reading job from queue...\n");
667 59         258 $self->read($self->{socket}, \@queue, undef, 'worker');
668 59 100       186 return if ! @queue;
669 43         149 $self->log("worker_loop: read job complete, we have a job\n");
670              
671 43         205 my $job = $self->deserialize($queue[0]);
672 43         47 my $result;
673 43         48 eval {
674             local $SIG{ALRM} = sub {
675 1     1   3000301 die("BossWorkerAsync: timed out");
676 43         818 };
677              
678             # Set alarm
679 43         513 alarm($self->{global_timeout});
680              
681             # Invoke handler and get result
682 43         103 $self->log("worker_loop: calling work_handler for this job\n");
683 43         132 $result = $self->{work_handler}->($job);
684              
685             # Disable alarm
686 42         5001132 alarm(0);
687             };
688              
689 43 100       99 if ($@) {
690 1         11 $result = {ERROR => $@};
691 1         8 $self->log("worker_loop: ERROR: $@\n");
692             }
693            
694 43         113 $result_stream = $self->serialize($result);
695             }
696             }
697             };
698 16   50     252 my $errm = $@ || '';
699 16         30 eval {
700 16 100       79 if ($self->{exit_handler}) {
701 3         9 $self->log("worker_loop: calling exit_handler()\n");
702 3         8 $self->{exit_handler}->();
703             }
704             };
705 16 50       59 $errm .= "\n$@" if $@;
706 16 50       59 if ($errm) {
707 0         0 croak($errm);
708             }
709             }
710              
711              
712             # -----------------------------------------------------------------
713             # IN: log message
714             # If verbose is enabled, print the message.
715             sub log {
716 1461     1461 0 1953 my ($self, $msg) = @_;
717 1461 50       3663 print STDERR $msg if $self->{verbose};
718             }
719              
720              
721             1;
722             __END__