File Coverage

blib/lib/MCE/Core/Worker.pm
Criterion Covered Total %
statement 215 324 66.3
branch 101 228 44.3
condition 10 66 15.1
subroutine 14 21 66.6
pod n/a
total 340 639 53.2


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Core methods for the worker process.
4             ##
5             ## This package provides main, loop, and relevant methods used internally by
6             ## the worker process.
7             ##
8             ## There is no public API.
9             ##
10             ###############################################################################
11              
12             package MCE::Core::Worker;
13              
14 85     85   653 use strict;
  85         172  
  85         2636  
15 85     85   472 use warnings;
  85         161  
  85         7869  
16              
17             our $VERSION = '1.889';
18              
19             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
20              
21             sub CLONE {
22 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
23             }
24              
25             ## Items below are folded into MCE.
26              
27             package # hide from rpm
28             MCE;
29              
30 85     85   515 no warnings qw( bareword threads recursion uninitialized );
  85         183  
  85         111842  
31              
32             ###############################################################################
33             ## ----------------------------------------------------------------------------
34             ## Internal do, gather and send related functions for serializing data to
35             ## destination. User functions for handling gather, queue or void.
36             ##
37             ###############################################################################
38              
39             {
40             my (
41             $_dest, $_len, $_tag, $_task_id, $_user_func, $_val, $_wa,
42             $_DAT_LOCK, $_DAT_W_SOCK, $_DAU_W_SOCK, $_chn, $_lock_chn,
43             $_dat_ex, $_dat_un
44             );
45              
46             my $_is_MSWin32 = ($^O eq 'MSWin32') ? 1 : 0;
47              
48             ## Create array structure containing various send functions.
49             my @_dest_function = ();
50              
51             $_dest_function[SENDTO_FILEV2] = sub { ## Content >> File
52              
53             return unless (defined $_val);
54             local $\ = undef if (defined $\);
55              
56             if (length ${ $_[1] }) {
57             my $_buf = $_[0]->{freeze}([ $_val, ${ $_[1] } ]);
58             $_dat_ex->() if $_lock_chn;
59             print({$_DAT_W_SOCK} OUTPUT_F_SND.$LF . $_chn.$LF),
60             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
61             $_dat_un->() if $_lock_chn;
62             }
63              
64             return;
65             };
66              
67             $_dest_function[SENDTO_FD] = sub { ## Content >> File descriptor
68              
69             return unless (defined $_val);
70             local $\ = undef if (defined $\);
71              
72             if (length ${ $_[1] }) {
73             my $_buf = $_[0]->{freeze}([ $_val, ${ $_[1] } ]);
74             $_dat_ex->() if $_lock_chn;
75             print({$_DAT_W_SOCK} OUTPUT_D_SND.$LF . $_chn.$LF),
76             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
77             $_dat_un->() if $_lock_chn;
78             }
79              
80             return;
81             };
82              
83             $_dest_function[SENDTO_STDOUT] = sub { ## Content >> STDOUT
84              
85             local $\ = undef if (defined $\);
86              
87             if (length ${ $_[1] }) {
88             my $_buf = $_[0]->{freeze}($_[1]);
89             $_dat_ex->() if $_lock_chn;
90             print({$_DAT_W_SOCK} OUTPUT_O_SND.$LF . $_chn.$LF),
91             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
92             $_dat_un->() if $_lock_chn;
93             }
94              
95             return;
96             };
97              
98             $_dest_function[SENDTO_STDERR] = sub { ## Content >> STDERR
99              
100             local $\ = undef if (defined $\);
101              
102             if (length ${ $_[1] }) {
103             my $_buf = $_[0]->{freeze}($_[1]);
104             $_dat_ex->() if $_lock_chn;
105             print({$_DAT_W_SOCK} OUTPUT_E_SND.$LF . $_chn.$LF),
106             print({$_DAU_W_SOCK} length($_buf).$LF, $_buf);
107             $_dat_un->() if $_lock_chn;
108             }
109              
110             return;
111             };
112              
113             ## -------------------------------------------------------------------------
114              
115             sub _do_callback {
116              
117 133     133   313 my ($self, $_buf, $_aref);
118              
119 133         406 ($self, $_val, $_aref) = @_;
120              
121 133 100       293 unless (defined wantarray) {
    100          
122 126         197 $_wa = WANTS_UNDEF;
123 0         0 } elsif (wantarray) {
124 2         9 $_wa = WANTS_ARRAY;
125             } else {
126 5         15 $_wa = WANTS_SCALAR;
127             }
128              
129 133 50       457 local $\ = undef if (defined $\);
130              
131             ## Crossover: Send arguments
132              
133 133 100       205 if ( ! @{ $_aref } ) {
  133         386  
134 2 50       10 $_dat_ex->() if $_lock_chn;
135 2         115 print({$_DAT_W_SOCK} OUTPUT_N_CBK.$LF . $_chn.$LF),
136 2         18 print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF);
  2         65  
137             }
138             else {
139 131         1471 $_buf = $self->{freeze}($_aref);
140 131         293 $_len = length $_buf;
141              
142 131 50       303 $_dat_ex->() if $_lock_chn;
143 131         4416 print({$_DAT_W_SOCK} OUTPUT_A_CBK.$LF . $_chn.$LF),
144 131         185 print({$_DAU_W_SOCK} $_wa.$LF . $_val.$LF . $_len.$LF, $_buf);
  131         1938  
145             }
146              
147             ## Crossover: Receive value
148              
149 133 100       552 if ( $_wa ) {
150 7 50       34 local $/ = $LF if ($/ ne $LF);
151 7         4822 chomp(my $_len = <$_DAU_W_SOCK>);
152              
153 7         76 read $_DAU_W_SOCK, my($_buf), $_len;
154 7 50       33 $_dat_un->() if $_lock_chn;
155              
156             return ( $_wa != WANTS_ARRAY )
157             ? ($self->{thaw}($_buf))->[0]
158 7 100       189 : @{ $self->{thaw}($_buf) };
  2         108  
159             }
160              
161 126 50       922 $_dat_un->() if $_lock_chn;
162             }
163              
164             ## -------------------------------------------------------------------------
165              
166             sub _do_gather {
167              
168 357     357   520 my $_buf; my ($self, $_aref) = @_;
  357         795  
169              
170 357 50       514 return unless (scalar @{ $_aref });
  357         1066  
171              
172 357         839 $_tag = OUTPUT_A_GTR;
173 357         3370 $_buf = $self->{freeze}($_aref);
174 357         802 $_len = length $_buf;
175              
176 357 50       1234 local $\ = undef if (defined $\);
177              
178 357 50       34395 $_dat_ex->() if $_lock_chn;
179 357         8405 print({$_DAT_W_SOCK} $_tag.$LF . $_chn.$LF),
180 357         542 print({$_DAU_W_SOCK} $_task_id.$LF . $_len.$LF, $_buf);
  357         4862  
181 357 50       1918 $_dat_un->() if $_lock_chn;
182              
183 357         1735 return;
184             }
185              
186             ## -------------------------------------------------------------------------
187              
188             sub _do_send {
189              
190 0     0   0 my $_data_ref; my $self = shift;
  0         0  
191              
192 0         0 $_dest = shift; $_val = shift;
  0         0  
193              
194 0 0       0 if (scalar @_ > 1) {
    0          
195 0         0 $_data_ref = \join('', @_);
196             }
197             elsif (my $_ref = ref $_[0]) {
198 0 0       0 if ($_ref eq 'SCALAR') {
    0          
    0          
199 0         0 $_data_ref = $_[0];
200             }
201             elsif ($_ref eq 'ARRAY') {
202 0         0 $_data_ref = \join('', @{ $_[0] });
  0         0  
203             }
204             elsif ($_ref eq 'HASH') {
205 0         0 $_data_ref = \join('', %{ $_[0] });
  0         0  
206             }
207             else {
208 0         0 $_data_ref = \join('', @_);
209             }
210             }
211             else {
212 0         0 $_data_ref = \(''.$_[0]);
213             }
214              
215 0         0 $_dest_function[$_dest]($self, $_data_ref);
216              
217 0         0 return 1;
218             }
219              
220             sub _do_send_glob {
221              
222 0     0   0 my ($self, $_glob, $_fd, $_data_ref) = @_;
223              
224 0 0       0 if ($self->{_wid} > 0) {
225 0 0       0 if ($_fd == 1) {
    0          
226 0         0 _do_send($self, SENDTO_STDOUT, undef, $_data_ref);
227             }
228             elsif ($_fd == 2) {
229 0         0 _do_send($self, SENDTO_STDERR, undef, $_data_ref);
230             }
231             else {
232 0         0 _do_send($self, SENDTO_FD, $_fd, $_data_ref);
233             }
234             }
235             else {
236 85     85   696 use bytes;
  85         212  
  85         595  
237 0         0 my $_fh = _sendto_fhs_get($self, $_fd);
238 0 0       0 local $\ = undef if (defined $\);
239              
240 0         0 print {$_fh} ${ $_data_ref };
  0         0  
  0         0  
241             }
242              
243 0         0 return 1;
244             }
245              
246             ## -------------------------------------------------------------------------
247              
248             sub _do_send_init {
249              
250 65     65   336 my ($self) = @_;
251              
252 65         497 $_chn = $self->{_chn};
253 65         394 $_DAT_LOCK = $self->{_dat_lock};
254 65         418 $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
255 65         332 $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
256 65         296 $_lock_chn = $self->{_lock_chn};
257 65         220 $_task_id = $self->{_task_id};
258              
259 65 50       652 if ($_lock_chn) {
260             # inlined for performance
261             $_dat_ex = sub {
262 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
263             CORE::lock($_DAT_LOCK->{_t_lock}), MCE::Util::_sock_ready($_DAT_LOCK->{_r_sock})
264 0 0       0 if $_is_MSWin32;
265             MCE::Util::_sysread($_DAT_LOCK->{_r_sock}, my($b), 1), $_DAT_LOCK->{ $_pid } = 1
266 0 0       0 unless $_DAT_LOCK->{ $_pid };
267 0         0 };
268             $_dat_un = sub {
269 0 0   0   0 my $_pid = $_tid ? $$ .'.'. $_tid : $$;
270             syswrite($_DAT_LOCK->{_w_sock}, '0'), $_DAT_LOCK->{ $_pid } = 0
271 0 0       0 if $_DAT_LOCK->{ $_pid };
272 0         0 };
273             }
274              
275             {
276 65         285 local $!;
  65         2322  
277 65 50       3896 (*STDERR)->autoflush(1) if defined( fileno *STDERR );
278 65 50       15783 (*STDOUT)->autoflush(1) if defined( fileno *STDOUT );
279             }
280              
281 65         2642 return;
282             }
283              
284             sub _do_send_clear {
285              
286 65     65   245 my ($self) = @_;
287              
288 65         291 $_dest = $_len = $_task_id = $_user_func = $_val = $_wa = undef;
289 65         229 $_DAT_LOCK = $_DAT_W_SOCK = $_DAU_W_SOCK = $_chn = $_lock_chn = undef;
290 65         216 $_dat_ex = $_dat_un = $_tag = undef;
291              
292 65         158 return;
293             }
294              
295             ## -------------------------------------------------------------------------
296              
297             sub _do_user_func {
298              
299 246     246   718 my ($self, $_chunk, $_chunk_id) = @_;
300 246         399 my $_size = 0;
301              
302 246         482 delete $self->{_relayed};
303 246         485 $self->{_chunk_id} = $_chunk_id;
304              
305 246 50 33     740 if ($self->{progress} && $self->{_task_id} == 0) {
306             # use_slurpio
307 0 0 0     0 if (ref $_chunk eq 'SCALAR') {
    0          
    0          
308 0         0 $_size += length ${ $_chunk };
  0         0  
309             }
310             # sequence and bounds_only
311             elsif ($self->{sequence} && $self->{bounds_only}) {
312 0         0 my $_seq = $self->{sequence};
313 0 0       0 my $_step = (ref $_seq eq 'ARRAY') ? $_seq->[2] : $_seq->{step};
314 0         0 $_size += int(abs($_chunk->[0] - $_chunk->[1]) / abs($_step)) + 1;
315             }
316             # workers clear {input_data} to conserve memory when array ref
317             # otherwise, /path/to/infile or scalar reference
318             elsif ($self->{input_data}) {
319 0         0 map { $_size += length } @{ $_chunk };
  0         0  
  0         0  
320             }
321             # array or sequence
322             else {
323 0 0       0 $_size += (ref $_chunk eq 'ARRAY') ? @{ $_chunk } : 1;
  0         0  
324             }
325             }
326              
327 246 50       522 if ($self->{max_retries}) {
328 0         0 $self->{_retry} = [ $_chunk, $_chunk_id, $self->{max_retries} ];
329             }
330              
331 246 0 33     633 if ($self->{loop_timeout} && $self->{_task_id} == 0 &&
      33        
      0        
      0        
332             defined $self->{init_relay} && !$self->{_is_thread} && !$_is_MSWin32) {
333              
334 0 0       0 local $\ = undef if (defined $\);
335              
336 0 0       0 $_dat_ex->() if $_lock_chn;
337 0         0 print({$_DAT_W_SOCK} OUTPUT_C_NFY.$LF . $_chn.$LF),
338 0         0 print({$_DAU_W_SOCK} "$$:$_chunk_id".$LF);
  0         0  
339 0 0       0 $_dat_un->() if $_lock_chn;
340             }
341              
342 246         1166 $_user_func->($self, $_chunk, $_chunk_id);
343              
344 246 50 33     1256 if ($self->{progress} && $self->{_task_id} == 0) {
345 0 0       0 local $\ = undef if (defined $\);
346              
347 0 0       0 $_dat_ex->() if $_lock_chn;
348 0         0 print({$_DAT_W_SOCK} OUTPUT_P_NFY.$LF . $_chn.$LF),
349 0         0 print({$_DAU_W_SOCK} $_size.$LF);
  0         0  
350 0 0       0 $_dat_un->() if $_lock_chn;
351             }
352              
353 246         857 return;
354             }
355              
356             sub _do_user_func_init {
357              
358 163     163   9463 my ($self) = @_;
359              
360 163         579 $_user_func = $self->{user_func};
361              
362 163         2028 return;
363             }
364             }
365              
366             ###############################################################################
367             ## ----------------------------------------------------------------------------
368             ## Worker process -- Do.
369             ##
370             ###############################################################################
371              
372             sub MCE::Core::Worker::_guard::DESTROY {
373              
374 163     163   279 my ($mce, $id) = @{ $_[0] };
  163         503  
375              
376 163 50 33     654 if (defined $mce && $id eq "$$.$_tid") {
377 0         0 @{ $_[0] } = ();
  0         0  
378 0         0 warn "MCE worker $id exited prematurely.\n";
379 0         0 $mce->exit(2);
380             }
381              
382 163         514 return;
383             };
384              
385             sub _worker_do {
386              
387 163     163   672 my ($self, $_params_ref) = @_;
388              
389 163         445 @_ = ();
390              
391             ## Set options.
392 163         515 $self->{_abort_msg} = $_params_ref->{_abort_msg};
393 163         442 $self->{_run_mode} = $_params_ref->{_run_mode};
394 163         507 $self->{use_slurpio} = $_params_ref->{_use_slurpio};
395 163         597 $self->{parallel_io} = $_params_ref->{_parallel_io};
396 163         1417 $self->{progress} = $_params_ref->{_progress};
397 163         418 $self->{max_retries} = $_params_ref->{_max_retries};
398 163         324 $self->{RS} = $_params_ref->{_RS};
399              
400 163         1278 _do_user_func_init($self);
401              
402             ## Init local vars.
403 163         626 my $_chn = $self->{_chn};
404 163         363 my $_DAT_LOCK = $self->{_dat_lock};
405 163         378 my $_DAT_W_SOCK = $self->{_dat_w_sock}->[0];
406 163         454 my $_DAU_W_SOCK = $self->{_dat_w_sock}->[$_chn];
407 163         426 my $_lock_chn = $self->{_lock_chn};
408 163         388 my $_run_mode = $self->{_run_mode};
409 163         340 my $_task_id = $self->{_task_id};
410 163         715 my $_task_name = $self->{task_name};
411              
412             ## Do not override params if defined in user_tasks during instantiation.
413 163         623 for my $_p (qw(bounds_only chunk_size sequence user_args)) {
414 652 100       2172 if (defined $_params_ref->{"_${_p}"}) {
415             $self->{$_p} = $_params_ref->{"_${_p}"}
416 208 50       1210 unless (defined $self->{_task}->{$_p});
417             }
418             }
419              
420             {
421 163         383 my $_guard = bless([ $self, "$$.$_tid" ], MCE::Core::Worker::_guard::);
  163         2984  
422 163         1983 weaken( $self->{_guard} = $_guard );
423              
424             ## Assign user function.
425 163         579 $self->{_wuf} = \&_do_user_func;
426              
427             ## Call user_begin if defined.
428 163 50       18279 if (defined $self->{user_begin}) {
429 0         0 $self->{_chunk_id} = 0;
430 0         0 $self->{user_begin}($self, $_task_id, $_task_name);
431 0 0 0     0 if ($_task_id == 0 && defined $self->{init_relay} && !$self->{_retry}) {
      0        
432 0         0 $self->sync();
433             }
434             }
435              
436             ## Retry chunk if previous attempt died.
437 163 50       662 if ($self->{_retry}) {
438 0         0 $self->{_chunk_id} = $self->{_retry}->[1];
439 0         0 $self->{user_func}->($self, $self->{_retry}->[0], $self->{_retry}->[1]);
440 0         0 delete $self->{_retry};
441             }
442              
443             ## Call worker function.
444 163 100       1768 if ($_run_mode eq 'sequence') {
    50          
    100          
    100          
    100          
    50          
    100          
    50          
    50          
445             require MCE::Core::Input::Sequence
446 14 100       6339 unless $INC{'MCE/Core/Input/Sequence.pm'};
447 14         107 _worker_sequence_queue($self);
448             }
449             elsif (defined $self->{_task}->{sequence}) {
450             require MCE::Core::Input::Generator
451 0 0       0 unless $INC{'MCE/Core/Input/Generator.pm'};
452 0         0 _worker_sequence_generator($self);
453             }
454             elsif ($_run_mode eq 'array') {
455             require MCE::Core::Input::Request
456 36 50       181 unless $INC{'MCE/Core/Input/Request.pm'};
457 36         344 _worker_request_chunk($self, REQUEST_ARRAY);
458             }
459             elsif ($_run_mode eq 'glob') {
460             require MCE::Core::Input::Request
461 14 50       67 unless $INC{'MCE/Core/Input/Request.pm'};
462 14         67 _worker_request_chunk($self, REQUEST_GLOB);
463             }
464             elsif ($_run_mode eq 'hash') {
465             require MCE::Core::Input::Request
466 3 50       36 unless $INC{'MCE/Core/Input/Request.pm'};
467 3         58 _worker_request_chunk($self, REQUEST_HASH);
468             }
469             elsif ($_run_mode eq 'iterator') {
470             require MCE::Core::Input::Iterator
471 0 0       0 unless $INC{'MCE/Core/Input/Iterator.pm'};
472 0         0 _worker_user_iterator($self);
473             }
474             elsif ($_run_mode eq 'file') {
475             require MCE::Core::Input::Handle
476 14 100       15470 unless $INC{'MCE/Core/Input/Handle.pm'};
477 14         89 _worker_read_handle($self, READ_FILE, $_params_ref->{_input_file});
478             }
479             elsif ($_run_mode eq 'memory') {
480             require MCE::Core::Input::Handle
481 0 0       0 unless $INC{'MCE/Core/Input/Handle.pm'};
482 0         0 _worker_read_handle($self, READ_MEMORY, $self->{input_data});
483             }
484             elsif (defined $self->{user_func}) {
485 82 50       296 if ($self->{max_retries}) {
486 0         0 $self->{_retry} = [ undef, 0, $self->{max_retries} ];
487             }
488 82         225 $self->{_chunk_id} = 0;
489 82         833 $self->{user_func}->($self);
490             }
491              
492 163 100       1760 undef $self->{_next_jmp} if (defined $self->{_next_jmp});
493 163 100       710 undef $self->{_last_jmp} if (defined $self->{_last_jmp});
494 163 50       459 undef $self->{user_data} if (defined $self->{user_data});
495              
496             ## Call user_end if defined.
497 163 50       487 if (defined $self->{user_end}) {
498 0         0 $self->{_chunk_id} = 0;
499 0 0 0     0 $self->sync() if ($_task_id == 0 && defined $self->{init_relay});
500 0         0 $self->{user_end}($self, $_task_id, $_task_name);
501             }
502              
503 163         515 @{ $_guard } = ();
  163         1257  
504 163         578 delete $self->{_guard};
505 163         821 delete $self->{_wuf};
506             }
507              
508             ## Check for nested workers not yet joined.
509 163 50       490 MCE::Child->finish('MCE') if $INC{'MCE/Child.pm'};
510 163 50       480 MCE::Hobo->finish('MCE') if $INC{'MCE/Hobo.pm'};
511              
512             ## Notify the main process a worker has completed.
513 163 50       9666 local $\ = undef if (defined $\);
514              
515 163 50       432 $_DAT_LOCK->lock() if $_lock_chn;
516              
517 163         4076 print({$_DAT_W_SOCK} OUTPUT_W_DNE.$LF . $_chn.$LF),
518 163         284 print({$_DAU_W_SOCK} $_task_id.$LF);
  163         1905  
519              
520 163 50       777 $_DAT_LOCK->unlock() if $_lock_chn;
521              
522 163 50       8214 if ($^O eq 'MSWin32') {
523 0         0 lock $self->{_run_lock};
524             }
525              
526 163         503 return;
527             }
528              
529             ###############################################################################
530             ## ----------------------------------------------------------------------------
531             ## Worker process -- Loop.
532             ##
533             ###############################################################################
534              
535             sub _worker_loop {
536              
537 65     65   358 my ($self) = @_;
538              
539 65         241 @_ = ();
540              
541 65         217 my ($_response, $_len, $_buf, $_params_ref);
542              
543 65         284 my $_COM_LOCK = $self->{_com_lock};
544 65         293 my $_COM_W_SOCK = $self->{_com_w_sock};
545 65         273 my $_job_delay = $self->{job_delay};
546              
547 65 50       1518 if ($^O eq 'MSWin32') { lock $MCE::_WIN_LOCK; }
  0         0  
548              
549 65         234 while (1) {
550             {
551 228         438 local $\ = undef; local $/ = $LF;
  228         20385  
  228         2657  
552 228         2176 $_COM_LOCK->lock();
553              
554             ## Wait for the next job request.
555 228         228973 $_response = <$_COM_W_SOCK>, print {$_COM_W_SOCK} $self->{_wid}.$LF;
  228         11341  
556              
557             ## Return if instructed to exit.
558 228 100       1936 $_COM_LOCK->unlock(), return if ($_response eq "_exit\n");
559              
560             ## Process send request.
561 163 50       3989 if ($_response eq "_data\n") {
    50          
562 0         0 chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len);
563 0         0 print({$_COM_W_SOCK} $LF), $_COM_LOCK->unlock();
  0         0  
564 0         0 $self->{user_data} = $self->{thaw}($_buf), undef $_buf;
565              
566             sleep $_job_delay * $self->{_wid}
567 0 0 0     0 if defined($_job_delay) && $_job_delay > 0.0;
568             }
569              
570             ## Process normal request.
571             elsif ($_response =~ /\d+/) {
572 163         29557 chomp($_len = <$_COM_W_SOCK>), read($_COM_W_SOCK, $_buf, $_len);
573 163         673 print({$_COM_W_SOCK} $LF), $_COM_LOCK->unlock();
  163         7073  
574 163         7199 $_params_ref = $self->{thaw}($_buf), undef $_buf;
575             }
576              
577             ## Leave loop if invalid response.
578             else {
579 0         0 last;
580             }
581             }
582              
583             ## Send request.
584 163 50       1571 _worker_do($self, {}), next if ($_response eq "_data\n");
585              
586             ## Wait here until MCE completes job submission to all workers.
587 163         21789 MCE::Util::_sysread($self->{_bsb_w_sock}, my($_b), 1);
588              
589             ## Normal request.
590             sleep $_job_delay * $self->{_wid}
591 163 50 33     1336 if defined($_job_delay) && $_job_delay > 0.0;
592              
593 163         1554 _worker_do($self, $_params_ref); undef $_params_ref;
  163         813  
594             }
595              
596             ## Notify the main process a worker has ended. The following is executed
597             ## when an invalid reply was received above (not likely to occur).
598              
599 0         0 $_COM_LOCK->unlock();
600              
601 0         0 die "Error: worker $self->{_wid} has ended prematurely";
602             }
603              
604             ###############################################################################
605             ## ----------------------------------------------------------------------------
606             ## Worker process -- Main.
607             ##
608             ###############################################################################
609              
610             sub _worker_main {
611              
612 65     65   746 my ( $self, $_wid, $_task, $_task_id, $_task_wid, $_params,
613             $_plugin_worker_init ) = @_;
614              
615 65         376 @_ = ();
616              
617 65 100       771 if (exists $self->{input_data}) {
618 27         733 my $_ref = ref $self->{input_data};
619 27 50 33     1674 delete $self->{input_data} if ($_ref && $_ref ne 'SCALAR');
620             }
621              
622 65 100       1144 $self->{_task_id} = (defined $_task_id ) ? $_task_id : 0;
623 65 100       772 $self->{_task_wid} = (defined $_task_wid) ? $_task_wid : $_wid;
624 65         527 $self->{_task} = $_task;
625 65         573 $self->{_wid} = $_wid;
626              
627             ## Define exit pid and DIE handler.
628             my $_use_threads = (defined $_task->{use_threads})
629 65 100       1556 ? $_task->{use_threads} : $self->{use_threads};
630              
631 65 50 33     1776 if ($INC{'threads.pm'} && $_use_threads) {
632 0         0 $self->{_exit_pid} = 'TID_' . $_tid;
633             } else {
634 65         2810 $self->{_exit_pid} = 'PID_' . $$;
635             }
636              
637 65         1804 my $_running_inside_eval = $^S;
638              
639 65     0   7154 local $SIG{SEGV} = sub { $self->exit(11) };
  0         0  
640              
641             local $SIG{__DIE__} = sub {
642 0 0 0 0   0 if (!defined $^S || $^S) {
643 0 0 0     0 if ( ($INC{'threads.pm'} && $_tid != 0) ||
      0        
      0        
644             $ENV{'PERL_IPERL_RUNNING'} ||
645             $_running_inside_eval
646             ) {
647             # thread env or running inside IPerl, check stack trace
648 0         0 my $_t = Carp::longmess(); $_t =~ s/\teval [^\n]+\n$//;
  0         0  
649 0 0 0     0 if ( $_t =~ /^(?:[^\n]+\n){1,7}\teval / ||
650             $_t =~ /\n\teval [^\n]+\n\t(?:eval|Try)/ )
651             {
652 0         0 CORE::die(@_);
653             }
654             }
655             else {
656             # normal env, trust $^S
657 0         0 CORE::die(@_);
658             }
659             }
660              
661 0         0 $SIG{__DIE__} = $SIG{__WARN__} = sub {};
662 0 0       0 my $_die_msg = (defined $_[0]) ? $_[0] : '';
663 0         0 $_die_msg =~ s/, <__ANONIO__> line \d+//;
664 0         0 local $\ = undef; print {*STDERR} $_die_msg;
  0         0  
  0         0  
665              
666 0         0 $self->exit(255, $_die_msg, $self->{_chunk_id});
667 65         3303 };
668              
669             ## Use options from user_tasks if defined.
670 65 100       1451 $self->{max_workers} = $_task->{max_workers} if ($_task->{max_workers});
671 65 50       647 $self->{chunk_size} = $_task->{chunk_size} if ($_task->{chunk_size});
672 65 100       1280 $self->{gather} = $_task->{gather} if ($_task->{gather});
673 65 50       511 $self->{sequence} = $_task->{sequence} if ($_task->{sequence});
674 65 50       507 $self->{bounds_only} = $_task->{bounds_only} if ($_task->{bounds_only});
675 65 100       1015 $self->{task_name} = $_task->{task_name} if ($_task->{task_name});
676 65 50       562 $self->{user_args} = $_task->{user_args} if ($_task->{user_args});
677 65 50       467 $self->{user_begin} = $_task->{user_begin} if ($_task->{user_begin});
678 65 100       788 $self->{user_func} = $_task->{user_func} if ($_task->{user_func});
679 65 50       464 $self->{user_end} = $_task->{user_end} if ($_task->{user_end});
680              
681             ## Init runtime vars. Obtain handle to lock files.
682 65         230 my $_chn;
683              
684 65 50 33     810 if (defined $_params && exists $_params->{_chn}) {
685 0         0 $_chn = $self->{_chn} = delete $_params->{_chn}; # worker restarted
686             } else {
687 65         860 $_chn = $self->{_chn} = $_wid % $self->{_data_channels} + 1; # default
688             }
689              
690             ## Choose locks for DATA channels.
691 65         1103 $self->{_com_lock} = $self->{'_mutex_0'};
692 65 50       575 $self->{_dat_lock} = $self->{'_mutex_'.$_chn} if ($self->{_lock_chn});
693              
694             ## Delete attributes no longer required after being spawned.
695 65         342 delete @{ $self }{ qw(
  65         1947  
696             flush_file flush_stderr flush_stdout stderr_file stdout_file
697             on_post_exit on_post_run user_data user_error user_output
698             _pids _state _status _thrs _tids
699             ) };
700              
701             ## Call MCE::Shared's init routine if present; enables parallel IPC.
702             ## For threads, init is called automatically via the CLONE feature.
703 65 50 33     1432 MCE::Shared::init() if (!$_use_threads && $INC{'MCE/Shared.pm'});
704              
705 65         1532 _do_send_init($self);
706              
707             ## Call module's worker_init routine for modules plugged into MCE.
708 65         243 for my $_p (@{ $_plugin_worker_init }) { $_p->($self); }
  65         1174  
  28         920  
709              
710             ## Begin processing if worker was added during processing.
711 65 50       413 _worker_do($self, $_params), undef $_params if defined($_params);
712              
713             ## Enter worker loop.
714 65         1254 _worker_loop($self);
715              
716             ## Clear worker session.
717 65         449 _do_send_clear($self);
718              
719 65         179 $self->{_com_lock} = undef;
720 65         233 $self->{_dat_lock} = undef;
721              
722 65         2736 return;
723             }
724              
725             1;
726              
727             __END__