File Coverage

blib/lib/MCE/Flow.pm
Criterion Covered Total %
statement 198 235 84.2
branch 106 178 59.5
condition 43 74 58.1
subroutine 14 16 87.5
pod 5 5 100.0
total 366 508 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel flow model for building creative applications.
4             ##
5             ###############################################################################
6              
7             package MCE::Flow;
8              
9 37     37   1956916 use strict;
  37         352  
  37         1050  
10 37     37   185 use warnings;
  37         71  
  37         1100  
11              
12 37     37   164 no warnings qw( threads recursion uninitialized );
  37         61  
  37         2034  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 37     37   225 use Scalar::Util qw( looks_like_number );
  37         49  
  37         1856  
21 37     37   17640 use MCE;
  37         85  
  37         200  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow');
38             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
39             my ($_user_tasks) = ({});
40              
41             sub import {
42 37     37   539 my ($_class, $_pkg) = (shift, caller);
43              
44 37         144 my $_p = $_def->{$_pkg} = {
45             MAX_WORKERS => 'auto',
46             CHUNK_SIZE => 'auto',
47             };
48              
49             ## Import functions.
50 37 50       148 if ($_pkg !~ /^MCE::/) {
51 37     37   309 no strict 'refs'; no warnings 'redefine';
  37     37   85  
  37         2678  
  37         355  
  37         104  
  37         111266  
52 37         86 *{ $_pkg.'::mce_flow_f' } = \&run_file;
  37         241  
53 37         73 *{ $_pkg.'::mce_flow_s' } = \&run_seq;
  37         147  
54 37         68 *{ $_pkg.'::mce_flow' } = \&run;
  37         141  
55             }
56              
57             ## Process module arguments.
58 37         141 while ( my $_argument = shift ) {
59 0         0 my $_arg = lc $_argument;
60              
61 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
62 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
63 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
64 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
65 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
66 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
67 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
68              
69             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
70 0 0       0 if ( $_arg eq 'sereal' ) {
71 0 0       0 if ( shift eq '0' ) {
72 0         0 require Storable;
73 0         0 $_p->{FREEZE} = \&Storable::freeze;
74 0         0 $_p->{THAW} = \&Storable::thaw;
75             }
76 0         0 next;
77             }
78              
79 0         0 _croak("Error: ($_argument) invalid module option");
80             }
81              
82 37         157 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
83              
84 37         192 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
85             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
86 37 50       115 unless ($_p->{CHUNK_SIZE} eq 'auto');
87              
88 37         657 return;
89             }
90              
91             ###############################################################################
92             ## ----------------------------------------------------------------------------
93             ## Init and finish routines.
94             ##
95             ###############################################################################
96              
97             sub init (@) {
98              
99 29 100 66 29 1 11203 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
100 29         259 my $_pkg = "$$.$_tid.".caller();
101              
102 29 50       258 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
103              
104 29         91 @_ = ();
105              
106 29         82 return;
107             }
108              
109             sub finish (@) {
110              
111 101 50 33 101 1 10077 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
112 101 100       17539 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
113              
114 101 100 66     1510 if ( $_pkg eq 'MCE' ) {
    100          
115 37         97 for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); }
  37         637  
  29         736  
