File Coverage

blib/lib/MCE/Step.pm
Criterion Covered Total %
statement 256 348 73.5
branch 116 238 48.7
condition 45 97 46.3
subroutine 21 26 80.7
pod 5 9 55.5
total 443 718 61.7


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel step model for building creative steps.
4             ##
5             ###############################################################################
6              
7             package MCE::Step;
8              
9 12     12   999109 use strict;
  12         139  
  12         302  
10 12     12   48 use warnings;
  12         18  
  12         300  
11              
12 12     12   49 no warnings qw( threads recursion uninitialized );
  12         12  
  12         624  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 12     12   71 use Scalar::Util qw( looks_like_number );
  12         23  
  12         576  
21              
22 12     12   5487 use MCE;
  12         30  
  12         72  
23 12     12   6750 use MCE::Queue;
  12         41  
  12         61  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Step');
40             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue, $_last_task_id, $_lkup) = ({}, {}, {}, {});
42              
43             sub import {
44 12     12   157 my ($_class, $_pkg) = (shift, caller);
45              
46 12         44 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             };
50              
51             ## Import functions.
52 12 50       37 if ($_pkg !~ /^MCE::/) {
53 12     12   77 no strict 'refs'; no warnings 'redefine';
  12     12   18  
  12         392  
  12         50  
  12         17  
  12         5005  
54 12         25 *{ $_pkg.'::mce_step_f' } = \&run_file;
  12         72  
55 12         20 *{ $_pkg.'::mce_step_s' } = \&run_seq;
  12         138  
56 12         30 *{ $_pkg.'::mce_step' } = \&run;
  12         37  
57             }
58              
59             ## Process module arguments.
60 12         101 while ( my $_argument = shift ) {
61 0         0 my $_arg = lc $_argument;
62              
63 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
64 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
65 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
66 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
67 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
68 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
69 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
70              
71 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
72              
73             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
74 0 0       0 if ( $_arg eq 'sereal' ) {
75 0 0       0 if ( shift eq '0' ) {
76 0         0 require Storable;
77 0         0 $_p->{FREEZE} = \&Storable::freeze;
78 0         0 $_p->{THAW} = \&Storable::thaw;
79             }
80 0         0 next;
81             }
82              
83 0         0 _croak("Error: ($_argument) invalid module option");
84             }
85              
86 12         70 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
87              
88 12         48 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
89             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
90 12 50       38 unless ($_p->{CHUNK_SIZE} eq 'auto');
91              
92 12         178 return;
93             }
94              
95             ###############################################################################
96             ## ----------------------------------------------------------------------------
97             ## The task end callback for when a task completes.
98             ##
99             ###############################################################################
100              
101             sub _task_end {
102              
103 22     22   58 my ($_mce, $_task_id, $_task_name) = @_;
104 22         89 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
105              
106 22 100       67 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
107 10         30 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
108 10         68 $_queue->{$_pid}[$_task_id]->enqueue((undef) x $n_workers);
109             }
110              
111             $_params->{task_end}->($_mce, $_task_id, $_task_name)
112 22 50 33     68 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
113              
114 22         52 return;
115             }
116              
117             ###############################################################################
118             ## ----------------------------------------------------------------------------
119             ## Methods for MCE; step, enq, enqp, await.
120             ##
121             ###############################################################################
122              
123             {
124 12     12   102 no warnings 'redefine';
  12         29  
  12         39953  
125              
126             sub MCE::step {
127              
128 21 50   21 0 429 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  21         50  
129 21         77 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
130              
131             _croak('MCE::step: method is not allowed by the manager process')
132 21 50       47 unless ($self->{_wid});
133              
134 21         27 my $_task_id = $self->{_task_id};
135              
136 21 50       51 if ($_task_id < $_last_task_id->{$_pid}) {
137 21         156 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
138             }
139             else {
140 0         0 _croak('MCE::step: method is not allowed by the last task');
141             }
142              
143 21         84 return;
144             }
145              
146             ############################################################################
147              
148             sub MCE::enq {
149              
150 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
151 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
152 0         0 my $_name = shift;
153              
154             _croak('MCE::enq: method is not allowed by the manager process')
155 0 0       0 unless ($self->{_wid});
156             _croak('MCE::enq: (task_name) is not specified or valid')
157 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
158             _croak('MCE::enq: stepping to same task or backwards is not allowed')
159 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
160              
161 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
162              
163 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
164 0 0       0 if (scalar @_ > 1) {
165 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
166 0         0 $_queue->{$_pid}[$_task_id]->enqueue(@_items);
167             }
168             else {
169 0         0 $_queue->{$_pid}[$_task_id]->enqueue($self->freeze([ @_ ]));
170             }
171             }
172             else {
173 0         0 _croak('MCE::enq: method is not allowed by the last task');
174             }
175              
176 0         0 return;
177             }
178              
179             ############################################################################
180              
181             sub MCE::enqp {
182              
183 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
184 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
185 0         0 my ($_name, $_p) = (shift, shift);
186              
187             _croak('MCE::enqp: method is not allowed by the manager process')
188 0 0       0 unless ($self->{_wid});
189             _croak('MCE::enqp: (task_name) is not specified or valid')
190 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
191             _croak('MCE::enqp: stepping to same task or backwards is not allowed')
192 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
193 0 0 0     0 _croak('MCE::enqp: (priority) is not an integer')
194             if (!looks_like_number($_p) || int($_p) != $_p);
195              
196 0         0 my $_task_id = $_lkup->{$_pid}{$_name} - 1;
197              
198 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
199 0 0       0 if (scalar @_ > 1) {
200 0         0 my @_items = map { $self->freeze([ $_ ]) } @_;
  0         0  
201 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, @_items);
202             }
203             else {
204 0         0 $_queue->{$_pid}[$_task_id]->enqueuep($_p, $self->freeze([ @_ ]));
205             }
206             }
207             else {
208 0         0 _croak('MCE::enqp: method is not allowed by the last task');
209             }
210              
211 0         0 return;
212             }
213              
214             ############################################################################
215              
216             sub MCE::await {
217              
218 0 0   0 0 0 my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0         0  
219 0         0 my $_pid = $self->{_init_pid}.'.'.$self->{_caller};
220 0         0 my $_name = shift;
221              
222             _croak('MCE::await: method is not allowed by the manager process')
223 0 0       0 unless ($self->{_wid});
224             _croak('MCE::await: (task_name) is not specified or valid')
225 0 0 0     0 if (!defined $_name || !exists $_lkup->{$_pid}{$_name});
226             _croak('MCE::await: awaiting from same task or backwards is not allowed')
227 0 0       0 if ($_lkup->{$_pid}{$_name} <= $self->{_task_id});
228              
229 0   0     0 my $_task_id = $_lkup->{$_pid}{$_name} - 1; my $_t = shift || 0;
  0         0  
230              
231 0 0 0     0 _croak('MCE::await: (threshold) is not an integer')
232             if (!looks_like_number($_t) || int($_t) != $_t);
233              
234 0 0       0 if ($_task_id < $_last_task_id->{$_pid}) {
235 0         0 $_queue->{$_pid}[$_task_id]->await($_t);
236             } else {
237 0         0 _croak('MCE::await: method is not allowed by the last task');
238             }
239              
240 0         0 return;
241             }
242              
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Init and finish routines.
248             ##
249             ###############################################################################
250              
251             sub init (@) {
252              
253 13 50 33 13 1 1287 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
254 13         92 my $_pkg = "$$.$_tid.".caller();
255              
256 13 50       124 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
257              
258 13         34 @_ = ();
259              
260 13         32 return;
261             }
262              
263             sub finish (@) {
264              
265 25 50 33 25 1 6941 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
266 25 100       174 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
267              
268 25 100 66     344 if ( $_pkg eq 'MCE' ) {
    100          
269 12         25 for my $_k ( keys %{ $_MCE } ) { MCE::Step->finish($_k, 1); }
  12         194  
  9         164  
270             }
271             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
272 4 50       69 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
273              
274 4         29 delete $_lkup->{$_pkg};
275 4         31 delete $_last_task_id->{$_pkg};
276              
277 4         21 delete $_user_tasks->{$_pkg};
278 4         29 delete $_prev_c->{$_pkg};
279 4         16 delete $_prev_n->{$_pkg};
280 4         17 delete $_prev_t->{$_pkg};
281 4         14 delete $_prev_w->{$_pkg};
282 4         40 delete $_MCE->{$_pkg};
283              
284 4 50       21 if (defined $_queue->{$_pkg}) {
285 4         11 local $_;
286 4         7 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  4         31  
287 4         27 delete $_queue->{$_pkg};
288             }
289             }
290              
291 25         74 @_ = ();
292              
293 25         62 return;
294             }
295              
296             ###############################################################################
297             ## ----------------------------------------------------------------------------
298             ## Parallel step with MCE -- file.
299             ##
300             ###############################################################################
301              
302             sub run_file (@) {
303              
304 4 50 33 4 1 4248 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
305              
306 4 50       10 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         14  
307 4         20 my $_pid = "$$.$_tid.".caller();
308              
309 4 50       14 if (defined (my $_p = $_params->{$_pid})) {
310 4 50       14 delete $_p->{input_data} if (exists $_p->{input_data});
311 4 50       12 delete $_p->{sequence} if (exists $_p->{sequence});
312             }
313             else {
314 0         0 $_params->{$_pid} = {};
315             }
316              
317 4         24 for my $_i ($_start_pos .. @_ - 1) {
318 8         24 my $_r = ref $_[$_i];
319 8 100 66     66 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
320 4         8 $_file = $_[$_i]; $_pos = $_i;
  4         6  
321 4         8 last;
322             }
323             }
324              
325 4 100 66     68 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
326 2 50       76 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
327 2 50       28 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
328 2 50       18 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
329 2         30 $_params->{$_pid}{_file} = $_file;
330             }
331             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
332 2         6 $_params->{$_pid}{_file} = $_file;
333             }
334             else {
335 0         0 _croak("$_tag: (file) is not specified or valid");
336             }
337              
338 4 50       22 if (defined $_pos) {
339 4         18 pop @_ for ($_pos .. @_ - 1);
340             }
341              
342 4         14 return run(@_);
343             }
344              
345             ###############################################################################
346             ## ----------------------------------------------------------------------------
347             ## Parallel step with MCE -- sequence.
348             ##
349             ###############################################################################
350              
351             sub run_seq (@) {
352              
353 2 50 33 2 1 1588 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
354              
355 2 50       6 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         8  
356 2         22 my $_pid = "$$.$_tid.".caller();
357              
358 2 50       10 if (defined (my $_p = $_params->{$_pid})) {
359 2 50       6 delete $_p->{sequence} if (exists $_p->{sequence});
360 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
361 2 50       8 delete $_p->{_file} if (exists $_p->{_file});
362             }
363             else {
364 0         0 $_params->{$_pid} = {};
365             }
366              
367 2         8 for my $_i ($_start_pos .. @_ - 1) {
368 4         10 my $_r = ref $_[$_i];
369              
370 4 50 66     54 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
371 2         4 $_pos = $_i;
372              
373 2 50 33     6 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
374 2         6 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
375             $_params->{$_pid}{sequence} = [
376 2         10 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
377             ];
378             }
379             elsif ($_r eq 'HASH') {
380 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
381 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
382             }
383             elsif ($_r eq 'ARRAY') {
384 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
385 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
386             }
387              
388 2         6 last;
389             }
390             }
391              
392             _croak("$_tag: (sequence) is not specified or valid")
393 2 50       6 unless (exists $_params->{$_pid}{sequence});
394 2 50       6 _croak("$_tag: (begin) is not specified for sequence")
395             unless (defined $_begin);
396 2 50       6 _croak("$_tag: (end) is not specified for sequence")
397             unless (defined $_end);
398              
399 2         4 $_params->{$_pid}{sequence_run} = undef;
400              
401 2 50       6 if (defined $_pos) {
402 2         10 pop @_ for ($_pos .. @_ - 1);
403             }
404              
405 2         8 return run(@_);
406             }
407              
408             ###############################################################################
409             ## ----------------------------------------------------------------------------
410             ## Parallel step with MCE.
411             ##
412             ###############################################################################
413              
414             sub run (@) {
415              
416 21 50 33 21 1 4697 shift if (defined $_[0] && $_[0] eq 'MCE::Step');
417              
418 21 100       82 my $_pkg = caller() eq 'MCE::Step' ? caller(1) : caller();
419 21         85 my $_pid = "$$.$_tid.$_pkg";
420              
421 21 100       72 if (ref $_[0] eq 'HASH') {
422 14 50       58 $_params->{$_pid} = {} unless defined $_params->{$_pid};
423 14         22 for my $_p (keys %{ $_[0] }) {
  14         70  
424 14         40 $_params->{$_pid}{$_p} = $_[0]->{$_p};
425             }
426              
427 14         32 shift;
428             }
429              
430             ## -------------------------------------------------------------------------
431              
432 21         40 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  21         38  
  21         31  
433              
434 21         34 %{ $_lkup->{$_pid} } = ();
  21         130  
435              
436 21         59 while (ref $_[0] eq 'CODE') {
437 35         111 push @_code, $_[0];
438              
439 35 50       91 if (defined (my $_p = $_params->{$_pid})) {
440             push @_name, (ref $_p->{task_name} eq 'ARRAY')
441 35 100       109 ? $_p->{task_name}->[$_pos] : undef;
442             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
443 35 50       81 ? $_p->{use_threads}->[$_pos] : undef;
444             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
445 35 100       87 ? $_p->{max_workers}->[$_pos] : undef;
446             }
447              
448 35 100       96 $_lkup->{$_pid}{ $_name[ $_pos ] } = $_pos if (defined $_name[ $_pos ]);
449              
450             $_init_mce = 1 if (
451             !defined $_prev_c->{$_pid}[$_pos] ||
452 35 100 66     159 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
453             );
454              
455 35 100       128 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
456 35 50       89 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
457 35 100       95 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
458              
459 35         61 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
460 35         51 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
461 35         60 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
462 35         57 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
463              
464 35         50 shift; $_pos++;
  35         70  
465             }
466              
467 21 50       70 if (defined $_prev_c->{$_pid}[$_pos]) {
468 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
469 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
470 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
471 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
472              
473 0         0 $_init_mce = 1;
474             }
475              
476 21 50       50 return unless (scalar @_code);
477              
478             ## -------------------------------------------------------------------------
479              
480 21         27 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  21         48  
481 21         74 my $_r = ref $_[0];
482              
483 21 100 66     130 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
484 4         8 $_input_data = shift;
485             }
486              
487 21 50       56 if (defined (my $_p = $_params->{$_pid})) {
488             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
489 21 100 66     154 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
490              
491 21 100 100     104 delete $_p->{sequence} if (defined $_input_data || scalar @_);
492 21 50       58 delete $_p->{user_func} if (exists $_p->{user_func});
493 21 50       57 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
494             }
495              
496 21 100 66     96 if (@_code > 1 && $_max_workers > 1) {
497 14         54 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
498             }
499              
500             my $_chunk_size = MCE::_parse_chunk_size(
501 21         132 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
502             $_input_data, scalar @_
503             );
504              
505 21 50       95 if (defined (my $_p = $_params->{$_pid})) {
506 21 100       100 if (exists $_p->{_file}) {
507 4         10 $_input_data = delete $_p->{_file};
508             } else {
509 17 50       60 $_input_data = $_p->{input_data} if exists $_p->{input_data};
510             }
511             }
512              
513             ## -------------------------------------------------------------------------
514              
515 21         119 MCE::_save_state($_MCE->{$_pid});
516              
517 21 100 66     131 if ($_init_mce || !exists $_queue->{$_pid}) {
518 13 50       39 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
519 13 50       50 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
520              
521 13         21 my $_Q = $_queue->{$_pid};
522 13         24 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  13         39  
  0         0  
523              
524 6         42 push @{ $_Q }, MCE::Queue->new(await => 1)
525 13         18 for (@{ $_Q } .. @_code - 2);
  13         40  
526              
527 13         37 $_last_task_id->{$_pid} = @_code - 1;
528              
529             ## must clear arrays for nested session to work with Perl < v5.14
530 13         72 _gen_user_tasks($_pid,$_Q, [@_code],[@_name],[@_thrs],[@_wrks], $_chunk_size);
531              
532 13         60 @_code = @_name = @_thrs = @_wrks = ();
533              
534             my %_opts = (
535             max_workers => $_max_workers, task_name => $_tag,
536 13         79 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
537             );
538              
539 13 50       45 if (defined (my $_p = $_params->{$_pid})) {
540 13         26 local $_;
541              
542 13         21 for (keys %{ $_p }) {
  13         47  
543 25 100 100     108 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
544 19 100 66     71 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
545 13 50 33     45 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
546              
547 13 50       26 next if ($_ eq 'chunk_size');
548 13 50       34 next if ($_ eq 'input_data');
549 13 50       26 next if ($_ eq 'sequence_run');
550 13 50       101 next if ($_ eq 'task_end');
551              
552             _croak("$_tag: ($_) is not a valid constructor argument")
553 13 50       50 unless (exists $MCE::_valid_fields_new{$_});
554              
555 13         38 $_opts{$_} = $_p->{$_};
556             }
557             }
558              
559 13         76 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
560             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
561 65 50 33     167 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
562             }
563              
564 13         143 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
565             }
566             else {
567             ## Workers may persist after running. Thus, updating the MCE instance.
568             ## These options do not require respawning.
569 8 50       26 if (defined (my $_p = $_params->{$_pid})) {
570 8         24 for my $_k (qw(
571             RS interval stderr_file stdout_file user_error user_output
572             job_delay submit_delay on_post_exit on_post_run user_args
573             flush_file flush_stderr flush_stdout gather max_retries
574             )) {
575 128 100       244 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
576             }
577             }
578             }
579              
580             ## -------------------------------------------------------------------------
581              
582 21 100       38 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  21         38  
  21         91  
