File Coverage

blib/lib/MCE/Flow.pm
Criterion Covered Total %
statement 198 235 84.2
branch 106 178 59.5
condition 43 74 58.1
subroutine 14 16 87.5
pod 5 5 100.0
total 366 508 72.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel flow model for building creative applications.
4             ##
5             ###############################################################################
6              
7             package MCE::Flow;
8              
9 37     37   2167782 use strict;
  37         334  
  37         1160  
10 37     37   203 use warnings;
  37         123  
  37         1334  
11              
12 37     37   185 no warnings qw( threads recursion uninitialized );
  37         98  
  37         2220  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 37     37   226 use Scalar::Util qw( looks_like_number );
  37         151  
  37         1943  
21 37     37   20477 use MCE;
  37         97  
  37         236  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Flow');
38             my ($_prev_c, $_prev_n, $_prev_t, $_prev_w) = ({}, {}, {}, {});
39             my ($_user_tasks) = ({});
40              
41             sub import {
42 37     37   537 my ($_class, $_pkg) = (shift, caller);
43              
44 37         181 my $_p = $_def->{$_pkg} = {
45             MAX_WORKERS => 'auto',
46             CHUNK_SIZE => 'auto',
47             };
48              
49             ## Import functions.
50 37 50       185 if ($_pkg !~ /^MCE::/) {
51 37     37   281 no strict 'refs'; no warnings 'redefine';
  37     37   86  
  37         1485  
  37         226  
  37         73  
  37         117813  
52 37         83 *{ $_pkg.'::mce_flow_f' } = \&run_file;
  37         261  
53 37         91 *{ $_pkg.'::mce_flow_s' } = \&run_seq;
  37         157  
54 37         72 *{ $_pkg.'::mce_flow' } = \&run;
  37         123  
55             }
56              
57             ## Process module arguments.
58 37         173 while ( my $_argument = shift ) {
59 0         0 my $_arg = lc $_argument;
60              
61 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
62 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
63 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
64 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
65 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
66 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
67 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
68              
69             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
70 0 0       0 if ( $_arg eq 'sereal' ) {
71 0 0       0 if ( shift eq '0' ) {
72 0         0 require Storable;
73 0         0 $_p->{FREEZE} = \&Storable::freeze;
74 0         0 $_p->{THAW} = \&Storable::thaw;
75             }
76 0         0 next;
77             }
78              
79 0         0 _croak("Error: ($_argument) invalid module option");
80             }
81              
82 37         166 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
83              
84 37         156 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
85             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
86 37 50       114 unless ($_p->{CHUNK_SIZE} eq 'auto');
87              
88 37         708 return;
89             }
90              
91             ###############################################################################
92             ## ----------------------------------------------------------------------------
93             ## Init and finish routines.
94             ##
95             ###############################################################################
96              
97             sub init (@) {
98              
99 29 100 66 29 1 13976 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
100 29         236 my $_pkg = "$$.$_tid.".caller();
101              
102 29 50       318 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
103              
104 29         94 @_ = ();
105              
106 29         83 return;
107             }
108              
109             sub finish (@) {
110              
111 101 50 33 101 1 11744 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
112 101 100       1166 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
113              
114 101 100 66     1591 if ( $_pkg eq 'MCE' ) {
    100          
115 37         20279 for my $_k ( keys %{ $_MCE } ) { MCE::Flow->finish($_k, 1); }
  37         793  
  29         901  
116             }
117             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
118 35 50       682 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
119              
120 35         381 delete $_user_tasks->{$_pkg};
121 35         554 delete $_prev_c->{$_pkg};
122 35         353 delete $_prev_n->{$_pkg};
123 35         171 delete $_prev_t->{$_pkg};
124 35         164 delete $_prev_w->{$_pkg};
125 35         619 delete $_MCE->{$_pkg};
126             }
127              
128 101         426 @_ = ();
129              
130 101         457 return;
131             }
132              
133             ###############################################################################
134             ## ----------------------------------------------------------------------------
135             ## Parallel flow with MCE -- file.
136             ##
137             ###############################################################################
138              
139             sub run_file (@) {
140              
141 4 50 33 4 1 3556 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
142              
143 4 50       16 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  4         20  
144 4         24 my $_pid = "$$.$_tid.".caller();
145              
146 4 50       22 if (defined (my $_p = $_params->{$_pid})) {
147 4 50       16 delete $_p->{input_data} if (exists $_p->{input_data});
148 4 50       16 delete $_p->{sequence} if (exists $_p->{sequence});
149             }
150             else {
151 0         0 $_params->{$_pid} = {};
152             }
153              
154 4         48 for my $_i ($_start_pos .. @_ - 1) {
155 8         30 my $_r = ref $_[$_i];
156 8 100 66     132 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
157 4         10 $_file = $_[$_i]; $_pos = $_i;
  4         8  
158 4         10 last;
159             }
160             }
161              
162 4 100 66     120 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
163 2 50       112 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
164 2 50       48 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
165 2 50       66 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
166 2         16 $_params->{$_pid}{_file} = $_file;
167             }
168             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
169 2         8 $_params->{$_pid}{_file} = $_file;
170             }
171             else {
172 0         0 _croak("$_tag: (file) is not specified or valid");
173             }
174              
175 4 50       14 if (defined $_pos) {
176 4         16 pop @_ for ($_pos .. @_ - 1);
177             }
178              
179 4         16 return run(@_);
180             }
181              
182             ###############################################################################
183             ## ----------------------------------------------------------------------------
184             ## Parallel flow with MCE -- sequence.
185             ##
186             ###############################################################################
187              
188             sub run_seq (@) {
189              
190 2 50 33 2 1 1910 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
191              
192 2 50       8 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  2         12  
193 2         18 my $_pid = "$$.$_tid.".caller();
194              
195 2 50       10 if (defined (my $_p = $_params->{$_pid})) {
196 2 50       10 delete $_p->{sequence} if (exists $_p->{sequence});
197 2 50       10 delete $_p->{input_data} if (exists $_p->{input_data});
198 2 50       10 delete $_p->{_file} if (exists $_p->{_file});
199             }
200             else {
201 0         0 $_params->{$_pid} = {};
202             }
203              
204 2         16 for my $_i ($_start_pos .. @_ - 1) {
205 4         16 my $_r = ref $_[$_i];
206              
207 4 50 66     92 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
208 2         6 $_pos = $_i;
209              
210 2 50 33     10 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
211 2         10 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
212             $_params->{$_pid}{sequence} = [
213 2         10 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
214             ];
215             }
216             elsif ($_r eq 'HASH') {
217 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
218 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
219             }
220             elsif ($_r eq 'ARRAY') {
221 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
222 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
223             }
224              
225 2         10 last;
226             }
227             }
228              
229             _croak("$_tag: (sequence) is not specified or valid")
230 2 50       10 unless (exists $_params->{$_pid}{sequence});
231 2 50       10 _croak("$_tag: (begin) is not specified for sequence")
232             unless (defined $_begin);
233 2 50       10 _croak("$_tag: (end) is not specified for sequence")
234             unless (defined $_end);
235              
236 2         6 $_params->{$_pid}{sequence_run} = undef;
237              
238 2 50       10 if (defined $_pos) {
239 2         14 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 2         12 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel flow with MCE.
248             ##
249             ###############################################################################
250              
251             sub run (@) {
252              
253 74 50 33 74 1 49508 shift if (defined $_[0] && $_[0] eq 'MCE::Flow');
254              
255 74 100       423 my $_pkg = caller() eq 'MCE::Flow' ? caller(1) : caller();
256 74         424 my $_pid = "$$.$_tid.$_pkg";
257              
258 74 100       358 if (ref $_[0] eq 'HASH') {
259 48 100       427 $_params->{$_pid} = {} unless defined $_params->{$_pid};
260 48         212 for my $_p (keys %{ $_[0] }) {
  48         266  
261 72         398 $_params->{$_pid}{$_p} = $_[0]->{$_p};
262             }
263              
264 48         207 shift;
265             }
266              
267             ## -------------------------------------------------------------------------
268              
269 74         247 my (@_code, @_name, @_thrs, @_wrks); my $_init_mce = 0; my $_pos = 0;
  74         836  
  74         163  
270              
271 74         816 while (ref $_[0] eq 'CODE') {
272 94         445 push @_code, $_[0];
273              
274 94 50       496 if (defined (my $_p = $_params->{$_pid})) {
275             push @_name, (ref $_p->{task_name} eq 'ARRAY')
276 94 100       499 ? $_p->{task_name}->[$_pos] : undef;
277             push @_thrs, (ref $_p->{use_threads} eq 'ARRAY')
278 94 50       354 ? $_p->{use_threads}->[$_pos] : undef;
279             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
280 94 100       357 ? $_p->{max_workers}->[$_pos] : undef;
281             }
282              
283             $_init_mce = 1 if (
284             !defined $_prev_c->{$_pid}[$_pos] ||
285 94 100 66     649 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
286             );
287              
288 94 100       464 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
289 94 50       457 $_init_mce = 1 if ($_prev_t->{$_pid}[$_pos] ne $_thrs[$_pos]);
290 94 100       752 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
291              
292 94         375 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
293 94         362 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
294 94         284 $_prev_t->{$_pid}[$_pos] = $_thrs[$_pos];
295 94         244 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
296              
297 94         171 shift; $_pos++;
  94         399  
298             }
299              
300 74 50       349 if (defined $_prev_c->{$_pid}[$_pos]) {
301 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
302 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
303 0         0 pop @{ $_prev_t->{$_pid} } for ($_pos .. $#{ $_prev_t->{$_pid } });
  0         0  
  0         0  
304 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
305              
306 0         0 $_init_mce = 1;
307             }
308              
309 74 50       298 return unless (scalar @_code);
310              
311             ## -------------------------------------------------------------------------
312              
313 74         225 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  74         334  
314 74         257 my $_r = ref $_[0];
315              
316 74 100 66     453 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
317 4         16 $_input_data = shift;
318             }
319              
320 74 50       281 if (defined (my $_p = $_params->{$_pid})) {
321             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
322 74 100 66     1239 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
323              
324 74 100 100     554 delete $_p->{sequence} if (defined $_input_data || scalar @_);
325 74 50       370 delete $_p->{user_func} if (exists $_p->{user_func});
326 74 50       286 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
327             }
328              
329 74 100 66     579 if (@_code > 1 && $_max_workers > 1) {
330 20         214 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
331             }
332              
333             my $_chunk_size = MCE::_parse_chunk_size(
334 74         629 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
335             $_input_data, scalar @_
336             );
337              
338 74 50       568 if (defined (my $_p = $_params->{$_pid})) {
339 74 100       416 if (exists $_p->{_file}) {
340 4         20 $_input_data = delete $_p->{_file};
341             } else {
342 70 50       287 $_input_data = $_p->{input_data} if exists $_p->{input_data};
343             }
344             }
345              
346             ## -------------------------------------------------------------------------
347              
348 74         554 MCE::_save_state($_MCE->{$_pid});
349              
350 74 100       315 if ($_init_mce) {
351 64 50       248 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
352              
353             ## must clear arrays for nested session to work with Perl < v5.14
354 64         648 _gen_user_tasks($_pid, [@_code], [@_name], [@_thrs], [@_wrks]);
355              
356 64         459 @_code = @_name = @_thrs = @_wrks = ();
357              
358             my %_opts = (
359             max_workers => $_max_workers, task_name => $_tag,
360 64         398 user_tasks => $_user_tasks->{$_pid},
361             );
362              
363 64 50       295 if (defined (my $_p = $_params->{$_pid})) {
364 64         127 local $_;
365              
366 64         208 for (keys %{ $_p }) {
  64         373  
367 114 100 100     798 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
368 102 100 66     542 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
369 96 50 33     368 next if ($_ eq 'use_threads' && ref $_p->{use_threads} eq 'ARRAY');
370              
371 96 50       276 next if ($_ eq 'chunk_size');
372 96 50       198 next if ($_ eq 'input_data');
373 96 50       243 next if ($_ eq 'sequence_run');
374              
375             _croak("$_tag: ($_) is not a valid constructor argument")
376 96 50       444 unless (exists $MCE::_valid_fields_new{$_});
377              
378 96         416 $_opts{$_} = $_p->{$_};
379             }
380             }
381              
382 64         355 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
383             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
384 320 50 33     1160 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
385             }
386              
387 64         860 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
388             }
389             else {
390             ## Workers may persist after running. Thus, updating the MCE instance.
391             ## These options do not require respawning.
392 10 50       36 if (defined (my $_p = $_params->{$_pid})) {
393 10         32 for my $_k (qw(
394             RS interval stderr_file stdout_file user_error user_output
395             job_delay submit_delay on_post_exit on_post_run user_args
396             flush_file flush_stderr flush_stdout gather max_retries
397             )) {
398 160 100       430 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
399             }
400             }
401             }
402              
403             ## -------------------------------------------------------------------------
404              
405 74 100       192 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  74         250  
  74         393  
406              
407 74 100       631 if (defined $_input_data) {
    100          
408 8         18 @_ = ();
409 8         86 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
410 7         60 delete $_MCE->{$_pid}{input_data};
411             }
412             elsif (scalar @_) {
413 6         72 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
414 2         18 delete $_MCE->{$_pid}{input_data};
415             }
416             else {
417 60 100 66     720 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
418             $_MCE->{$_pid}->run({
419             chunk_size => $_chunk_size,
420             sequence => $_params->{$_pid}{sequence}
421 2         30 }, 0);
422 2 50       14 if (exists $_params->{$_pid}{sequence_run}) {
423 2         6 delete $_params->{$_pid}{sequence_run};
424 2         4 delete $_params->{$_pid}{sequence};
425             }
426 2         4 delete $_MCE->{$_pid}{sequence};
427             }
428             else {
429 58         601 $_MCE->{$_pid}->run({ chunk_size => $_chunk_size }, 0);
430             }
431             }
432              
433 45         1091 MCE::_restore_state();
434              
435 45 100       219 delete $_MCE->{$_pid}{gather} if (defined $_wa);
436              
437 45 100       1714 return ((defined $_wa) ? @_a : ());
438             }
439              
440             ###############################################################################
441             ## ----------------------------------------------------------------------------
442             ## Private methods.
443             ##
444             ###############################################################################
445              
446             sub _croak {
447              
448 0     0   0 goto &MCE::_croak;
449             }
450              
451             sub _gen_user_tasks {
452              
453 64     64   311 my ($_pid, $_code_ref, $_name_ref, $_thrs_ref, $_wrks_ref) = @_;
454              
455 64         136 @{ $_user_tasks->{$_pid} } = ();
  64         216  
456              
457 64         269 for (my $_i = 0; $_i < @{ $_code_ref }; $_i++) {
  140         518  
458 76         200 push @{ $_user_tasks->{$_pid} }, {
  76         565  
459             task_name => $_name_ref->[$_i],
460             use_threads => $_thrs_ref->[$_i],
461             max_workers => $_wrks_ref->[$_i],
462             user_func => $_code_ref->[$_i]
463             }
464             }
465              
466 64         160 return;
467             }
468              
469             1;
470              
471             __END__