File Coverage

blib/lib/MCE/Flow.pm
Criterion Covered Total %
statement 198 235 84.2
branch 106 178 59.5
condition 43 74 58.1
subroutine 14 16 87.5
pod 5 5 100.0
total 366 508 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel flow model for building creative applications.
4             ##
5             ###############################################################################
6              
7             package MCE::Flow;
8              
9 37     37   2626900 use strict;
  37         302  
  37         937  
10 37     37   154 use warnings;
  37         51  
  37         924  
11              
12 37     37   156 no warnings qw( threads recursion uninitialized );
  37         108  
  37         1640  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 37     37   184 use Scalar::Util qw( looks_like_number );
  37         55  
  37         1895  
21 37     37   16007 use MCE;
  37         83  
  37         199  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow');
38             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
39             my ($_user_tasks) = ({});
40              
41             sub import {
42 37     37   467 my ($_class, $_pkg) = (shift, caller);
43              
44 37         142 my $_p = $_def->{$_pkg} = {
45             MAX_WORKERS => 'auto',
46             CHUNK_SIZE => 'auto',
47             };
48              
49             ## Import functions.
50 37 50       112 if ($_pkg !~ /^MCE::/) {
51 37     37   473 no strict 'refs'; no warnings 'redefine';
  37     37   103  
  37         1389  
  37         175  
  37         54  
  37         91627  
52 37         76 *{ $_pkg.'::mce_flow_f' } = \&run_file;
  37         207  
53 37         83 *{ $_pkg.'::mce_flow_s' } = \&run_seq;
  37         113  
54 37         65 *{ $_pkg.'::mce_flow' } = \&run;
  37         109  
55             }
56              
57             ## Process module arguments.
58 37         147 while ( my $_argument = shift ) {
59 0         0 my $_arg = lc $_argument;
60              
61 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
62 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
63 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
64 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
65 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
66 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
67 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
68              
69             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
70 0 0       0 if ( $_arg eq 'sereal' ) {
71 0 0       0 if ( shift eq '0' ) {
72 0         0 require Storable;
73 0         0 $_p->{FREEZE} = \&Storable::freeze;
74 0         0 $_p->{THAW} = \&Storable::thaw;
75             }
76 0         0 next;
77             }
78              
79 0         0 _croak("Error: ($_argument) invalid module option");
80             }
81              
82 37         172 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
83              
84 37         150 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
85             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
86 37 50       103 unless ($_p->{CHUNK_SIZE} eq 'auto');
87              
88 37         518 return;
89             }
90              
91             ###############################################################################
92             ## ----------------------------------------------------------------------------
93             ## Init and finish routines.
94             ##
95             ###############################################################################
96              
97             sub init (@) {
98              
99 29 100 66 29 1 13185 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
100 29         205 my $_pkg = "$$.$_tid.".caller();
101              
102 29 50       297 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
103              
104 29         79 @_ = ();
105              
106 29         67 return;
107             }
108              
109             sub finish (@) {
110              
111 101 50 33 101 1 11868 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
112 101 100       929 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
113              
114 101 100 66     1455 if ( $_pkg eq 'MCE' ) {
    100          
115 37         106 for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); }
  37         712  
  29         573  