116             }
117             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
118 35 50       831 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
119              
120 35         311 delete $_user_tasks->{$_pkg};
121 35         428 delete $_prev_c->{$_pkg};
122 35         399 delete $_prev_n->{$_pkg};
123 35         268 delete $_prev_t->{$_pkg};
124 35         148 delete $_prev_w->{$_pkg};
125 35         510 delete $_MCE->{$_pkg};
126             }
127              
128 101         373 @_ = ();
129              
130 101         414 return;
131             }
132              
133             ###############################################################################
134             ## ----------------------------------------------------------------------------
135             ## Parallel flow with MCE -- file.
136             ##
137             ###############################################################################
138              
139             sub run_file (@) {
140              
141 4 50 33 4 1 2154 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
142              
143 4 50       18 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         14  
144 4         20 my $_pid = "$$.$_tid.".caller();
145              
146 4 50       16 if (defined (my $_p = $_params->{$_pid})) {
147 4 50       8 delete $_p->{input_data} if (exists $_p->{input_data});
148 4 50       10 delete $_p->{sequence} if (exists $_p->{sequence});
149             }
150             else {
151 0         0 $_params->{$_pid} = {};
152             }
153              
154 4         46 for my $_i ($_start_pos .. @_ - 1) {
155 8         18 my $_r = ref $_[$_i];
156 8 100 66     62 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
157 4         8 $_file = $_[$_i]; $_pos = $_i;
  4         4  
158 4         8 last;
159             }
160             }
161              
162 4 100 66     42 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
163 2 50       54 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
164 2 50       28 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
165 2 50       20 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
166 2         18 $_params->{$_pid}{_file} = $_file;
167             }
168             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
169 2         6 $_params->{$_pid}{_file} = $_file;
170             }
171             else {
172 0         0 _croak("$_tag: (file) is not specified or valid");
173             }
174              
175 4 50       12 if (defined $_pos) {
176 4         10 pop @_ for ($_pos .. @_ - 1);
177             }
178              
179 4         10 return run(@_);
180             }
181              
182             ###############################################################################
183             ## ----------------------------------------------------------------------------
184             ## Parallel flow with MCE -- sequence.
185             ##
186             ###############################################################################
187              
188             sub run_seq (@) {
189              
190 2 50 33 2 1 1070 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
191              
192 2 50       6 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         6  
193 2         10 my $_pid = "$$.$_tid.".caller();
194              
195 2 50       8 if (defined (my $_p = $_params->{$_pid})) {
196 2 50       8 delete $_p->{sequence} if (exists $_p->{sequence});
197 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
198 2 50       4 delete $_p->{_file} if (exists $_p->{_file});
199             }
200             else {
201 0         0 $_params->{$_pid} = {};
202             }
203              
204 2         10 for my $_i ($_start_pos .. @_ - 1) {
205 4         10 my $_r = ref $_[$_i];
206              
207 4 50 66     52 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
208 2         2 $_pos = $_i;
209              
210 2 50 33     8 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
211 2         4 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
212             $_params->{$_pid}{sequence} = [
213 2         12 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
214             ];
215             }
216             elsif ($_r eq 'HASH') {
217 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
218 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
219             }
220             elsif ($_r eq 'ARRAY') {
221 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
222 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
223             }
224              
225 2         4 last;
226             }
227             }
228              
229             _croak("$_tag: (sequence) is not specified or valid")
230 2 50       6 unless (exists $_params->{$_pid}{sequence});
231 2 50       8 _croak("$_tag: (begin) is not specified for sequence")
232             unless (defined $_begin);
233 2 50       6 _croak("$_tag: (end) is not specified for sequence")
234             unless (defined $_end);
235              
236 2         4 $_params->{$_pid}{sequence_run} = undef;
237              
238 2 50       6 if (defined $_pos) {
239 2         8 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 2         6 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel flow with MCE.
248             ##
249             ###############################################################################
250              
251             sub run (@) {
252              
253 74 50 33 74 1 36327 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
254              
255 74 100       392 my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller();
256 74         368 my $_pid = "$$.$_tid.$_pkg";
257              
258 74 100       532 if (ref $_[0] eq 'HASH') {
259 48 100       198 $_params->{$_pid} = {} unless defined $_params->{$_pid};
260 48         167 for my $_p (keys %{ $_[0] }) {
  48         243  
261 72         337 $_params->{$_pid}{$_p} = $_[0]->{$_p};
262             }
263              
264 48         98 shift;
265             }
266              
267             ## -------------------------------------------------------------------------
268              
269 74         201 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  74         709  
  74         156  
270              
271 74         1101 while (ref $_[0] eq 'CODE') {
272 94         307 push @_code, $_[0];
273              
274 94 50       369 if (defined (my $_p = $_params->{$_pid})) {
275             push @_name, (ref $_p->{task_name} eq 'ARRAY')
276 94 100       325 ? $_p->{task_name}->[$_pos] : undef;
277             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
278 94 50       300 ? $_p->{use_threads}->[$_pos] : undef;
279             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
280 94 100       426 ? $_p->{max_workers}->[$_pos] : undef;
281             }
282              
283             $_init_mce = 1 if (
284             !defined $_prev_c->{$_pid}[$_pos] ||
285 94 100 66     572 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
286             );
287              
288 94 100       355 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
289 94 50       311 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
290 94 100       293 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
291              
292 94         206 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
293 94         196 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
294 94         170 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
295 94         204 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
296              
297 94         209 shift; $_pos++;
  94         389  
298             }
299              
300 74 50       297 if (defined $_prev_c->{$_pid}[$_pos]) {
301 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
302 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
303 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
304 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
305              
306 0         0 $_init_mce = 1;
307             }
308              
309 74 50       219 return unless (scalar @_code);
310              
311             ## -------------------------------------------------------------------------
312              
313 74         154 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  74         237  
314 74         178 my $_r = ref $_[0];
315              
316 74 100 66     373 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
317 4         10 $_input_data = shift;
318             }
319              
320 74 50       235 if (defined (my $_p = $_params->{$_pid})) {
321             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
322 74 100 66     1101 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
323              
324 74 100 100     433 delete $_p->{sequence} if (defined $_input_data || scalar @_);
325 74 50       220 delete $_p->{user_func} if (exists $_p->{user_func});
326 74 50       234 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
327             }
328              
329 74 100 66     399 if (@_code > 1 && $_max_workers > 1) {
330 20         74 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
331             }
332              
333             my $_chunk_size = MCE::_parse_chunk_size(
334 74         490 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
335             $_input_data, scalar @_
336             );
337              
338 74 50       450 if (defined (my $_p = $_params->{$_pid})) {
339 74 100       236 if (exists $_p->{_file}) {
340 4         10 $_input_data = delete $_p->{_file};
341             } else {
342 70 50       282 $_input_data = $_p->{input_data} if exists $_p->{input_data};
343             }
344             }
345              
346             ## -------------------------------------------------------------------------
347              
348 74         477 MCE::_save_state($_MCE->{$_pid});
349              
350 74 100       308 if ($_init_mce) {
351 64 50       353 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
352              
353             ## must clear arrays for nested session to work with Perl < v5.14
354 64         585 _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]);
355              
356 64         338 @_code = @_name = @_thrs = @_wrks = ();
357              
358             my %_opts = (
359             max_workers => $_max_workers, task_name => $_tag,
360 64         431 user_tasks => $_user_tasks->{$_pid},
361             );
362              
363 64 50       292 if (defined (my $_p = $_params->{$_pid})) {
364 64         112 local $_;
365              
366 64         115 for (keys %{ $_p }) {
  64         310  
367 114 100 100     661 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
368 102 100 66     426 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
369 96 50 33     339 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
370              
371 96 50       219 next if ($_ eq 'chunk_size');
372 96 50       230 next if ($_ eq 'input_data');
373 96 50       206 next if ($_ eq 'sequence_run');
374              
375             _croak("$_tag: ($_) is not a valid constructor argument")
376 96 50       362 unless (exists $MCE::_valid_fields_new{$_});
377              
378 96         284 $_opts{$_} = $_p->{$_};
379             }
380             }
381              
382 64         257 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
383             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
384 320 50 33     1061 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
385             }
386              
387 64         693 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
388             }
389             else {
390             ## Workers may persist after running. Thus, updating the MCE instance.
391             ## These options do not require respawning.
392 10 50       30 if (defined (my $_p = $_params->{$_pid})) {
393 10         24 for my $_k (qw(
394             RS interval stderr_file stdout_file user_error user_output
395             job_delay submit_delay on_post_exit on_post_run user_args
396             flush_file flush_stderr flush_stdout gather max_retries
397             )) {
398 160 100       266 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
399             }
400             }
401             }
402              
403             ## -------------------------------------------------------------------------
404              
405 74 100       172 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  74         153  
  74         247  
