File Coverage

blib/lib/MCE/Loop.pm
Criterion Covered Total %
statement 135 166 81.3
branch 68 130 52.3
condition 22 50 44.0
subroutine 13 15 86.6
pod 5 5 100.0
total 243 366 66.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE model for building parallel loops.
4             ##
5             ###############################################################################
6              
7             package MCE::Loop;
8              
9 5     5   344406 use strict;
  5         50  
  5         124  
10 5     5   24 use warnings;
  5         6  
  5         154  
11              
12 5     5   22 no warnings qw( threads recursion uninitialized );
  5         10  
  5         250  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 5     5   29 use Scalar::Util qw( looks_like_number );
  5         5  
  5         222  
21 5     5   2008 use MCE;
  5         9  
  5         32  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Loop');
38              
39             sub import {
40 5     5   67 my ($_class, $_pkg) = (shift, caller);
41              
42 5         15 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 5 50       19 if ($_pkg !~ /^MCE::/) {
49 5     5   67 no strict 'refs'; no warnings 'redefine';
  5     5   9  
  5         139  
  5         16  
  5         9  
  5         8926  
50 5         6 *{ $_pkg.'::mce_loop_f' } = \&run_file;
  5         28  
51 5         10 *{ $_pkg.'::mce_loop_s' } = \&run_seq;
  5         17  
52 5         5 *{ $_pkg.'::mce_loop' } = \&run;
  5         15  
53             }
54              
55             ## Process module arguments.
56 5         20 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 5         20 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 5         19 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 5 50       11 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 5         70 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Init and finish routines.
92             ##
93             ###############################################################################
94              
95             sub init (@) {
96              
97 6 50 33 6 1 554 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
98 6         34 my $_pkg = "$$.$_tid.".caller();
99              
100 6 50       78 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
101              
102 6         14 @_ = ();
103              
104 6         18 return;
105             }
106              
107             sub finish (@) {
108              
109 11 50 33 11 1 3470 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
110 11 100       65 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
111              
112 11 100 66     76 if ( $_pkg eq 'MCE' ) {
    100          
113 5         14 for my $_k ( keys %{ $_MCE } ) { MCE::Loop->finish($_k, 1); }
  5         77  
  3         60  
114             }
115             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
116 3 50       41 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
117              
118 3         21 delete $_prev_c->{$_pkg};
119 3         23 delete $_MCE->{$_pkg};
120             }
121              
122 11         75 @_ = ();
123              
124 11         32 return;
125             }
126              
127             ###############################################################################
128             ## ----------------------------------------------------------------------------
129             ## Parallel loop with MCE -- file.
130             ##
131             ###############################################################################
132              
133             sub run_file (&@) {
134              
135 4 50 33 4 1 2934 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
136              
137 4         10 my $_code = shift; my $_file = shift;
  4         4  
138 4         16 my $_pid = "$$.$_tid.".caller();
139              
140 4 50       12 if (defined (my $_p = $_params->{$_pid})) {
141 4 50       8 delete $_p->{input_data} if (exists $_p->{input_data});
142 4 50       10 delete $_p->{sequence} if (exists $_p->{sequence});
143             }
144             else {
145 0         0 $_params->{$_pid} = {};
146             }
147              
148 4 100 66     58 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
149 2 50       68 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
150 2 50       26 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
151 2 50       22 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
152 2         22 $_params->{$_pid}{_file} = $_file;
153             }
154             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
155 2         8 $_params->{$_pid}{_file} = $_file;
156             }
157             else {
158 0         0 _croak("$_tag: (file) is not specified or valid");
159             }
160              
161 4         8 @_ = ();
162              
163 4         20 return run($_code);
164             }
165              
166             ###############################################################################
167             ## ----------------------------------------------------------------------------
168             ## Parallel loop with MCE -- sequence.
169             ##
170             ###############################################################################
171              
172             sub run_seq (&@) {
173              
174 2 50 33 2 1 1394 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
175              
176 2         6 my $_code = shift;
177 2         8 my $_pid = "$$.$_tid.".caller();
178              
179 2 50       8 if (defined (my $_p = $_params->{$_pid})) {
180 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
181 2 50       6 delete $_p->{_file} if (exists $_p->{_file});
182             }
183             else {
184 0         0 $_params->{$_pid} = {};
185             }
186              
187 2         2 my ($_begin, $_end);
188              
189 2 50 33     14 if (ref $_[0] eq 'HASH') {
    50          
    50          
190 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
191 0         0 $_params->{$_pid}{sequence} = $_[0];
192             }
193             elsif (ref $_[0] eq 'ARRAY') {
194 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
195 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
196 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
197             }
198             else {
199 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
200 0         0 $_params->{$_pid}{sequence} = $_[0];
201             }
202             }
203             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
204 2 50 33     8 if (@_ > 3 && $_[3] =~ /\d$/) {
205 0         0 $_begin = $_[0], $_end = $_[-1];
206 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
207             }
208             else {
209 2         6 $_begin = $_[0], $_end = $_[1];
210 2         6 $_params->{$_pid}{sequence} = [ @_ ];
211             }
212             }
213             else {
214 0         0 _croak("$_tag: (sequence) is not specified or valid");
215             }
216              
217 2 50       4 _croak("$_tag: (begin) is not specified for sequence")
218             unless (defined $_begin);
219 2 50       6 _croak("$_tag: (end) is not specified for sequence")
220             unless (defined $_end);
221              
222 2         6 $_params->{$_pid}{sequence_run} = undef;
223              
224 2         4 @_ = ();
225              
226 2         6 return run($_code);
227             }
228              
229             ###############################################################################
230             ## ----------------------------------------------------------------------------
231             ## Parallel loop with MCE.
232             ##
233             ###############################################################################
234              
235             sub run (&@) {
236              
237 14 50 33 14 1 4484 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
238              
239 14         40 my $_code = shift;
240 14 100       46 my $_pkg = caller() eq 'MCE::Loop' ? caller(1) : caller();
241 14         46 my $_pid = "$$.$_tid.$_pkg";
242              
243 14         18 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  14         34  
244 14         22 my $_r = ref $_[0];
245              
246 14 100 66     108 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
247 4         10 $_input_data = shift;
248             }
249              
250 14 50       42 if (defined (my $_p = $_params->{$_pid})) {
251             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
252 14 50       90 if (exists $_p->{max_workers});
253              
254 14 100 100     74 delete $_p->{sequence} if (defined $_input_data || scalar @_);
255 14 50       32 delete $_p->{user_func} if (exists $_p->{user_func});
256 14 50       36 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
257             }
258              
259             my $_chunk_size = MCE::_parse_chunk_size(
260 14         82 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
261             $_input_data, scalar @_
262             );
263              
264 14 50       42 if (defined (my $_p = $_params->{$_pid})) {
265 14 100       30 if (exists $_p->{_file}) {
266 4         8 $_input_data = delete $_p->{_file};
267             } else {
268 10 50       30 $_input_data = $_p->{input_data} if exists $_p->{input_data};
269             }
270             }
271              
272             ## -------------------------------------------------------------------------
273              
274 14         112 MCE::_save_state($_MCE->{$_pid});
275              
276 14 100 66     76 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
277 6 50       18 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
278 6         12 $_prev_c->{$_pid} = $_code;
279              
280 6         24 my %_opts = (
281             max_workers => $_max_workers, task_name => $_tag,
282             user_func => $_code,
283             );
284              
285 6 50       18 if (defined (my $_p = $_params->{$_pid})) {
286 6         12 for my $_k (keys %{ $_p }) {
  6         32  
287 10 50       20 next if ($_k eq 'sequence_run');
288 10 50       22 next if ($_k eq 'input_data');
289 10 50       22 next if ($_k eq 'chunk_size');
290              
291             _croak("$_tag: ($_k) is not a valid constructor argument")
292 10 50       28 unless (exists $MCE::_valid_fields_new{$_k});
293              
294 10         22 $_opts{$_k} = $_p->{$_k};
295             }
296             }
297              
298 6         20 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
299             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
300 30 50 33     72 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
301             }
302              
303 6         52 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
304             }
305              
306             ## -------------------------------------------------------------------------
307              
308 14 100       30 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  14         42  
  14         60  