116             }
117             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
118 35 50       717 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
119              
120 35         225 delete $_user_tasks->{$_pkg};
121 35         370 delete $_prev_c->{$_pkg};
122 35         229 delete $_prev_n->{$_pkg};
123 35         121 delete $_prev_t->{$_pkg};
124 35         127 delete $_prev_w->{$_pkg};
125 35         387 delete $_MCE->{$_pkg};
126             }
127              
128 101         323 @_ = ();
129              
130 101         304 return;
131             }
132              
133             ###############################################################################
134             ## ----------------------------------------------------------------------------
135             ## Parallel flow with MCE -- file.
136             ##
137             ###############################################################################
138              
139             sub run_file (@) {
140              
141 4 50 33 4 1 5832 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
142              
143 4 50       12 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         14  
144 4         22 my $_pid = "$$.$_tid.".caller();
145              
146 4 50       12 if (defined (my $_p = $_params->{$_pid})) {
147 4 50       10 delete $_p->{input_data} if (exists $_p->{input_data});
148 4 50       16 delete $_p->{sequence} if (exists $_p->{sequence});
149             }
150             else {
151 0         0 $_params->{$_pid} = {};
152             }
153              
154 4         26 for my $_i ($_start_pos .. @_ - 1) {
155 8         20 my $_r = ref $_[$_i];
156 8 100 66     88 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
157 4         20 $_file = $_[$_i]; $_pos = $_i;
  4         24  
158 4         10 last;
159             }
160             }
161              
162 4 100 66     64 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
163 2 50       50 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
164 2 50       26 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
165 2 50       152 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
166 2         26 $_params->{$_pid}{_file} = $_file;
167             }
168             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
169 2         6 $_params->{$_pid}{_file} = $_file;
170             }
171             else {
172 0         0 _croak("$_tag: (file) is not specified or valid");
173             }
174              
175 4 50       14 if (defined $_pos) {
176 4         16 pop @_ for ($_pos .. @_ - 1);
177             }
178              
179 4         12 return run(@_);
180             }
181              
182             ###############################################################################
183             ## ----------------------------------------------------------------------------
184             ## Parallel flow with MCE -- sequence.
185             ##
186             ###############################################################################
187              
188             sub run_seq (@) {
189              
190 2 50 33 2 1 4036 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
191              
192 2 50       6 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         8  
193 2         12 my $_pid = "$$.$_tid.".caller();
194              
195 2 50       6 if (defined (my $_p = $_params->{$_pid})) {
196 2 50       8 delete $_p->{sequence} if (exists $_p->{sequence});
197 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
198 2 50       8 delete $_p->{_file} if (exists $_p->{_file});
199             }
200             else {
201 0         0 $_params->{$_pid} = {};
202             }
203              
204 2         22 for my $_i ($_start_pos .. @_ - 1) {
205 4         96 my $_r = ref $_[$_i];
206              
207 4 50 66     52 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
208 2         4 $_pos = $_i;
209              
210 2 50 33     10 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
211 2         4 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
212             $_params->{$_pid}{sequence} = [
213 2         10 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
214             ];
215             }
216             elsif ($_r eq 'HASH') {
217 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
218 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
219             }
220             elsif ($_r eq 'ARRAY') {
221 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
222 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
223             }
224              
225 2         6 last;
226             }
227             }
228              
229             _croak("$_tag: (sequence) is not specified or valid")
230 2 50       6 unless (exists $_params->{$_pid}{sequence});
231 2 50       116 _croak("$_tag: (begin) is not specified for sequence")
232             unless (defined $_begin);
233 2 50       8 _croak("$_tag: (end) is not specified for sequence")
234             unless (defined $_end);
235              
236 2         6 $_params->{$_pid}{sequence_run} = undef;
237              
238 2 50       8 if (defined $_pos) {
239 2         10 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 2         8 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel flow with MCE.
248             ##
249             ###############################################################################
250              
251             sub run (@) {
252              
253 74 50 33 74 1 37765 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
254              
255 74 100       377 my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller();
256 74         440 my $_pid = "$$.$_tid.$_pkg";
257              
258 74 100       428 if (ref $_[0] eq 'HASH') {
259 48 100       196 $_params->{$_pid} = {} unless defined $_params->{$_pid};
260 48         171 for my $_p (keys %{ $_[0] }) {
  48         272  
261 72         289 $_params->{$_pid}{$_p} = $_[0]->{$_p};
262             }
263              
264 48         117 shift;
265             }
266              
267             ## -------------------------------------------------------------------------
268              
269 74         191 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  74         697  
  74         121  
270              
271 74         723 while (ref $_[0] eq 'CODE') {
272 94         337 push @_code, $_[0];
273              
274 94 50       443 if (defined (my $_p = $_params->{$_pid})) {
275             push @_name, (ref $_p->{task_name} eq 'ARRAY')
276 94 100       402 ? $_p->{task_name}->[$_pos] : undef;
277             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
278 94 50       270 ? $_p->{use_threads}->[$_pos] : undef;
279             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
280 94 100       307 ? $_p->{max_workers}->[$_pos] : undef;
281             }
282              
283             $_init_mce = 1 if (
284             !defined $_prev_c->{$_pid}[$_pos] ||
285 94 100 66     556 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
286             );
287              
288 94 100       304 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
289 94 50       291 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
290 94 100       272 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
291              
292 94         210 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
293 94         215 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
294 94         163 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
295 94         165 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
296              
297 94         142 shift; $_pos++;
  94         282  
298             }
299              
300 74 50       258 if (defined $_prev_c->{$_pid}[$_pos]) {
301 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
302 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
303 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
304 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
305              
306 0         0 $_init_mce = 1;
307             }
308              
309 74 50       176 return unless (scalar @_code);
310              
311             ## -------------------------------------------------------------------------
312              
313 74         124 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  74         179  
314 74         256 my $_r = ref $_[0];
315              
316 74 100 66     358 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
317 4         10 $_input_data = shift;
318             }
319              
320 74 50       247 if (defined (my $_p = $_params->{$_pid})) {
321             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
322 74 100 66     1038 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
323              
324 74 100 100     440 delete $_p->{sequence} if (defined $_input_data || scalar @_);
325 74 50       250 delete $_p->{user_func} if (exists $_p->{user_func});
326 74 50       194 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
327             }
328              
329 74 100 66     410 if (@_code > 1 && $_max_workers > 1) {
330 20         96 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
331             }
332              
333             my $_chunk_size = MCE::_parse_chunk_size(
334 74         462 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
335             $_input_data, scalar @_
336             );
337              
338 74 50       269 if (defined (my $_p = $_params->{$_pid})) {
339 74 100       183 if (exists $_p->{_file}) {
340 4         8 $_input_data = delete $_p->{_file};
341             } else {
342 70 50       190 $_input_data = $_p->{input_data} if exists $_p->{input_data};
343             }
344             }
345              
346             ## -------------------------------------------------------------------------
347              
348 74         530 MCE::_save_state($_MCE->{$_pid});
349              
350 74 100       243 if ($_init_mce) {
351 64 50       283 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
352              
353             ## must clear arrays for nested session to work with Perl < v5.14
354 64         414 _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]);
355              
356 64         250 @_code = @_name = @_thrs = @_wrks = ();
357              
358             my %_opts = (
359             max_workers => $_max_workers, task_name => $_tag,
360 64         314 user_tasks => $_user_tasks->{$_pid},
361             );
362              
363 64 50       195 if (defined (my $_p = $_params->{$_pid})) {
364 64         165 local $_;
365              
366 64         88 for (keys %{ $_p }) {
  64         305  
367 114 100 100     849 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
368 102 100 66     340 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
369 96 50 33     531 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
370              
371 96 50       232 next if ($_ eq 'chunk_size');
372 96 50       195 next if ($_ eq 'input_data');
373 96 50       202 next if ($_ eq 'sequence_run');
374              
375             _croak("$_tag: ($_) is not a valid constructor argument")
376 96 50       260 unless (exists $MCE::_valid_fields_new{$_});
377              
378 96         215 $_opts{$_} = $_p->{$_};
379             }
380             }
381              
382 64         227 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
383             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
384 320 50 33     807 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
385             }
386              
387 64         547 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
388             }
389             else {
390             ## Workers may persist after running. Thus, updating the MCE instance.
391             ## These options do not require respawning.
392 10 50       30 if (defined (my $_p = $_params->{$_pid})) {
393 10         48 for my $_k (qw(
394             RS interval stderr_file stdout_file user_error user_output
395             job_delay submit_delay on_post_exit on_post_run user_args
396             flush_file flush_stderr flush_stdout gather max_retries
397             )) {
398 160 100       281 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
399             }
400             }
401             }
402              
403             ## -------------------------------------------------------------------------
404              
405 74 100       165 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  74         128  
  74         283  
