File Coverage

blib/lib/Parallel/Fork/BossWorkerAsync.pm
Criterion Covered Total %
statement 290 322 90.0
branch 90 138 65.2
condition 30 46 65.2
subroutine 27 27 100.0
pod 5 18 27.7
total 442 551 80.2


line stmt bran cond sub pod time code
1             package Parallel::Fork::BossWorkerAsync;
2 23     23   737130 use strict;
  23         64  
  23         1558  
3 23     23   115 use warnings;
  23         41  
  23         638  
4 23     23   138 use Carp;
  23         66  
  23         2515  
5 23     23   1450398 use Data::Dumper qw( Dumper );
  23         323608  
  23         52230  
6 23     23   54920 use Socket qw( AF_UNIX SOCK_STREAM PF_UNSPEC );
  23         146288  
  23         7483  
7 23     23   275 use Fcntl qw( F_GETFL F_SETFL O_NONBLOCK );
  23         46  
  23         1279  
8 23     23   23905 use POSIX qw( EINTR EWOULDBLOCK );
  23         241207  
  23         164  
9 23     23   266342 use IO::Select ();
  23         46352  
  23         177442  
10              
11             our @ISA = qw();
12             our $VERSION = '0.08';
13              
14             # TO DO (wish list):
15             # Restart crashed child workers.
16              
17             # -----------------------------------------------------------------
18             sub new {
19 23     23 1 88064 my ($class, %attrs)=@_;
20 23   50     1238 my $self = {
      50        
      100        
      100        
      100        
      50        
      50        
21             work_handler => $attrs{work_handler}, # required
22             init_handler => $attrs{init_handler} || undef, # optional
23             result_handler => $attrs{result_handler} || undef, # optional
24             worker_count => $attrs{worker_count} || 3, # optional, how many child workers
25             global_timeout => $attrs{global_timeout} || 0, # optional, in seconds, 0 is unlimited
26             msg_delimiter => $attrs{msg_delimiter} || "\0\0\0", # optional, may not appear in data
27             read_size => $attrs{read_size} || 1024*1024, # optional, defaults to 1 MB
28             verbose => $attrs{verbose} || 0, # optional, *undocumented*, 0=silence, 1=debug
29             shutting_down => 0,
30             force_down => 0,
31             pending => 0,
32             result_stream => '',
33             result_queue => [],
34             job_queue => [],
35             };
36 23   33     199 bless($self, ref($class) || $class);
37              
38 23 50       208 croak("Parameter 'work_handler' is required") if ! defined($self->{work_handler});
39              
40             # Start the "boss" process, which will start the workers
41 23         130 $self->start_boss();
42              
43 5         487 return $self;
44             }
45              
46             # -----------------------------------------------------------------
47             sub serialize {
48 66     66 0 371 my ($self, $ref)=@_;
49 66         504 local $Data::Dumper::Deepcopy = 1;
50 66         339 local $Data::Dumper::Indent = 0;
51 66         253 local $Data::Dumper::Purity = 1;
52 66         618 return Dumper($ref) . $self->{msg_delimiter};
53             }
54              
55             # -----------------------------------------------------------------
56             sub deserialize {
57 66     66 0 146 my ($self, $data)=@_;
58 66         746 $data = substr($data, 0, - length($self->{msg_delimiter}));
59 66         104 my $VAR1;
60 66         27443 my $ref = eval($data);
61 66 50       392 if ($@) {
62 0         0 confess("failed to deserialize: $@");
63             }
64 66         265 return $ref;
65             }
66              
67             # -----------------------------------------------------------------
68             # Pass one or more hashrefs for the jobs.
69             # Main app sends jobs to Boss.
70             sub add_work {
71 33     33 1 451 my ($self, @jobs)=@_;
72 33         115 $self->blocking($self->{boss_socket}, 1);
73 33         117 while (@jobs) {
74 33         136 $self->log("add_work: adding job to queue\n");
75 33         46 my $job = shift(@jobs);
76 33         535 my $n = syswrite( $self->{boss_socket}, $self->serialize($job) );
77 33 50       16802 croak("add_work: app write to boss: syswrite: $!") if ! defined($n);
78 33         60 $self->{pending} ++;
79 33         143 $self->log("add_work: job added to queue, $self->{pending} pending\n");
80             }
81             }
82              
83             # -----------------------------------------------------------------
84             # Syntactic nicety
85             sub get_result_nb {
86 1     1 0 8 my ($self)=@_;
87 1         18 return $self->get_result(blocking => 0);
88             }
89              
90             # -----------------------------------------------------------------
91             # Main app gets a complete, single result from Boss.
92             # If defined, result_handler fires here.
93             # Return is result of work_handler, or result_handler (if defined),
94             # or {} (empty hash ref).
95             # Undef is returned if socket marked nonblocking and read would have
96             # blocked.
97             sub get_result {
98 34     34 1 292 my ($self, %args)=@_;
99 34 100       680 $args{blocking} = 1 if ! defined($args{blocking});
100 34 50       132 carp("get_result() when no results pending") if ! $self->pending();
101              
102 34         56 my $rq_count = scalar(@{ $self->{result_queue} });
  34         267  
103 34         360 $self->log("get_result: $self->{pending} jobs in process, $rq_count results ready\n");
104              
105 34 100       54 if ( ! @{ $self->{result_queue} }) {
  34         167  
106 20         82 $self->blocking($self->{boss_socket}, $args{blocking});
107 20         230 $self->read($self->{boss_socket}, $self->{result_queue}, \$self->{result_stream}, 'app');
108            
109             # Handle nonblocking case
110 20 50 66     249 if ( ! $args{blocking} && ! @{ $self->{result_queue} }) {
  1         4  
111 1         26 return undef;
112             }
113             }
114              
115 33         99 $self->log("get_result: got result\n");
116              
117 33         65 $self->{pending} --;
118 33 50 66     306 if ($self->{pending} == 0 && $self->{shutting_down}) {
119 0         0 $self->log("get_result: no jobs pending; closing boss\n");
120 0         0 close($self->{boss_socket});
121             }
122 33         45 my $ref = $self->deserialize( shift( @{ $self->{result_queue} } ) );
  33         233  
123 33 50       5228 my $retval = $self->{result_handler} ? $self->{result_handler}->($ref) : $ref;
124 33 50       311 $retval = {} if ! defined($retval);
125 33         600 return $retval;
126             }
127              
128             # -----------------------------------------------------------------
129             # Main app calls to see if there are submitted jobs for which no
130             # response has been collected. It doesn't mean the responses are
131             # ready yet.
132             sub pending {
133 75     75 1 333 my ($self)=@_;
134 75         500 return $self->{pending};
135             }
136              
137             # -----------------------------------------------------------------
138             # App tells boss to shut down by half-close.
139             # Boss then finishes work in progress, and eventually tells
140             # workers to exit.
141             # Boss sends all results back to app before exiting itself.
142             # Note: Boss won't be able to close cleanly if app ignores
143             # final reads...
144             # args: force => 0,1 defaults to 0
145             sub shut_down {
146 5     5 1 15 my ($self, %args)=@_;
147 5   50     122 $args{force} ||= 0;
148 5         9 $self->{shutting_down} = 1;
149              
150 5         24 $self->log("shut_down: MARK\n");
151              
152 5 50       30 if ($args{force}) {
    50          
153             # kill boss pid
154 0         0 kill(9, $self->{boss_pid});
155             } elsif ($self->pending()) {
156 0         0 shutdown($self->{boss_socket}, 1);
157             } else {
158 5         7773 close($self->{boss_socket});
159             }
160              
161 5         8753740 while (wait() != -1) {}; # waits/reaps Boss process
162             }
163              
164             # -----------------------------------------------------------------
165             # Make socket blocking/nonblocking
166             sub blocking {
167 96     96 0 459 my ($self, $socket, $makeblocking)=@_;
168 96 50       1615 my $flags = fcntl($socket, F_GETFL, 0)
169             or croak("fcntl failed: $!");
170 96         1135 my $blocking = ($flags & O_NONBLOCK) == 0;
171 96 100 100     3733 if ($blocking && ! $makeblocking) {
    100 66        
172 44         335 $flags |= O_NONBLOCK;
173             } elsif (! $blocking && $makeblocking) {
174 1         3 $flags &= ~O_NONBLOCK;
175             } else {
176             # do nothing
177 51         221 return $blocking;
178             }
179            
180 45 50       1038 fcntl($socket, F_SETFL, $flags)
181             or croak("fcntl failed: $!");
182 45         1632 return $blocking;
183             }
184              
185             # -----------------------------------------------------------------
186             sub start_boss {
187 23     23 0 51 my ($self)=@_;
188 23         335 $self->log("start_boss: start\n");
189 23         51 eval {
190 23         58 my ($b1, $b2);
191 23 50       1757 socketpair($b1, $b2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
192             or die("socketpair: $!");
193              
194 23         49900 my $pid = fork();
195 23 50       2542 defined $pid || confess("fork failed: $!");
196              
197 23 100       978 if ($pid) {
198             # Application (parent)
199 5         126 $self->{boss_pid} = $pid;
200              
201             # App won't write to, or read from itself.
202 5         184 close($b2);
203 5         136 $self->{boss_socket} = $b1;
204              
205 5         292 $self->log("start_boss: Application: Boss started\n");
206              
207             } else {
208             # Manager aka Boss (child)
209             # Boss won't write to, or read from itself.
210 18         1300 close($b1);
211            
212 18         630 $self->{app_socket} = $b2;
213            
214             # Make nonblocking
215 18         2322 $self->blocking( $self->{app_socket}, 0 );
216 18         5290 open(STDIN, '/dev/null');
217            
218 18         334 $self->start_workers();
219 5         78 $self->boss_loop();
220 5         5296053 while (wait() != -1) {}; # waits/reaps workers only
221              
222 5         66 $self->log("start_boss: Boss: exiting\n");
223 5         4289 exit;
224             }
225             };
226 5 50       151 if ($@) {
227 0         0 croak($@);
228             }
229             }
230              
231             # -----------------------------------------------------------------
232             sub start_workers {
233 18     18 0 130 my ($self)=@_;
234 18         754 $self->log("start_workers: starting $self->{worker_count} workers\n");
235 18         230 eval {
236 18         592 for (1 .. $self->{worker_count}) {
237 38         1455 my ($w1, $w2);
238 38 50       4100 socketpair($w1, $w2, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
239             or die("socketpair: $!");
240            
241 38         69979 my $pid = fork();
242 38 50       2281 defined $pid || confess("fork failed: $!");
243              
244 38 100       1373 if ($pid) {
245             # Boss (parent)
246 25         1752 close($w2);
247 25         2155 $self->{workers}->{ $w1 } = { pid => $pid, socket => $w1 };
248              
249             # Make nonblocking
250 25         1158 $self->blocking( $w1, 0 );
251            
252             } else {
253             # Worker (child)
254 13         1570 close($self->{app_socket});
255 13         901 delete($self->{workers});
256 13         232 close($w1);
257 13         242 $self->{socket} = $w2;
258 13         2194 open(STDIN, '/dev/null');
259            
260 13         672 $self->worker_loop();
261 13         8950 exit;
262             }
263             }
264              
265 5         391 $self->log("start_workers: start workers complete\n");
266             };
267 5 50       114 if ($@) {
268 0         0 croak($@);
269             }
270             }
271              
272             # -----------------------------------------------------------------
273             # Boss process; have an open socket to the app, and one to each worker.
274             # Loop select(), checking for read and write on app socket, and read
275             # on working children, and write on idle children.
276             # Keep track of idle vs. working children.
277             # When receive a shutdown order from the app, keep looping until the
278             # job queue is empty, and all results have been retrieved (all
279             # children will now be idle.) Then close the worker sockets.
280             # They'll be reading, and will notice this and exit.
281             # Don't deserialize any data. Just look for the delimiters to know
282             # we're processing whole records.
283             #
284              
285             sub boss_loop {
286 5     5 0 43 my ($self)=@_;
287              
288 5         172 $self->log("boss_loop: start\n");
289 5         11 eval {
290             # handy
291 5         35 my $workers = $self->{workers};
292            
293             # All workers start out idle
294 5         159 for my $s (keys(%$workers)) {
295 13         130 $workers->{ $s }->{idle} = 1;
296             }
297            
298 5         47 while ( 1 ) {
299             # When to exit loop?
300             # shutting_down = 1
301             # job_queue empty
302             # all workers idle, and no partial jobs
303             # result_queue empty
304 64 50 66     817 if ($self->{shutting_down} &&
  5   66     216  
305 5         49 ! @{ $self->{job_queue} } &&
306             ! @{ $self->{result_queue} } ) {
307 5         16 my $busy=0;
308 5         10 my $partials = 0;
309 5         27 for my $s (keys(%$workers)) {
310 13 50       87 if ( ! $workers->{ $s }->{idle}) {
    50          
311 0         0 $busy ++;
312 0         0 last;
313             } elsif (exists($workers->{ $s }->{partial_job})) {
314 0         0 $partials ++;
315 0         0 last;
316             }
317             }
318 5 50 33     106 if ( ! $busy && ! $partials) {
319             # Close all workers
320 5         207 for my $s (keys(%$workers)) {
321 13         13578 close($workers->{ $s }->{socket});
322             }
323 5         126 close($self->{app_socket});
324 5         31 last;
325             }
326             }
327            
328             # Set up selectors:
329             # Always check app for read, unless shutting down. App write only if
330             # there's something in @result_queue.
331 59         199 my (@rpids, @wpids);
332 59         705 my $rs = IO::Select->new();
333 59 50       1168 if ( ! $self->{shutting_down}) {
334 59         257 $rs->add($self->{app_socket});
335 59         3312 push(@rpids, "app");
336             }
337 59         343 my $ws = IO::Select->new();
338 59 100       524 if ( @{ $self->{result_queue} } ) {
  59         301  
339 21         116 $ws->add($self->{app_socket});
340 21         649 push(@wpids, "app");
341             }
342            
343             # Check workers for read only if not idle
344             # Otherwise, IF job_queue isn't empty,
345             # check nonidle workers for write.
346 59         209 for my $s (keys(%$workers)) {
347 163 100       631 if ( $workers->{ $s }->{idle}) {
348 83 100 66     109 if ( @{ $self->{job_queue} } || exists($workers->{ $s }->{partial_job})) {
  83         598  
349 37         126 $ws->add($workers->{ $s }->{socket});
350 37         1054 push(@wpids, $workers->{ $s }->{pid});
351             }
352             } else {
353 80         294 $rs->add($workers->{ $s }->{socket});
354 80         2622 push(@rpids, $workers->{ $s }->{pid});
355             }
356             }
357            
358             # Blocking
359 59         342 my @rdy = IO::Select->select($rs, $ws, undef);
360 59 50       8088872 if ( ! @rdy) {
361 0 0       0 if ($! == EINTR) {
362             # signal interrupt, continue waiting
363 0         0 next;
364             }
365 0         0 croak("select failed: $!");
366             }
367 59         175 my ($r, $w) = @rdy[0,1];
368            
369             # Now we have zero or more reabable sockets, and
370             # zero or more writable sockets, but there's at
371             # least one socket among the two groups.
372             # Read first, as things read can be further handled
373             # by writables immediately afterwards.
374            
375 59         144 for my $rh (@$r) {
376 43         71 my ($source, $queue, $rstream);
377 43 100       193 if ($rh != $self->{app_socket}) {
378 33         139 $source = $workers->{$rh}->{pid};
379 33         96 $queue = $self->{result_queue};
380 33         167 $rstream = \$workers->{$rh}->{result_stream};
381             } else {
382 10         36 $source = 'app';
383 10         62 $queue = $self->{job_queue};
384 10         73 $rstream = \$self->{job_stream};
385             }
386              
387 43         192 $self->log("boss_loop: reading socket\n");
388 43         213 $self->read($rh, $queue, $rstream, 'boss');
389 43         17259 $self->log("boss_loop: read socket complete\n");
390             }
391              
392 59         545 for my $wh (@$w) {
393 58 100       223 my $source = exists($workers->{ $wh }) ? $workers->{ $wh }->{pid} : "app";
394 58         204 $self->log("boss_loop: writing socket\n");
395 58         226 $self->write($wh);
396 58         141 $self->log("boss_loop: write socket complete\n");
397             }
398             }
399             };
400 5 50       54 if ($@) {
401 0         0 croak($@);
402             }
403             }
404              
405             # -----------------------------------------------------------------
406             sub write {
407 58     58 0 86 my ($self, $socket)=@_;
408 58 100       4880 if ($socket == $self->{app_socket}) {
409 21         78 $self->write_app($socket);
410             } else {
411 37         104 $self->write_worker($socket);
412             }
413             }
414              
415             # -----------------------------------------------------------------
416             sub write_app {
417 21     21 0 43 my ($self, $socket)=@_;
418            
419             # App socket: write all bytes until would block, or complete.
420             # This means process result_queue in order, doing as many elems
421             # as possible. Don't remove from the queue until complete. In
422             # other words, the first item on the queue may be a partial from
423             # the previous write attempt.
424 21         43 my $queue = $self->{result_queue};
425 21         61 while (@$queue) {
426 33         92 $self->log("write_app: processing queue entry\n");
427 33         97 while ( $queue->[0] ) {
428 33         15250 my $n = syswrite($socket, $queue->[0]);
429 33 50       142 if ( ! defined($n)) {
    50          
430             # Block or real socket error
431 0 0       0 if ($! == EWOULDBLOCK) {
432             # That's it for this socket, try another, or select again.
433 0         0 return;
434             } else {
435 0         0 croak("boss write to app: syswrite: $!");
436             }
437             }
438            
439             elsif ($n == 0) {
440             # Application error: socket has been closed prematurely by other party.
441             # Boss is supposed to close app socket before app. App tells Boss to
442             # stop, but it only happens after all existing work is completed, and
443             # data is sent back to app.
444 0         0 croak("boss write to app: peer closed prematurely");
445            
446             } else {
447             # wrote some bytes, remove them from the queue elem
448 33         286 substr($queue->[0], 0, $n) = '';
449             }
450             }
451             # queue elem is empty, remove it, go try next one
452 33         338 $self->log("write_app: process queue entry complete\n");
453 33         274 shift(@$queue);
454             }
455 21         65 $self->log("write_app: all queue entries have been written\n");
456             # queue is empty, all written!
457             }
458            
459             # -----------------------------------------------------------------
460             sub write_worker {
461 37     37 0 58 my ($self, $socket)=@_;
462            
463             # A worker: check to see if we have a remaining partial
464             # job we already started to send. If so, continue with this.
465             # Otherwise, take a *single* job off the job_queue, and send that.
466             # When we've gotten either complete, or would block, write remaining
467             # portion to per-worker job-in-progress, or make it '' if complete.
468             # With worker, we only send ONE job, never more.
469             # Once job send is complete, mark worker not-idle.
470            
471 37 50       149 if ( ! exists($self->{workers}->{ $socket }->{partial_job})) {
472 37         124 $self->log("write_worker: processing new job\n");
473 37 100       49 if (@{ $self->{job_queue} }) {
  37         102  
474 33         39 $self->{workers}->{ $socket }->{partial_job} = shift(@{ $self->{job_queue} });
  33         176  
475             } else {
476             # Nothing left on queue. Remember, we select on *all* idle workers,
477             # even if there's only one job on the queue.
478 4         14 return;
479             }
480             } else {
481 0         0 $self->log("write_worker: processing job remnant\n");
482             }
483 33         108 my $rjob = \$self->{workers}->{ $socket }->{partial_job};
484            
485 33         104 while ( length($$rjob) ) {
486 33         71 $self->log("write_worker: writing...\n");
487 33         2212 my $n = syswrite($socket, $$rjob);
488 33 50       142 if ( ! defined($n)) {
    50          
489             # Block or real socket error
490 0 0       0 if ($! == EWOULDBLOCK) {
491             # That's it for this socket, try another, or select again.
492 0         0 return;
493             } else {
494 0         0 croak("boss write to worker: syswrite: $!");
495             }
496             }
497            
498             elsif ($n == 0) {
499             # Application error: socket has been closed prematurely by other party.
500             # Boss is supposed to close worker socket before worker - that's how
501             # worker knows to exit.
502 0         0 croak("boss write to worker: peer closed prematurely (pid " . $self->{workers}->{ $socket }->{pid} . ")");
503            
504             } else {
505             # wrote some bytes, remove them from the job
506 33         83 substr($$rjob, 0, $n) = '';
507 33         139 $self->log("write_worker: wrote $n bytes\n");
508             }
509             }
510             # job all written!
511 33         65 $self->log("write_worker: job complete\n");
512 33         108 delete($self->{workers}->{ $socket }->{partial_job});
513 33         195 $self->{workers}->{ $socket }->{idle} = 0;
514             }
515              
516             # -----------------------------------------------------------------
517             # Boss exits loop on error, wouldblock, or shutdown msg (socket close).
518             # Worker exits loop on error, recd full record, or boss socket close.
519             # App exits loop on error, recd full record, wouldblock (nb only), early boss close (error).
520             # Stream (as external ref) isn't needed for worker, as it's blocking, and only reads a single
521             # record, no more.
522             # So $rstream can be undef, and if so, we init locally.
523             sub read {
524 109     109 0 378 my ($self, $socket, $queue, $rstream, $iam)=@_;
525 109         209 my $stream;
526 109 100       1537 $rstream = \$stream if ! defined($rstream);
527 109 100       602 $$rstream = '' if ! defined($$rstream);
528              
529             # croak messaging details...
530 109         185 my $source;
531 109 100       452 if ($iam eq 'boss') {
532 43 100       343 if ($socket == $self->{app_socket}) {
533 10         51 $source = 'app';
534             } else {
535 33         235 $source = "worker [$self->{workers}->{$socket}->{pid}]";
536             }
537             } else { # app or worker, same source
538 66         297 $source = "boss";
539             }
540              
541 109         207 while ( 1 ) {
542 148         7484 $self->log("read: $iam is reading...\n");
543              
544 148         14767315 my $n = sysread($socket, $$rstream, $self->{read_size}, length($$rstream));
545 148 100       9742 if ( ! defined($n)) {
    100          
546 39 50       945 if ($! == EINTR) {
    50          
547             # signal interrupt, continue reading
548 0         0 next;
549             } elsif ($! == EWOULDBLOCK) {
550 39         167 last; # No bytes recd, no need to chunk.
551             } else {
552 0         0 croak("$iam read from $source: sysread: $!");
553             }
554            
555             } elsif ($n == 0) {
556             # Application error: socket has been closed prematurely by other party.
557             # Boss is supposed to close worker socket before worker - that's how
558             # worker knows to exit.
559             # Boss is supposed to close app socket before app. App tells Boss to
560             # stop, but it only happens after all existing work is completed, and
561             # data is sent back to app.
562 18 100       216 if ($iam eq 'boss') {
    50          
563 5 50       30 if ($socket == $self->{app_socket}) {
    0          
564 5         15 $self->{shutting_down} = 1;
565             } elsif (exists($self->{workers}->{$socket})) {
566 0         0 croak("$iam read from $source: peer closed prematurely (pid " . $self->{workers}->{ $socket }->{pid} . ")");
567             }
568             } elsif ($iam eq 'worker') {
569 13         497 close($socket);
570             } else { # i am app
571 0         0 croak("$iam read from $source: peer closed prematurely (pid " . $self->{boss_pid} . ")");
572             }
573              
574             # if we didn't croak...
575 18         231 last;
576            
577             } else {
578             # We actually read some bytes. See if we can chunk
579             # out any record(s).
580 91         1921 $self->log("read: $iam read $n bytes\n");
581            
582             # Split on delimiter
583 91         249835 my @records = split(/(?<=$self->{msg_delimiter})/, $$rstream);
584              
585             # All but last elem are full records
586 91         2150 my $rcount=$#records;
587 91         1596 push(@$queue, @records[0..$#records-1]);
588              
589             # Deal with last elem, which may or may not be full record
590 91 50       7031 if ($records[ $#records ] =~ /$self->{msg_delimiter}$/) {
591             # We have a full record
592 91         1426 $rcount++;
593 91         990 $self->log("read: $iam pushing full record onto queue\n");
594 91         342 push(@$queue, $records[ $#records ]);
595 91         207 $$rstream = '';
596 91 100       814 if (exists($self->{workers}->{ $socket })) {
597 33         141 $self->{workers}->{ $socket }->{idle} = 1;
598             }
599             } else {
600 0         0 $$rstream = $records[$#records];
601             }
602              
603             # Boss grabs all it can get, only exiting loop on wouldblock.
604             # App (even nb method), and workers all exit when one full
605             # record is received.
606 91 100 66     1890 last if $rcount && $iam ne 'boss';
607             }
608             }
609             }
610              
611             # -----------------------------------------------------------------
612             # Worker process; single blocking socket open to boss.
613             # Blocking select loop:
614             # Only do read OR write, not both. We never want more than a single
615             # job at a time. So, if no job currently, read, waiting for one.
616             # Get a job, perform it, and try to write results.
617             # Send delimiter, which tells boss it has all the results, and we're ready
618             # for another job.
619             #
620             sub worker_loop {
621 13     13 0 110 my ($self)=@_;
622 13         391 eval {
623 13 50       249 if ($self->{init_handler}) {
624 0         0 $self->log("worker_loop: calling init_handler()\n");
625 0         0 $self->{init_handler}->();
626             }
627              
628             # String buffers to store serialized data: in and out.
629 13         93 my $result_stream;
630 13         53 while ( 1 ) {
631 79 100       21062 if (defined($result_stream)) {
632             # We have a result: write it to boss
633 33         151 $self->log("worker_loop: writing result...\n");
634            
635 33         18215 my $n = syswrite( $self->{socket}, $result_stream);
636 33 50       121 croak("worker [$$] write to boss: syswrite: $!") if ! defined($n);
637 33 50       231 $self->log("worker_loop: wrote $n bytes\n") if defined($n);
638 33         72 $result_stream = undef;
639             # will return to top of loop
640            
641             } else {
642             # Get job from boss
643            
644 46         126 my @queue;
645 46         588 $self->log("worker_loop: reading job from queue...\n");
646 46         377 $self->read($self->{socket}, \@queue, undef, 'worker');
647 46 100       369 return if ! @queue;
648 33         209 $self->log("worker_loop: read job complete, we have a job\n");
649              
650 33         287 my $job = $self->deserialize($queue[0]);
651 33         61 my $result;
652 33         90 eval {
653             local $SIG{ALRM} = sub {
654 1     1   3001336 die("BossWorkerAsync: timed out");
655 33         1218 };
656              
657             # Set alarm
658 33         2300 alarm($self->{global_timeout});
659              
660             # Invoke handler and get result
661 33         153 $self->log("worker_loop: calling work_handler for this job\n");
662 33         363 $result = $self->{work_handler}->($job);
663              
664             # Disable alarm
665 32         5001392 alarm(0);
666             };
667              
668 33 100       130 if ($@) {
669 1         18 $result = {ERROR => $@};
670 1         9 $self->log("worker_loop: ERROR: $@\n");
671             }
672            
673 33         177 $result_stream = $self->serialize($result);
674             }
675             }
676             };
677 13 50       118 if ($@) {
678 0         0 croak($@);
679             }
680             }
681              
682              
683             # -----------------------------------------------------------------
684             # IN: log message
685             # If verbose is enabled, print the message.
686             sub log {
687 1133     1133 0 2742 my ($self, $msg) = @_;
688 1133 50       5964 print STDERR $msg if $self->{verbose};
689             }
690              
691              
692             1;
693             __END__