309              
310 14 100       46 if (defined $_input_data) {
    100          
311 8         12 @_ = ();
312 8         42 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
313 7         42 delete $_MCE->{$_pid}{input_data};
314             }
315             elsif (scalar @_) {
316 4         32 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
317 2         18 delete $_MCE->{$_pid}{input_data};
318             }
319             else {
320 2 50 33     26 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
321             $_MCE->{$_pid}->run({
322             chunk_size => $_chunk_size,
323             sequence => $_params->{$_pid}{sequence}
324 2         14 }, 0);
325 2 50       10 if (exists $_params->{$_pid}{sequence_run}) {
326 2         4 delete $_params->{$_pid}{sequence_run};
327 2         2 delete $_params->{$_pid}{sequence};
328             }
329 2         4 delete $_MCE->{$_pid}{sequence};
330             }
331             }
332              
333 11         66 MCE::_restore_state();
334              
335 11 100       27 delete $_MCE->{$_pid}{gather} if (defined $_wa);
336              
337 11 100       156 return ((defined $_wa) ? @_a : ());
338             }
339              
340             ###############################################################################
341             ## ----------------------------------------------------------------------------
342             ## Private methods.
343             ##
344             ###############################################################################
345              
346             sub _croak {
347              
348 0     0     goto &MCE::_croak;
349             }
350              
351             1;
352              
353             __END__
354              
355             ###############################################################################
356             ## ----------------------------------------------------------------------------
357             ## Module usage.
358             ##
359             ###############################################################################
360              
361             =head1 NAME
362              
363             MCE::Loop - MCE model for building parallel loops
364              
365             =head1 VERSION
366              
367             This document describes MCE::Loop version 1.887
368              
369             =head1 DESCRIPTION
370              
371             This module provides a parallel loop implementation through Many-Core Engine.
372             MCE::Loop is not MCE::Map but more along the lines of an easy way to spin up a
373             MCE instance and have user_func pointing to your code block. If you want
374             something similar to map, then see L<MCE::Map>.
375              
376             ## Construction when chunking is not desired
377              
378             use MCE::Loop;
379              
380             MCE::Loop->init(
381             max_workers => 5, chunk_size => 1
382             );
383              
384             mce_loop {
385             my ($mce, $chunk_ref, $chunk_id) = @_;
386             MCE->say("$chunk_id: $_");
387             } 40 .. 48;
388              
389             -- Output
390              
391             3: 42
392             1: 40
393             2: 41
394             4: 43
395             5: 44
396             6: 45
397             7: 46
398             8: 47
399             9: 48
400              
401             ## Construction for 'auto' or greater than 1
402              
403             use MCE::Loop;
404              
405             MCE::Loop->init(
406             max_workers => 5, chunk_size => 'auto'
407             );
408              
409             mce_loop {
410             my ($mce, $chunk_ref, $chunk_id) = @_;
411             for (@{ $chunk_ref }) {
412             MCE->say("$chunk_id: $_");
413             }
414             } 40 .. 48;
415              
416             -- Output
417              
418             1: 40
419             2: 42
420             1: 41
421             4: 46
422             2: 43
423             5: 48
424             3: 44
425             4: 47
426             3: 45
427              
428             =head1 SYNOPSIS when CHUNK_SIZE EQUALS 1
429              
430             All models in MCE default to 'auto' for chunk_size. The arguments for the block
431             are the same as writing a user_func block using the Core API.
432              
433             Beginning with MCE 1.5, the next input item is placed into the input scalar
434             variable $_ when chunk_size equals 1. Otherwise, $_ points to $chunk_ref
435             containing many items. Basically, line 2 below may be omitted from your code
436             when using $_. One can call MCE->chunk_id to obtain the current chunk id.
437              
438             line 1: user_func => sub {
439             line 2: my ($mce, $chunk_ref, $chunk_id) = @_;
440             line 3:
441             line 4: $_ points to $chunk_ref->[0]
442             line 5: in MCE 1.5 when chunk_size == 1
443             line 6:
444             line 7: $_ points to $chunk_ref
445             line 8: in MCE 1.5 when chunk_size > 1
446             line 9: }
447              
448             Follow this synopsis when chunk_size equals one. Looping is not required from
449             inside the block. Hence, the block is called once per each item.
450              
451             ## Exports mce_loop, mce_loop_f, and mce_loop_s
452             use MCE::Loop;
453              
454             MCE::Loop->init(
455             chunk_size => 1
456             );
457              
458             ## Array or array_ref
459             mce_loop { do_work($_) } 1..10000;
460             mce_loop { do_work($_) } \@list;
461              
462             ## Important; pass an array_ref for deeply input data
463             mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
464             mce_loop { do_work($_) } \@deeply_list;
465              
466             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
467             ## Workers read directly and not involve the manager process
468             mce_loop_f { chomp; do_work($_) } "/path/to/file"; # efficient
469              
470             ## Involves the manager process, therefore slower
471             mce_loop_f { chomp; do_work($_) } $file_handle;
472             mce_loop_f { chomp; do_work($_) } $io;
473             mce_loop_f { chomp; do_work($_) } \$scalar;
474              
475             ## Sequence of numbers (begin, end [, step, format])
476             mce_loop_s { do_work($_) } 1, 10000, 5;
477             mce_loop_s { do_work($_) } [ 1, 10000, 5 ];
478              
479             mce_loop_s { do_work($_) } {
480             begin => 1, end => 10000, step => 5, format => undef
481             };
482              
483             =head1 SYNOPSIS when CHUNK_SIZE is GREATER THAN 1
484              
485             Follow this synopsis when chunk_size equals 'auto' or greater than 1.
486             This means having to loop through the chunk from inside the block.
487              
488             use MCE::Loop;
489              
490             MCE::Loop->init( ## Chunk_size defaults to 'auto' when
491             chunk_size => 'auto' ## not specified. Therefore, the init
492             ); ## function may be omitted.
493              
494             ## Syntax is shown for mce_loop for demonstration purposes.
495             ## Looping inside the block is the same for mce_loop_f and
496             ## mce_loop_s.
497              
498             ## Array or array_ref
499             mce_loop { do_work($_) for (@{ $_ }) } 1..10000;
500             mce_loop { do_work($_) for (@{ $_ }) } \@list;
501              
502             ## Important; pass an array_ref for deeply input data
503             mce_loop { do_work($_) for (@{ $_ }) } [ [ 0, 1 ], [ 0, 2 ], ... ];
504             mce_loop { do_work($_) for (@{ $_ }) } \@deeply_list;
505              
506             ## Resembles code using the core MCE API
507             mce_loop {
508             my ($mce, $chunk_ref, $chunk_id) = @_;
509              
510             for (@{ $chunk_ref }) {
511             do_work($_);
512             }
513              
514             } 1..10000;
515              
516             Chunking reduces the number of IPC calls behind the scene. Think in terms of
517             chunks whenever processing a large amount of data. For relatively small data,
518             choosing 1 for chunk_size is fine.
519              
520             =head1 OVERRIDING DEFAULTS
521              
522             The following list options which may be overridden when loading the module.
523              
524             use Sereal qw( encode_sereal decode_sereal );
525             use CBOR::XS qw( encode_cbor decode_cbor );
526             use JSON::XS qw( encode_json decode_json );
527              
528             use MCE::Loop
529             max_workers => 4, # Default 'auto'
530             chunk_size => 100, # Default 'auto'
531             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
532             freeze => \&encode_sereal, # \&Storable::freeze
533             thaw => \&decode_sereal, # \&Storable::thaw
534             init_relay => 0, # Default undef; MCE 1.882+
535             use_threads => 0, # Default undef; MCE 1.882+
536             ;
537              
538             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
539             Specify C<< Sereal => 0 >> to use Storable instead.
540              
541             use MCE::Loop Sereal => 0;
542              
543             =head1 CUSTOMIZING MCE
544              
545             =over 3
546              
547             =item MCE::Loop->init ( options )
548              
549             =item MCE::Loop::init { options }
550              
551             =back
552              
553             The init function accepts a hash of MCE options.
554              
555             use MCE::Loop;
556              
557             MCE::Loop->init(
558             chunk_size => 1, max_workers => 4,
559              
560             user_begin => sub {
561             print "## ", MCE->wid, " started\n";
562             },
563              
564             user_end => sub {
565             print "## ", MCE->wid, " completed\n";
566             }
567             );
568              
569             my %a = mce_loop { MCE->gather($_, $_ * $_) } 1..100;
570              
571             print "\n", "@a{1..100}", "\n";
572              
573             -- Output
574              
575             ## 3 started
576             ## 1 started
577             ## 2 started
578             ## 4 started
579             ## 1 completed
580             ## 2 completed
581             ## 3 completed
582             ## 4 completed
583              
584             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
585             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
586             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
587             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
588             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
589             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
590             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
591             10000
592              
593             =head1 API DOCUMENTATION
594              
595             The following assumes chunk_size equals 1 in order to demonstrate all the
596             possibilities for providing input data.
597              
598             =over 3
599              
600             =item MCE::Loop->run ( sub { code }, list )
601              
602             =item mce_loop { code } list
603              
604             =back
605              
606             Input data may be defined using a list, an array ref, or a hash ref.
607              
608             # $_ contains the item when chunk_size => 1
609              
610             mce_loop { do_work($_) } 1..1000;
611             mce_loop { do_work($_) } \@list;
612              
613             # Important; pass an array_ref for deeply input data
614              
615             mce_loop { do_work($_) } [ [ 0, 1 ], [ 0, 2 ], ... ];
616             mce_loop { do_work($_) } \@deeply_list;
617              
618             # Chunking; any chunk_size => 1 or greater
619              
620             my %res = mce_loop {
621             my ($mce, $chunk_ref, $chunk_id) = @_;
622             my %ret;
623             for my $item (@{ $chunk_ref }) {
624             $ret{$item} = $item * 2;
625             }
626             MCE->gather(%ret);
627             }
628             \@list;
629              
630             # Input hash; current API available since 1.828
631              
632             my %res = mce_loop {
633             my ($mce, $chunk_ref, $chunk_id) = @_;
634             my %ret;
635             for my $key (keys %{ $chunk_ref }) {
636             $ret{$key} = $chunk_ref->{$key} * 2;
637             }
638             MCE->gather(%ret);
639             }
640             \%hash;
641              
642             =over 3
643              
644             =item MCE::Loop->run_file ( sub { code }, file )
645              
646             =item mce_loop_f { code } file
647              
648             =back
649              
650             The fastest of these is the /path/to/file. Workers communicate the next offset
651             position among themselves with zero interaction by the manager process.
652              
653             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
654              
655             # $_ contains the line when chunk_size => 1
656              
657             mce_loop_f { $_ } "/path/to/file"; # faster
658             mce_loop_f { $_ } $file_handle;
659             mce_loop_f { $_ } $io; # IO::All
660             mce_loop_f { $_ } \$scalar;
661              
662             # chunking, any chunk_size => 1 or greater
663              
664             my %res = mce_loop_f {
665             my ($mce, $chunk_ref, $chunk_id) = @_;
666             my $buf = '';
667             for my $line (@{ $chunk_ref }) {
668             $buf .= $line;
669             }
670             MCE->gather($chunk_id, $buf);
671             }
672             "/path/to/file";
673              
674             =over 3
675              
676             =item MCE::Loop->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
677              
678             =item mce_loop_s { code } $beg, $end [, $step, $fmt ]
679              
680             =back
681              
682             Sequence may be defined as a list, an array reference, or a hash reference.
683             The functions require both begin and end values to run. Step and format are
684             optional. The format is passed to sprintf (% may be omitted below).
685              
686             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
687              
688             # $_ contains the sequence number when chunk_size => 1
689              
690             mce_loop_s { $_ } $beg, $end, $step, $fmt;
691             mce_loop_s { $_ } [ $beg, $end, $step, $fmt ];
692              
693             mce_loop_s { $_ } {
694             begin => $beg, end => $end,
695             step => $step, format => $fmt
696             };
697              
698             # chunking, any chunk_size => 1 or greater
699              
700             my %res = mce_loop_s {
701             my ($mce, $chunk_ref, $chunk_id) = @_;
702             my $buf = '';
703             for my $seq (@{ $chunk_ref }) {
704             $buf .= "$seq\n";
705             }
706             MCE->gather($chunk_id, $buf);
707             }
708             [ $beg, $end ];
709              
710             The sequence engine can compute 'begin' and 'end' items only, for the chunk,
711             and not the items in between (hence boundaries only). This option applies
712             to sequence only and has no effect when chunk_size equals 1.
713              
714             The time to run is 0.006s below. This becomes 0.827s without the bounds_only
715             option due to computing all items in between, thus creating a very large
716             array. Basically, specify bounds_only => 1 when boundaries is all you need
717             for looping inside the block; e.g. Monte Carlo simulations.
718              
719             Time was measured using 1 worker to emphasize the difference.
720              
721             use MCE::Loop;
722              
723             MCE::Loop->init(
724             max_workers => 1, chunk_size => 1_250_000,
725             bounds_only => 1
726             );
727              
728             # Typically, the input scalar $_ contains the sequence number
729             # when chunk_size => 1, unless the bounds_only option is set
730             # which is the case here. Thus, $_ points to $chunk_ref.
731              
732             mce_loop_s {
733             my ($mce, $chunk_ref, $chunk_id) = @_;
734              
735             # $chunk_ref contains 2 items, not 1_250_000
736             # my ( $begin, $end ) = ( $_->[0], $_->[1] );
737              
738             my $begin = $chunk_ref->[0];
739             my $end = $chunk_ref->[1];
740              
741             # for my $seq ( $begin .. $end ) {
742             # ...
743             # }
744              
745             MCE->printf("%7d .. %8d\n", $begin, $end);
746             }
747             [ 1, 10_000_000 ];
748              
749             -- Output
750              
751             1 .. 1250000
752             1250001 .. 2500000
753             2500001 .. 3750000
754             3750001 .. 5000000
755             5000001 .. 6250000
756             6250001 .. 7500000
757             7500001 .. 8750000
758             8750001 .. 10000000
759              
760             =over 3
761              
762             =item MCE::Loop->run ( sub { code }, iterator )
763              
764             =item mce_loop { code } iterator
765              
766             =back
767              
768             An iterator reference may be specified for input_data. Iterators are described
769             under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
770              
771             mce_loop { $_ } make_iterator(10, 30, 2);
772              
773             =head1 GATHERING DATA
774              
775             Unlike MCE::Map where gather and output order are done for you automatically,
776             the gather method is used to have results sent back to the manager process.
777              
778             use MCE::Loop chunk_size => 1;
779              
780             ## Output order is not guaranteed.
781             my @a1 = mce_loop { MCE->gather($_ * 2) } 1..100;
782             print "@a1\n\n";
783              
784             ## Outputs to a hash instead (key, value).
785             my %h1 = mce_loop { MCE->gather($_, $_ * 2) } 1..100;
786             print "@h1{1..100}\n\n";
787              
788             ## This does the same thing due to chunk_id starting at one.
789             my %h2 = mce_loop { MCE->gather(MCE->chunk_id, $_ * 2) } 1..100;
790             print "@h2{1..100}\n\n";
791              
792             The gather method may be called multiple times within the block unlike return
793             which would leave the block. Therefore, think of gather as yielding results
794             immediately to the manager process without actually leaving the block.
795              
796             use MCE::Loop chunk_size => 1, max_workers => 3;
797              
798             my @hosts = qw(
799             hosta hostb hostc hostd hoste
800             );
801              
802             my %h3 = mce_loop {
803             my ($output, $error, $status); my $host = $_;
804              
805             ## Do something with $host;
806             $output = "Worker ". MCE->wid .": Hello from $host";
807              
808             if (MCE->chunk_id % 3 == 0) {
809             ## Simulating an error condition
810             local $? = 1; $status = $?;
811             $error = "Error from $host"
812             }
813             else {
814             $status = 0;
815             }
816              
817             ## Ensure unique keys (key, value) when gathering to
818             ## a hash.
819             MCE->gather("$host.out", $output);
820             MCE->gather("$host.err", $error) if (defined $error);
821             MCE->gather("$host.sta", $status);
822              
823             } @hosts;
824              
825             foreach my $host (@hosts) {
826             print $h3{"$host.out"}, "\n";
827             print $h3{"$host.err"}, "\n" if (exists $h3{"$host.err"});
828             print "Exit status: ", $h3{"$host.sta"}, "\n\n";
829             }
830              
831             -- Output
832              
833             Worker 2: Hello from hosta
834             Exit status: 0
835              
836             Worker 1: Hello from hostb
837             Exit status: 0
838              
839             Worker 3: Hello from hostc
840             Error from hostc
841             Exit status: 1
842              
843             Worker 2: Hello from hostd
844             Exit status: 0
845              
846             Worker 1: Hello from hoste
847             Exit status: 0
848              
849             The following uses an anonymous array containing 3 elements when gathering
850             data. Serialization is automatic behind the scene.
851              
852             my %h3 = mce_loop {
853             ...
854              
855             MCE->gather($host, [$output, $error, $status]);
856              
857             } @hosts;
858              
859             foreach my $host (@hosts) {
860             print $h3{$host}->[0], "\n";
861             print $h3{$host}->[1], "\n" if (defined $h3{$host}->[1]);
862             print "Exit status: ", $h3{$host}->[2], "\n\n";
863             }
864              
865             Although MCE::Map comes to mind, one may want additional control when
866             gathering data such as retaining output order.
867              
868             use MCE::Loop;
869              
870             sub preserve_order {
871             my %tmp; my $order_id = 1; my $gather_ref = $_[0];
872              
873             return sub {
874             $tmp{ (shift) } = \@_;
875              
876             while (1) {
877             last unless exists $tmp{$order_id};
878             push @{ $gather_ref }, @{ delete $tmp{$order_id++} };
879             }
880              
881             return;
882             };
883             }
884              
885             my @m2;
886              
887             MCE::Loop->init(
888             chunk_size => 'auto', max_workers => 'auto',
889             gather => preserve_order(\@m2)
890             );
891              
892             mce_loop {
893             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
894              
895             ## Compute the entire chunk data at once.
896             push @a, map { $_ * 2 } @{ $chunk_ref };
897              
898             ## Afterwards, invoke the gather feature, which
899             ## will direct the data to the callback function.
900             MCE->gather(MCE->chunk_id, @a);
901              
902             } 1..100000;
903              
904             MCE::Loop->finish;
905              
906             print scalar @m2, "\n";
907              
908             All 6 models support 'auto' for chunk_size unlike the Core API. Think of the
909             models as the basis for providing JIT for MCE. They create the instance, tune
910             max_workers, and tune chunk_size automatically regardless of the hardware.
911              
912             The following does the same thing using the Core API.
913              
914             use MCE;
915              
916             sub preserve_order {
917             ...
918             }
919              
920             my $mce = MCE->new(
921             max_workers => 'auto', chunk_size => 8000,
922              
923             user_func => sub {
924             my @a; my ($mce, $chunk_ref, $chunk_id) = @_;
925              
926             ## Compute the entire chunk data at once.
927             push @a, map { $_ * 2 } @{ $chunk_ref };
928              
929             ## Afterwards, invoke the gather feature, which
930             ## will direct the data to the callback function.
931             MCE->gather(MCE->chunk_id, @a);
932             }
933             );
934              
935             my @m2;
936              
937             $mce->process({ gather => preserve_order(\@m2) }, [1..100000]);
938             $mce->shutdown;
939              
940             print scalar @m2, "\n";
941              
942             =head1 MANUAL SHUTDOWN
943              
944             =over 3
945              
946             =item MCE::Loop->finish
947              
948             =item MCE::Loop::finish
949              
950             =back
951              
952             Workers remain persistent as much as possible after running. Shutdown occurs
953             automatically when the script terminates. Call finish when workers are no
954             longer needed.
955              
956             use MCE::Loop;
957              
958             MCE::Loop->init(
959             chunk_size => 20, max_workers => 'auto'
960             );
961              
962             mce_loop { ... } 1..100;
963              
964             MCE::Loop->finish;
965              
966             =head1 INDEX
967              
968             L<MCE|MCE>, L<MCE::Core>
969              
970             =head1 AUTHOR
971              
972             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
973              
974             =cut
975