406              
407 74 100       284 if (defined $_input_data) {
    100          
408 8         14 @_ = ();
409 8         44 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
410 7         49 delete $_MCE->{$_pid}{input_data};
411             }
412             elsif (scalar @_) {
413 6         36 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
414 2         66 delete $_MCE->{$_pid}{input_data};
415             }
416             else {
417 60 100 66     534 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
418             $_MCE->{$_pid}->run({
419             chunk_size => $_chunk_size,
420             sequence => $_params->{$_pid}{sequence}
421 2         18 }, 0);
422 2 50       14 if (exists $_params->{$_pid}{sequence_run}) {
423 2         4 delete $_params->{$_pid}{sequence_run};
424 2         4 delete $_params->{$_pid}{sequence};
425             }
426 2         4 delete $_MCE->{$_pid}{sequence};
427             }
428             else {
429 58         443 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
430             }
431             }
432              
433 45         950 MCE::_restore_state();
434              
435 45 100       210 delete $_MCE->{$_pid}{gather} if (defined $_wa);
436              
437 45 100       1330 return ((defined $_wa) ? @_a : ());
438             }
439              
440             ###############################################################################
441             ## ----------------------------------------------------------------------------
442             ## Private methods.
443             ##
444             ###############################################################################
445              
446             sub _croak {
447              
448 0     0   0 goto &MCE::_croak;
449             }
450              
451             sub _gen_user_tasks {
452              
453 64     64   242 my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_;
454              
455 64         104 @{ $_user_tasks->{$_pid} } = ();
  64         173  
456              
457 64         148 for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) {
  140         373  
458 76         116 push @{ $_user_tasks->{$_pid} }, {
  76         5081  
459             task_name => $_name_ref->[$_i],
460             use_threads => $_thrs_ref->[$_i],
461             max_workers => $_wrks_ref->[$_i],
462             user_func => $_code_ref->[$_i]
463             }
464             }
465              
466 64         130 return;
467             }
468              
469             1;
470              
471             __END__
472              
473             ###############################################################################
474             ## ----------------------------------------------------------------------------
475             ## Module usage.
476             ##
477             ###############################################################################
478              
479             =head1 NAME
480              
481             MCE::Flow - Parallel flow model for building creative applications
482              
483             =head1 VERSION
484              
485             This document describes MCE::Flow version 1.887
486              
487             =head1 DESCRIPTION
488              
489             MCE::Flow is great for writing custom apps to maximize on all available cores.
490             This module was created to help one harness user_tasks within MCE.
491              
492             It is trivial to parallelize with mce_stream shown below.
493              
494             ## Native map function
495             my @a = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..10000;
496              
497             ## Same as with MCE::Stream (processing from right to left)
498             @a = mce_stream
499             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
500              
501             ## Pass an array reference to have writes occur simultaneously
502             mce_stream \@a,
503             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
504              
505             However, let's have MCE::Flow compute the same in parallel. MCE::Queue
506             will be used for data flow among the sub-tasks.
507              
508             use MCE::Flow;
509             use MCE::Queue;
510              
511             This calls for preserving output order.
512              
513             sub preserve_order {
514             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
515             @{ $gather_ref } = (); ## clear the array (optional)
516              
517             return sub {
518             my ($data_ref, $chunk_id) = @_;
519             $tmp{$chunk_id} = $data_ref;
520              
521             while (1) {
522             last unless exists $tmp{$order_id};
523             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
524             }
525              
526             return;
527             };
528             }
529              
530             Two queues are needed for data flow between the 3 sub-tasks. Notice task_end
531             and how the value from $task_name is used for determining which task has ended.
532              
533             my $b = MCE::Queue->new;
534             my $c = MCE::Queue->new;
535              
536             sub task_end {
537             my ($mce, $task_id, $task_name) = @_;
538              
539             if (defined $mce->{user_tasks}->[$task_id + 1]) {
540             my $n_workers = $mce->{user_tasks}->[$task_id + 1]->{max_workers};
541              
542             if ($task_name eq 'a') {
543             $b->enqueue((undef) x $n_workers);
544             }
545             elsif ($task_name eq 'b') {
546             $c->enqueue((undef) x $n_workers);
547             }
548             }
549              
550             return;
551             }
552              
553             Next are the 3 sub-tasks. The first one reads input and begins the flow.
554             The 2nd task dequeues, performs the calculation, and enqueues into the next.
555             Finally, the last task calls the gather method.
556              
557             Although serialization is done for you automatically, it is done here to save
558             from double serialization. This is the fastest approach for passing data
559             between sub-tasks. Thus, the least overhead.
560              
561             sub task_a {
562             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
563              
564             push @ans, map { $_ * 2 } @{ $chunk_ref };
565             $b->enqueue(MCE->freeze([ \@ans, $chunk_id ]));
566              
567             return;
568             }
569              
570             sub task_b {
571             my ($mce) = @_;
572              
573             while (1) {
574             my @ans; my $chunk = $b->dequeue;
575             last unless defined $chunk;
576              
577             $chunk = MCE->thaw($chunk);
578             push @ans, map { $_ * 3 } @{ $chunk->[0] };
579             $c->enqueue(MCE->freeze([ \@ans, $chunk->[1] ]));
580             }
581              
582             return;
583             }
584              
585             sub task_c {
586             my ($mce) = @_;
587              
588             while (1) {
589             my @ans; my $chunk = $c->dequeue;
590             last unless defined $chunk;
591              
592             $chunk = MCE->thaw($chunk);
593             push @ans, map { $_ * 4 } @{ $chunk->[0] };
594             MCE->gather(\@ans, $chunk->[1]);
595             }
596              
597             return;
598             }
599              
600             In summary, MCE::Flow builds out a MCE instance behind the scene and starts
601             running. The task_name (shown), max_workers, and use_threads options can take
602             an anonymous array for specifying the values uniquely per each sub-task.
603              
604             my @a;
605              
606             mce_flow {
607             task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
608             gather => preserve_order(\@a)
609              
610             }, \&task_a, \&task_b, \&task_c, 1..10000;
611              
612             print "@a\n";
613              
614             If speed is not a concern and wanting to rid of all the MCE->freeze and
615             MCE->thaw statements, simply enqueue and dequeue 2 items at a time.
616             Or better yet, see L<MCE::Step> introduced in MCE 1.506.
617              
618             First, task_end must be updated. The number of undef(s) must match the number
619             of workers times the dequeue count. Otherwise, the script will stall.
620              
621             sub task_end {
622             ...
623             if ($task_name eq 'a') {
624             # $b->enqueue((undef) x $n_workers);
625             $b->enqueue((undef) x ($n_workers * 2));
626             }
627             elsif ($task_name eq 'b') {
628             # $c->enqueue((undef) x $n_workers);
629             $c->enqueue((undef) x ($n_workers * 2));
630             }
631             ...
632             }
633              
634             Next, the 3 sub-tasks enqueuing and dequeuing 2 elements at a time.
635              
636             sub task_a {
637             my @ans; my ($mce, $chunk_ref, $chunk_id) = @_;
638              
639             push @ans, map { $_ * 2 } @{ $chunk_ref };
640             $b->enqueue(\@ans, $chunk_id);
641              
642             return;
643             }
644              
645             sub task_b {
646             my ($mce) = @_;
647              
648             while (1) {
649             my @ans; my ($chunk_ref, $chunk_id) = $b->dequeue(2);
650             last unless defined $chunk_ref;
651              
652             push @ans, map { $_ * 3 } @{ $chunk_ref };
653             $c->enqueue(\@ans, $chunk_id);
654             }
655              
656             return;
657             }
658              
659             sub task_c {
660             my ($mce) = @_;
661              
662             while (1) {
663             my @ans; my ($chunk_ref, $chunk_id) = $c->dequeue(2);
664             last unless defined $chunk_ref;
665              
666             push @ans, map { $_ * 4 } @{ $chunk_ref };
667             MCE->gather(\@ans, $chunk_id);
668             }
669              
670             return;
671             }
672              
673             Finally, run as usual.
674              
675             my @a;
676              
677             mce_flow {
678             task_name => [ 'a', 'b', 'c' ], task_end => \&task_end,
679             gather => preserve_order(\@a)
680              
681             }, \&task_a, \&task_b, \&task_c, 1..10000;
682              
683             print "@a\n";
684              
685             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
686              
687             Although L<MCE::Loop> may be preferred for running using a single code block,
688             the text below also applies to this module, particularly for the first block.
689              
690             All models in MCE default to 'auto' for chunk_size. The arguments for the block
691             are the same as writing a user_func block using the Core API.
692              
693             Beginning with MCE 1.5, the next input item is placed into the input scalar
694             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
695             containing many items. Basically, line 2 below may be omitted from your code
696             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
697              
698             line 1: user_func => sub {
699             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
700             line 3:
701             line 4: $_ points to $chunk_ref->[0]
702             line 5: in MCE 1.5 when chunk_size == 1
703             line 6:
704             line 7: $_ points to $chunk_ref
705             line 8: in MCE 1.5 when chunk_size > 1
706             line 9: }
707              
708             Follow this synopsis when chunk_size equals one. Looping is not required from
709             inside the first block. Hence, the block is called once per each item.
710              
711             ## Exports mce_flow, mce_flow_f, and mce_flow_s
712             use MCE::Flow;
713              
714             MCE::Flow->init(
715             chunk_size => 1
716             );
717              
718             ## Array or array_ref
719             mce_flow sub { do_work($_) }, 1..10000;
720             mce_flow sub { do_work($_) }, \@list;
721              
722             ## Important; pass an array_ref for deeply input data
723             mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
724             mce_flow sub { do_work($_) }, \@deeply_list;
725              
726             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
727             ## Workers read directly and not involve the manager process
728             mce_flow_f sub { chomp; do_work($_) }, "/path/to/file"; # efficient
729              
730             ## Involves the manager process, therefore slower
731             mce_flow_f sub { chomp; do_work($_) }, $file_handle;
732             mce_flow_f sub { chomp; do_work($_) }, $io;
733             mce_flow_f sub { chomp; do_work($_) }, \$scalar;
734              
735             ## Sequence of numbers (begin, end [, step, format])
736             mce_flow_s sub { do_work($_) }, 1, 10000, 5;
737             mce_flow_s sub { do_work($_) }, [ 1, 10000, 5 ];
738              
739             mce_flow_s sub { do_work($_) }, {
740             begin => 1, end => 10000, step => 5, format => undef
741             };
742              
743             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
744              
745             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
746             This means having to loop through the chunk from inside the first block.
747              
748             use MCE::Flow;
749              
750             MCE::Flow->init( ## Chunk_size defaults to 'auto' when
751             chunk_size => 'auto' ## not specified. Therefore, the init
752             ); ## function may be omitted.
753              
754             ## Syntax is shown for mce_flow for demonstration purposes.
755             ## Looping inside the block is the same for mce_flow_f and
756             ## mce_flow_s.
757              
758             ## Array or array_ref
759             mce_flow sub { do_work($_) for (@{ $_ }) }, 1..10000;
760             mce_flow sub { do_work($_) for (@{ $_ }) }, \@list;
761              
762             ## Important; pass an array_ref for deeply input data
763             mce_flow sub { do_work($_) for (@{ $_ }) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
764             mce_flow sub { do_work($_) for (@{ $_ }) }, \@deeply_list;
765              
766             ## Resembles code using the core MCE API
767             mce_flow sub {
768             my ($mce, $chunk_ref, $chunk_id) = @_;
769              
770             for (@{ $chunk_ref }) {
771             do_work($_);
772             }
773              
774             }, 1..10000;
775              
776             Chunking reduces the number of IPC calls behind the scene. Think in terms of
777             chunks whenever processing a large amount of data. For relatively small data,
778             choosing 1 for chunk_size is fine.
779              
780             =head1 OVERRIDING DEFAULTS
781              
782             The following list options which may be overridden when loading the module.
783              
784             use Sereal qw( encode_sereal decode_sereal );
785             use CBOR::XS qw( encode_cbor decode_cbor );
786             use JSON::XS qw( encode_json decode_json );
787              
788             use MCE::Flow
789             max_workers => 8, # Default 'auto'
790             chunk_size => 500, # Default 'auto'
791             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
792             freeze => \&encode_sereal, # \&Storable::freeze
793             thaw => \&decode_sereal, # \&Storable::thaw
794             init_relay => 0, # Default undef; MCE 1.882+
795             use_threads => 0, # Default undef; MCE 1.882+
796             ;
797              
798             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
799             Specify C<< Sereal => 0 >> to use Storable instead.
800              
801             use MCE::Flow Sereal => 0;
802              
803             =head1 CUSTOMIZING MCE
804              
805             =over 3
806              
807             =item MCE::Flow->init ( options )
808              
809             =item MCE::Flow::init { options }
810              
811             =back
812              
813             The init function accepts a hash of MCE options. Unlike with MCE::Stream,
814             both gather and bounds_only options may be specified when calling init
815             (not shown below).
816              
817             use MCE::Flow;
818              
819             MCE::Flow->init(
820             chunk_size => 1, max_workers => 4,
821              
822             user_begin => sub {
823             print "## ", MCE->wid, " started\n";
824             },
825              
826             user_end => sub {
827             print "## ", MCE->wid, " completed\n";
828             }
829             );
830              
831             my %a = mce_flow sub { MCE->gather($_, $_ * $_) }, 1..100;
832              
833             print "\n", "@a{1..100}", "\n";
834              
835             -- Output
836              
837             ## 3 started
838             ## 2 started
839             ## 4 started
840             ## 1 started
841             ## 2 completed
842             ## 4 completed
843             ## 3 completed
844             ## 1 completed
845              
846             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
847             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
848             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
849             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
850             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
851             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
852             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
853             10000
854              
855             Like with MCE::Flow->init above, MCE options may be specified using an
856             anonymous hash for the first argument. Notice how task_name, max_workers,
857             and use_threads can take an anonymous array for setting uniquely per
858             each code block.
859              
860             Unlike MCE::Stream which processes from right-to-left, MCE::Flow begins
861             with the first code block, thus processing from left-to-right.
862              
863             use threads;
864             use MCE::Flow;
865              
866             my @a = mce_flow {
867             task_name => [ 'a', 'b', 'c' ],
868             max_workers => [ 3, 4, 2, ],
869             use_threads => [ 1, 0, 0, ],
870              
871             user_end => sub {
872             my ($mce, $task_id, $task_name) = @_;
873             MCE->print("$task_id - $task_name completed\n");
874             },
875              
876             task_end => sub {
877             my ($mce, $task_id, $task_name) = @_;
878             MCE->print("$task_id - $task_name ended\n");
879             }
880             },
881             sub { sleep 1; }, ## 3 workers, named a
882             sub { sleep 2; }, ## 4 workers, named b
883             sub { sleep 3; }; ## 2 workers, named c
884              
885             -- Output
886              
887             0 - a completed
888             0 - a completed
889             0 - a completed
890             0 - a ended
891             1 - b completed
892             1 - b completed
893             1 - b completed
894             1 - b completed
895             1 - b ended
896             2 - c completed
897             2 - c completed
898             2 - c ended
899              
900             =head1 API DOCUMENTATION
901              
902             Although input data is optional for MCE::Flow, the following assumes chunk_size
903             equals 1 in order to demonstrate all the possibilities for providing input data.
904              
905             =over 3
906              
907             =item MCE::Flow->run ( sub { code }, list )
908              
909             =item mce_flow sub { code }, list
910              
911             =back
912              
913             Input data may be defined using a list, an array ref, or a hash ref.
914              
915             Unlike MCE::Loop, Map, and Grep which take a block as C<{ ... }>, Flow takes a
916             C<sub { ... }> or a code reference. The other difference is that the comma is
917             needed after the block.
918              
919             # $_ contains the item when chunk_size => 1
920              
921             mce_flow sub { do_work($_) }, 1..1000;
922             mce_flow sub { do_work($_) }, \@list;
923              
924             # Important; pass an array_ref for deeply input data
925              
926             mce_flow sub { do_work($_) }, [ [ 0, 1 ], [ 0, 2 ], ... ];
927             mce_flow sub { do_work($_) }, \@deeply_list;
928              
929             # Chunking; any chunk_size => 1 or greater
930              
931             my %res = mce_flow sub {
932             my ($mce, $chunk_ref, $chunk_id) = @_;
933             my %ret;
934             for my $item (@{ $chunk_ref }) {
935             $ret{$item} = $item * 2;
936             }
937             MCE->gather(%ret);
938             },
939             \@list;
940              
941             # Input hash; current API available since 1.828
942              
943             my %res = mce_flow sub {
944             my ($mce, $chunk_ref, $chunk_id) = @_;
945             my %ret;
946             for my $key (keys %{ $chunk_ref }) {
947             $ret{$key} = $chunk_ref->{$key} * 2;
948             }
949             MCE->gather(%ret);
950             },
951             \%hash;
952              
953             # Unlike MCE::Loop, MCE::Flow doesn't need input to run
954              
955             mce_flow { max_workers => 4 }, sub {
956             MCE->say( MCE->wid );
957             };
958              
959             # ... and can run multiple tasks
960              
961             mce_flow {
962             max_workers => [ 1, 3 ],
963             task_name => [ 'p', 'c' ]
964             },
965             sub {
966             # 1 producer
967             MCE->say( "producer: ", MCE->wid );
968             },
969             sub {
970             # 3 consumers
971             MCE->say( "consumer: ", MCE->wid );
972             };
973              
974             # Here, options are specified via init
975              
976             MCE::Flow->init(
977             max_workers => [ 1, 3 ],
978             task_name => [ 'p', 'c' ]
979             );
980              
981             mce_flow \&producer, \&consumers;
982              
983             =over 3
984              
985             =item MCE::Flow->run_file ( sub { code }, file )
986              
987             =item mce_flow_f sub { code }, file
988              
989             =back
990              
991             The fastest of these is the /path/to/file. Workers communicate the next offset
992             position among themselves with zero interaction by the manager process.
993              
994             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
995              
996             # $_ contains the line when chunk_size => 1
997              
998             mce_flow_f sub { $_ }, "/path/to/file"; # faster
999             mce_flow_f sub { $_ }, $file_handle;
1000             mce_flow_f sub { $_ }, $io; # IO::All
1001             mce_flow_f sub { $_ }, \$scalar;
1002              
1003             # chunking, any chunk_size => 1 or greater
1004              
1005             my %res = mce_flow_f sub {
1006             my ($mce, $chunk_ref, $chunk_id) = @_;
1007             my $buf = '';
1008             for my $line (@{ $chunk_ref }) {
1009             $buf .= $line;
1010             }
1011             MCE->gather($chunk_id, $buf);
1012             },
1013             "/path/to/file";
1014              
1015             =over 3
1016              
1017             =item MCE::Flow->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
1018              
1019             =item mce_flow_s sub { code }, $beg, $end [, $step, $fmt ]
1020              
1021             =back
1022              
1023             Sequence may be defined as a list, an array reference, or a hash reference.
1024             The functions require both begin and end values to run. Step and format are
1025             optional. The format is passed to sprintf (% may be omitted below).
1026              
1027             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
1028              
1029             # $_ contains the sequence number when chunk_size => 1
1030              
1031             mce_flow_s sub { $_ }, $beg, $end, $step, $fmt;
1032             mce_flow_s sub { $_ }, [ $beg, $end, $step, $fmt ];
1033              
1034             mce_flow_s sub { $_ }, {
1035             begin => $beg, end => $end,
1036             step => $step, format => $fmt
1037             };
1038              
1039             # chunking, any chunk_size => 1 or greater
1040              
1041             my %res = mce_flow_s sub {
1042             my ($mce, $chunk_ref, $chunk_id) = @_;
1043             my $buf = '';
1044             for my $seq (@{ $chunk_ref }) {
1045             $buf .= "$seq\n";
1046             }
1047             MCE->gather($chunk_id, $buf);
1048             },
1049             [ $beg, $end ];
1050              
1051             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
1052             and not the items in between (hence boundaries only). This option applies
1053             to sequence only and has no effect when chunk_size equals 1.
1054              
1055             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
1056             option due to computing all items in between, thus creating a very large
1057             array. Basically, specify bounds_only => 1 when boundaries is all you need
1058             for looping inside the block; e.g. Monte Carlo simulations.
1059              
1060             Time was measured using 1 worker to emphasize the difference.
1061              
1062             use MCE::Flow;
1063              
1064             MCE::Flow->init(
1065             max_workers => 1, chunk_size => 1_250_000,
1066             bounds_only => 1
1067             );
1068              
1069             # Typically, the input scalar $_ contains the sequence number
1070             # when chunk_size => 1, unless the bounds_only option is set
1071             # which is the case here. Thus, $_ points to $chunk_ref.
1072              
1073             mce_flow_s sub {
1074             my ($mce, $chunk_ref, $chunk_id) = @_;
1075              
1076             # $chunk_ref contains 2 items, not 1_250_000
1077             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
1078              
1079             my $begin = $chunk_ref->[0];
1080             my $end = $chunk_ref->[1];
1081              
1082             # for my $seq ( $begin .. $end ) {
1083             # ...
1084             # }
1085              
1086             MCE->printf("%7d .. %8d\n", $begin, $end);
1087             },
1088             [ 1, 10_000_000 ];
1089              
1090             -- Output
1091              
1092             1 .. 1250000
1093             1250001 .. 2500000
1094             2500001 .. 3750000
1095             3750001 .. 5000000
1096             5000001 .. 6250000
1097             6250001 .. 7500000
1098             7500001 .. 8750000
1099             8750001 .. 10000000
1100              
1101             =over 3
1102              
1103             =item MCE::Flow->run ( { input_data => iterator }, sub { code } )
1104              
1105             =item mce_flow { input_data => iterator }, sub { code }
1106              
1107             =back
1108              
1109             An iterator reference may be specified for input_data. The only other way
1110             is to specify input_data via MCE::Flow->init. This prevents MCE::Flow from
1111             configuring the iterator reference as another user task which will not work.
1112              
1113             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1114              
1115             MCE::Flow->init(
1116             input_data => iterator
1117             );
1118              
1119             mce_flow sub { $_ };
1120              
1121             =head1 GATHERING DATA
1122              
1123             Unlike MCE::Map where gather and output order are done for you automatically,
1124             the gather method is used to have results sent back to the manager process.
1125              
1126             use MCE::Flow chunk_size => 1;
1127              
1128             ## Output order is not guaranteed.
1129             my @a1 = mce_flow sub { MCE->gather($_ * 2) }, 1..100;
1130             print "@a1\n\n";
1131              
1132             ## Outputs to a hash instead (key, value).
1133             my %h1 = mce_flow sub { MCE->gather($_, $_ * 2) }, 1..100;
1134             print "@h1{1..100}\n\n";
1135              
1136             ## This does the same thing due to chunk_id starting at one.
1137             my %h2 = mce_flow sub { MCE->gather(MCE->chunk_id, $_ * 2) }, 1..100;
1138             print "@h2{1..100}\n\n";
1139              
1140             The gather method may be called multiple times within the block unlike return
1141             which would leave the block. Therefore, think of gather as yielding results
1142             immediately to the manager process without actually leaving the block.
1143              
1144             use MCE::Flow chunk_size => 1, max_workers => 3;
1145              
1146             my @hosts = qw(
1147             hosta hostb hostc hostd hoste
1148             );
1149              
1150             my %h3 = mce_flow sub {
1151             my ($output, $error, $status); my $host = $_;
1152              
1153             ## Do something with $host;
1154             $output = "Worker ". MCE->wid .": Hello from $host";
1155              
1156             if (MCE->chunk_id % 3 == 0) {
1157             ## Simulating an error condition
1158             local $? = 1; $status = $?;
1159             $error = "Error from $host"
1160             }
1161             else {
1162             $status = 0;
1163             }
1164              
1165             ## Ensure unique keys (key, value) when gathering to
1166             ## a hash.
1167             MCE->gather("$host.out", $output);
1168             MCE->gather("$host.err", $error) if (defined $error);
1169             MCE->gather("$host.sta", $status);
1170              
1171             }, @hosts;
1172              
1173             foreach my $host (@hosts) {
1174             print $h3{"$host.out"}, "\n";
1175             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
1176             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
1177             }
1178              
1179             -- Output
1180              
1181             Worker 3: Hello from hosta
1182             Exit status: 0
1183              
1184             Worker 2: Hello from hostb
1185             Exit status: 0
1186              
1187             Worker 1: Hello from hostc
1188             Error from hostc
1189             Exit status: 1
1190              
1191             Worker 3: Hello from hostd
1192             Exit status: 0
1193              
1194             Worker 2: Hello from hoste
1195             Exit status: 0
1196              
1197             The following uses an anonymous array containing 3 elements when gathering
1198             data. Serialization is automatic behind the scene.
1199              
1200             my %h3 = mce_flow sub {
1201             ...
1202              
1203             MCE->gather($host, [$output, $error, $status]);
1204              
1205             }, @hosts;
1206              
1207             foreach my $host (@hosts) {
1208             print $h3{$host}->[0], "\n";
1209             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
1210             print "Exit status: ", $h3{$host}->[2], "\n\n";
1211             }
1212              
1213             Although MCE::Map comes to mind, one may want additional control when
1214             gathering data such as retaining output order.
1215              
1216             use MCE::Flow;
1217              
1218             sub preserve_order {
1219             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
1220              
1221             return sub {
1222             $tmp{ (shift) } = \@_;
1223              
1224             while (1) {
1225             last unless exists $tmp{$order_id};
1226             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
1227             }
1228              
1229             return;
1230             };
1231             }
1232              
1233             ## Workers persist for the most part after running. Though, not always
1234             ## the case and depends on Perl. Pass a reference to a subroutine if
1235             ## workers must persist; e.g. mce_flow { ... }, \&foo, 1..100000.
1236              
1237             MCE::Flow->init(
1238             chunk_size => 'auto', max_workers => 'auto'
1239             );
1240              
1241             for (1..2) {
1242             my @m2;
1243              
1244             mce_flow {
1245             gather => preserve_order(\@m2)
1246             },
1247             sub {
1248             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1249              
1250             ## Compute the entire chunk data at once.
1251             push @a, map { $_ * 2 } @{ $chunk_ref };
1252              
1253             ## Afterwards, invoke the gather feature, which
1254             ## will direct the data to the callback function.
1255             MCE->gather(MCE->chunk_id, @a);
1256              
1257             }, 1..100000;
1258              
1259             print scalar @m2, "\n";
1260             }
1261              
1262             MCE::Flow->finish;
1263              
1264             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
1265             models as the basis for providing JIT for MCE. They create the instance, tune
1266             max_workers, and tune chunk_size automatically regardless of the hardware.
1267              
1268             The following does the same thing using the Core API. Workers persist after
1269             running.
1270              
1271             use MCE;
1272              
1273             sub preserve_order {
1274             ...
1275             }
1276              
1277             my $mce = MCE->new(
1278             max_workers => 'auto', chunk_size => 8000,
1279              
1280             user_func => sub {
1281             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
1282              
1283             ## Compute the entire chunk data at once.
1284             push @a, map { $_ * 2 } @{ $chunk_ref };
1285              
1286             ## Afterwards, invoke the gather feature, which
1287             ## will direct the data to the callback function.
1288             MCE->gather(MCE->chunk_id, @a);
1289             }
1290             );
1291              
1292             for (1..2) {
1293             my @m2;
1294              
1295             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
1296              
1297             print scalar @m2, "\n";
1298             }
1299              
1300             $mce->shutdown;
1301              
1302             =head1 MANUAL SHUTDOWN
1303              
1304             =over 3
1305              
1306             =item MCE::Flow->finish
1307              
1308             =item MCE::Flow::finish
1309              
1310             =back
1311              
1312             Workers remain persistent as much as possible after running. Shutdown occurs
1313             automatically when the script terminates. Call finish when workers are no
1314             longer needed.
1315              
1316             use MCE::Flow;
1317              
1318             MCE::Flow->init(
1319             chunk_size => 20, max_workers => 'auto'
1320             );
1321              
1322             mce_flow sub { ... }, 1..100;
1323              
1324             MCE::Flow->finish;
1325              
1326             =head1 INDEX
1327              
1328             L<MCE|MCE>, L<MCE::Core>
1329              
1330             =head1 AUTHOR
1331              
1332             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1333              
1334             =cut
1335