406              
407 74 100       468 if (defined $_input_data) {
    100          
408 8         8 @_ = ();
409 8         42 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
410 7         51 delete $_MCE->{$_pid}{input_data};
411             }
412             elsif (scalar @_) {
413 6         42 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
414 2         12 delete $_MCE->{$_pid}{input_data};
415             }
416             else {
417 60 100 66     696 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
418             $_MCE->{$_pid}->run({
419             chunk_size => $_chunk_size,
420             sequence => $_params->{$_pid}{sequence}
421 2         22 }, 0);
422 2 50       12 if (exists $_params->{$_pid}{sequence_run}) {
423 2         6 delete $_params->{$_pid}{sequence_run};
424 2         2 delete $_params->{$_pid}{sequence};
425             }
426 2         6 delete $_MCE->{$_pid}{sequence};
427             }
428             else {
429 58         503 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
430             }
431             }
432              
433 45         954 MCE::_restore_state();
434              
435 45 100       256 delete $_MCE->{$_pid}{gather} if (defined $_wa);
436              
437 45 100       1495 return ((defined $_wa) ? @_a : ());
438             }
439              
440             ###############################################################################
441             ## ----------------------------------------------------------------------------
442             ## Private methods.
443             ##
444             ###############################################################################
445              
446             sub _croak {
447              
448 0     0   0 goto &MCE::_croak;
449             }
450              
451             sub _gen_user_tasks {
452              
453 64     64   261 my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_;
454              
455 64         122 @{ $_user_tasks->{$_pid} } = ();
  64         213  
456              
457 64         192 for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) {
  140         451  
458 76         129 push @{ $_user_tasks->{$_pid} }, {
  76         546  
459             task_name => $_name_ref->[$_i],
460             use_threads => $_thrs_ref->[$_i],
461             max_workers => $_wrks_ref->[$_i],
462             user_func => $_code_ref->[$_i]
463             }
464             }
465              
466 64         192 return;
467             }
468              
469             1;
470              
471             __END__