583              
584 21 100       88 if (defined $_input_data) {
    100          
585 8         14 @_ = ();
586 8         50 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
587 7         44 delete $_MCE->{$_pid}{input_data};
588             }
589             elsif (scalar @_) {
590 6         132 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
591 2         30 delete $_MCE->{$_pid}{input_data};
592             }
593             else {
594 7 100 66     63 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
595             $_MCE->{$_pid}->run({
596             chunk_size => $_chunk_size,
597             sequence => $_params->{$_pid}{sequence}
598 2         20 }, 0);
599 2 50       18 if (exists $_params->{$_pid}{sequence_run}) {
600 2         4 delete $_params->{$_pid}{sequence_run};
601 2         4 delete $_params->{$_pid}{sequence};
602             }
603 2         8 delete $_MCE->{$_pid}{sequence};
604             }
605             else {
606 5         30 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
607             }
608             }
609              
610 12         103 MCE::_restore_state();
611              
612             # destroy queue(s) if MCE::run requested workers to shutdown
613 12 50       50 if (!$_MCE->{$_pid}{_spawned}) {
614 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
615 0         0 delete $_queue->{$_pid};
616             }
617              
618 12 100       39 delete $_MCE->{$_pid}{gather} if (defined $_wa);
619              
620 12 100       226 return ((defined $_wa) ? @_a : ());
621             }
622              
623             ###############################################################################
624             ## ----------------------------------------------------------------------------
625             ## Private methods.
626             ##
627             ###############################################################################
628              
629             sub _croak {
630              
631 0     0   0 goto &MCE::_croak;
632             }
633              
634             sub _gen_user_func {
635              
636 6     6   18 my ($_qref, $_cref, $_chunk_size, $_pos) = @_;
637              
638 6         12 my $_q_in = $_qref->[$_pos - 1];
639 6         12 my $_code = $_cref->[$_pos];
640              
641             return sub {
642 10     10   34 my ($_mce) = @_;
643              
644 10         106 $_mce->{_next_jmp} = sub { goto _MCE_STEP__NEXT; };
  0         0  
645 10         41 $_mce->{_last_jmp} = sub { goto _MCE_STEP__LAST; };
  0         0  
646              
647             _MCE_STEP__NEXT:
648              
649 10         66 while (defined (local $_ = $_q_in->dequeue())) {
650 21         132 my $_args = $_mce->thaw($_); $_ = $_args->[0];
  21         54  
651 21         36 $_code->($_mce, @{ $_args });
  21         120  
652             }
653              
654             _MCE_STEP__LAST:
655              
656 10         29 return;
657 6         108 };
658             }
659              
660             sub _gen_user_tasks {
661              
662 13     13   45 my ($_pid, $_qref, $_cref, $_nref, $_tref, $_wref, $_chunk_size) = @_;
663              
664 13         19 @{ $_user_tasks->{$_pid} } = ();
  13         26  
665              
666 13         105 push @{ $_user_tasks->{$_pid} }, {
667             task_name => $_nref->[0],
668             use_threads => $_tref->[0],
669             max_workers => $_wref->[0],
670 30     30   528 user_func => sub { $_cref->[0]->(@_); return; }
  30         71  
671 13         26 };
672              
673 13         37 for my $_pos (1 .. @{ $_cref } - 1) {
  13         48  
674 6         12 push @{ $_user_tasks->{$_pid} }, {
  6         24  
675             task_name => $_nref->[$_pos],
676             use_threads => $_tref->[$_pos],
677             max_workers => $_wref->[$_pos],
678             user_func => _gen_user_func(
679             $_qref, $_cref, $_chunk_size, $_pos
680             )
681             };
682             }
683              
684 13         36 return;
685             }
686              
687             1;
688              
689             __END__
690              
691             ###############################################################################
692             ## ----------------------------------------------------------------------------
693             ## Module usage.
694             ##
695             ###############################################################################
696              
697             =head1 NAME
698              
699             MCE::Step - Parallel step model for building creative steps
700              
701             =head1 VERSION
702              
703             This document describes MCE::Step version 1.887
704              
705             =head1 DESCRIPTION
706              
707             MCE::Step is similar to L<MCE::Flow> for writing custom apps. The main
708             difference comes from the transparent use of queues between sub-tasks.
709             MCE 1.7 adds mce_enq, mce_enqp, and mce_await methods described under
710             QUEUE-LIKE FEATURES below.
711              
712             It is trivial to parallelize with mce_stream shown below.
713              
714             ## Native map function
715             my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
716              
717             ## Same as with MCE::Stream (processing from right to left)
718             @a = mce_stream
719             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
720              
721             ## Pass an array reference to have writes occur simultaneously
722             mce_stream \@a,
723             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
724              
725             However, let's have MCE::Step compute the same in parallel. Unlike the example
726             in L<MCE::Flow>, the use of MCE::Queue is totally transparent. This calls for
727             preserving output order provided by MCE::Candy.
728              
729             use MCE::Step;
730             use MCE::Candy;
731              
732             Next are the 3 sub-tasks. Compare these 3 sub-tasks with the same as described
733             in L<MCE::Flow>. The call to MCE->step simplifies the passing of data to
734             subsequent sub-task.
735              
736             sub task_a {
737             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
738             push @ans, map { $_ * 2 } @{ $chunk_ref };
739             MCE->step(\@ans, $chunk_id);
740             }
741              
742             sub task_b {
743             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
744             push @ans, map { $_ * 3 } @{ $chunk_ref };
745             MCE->step(\@ans, $chunk_id);
746             }
747              
748             sub task_c {
749             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
750             push @ans, map { $_ * 4 } @{ $chunk_ref };
751             MCE->gather($chunk_id, \@ans);
752             }
753              
754             In summary, MCE::Step builds out a MCE instance behind the scene and starts
755             running. The task_name (shown), max_workers, and use_threads options can take
756             an anonymous array for specifying the values uniquely per each sub-task.
757              
758             The task_name option is required to use ->enq, ->enqp, and ->await.
759              
760             my @a;
761              
762             mce_step {
763             task_name => [ 'a', 'b', 'c' ],
764             gather => MCE::Candy::out_iter_array(\@a)
765              
766             }, \&task_a, \&task_b, \&task_c, 1..10000;
767              
768             print "@a\n";
769              
770             =head1 STEP DEMO
771              
772             In the demonstration below, one may call ->gather or ->step any number of times
773             although ->step is not allowed in the last sub-block. Data is gathered to @arr
774             which may likely be out-of-order. Gathering data is optional. All sub-blocks
775             receive $mce as the first argument.
776              
777             First, defining 3 sub-tasks.
778              
779             use MCE::Step;
780              
781             sub task_a {
782             my ($mce, $chunk_ref, $chunk_id) = @_;
783              
784             if ($_ % 2 == 0) {
785             MCE->gather($_);
786             # MCE->gather($_ * 4); ## Ok to gather multiple times
787             }
788             else {
789             MCE->print("a step: $_, $_ * $_\n");
790             MCE->step($_, $_ * $_);
791             # MCE->step($_, $_ * 4 ); ## Ok to step multiple times
792             }
793             }
794              
795             sub task_b {
796             my ($mce, $arg1, $arg2) = @_;
797              
798             MCE->print("b args: $arg1, $arg2\n");
799              
800             if ($_ % 3 == 0) { ## $_ is the same as $arg1
801             MCE->gather($_);
802             }
803             else {
804             MCE->print("b step: $_ * $_\n");
805             MCE->step($_ * $_);
806             }
807             }
808              
809             sub task_c {
810             my ($mce, $arg1) = @_;
811              
812             MCE->print("c: $_\n");
813             MCE->gather($_);
814             }
815              
816             Next, pass MCE options, using chunk_size 1, and run all 3 tasks in parallel.
817             Notice how max_workers and use_threads can take an anonymous array, similarly
818             to task_name.
819              
820             my @arr = mce_step {
821             task_name => [ 'a', 'b', 'c' ],
822             max_workers => [ 2, 2, 2 ],
823             use_threads => [ 0, 0, 0 ],
824             chunk_size => 1
825              
826             }, \&task_a, \&task_b, \&task_c, 1..10;
827              
828             Finally, sort the array and display its contents.
829              
830             @arr = sort { $a <=> $b } @arr;
831              
832             print "\n@arr\n\n";
833              
834             -- Output
835              
836             a step: 1, 1 * 1
837             a step: 3, 3 * 3
838             a step: 5, 5 * 5
839             a step: 7, 7 * 7
840             a step: 9, 9 * 9
841             b args: 1, 1
842             b step: 1 * 1
843             b args: 3, 9
844             b args: 7, 49
845             b step: 7 * 7
846             b args: 5, 25
847             b step: 5 * 5
848             b args: 9, 81
849             c: 1
850             c: 49
851             c: 25
852              
853             1 2 3 4 6 8 9 10 25 49
854              
855             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
856              
857             Although L<MCE::Loop> may be preferred for running using a single code block,
858             the text below also applies to this module, particularly for the first block.
859              
860             All models in MCE default to 'auto' for chunk_size. The arguments for the block
861             are the same as writing a user_func block using the Core API.
862              
863             Beginning with MCE 1.5, the next input item is placed into the input scalar
864             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
865             containing many items. Basically, line 2 below may be omitted from your code
866             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
867              
868             line 1: user_func => sub {
869             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
870             line 3:
871             line 4: $_ points to $chunk_ref->[0]
872             line 5: in MCE 1.5 when chunk_size == 1
873             line 6:
874             line 7: $_ points to $chunk_ref
875             line 8: in MCE 1.5 when chunk_size > 1
876             line 9: }
877              
878             Follow this synopsis when chunk_size equals one. Looping is not required from
879             inside the first block. Hence, the block is called once per each item.
880              
881             ## Exports mce_step, mce_step_f, and mce_step_s
882             use MCE::Step;
883              
884             MCE::Step->init(
885             chunk_size => 1
886             );
887              
888             ## Array or array_ref
889             mce_step sub { do_work($_) }, 1..10000;
890             mce_step sub { do_work($_) }, \@list;
891              
892             ## Important; pass an array_ref for deeply input data
893             mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
894             mce_step sub { do_work($_) }, \@deeply_list;
895              
896             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
897             ## Workers read directly and not involve the manager process
898             mce_step_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
899              
900             ## Involves the manager process, therefore slower
901             mce_step_f sub { chomp; do_work($_) }, $file_handle;
902             mce_step_f sub { chomp; do_work($_) }, $io;
903             mce_step_f sub { chomp; do_work($_) }, \$scalar;
904              
905             ## Sequence of numbers (begin, end [, step, format])
906             mce_step_s sub { do_work($_) }, 1, 10000, 5;
907             mce_step_s sub { do_work($_) }, [ 1, 10000, 5 ];
908              
909             mce_step_s sub { do_work($_) }, {
910             begin => 1, end => 10000, step => 5, format => undef
911             };
912              
913             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
914              
915             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
916             This means having to loop through the chunk from inside the first block.
917              
918             use MCE::Step;
919              
920             MCE::Step->init( ## Chunk_size defaults to 'auto' when
921             chunk_size => 'auto' ## not specified. Therefore, the init
922             ); ## function may be omitted.
923              
924             ## Syntax is shown for mce_step for demonstration purposes.
925             ## Looping inside the block is the same for mce_step_f and
926             ## mce_step_s.
927              
928             ## Array or array_ref
929             mce_step sub { do_work($_) for (@{ $_ }) }, 1..10000;
930             mce_step sub { do_work($_) for (@{ $_ }) }, \@list;
931              
932             ## Important; pass an array_ref for deeply input data
933             mce_step sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
934             mce_step sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
935              
936             ## Resembles code using the core MCE API
937             mce_step sub {
938             my ($mce, $chunk_ref, $chunk_id) = @_;
939              
940             for (@{ $chunk_ref }) {
941             do_work($_);
942             }
943              
944             }, 1..10000;
945              
946             Chunking reduces the number of IPC calls behind the scene. Think in terms of
947             chunks whenever processing a large amount of data. For relatively small data,
948             choosing 1 for chunk_size is fine.
949              
950             =head1 OVERRIDING DEFAULTS
951              
952             The following list options which may be overridden when loading the module.
953             The fast option is obsolete in 1.867 onwards; ignored if specified.
954              
955             use Sereal qw( encode_sereal decode_sereal );
956             use CBOR::XS qw( encode_cbor decode_cbor );
957             use JSON::XS qw( encode_json decode_json );
958              
959             use MCE::Step
960             max_workers => 8, # Default 'auto'
961             chunk_size => 500, # Default 'auto'
962             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
963             freeze => \&encode_sereal, # \&Storable::freeze
964             thaw => \&decode_sereal, # \&Storable::thaw
965             init_relay => 0, # Default undef; MCE 1.882+
966             use_threads => 0, # Default undef; MCE 1.882+
967             ;
968              
969             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
970             Specify C<< Sereal => 0 >> to use Storable instead.
971              
972             use MCE::Step Sereal => 0;
973              
974             =head1 CUSTOMIZING MCE
975              
976             =over 3
977              
978             =item MCE::Step->init ( options )
979              
980             =item MCE::Step::init { options }
981              
982             =back
983              
984             The init function accepts a hash of MCE options. Unlike with MCE::Stream,
985             both gather and bounds_only options may be specified when calling init
986             (not shown below).
987              
988             use MCE::Step;
989              
990             MCE::Step->init(
991             chunk_size => 1, max_workers => 4,
992              
993             user_begin => sub {
994             print "## ", MCE->wid, " started\n";
995             },
996              
997             user_end => sub {
998             print "## ", MCE->wid, " completed\n";
999             }
1000             );
1001              
1002             my %a = mce_step sub { MCE->gather($_, $_ * $_) }, 1..100;
1003              
1004             print "\n", "@a{1..100}", "\n";
1005              
1006             -- Output
1007              
1008             ## 3 started
1009             ## 1 started
1010             ## 4 started
1011             ## 2 started
1012             ## 3 completed
1013             ## 4 completed
1014             ## 1 completed
1015             ## 2 completed
1016              
1017             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
1018             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
1019             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
1020             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
1021             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
1022             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
1023             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
1024             10000
1025              
1026             Like with MCE::Step->init above, MCE options may be specified using an
1027             anonymous hash for the first argument. Notice how task_name, max_workers,
1028             and use_threads can take an anonymous array for setting uniquely per
1029             each code block.
1030              
1031             Unlike MCE::Stream which processes from right-to-left, MCE::Step begins
1032             with the first code block, thus processing from left-to-right.
1033              
1034             The following takes 9 seconds to complete. The 9 seconds is from having
1035             only 2 workers assigned for the last sub-task and waiting 1 or 2 seconds
1036             initially before calling MCE->step.
1037              
1038             Removing both calls to MCE->step will cause the script to complete in just
1039             1 second. The reason is due to the 2nd and subsequent sub-tasks awaiting
1040             data from an internal queue. Workers terminate upon receiving an undef.
1041              
1042             use threads;
1043             use MCE::Step;
1044              
1045             my @a = mce_step {
1046             task_name => [ 'a', 'b', 'c' ],
1047             max_workers => [ 3, 4, 2, ],
1048             use_threads => [ 1, 0, 0, ],
1049              
1050             user_end => sub {
1051             my ($mce, $task_id, $task_name) = @_;
1052             MCE->print("$task_id - $task_name completed\n");
1053             },
1054              
1055             task_end => sub {
1056             my ($mce, $task_id, $task_name) = @_;
1057             MCE->print("$task_id - $task_name ended\n");
1058             }
1059             },
1060             sub { sleep 1; MCE->step(""); }, ## 3 workers, named a
1061             sub { sleep 2; MCE->step(""); }, ## 4 workers, named b
1062             sub { sleep 3; }; ## 2 workers, named c
1063              
1064             -- Output
1065              
1066             0 - a completed
1067             0 - a completed
1068             0 - a completed
1069             0 - a ended
1070             1 - b completed
1071             1 - b completed
1072             1 - b completed
1073             1 - b completed
1074             1 - b ended
1075             2 - c completed
1076             2 - c completed
1077             2 - c ended
1078              
1079             =head1 API DOCUMENTATION
1080              
1081             Although input data is optional for MCE::Step, the following assumes chunk_size
1082             equals 1 in order to demonstrate all the possibilities for providing input data.
1083              
1084             =over 3
1085              
1086             =item MCE::Step->run ( sub { code }, list )
1087              
1088             =item mce_step sub { code }, list
1089              
1090             =back
1091              
1092             Input data may be defined using a list, an array ref, or a hash ref.
1093              
1094             Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Step takes a
1095             C<sub { ... }> or a code reference. The other difference is that the comma is
1096             needed after the block.
1097              
1098             # $_ contains the item when chunk_size => 1
1099              
1100             mce_step sub { do_work($_) }, 1..1000;
1101             mce_step sub { do_work($_) }, \@list;
1102              
1103             # Important; pass an array_ref for deeply input data
1104              
1105             mce_step sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
1106             mce_step sub { do_work($_) }, \@deeply_list;
1107              
1108             # Chunking; any chunk_size => 1 or greater
1109              
1110             my %res = mce_step sub {
1111             my ($mce, $chunk_ref, $chunk_id) = @_;
1112             my %ret;
1113             for my $item (@{ $chunk_ref }) {
1114             $ret{$item} = $item * 2;
1115             }
1116             MCE->gather(%ret);
1117             },
1118             \@list;
1119              
1120             # Input hash; current API available since 1.828
1121              
1122             my %res = mce_step sub {
1123             my ($mce, $chunk_ref, $chunk_id) = @_;
1124             my %ret;
1125             for my $key (keys %{ $chunk_ref }) {
1126             $ret{$key} = $chunk_ref->{$key} * 2;
1127             }
1128             MCE->gather(%ret);
1129             },
1130             \%hash;
1131              
1132             # Unlike MCE::Loop, MCE::Step doesn't need input to run
1133              
1134             mce_step { max_workers => 4 }, sub {
1135             MCE->say( MCE->wid );
1136             };
1137              
1138             # ... and can run multiple tasks
1139              
1140             mce_step {
1141             max_workers => [ 1, 3 ],
1142             task_name => [ 'p', 'c' ]
1143             },
1144             sub {
1145             # 1 producer
1146             MCE->say( "producer: ", MCE->wid );
1147             },
1148             sub {
1149             # 3 consumers
1150             MCE->say( "consumer: ", MCE->wid );
1151             };
1152              
1153             # Here, options are specified via init
1154              
1155             MCE::Step->init(
1156             max_workers => [ 1, 3 ],
1157             task_name => [ 'p', 'c' ]
1158             );
1159              
1160             mce_step \&producer, \&consumers;
1161              
1162             =over 3
1163              
1164             =item MCE::Step->run_file ( sub { code }, file )
1165              
1166             =item mce_step_f sub { code }, file
1167              
1168             =back
1169              
1170             The fastest of these is the /path/to/file. Workers communicate the next offset
1171             position among themselves with zero interaction by the manager process.
1172              
1173             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
1174              
1175             # $_ contains the line when chunk_size => 1
1176              
1177             mce_step_f sub { $_ }, "/path/to/file"; # faster
1178             mce_step_f sub { $_ }, $file_handle;
1179             mce_step_f sub { $_ }, $io; # IO::All
1180             mce_step_f sub { $_ }, \$scalar;
1181              
1182             # chunking, any chunk_size => 1 or greater
1183              
1184             my %res = mce_step_f sub {
1185             my ($mce, $chunk_ref, $chunk_id) = @_;
1186             my $buf = '';
1187             for my $line (@{ $chunk_ref }) {
1188             $buf .= $line;
1189             }
1190             MCE->gather($chunk_id, $buf);
1191             },
1192             "/path/to/file";
1193              
1194             =over 3
1195              
1196             =item MCE::Step->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
1197              
1198             =item mce_step_s sub { code }, $beg, $end [, $step, $fmt ]
1199              
1200             =back
1201              
1202             Sequence may be defined as a list, an array reference, or a hash reference.
1203             The functions require both begin and end values to run. Step and format are
1204             optional. The format is passed to sprintf (% may be omitted below).
1205              
1206             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
1207              
1208             # $_ contains the sequence number when chunk_size => 1
1209              
1210             mce_step_s sub { $_ }, $beg, $end, $step, $fmt;
1211             mce_step_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1212              
1213             mce_step_s sub { $_ }, {
1214             begin => $beg, end => $end,
1215             step => $step, format => $fmt
1216             };
1217              
1218             # chunking, any chunk_size => 1 or greater
1219              
1220             my %res = mce_step_s sub {
1221             my ($mce, $chunk_ref, $chunk_id) = @_;
1222             my $buf = '';
1223             for my $seq (@{ $chunk_ref }) {
1224             $buf .= "$seq\n";
1225             }
1226             MCE->gather($chunk_id, $buf);
1227             },
1228             [ $beg, $end ];
1229              
1230             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
1231             and not the items in between (hence boundaries only). This option applies
1232             to sequence only and has no effect when chunk_size equals 1.
1233              
1234             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
1235             option due to computing all items in between, thus creating a very large
1236             array. Basically, specify bounds_only => 1 when boundaries is all you need
1237             for looping inside the block; e.g. Monte Carlo simulations.
1238              
1239             Time was measured using 1 worker to emphasize the difference.
1240              
1241             use MCE::Step;
1242              
1243             MCE::Step->init(
1244             max_workers => 1, chunk_size => 1_250_000,
1245             bounds_only => 1
1246             );
1247              
1248             # Typically, the input scalar $_ contains the sequence number
1249             # when chunk_size => 1, unless the bounds_only option is set
1250             # which is the case here. Thus, $_ points to $chunk_ref.
1251              
1252             mce_step_s sub {
1253             my ($mce, $chunk_ref, $chunk_id) = @_;
1254              
1255             # $chunk_ref contains 2 items, not 1_250_000
1256             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
1257              
1258             my $begin = $chunk_ref->[0];
1259             my $end = $chunk_ref->[1];
1260              
1261             # for my $seq ( $begin .. $end ) {
1262             # ...
1263             # }
1264              
1265             MCE->printf("%7d .. %8d\n", $begin, $end);
1266             },
1267             [ 1, 10_000_000 ];
1268              
1269             -- Output
1270              
1271             1 .. 1250000
1272             1250001 .. 2500000
1273             2500001 .. 3750000
1274             3750001 .. 5000000
1275             5000001 .. 6250000
1276             6250001 .. 7500000
1277             7500001 .. 8750000
1278             8750001 .. 10000000
1279              
1280             =over 3
1281              
1282             =item MCE::Step->run ( { input_data => iterator }, sub { code } )
1283              
1284             =item mce_step { input_data => iterator }, sub { code }
1285              
1286             =back
1287              
1288             An iterator reference may be specified for input_data. The only other way
1289             is to specify input_data via MCE::Step->init. This prevents MCE::Step from
1290             configuring the iterator reference as another user task which will not work.
1291              
1292             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1293              
1294             MCE::Step->init(
1295             input_data => iterator
1296             );
1297              
1298             mce_step sub { $_ };
1299              
1300             =head1 QUEUE-LIKE FEATURES
1301              
1302             =over 3
1303              
1304             =item MCE->step ( item )
1305              
1306             =item MCE->step ( arg1, arg2, argN )
1307              
1308             =back
1309              
1310             The ->step method is the simplest form for passing elements into the next
1311             sub-task.
1312              
1313             use MCE::Step;
1314              
1315             sub provider {
1316             MCE->step( $_, rand ) for 10 .. 19;
1317             }
1318              
1319             sub consumer {
1320             my ( $mce, @args ) = @_;
1321             MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args[0], $args[1] );
1322             }
1323              
1324             MCE::Step->init(
1325             task_name => [ 'p', 'c' ],
1326             max_workers => [ 1 , 4 ]
1327             );
1328              
1329             mce_step \&provider, \&consumer;
1330              
1331             -- Output
1332              
1333             2: 10, 0.583551
1334             4: 11, 0.175319
1335             3: 12, 0.843662
1336             4: 15, 0.748302
1337             2: 14, 0.591752
1338             3: 16, 0.357858
1339             5: 13, 0.953528
1340             4: 17, 0.698907
1341             2: 18, 0.985448
1342             3: 19, 0.146548
1343              
1344             =over 3
1345              
1346             =item MCE->enq ( task_name, item )
1347              
1348             =item MCE->enq ( task_name, [ arg1, arg2, argN ] )
1349              
1350             =item MCE->enq ( task_name, [ arg1, arg2 ], [ arg1, arg2 ] )
1351              
1352             =item MCE->enqp ( task_name, priority, item )
1353              
1354             =item MCE->enqp ( task_name, priority, [ arg1, arg2, argN ] )
1355              
1356             =item MCE->enqp ( task_name, priority, [ arg1, arg2 ], [ arg1, arg2 ] )
1357              
1358             =back
1359              
1360             The MCE 1.7 release enables finer control. Unlike ->step, which take multiple
1361             arguments, the ->enq and ->enqp methods push items at the end of the array
1362             internally. Passing multiple arguments is possible by enclosing the arguments
1363             inside an anonymous array.
1364              
1365             The direction of flow is forward only. Thus, stepping to itself or backwards
1366             will cause an error.
1367              
1368             use MCE::Step;
1369              
1370             sub provider {
1371             if ( MCE->wid % 2 == 0 ) {
1372             MCE->enq( 'c', [ $_, rand ] ) for 10 .. 19;
1373             } else {
1374             MCE->enq( 'd', [ $_, rand ] ) for 20 .. 29;
1375             }
1376             }
1377              
1378             sub consumer_c {
1379             my ( $mce, $args ) = @_;
1380             MCE->printf( "C%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1381             }
1382              
1383             sub consumer_d {
1384             my ( $mce, $args ) = @_;
1385             MCE->printf( "D%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1386             }
1387              
1388             MCE::Step->init(
1389             task_name => [ 'p', 'c', 'd' ],
1390             max_workers => [ 2 , 3 , 3 ]
1391             );
1392              
1393             mce_step \&provider, \&consumer_c, \&consumer_d;
1394              
1395             -- Output
1396              
1397             C4: 10, 0.527531
1398             D6: 20, 0.420108
1399             C5: 11, 0.839770
1400             D8: 21, 0.386414
1401             C3: 12, 0.834645
1402             C4: 13, 0.191014
1403             D6: 23, 0.924027
1404             C5: 14, 0.899357
1405             D8: 24, 0.706186
1406             C4: 15, 0.083823
1407             D7: 22, 0.479708
1408             D6: 25, 0.073882
1409             C3: 16, 0.207446
1410             D8: 26, 0.560755
1411             C5: 17, 0.198157
1412             D7: 27, 0.324909
1413             C4: 18, 0.147505
1414             C5: 19, 0.318371
1415             D6: 28, 0.220465
1416             D8: 29, 0.630111
1417              
1418             =over 3
1419              
1420             =item MCE->await ( task_name, pending_threshold )
1421              
1422             =back
1423              
1424             Providers may sometime run faster than consumers. Thus, increasing memory
1425             consumption. MCE 1.7 adds the ->await method for pausing momentarily until
1426             the receiving sub-task reaches the minimum threshold for the number of
1427             items pending in its queue.
1428              
1429             use MCE::Step;
1430             use Time::HiRes 'sleep';
1431              
1432             sub provider {
1433             for ( 10 .. 29 ) {
1434             # wait until 10 or less items pending
1435             MCE->await( 'c', 10 );
1436             # forward item to a later sub-task ( 'c' comes after 'p' )
1437             MCE->enq( 'c', [ $_, rand ] );
1438             }
1439             }
1440              
1441             sub consumer {
1442             my ($mce, $args) = @_;
1443             MCE->printf( "%d: %d, %03.06f\n", MCE->wid, $args->[0], $args->[1] );
1444             sleep 0.05;
1445             }
1446              
1447             MCE::Step->init(
1448             task_name => [ 'p', 'c' ],
1449             max_workers => [ 1 , 4 ]
1450             );
1451              
1452             mce_step \&provider, \&consumer;
1453              
1454             -- Output
1455              
1456             3: 10, 0.527307
1457             2: 11, 0.036193
1458             5: 12, 0.987168
1459             4: 13, 0.998140
1460             5: 14, 0.219526
1461             4: 15, 0.061609
1462             2: 16, 0.557664
1463             3: 17, 0.658684
1464             4: 18, 0.240932
1465             3: 19, 0.241042
1466             5: 20, 0.884830
1467             2: 21, 0.902223
1468             4: 22, 0.699223
1469             3: 23, 0.208270
1470             5: 24, 0.438919
1471             2: 25, 0.268854
1472             4: 26, 0.596425
1473             5: 27, 0.979818
1474             2: 28, 0.918173
1475             3: 29, 0.358266
1476              
1477             =head1 GATHERING DATA
1478              
1479             Unlike MCE::Map where gather and output order are done for you automatically,
1480             the gather method is used to have results sent back to the manager process.
1481              
1482             use MCE::Step chunk_size => 1;
1483              
1484             ## Output order is not guaranteed.
1485             my @a = mce_step sub { MCE->gather($_ * 2) }, 1..100;
1486             print "@a\n\n";
1487              
1488             ## Outputs to a hash instead (key, value).
1489             my %h1 = mce_step sub { MCE->gather($_, $_ * 2) }, 1..100;
1490             print "@h1{1..100}\n\n";
1491              
1492             ## This does the same thing due to chunk_id starting at one.
1493             my %h2 = mce_step sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
1494             print "@h2{1..100}\n\n";
1495              
1496             The gather method may be called multiple times within the block unlike return
1497             which would leave the block. Therefore, think of gather as yielding results
1498             immediately to the manager process without actually leaving the block.
1499              
1500             use MCE::Step chunk_size => 1, max_workers => 3;
1501              
1502             my @hosts = qw(
1503             hosta hostb hostc hostd hoste
1504             );
1505              
1506             my %h3 = mce_step sub {
1507             my ($output, $error, $status); my $host = $_;
1508              
1509             ## Do something with $host;
1510             $output = "Worker ". MCE->wid .": Hello from $host";
1511              
1512             if (MCE->chunk_id % 3 == 0) {
1513             ## Simulating an error condition
1514             local $? = 1; $status = $?;
1515             $error = "Error from $host"
1516             }
1517             else {
1518             $status = 0;
1519             }
1520              
1521             ## Ensure unique keys (key, value) when gathering to
1522             ## a hash.
1523             MCE->gather("$host.out", $output);
1524             MCE->gather("$host.err", $error) if (defined $error);
1525             MCE->gather("$host.sta", $status);
1526              
1527             }, @hosts;
1528              
1529             foreach my $host (@hosts) {
1530             print $h3{"$host.out"}, "\n";
1531             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
1532             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
1533             }
1534              
1535             -- Output
1536              
1537             Worker 3: Hello from hosta
1538             Exit status: 0
1539              
1540             Worker 2: Hello from hostb
1541             Exit status: 0
1542              
1543             Worker 1: Hello from hostc
1544             Error from hostc
1545             Exit status: 1
1546              
1547             Worker 3: Hello from hostd
1548             Exit status: 0
1549              
1550             Worker 2: Hello from hoste
1551             Exit status: 0
1552              
1553             The following uses an anonymous array containing 3 elements when gathering
1554             data. Serialization is automatic behind the scene.
1555              
1556             my %h3 = mce_step sub {
1557             ...
1558              
1559             MCE->gather($host, [$output, $error, $status]);
1560              
1561             }, @hosts;
1562              
1563             foreach my $host (@hosts) {
1564             print $h3{$host}->[0], "\n";
1565             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
1566             print "Exit status: ", $h3{$host}->[2], "\n\n";
1567             }
1568              
1569             Although MCE::Map comes to mind, one may want additional control when
1570             gathering data such as retaining output order.
1571              
1572             use MCE::Step;
1573              
1574             sub preserve_order {
1575             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
1576              
1577             return sub {
1578             $tmp{ (shift) } = \@_;
1579              
1580             while (1) {
1581             last unless exists $tmp{$order_id};
1582             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
1583             }
1584              
1585             return;
1586             };
1587             }
1588              
1589             ## Workers persist for the most part after running. Though, not always
1590             ## the case and depends on Perl. Pass a reference to a subroutine if
1591             ## workers must persist; e.g. mce_step { ... }, \&foo, 1..100000.
1592              
1593             MCE::Step->init(
1594             chunk_size => 'auto', max_workers => 'auto'
1595             );
1596              
1597             for (1..2) {
1598             my @m2;
1599              
1600             mce_step {
1601             gather => preserve_order(\@m2)
1602             },
1603             sub {
1604             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1605              
1606             ## Compute the entire chunk data at once.
1607             push @a, map { $_ * 2 } @{ $chunk_ref };
1608              
1609             ## Afterwards, invoke the gather feature, which
1610             ## will direct the data to the callback function.
1611             MCE->gather(MCE->chunk_id, @a);
1612              
1613             }, 1..100000;
1614              
1615             print scalar @m2, "\n";
1616             }
1617              
1618             MCE::Step->finish;
1619              
1620             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
1621             models as the basis for providing JIT for MCE. They create the instance, tune
1622             max_workers, and tune chunk_size automatically regardless of the hardware.
1623              
1624             The following does the same thing using the Core API. Workers persist after
1625             running.
1626              
1627             use MCE;
1628              
1629             sub preserve_order {
1630             ...
1631             }
1632              
1633             my $mce = MCE->new(
1634             max_workers => 'auto', chunk_size => 8000,
1635              
1636             user_func => sub {
1637             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1638              
1639             ## Compute the entire chunk data at once.
1640             push @a, map { $_ * 2 } @{ $chunk_ref };
1641              
1642             ## Afterwards, invoke the gather feature, which
1643             ## will direct the data to the callback function.
1644             MCE->gather(MCE->chunk_id, @a);
1645             }
1646             );
1647              
1648             for (1..2) {
1649             my @m2;
1650              
1651             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
1652              
1653             print scalar @m2, "\n";
1654             }
1655              
1656             $mce->shutdown;
1657              
1658             =head1 MANUAL SHUTDOWN
1659              
1660             =over 3
1661              
1662             =item MCE::Step->finish
1663              
1664             =item MCE::Step::finish
1665              
1666             =back
1667              
1668             Workers remain persistent as much as possible after running. Shutdown occurs
1669             automatically when the script terminates. Call finish when workers are no
1670             longer needed.
1671              
1672             use MCE::Step;
1673              
1674             MCE::Step->init(
1675             chunk_size => 20, max_workers => 'auto'
1676             );
1677              
1678             mce_step sub { ... }, 1..100;
1679              
1680             MCE::Step->finish;
1681              
1682             =head1 INDEX
1683              
1684             L<MCE|MCE>, L<MCE::Core>
1685              
1686             =head1 AUTHOR
1687              
1688             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1689              
1690             =cut
1691