File Coverage

blib/lib/MCE/Stream.pm
Criterion Covered Total %
statement 280 361 77.5
branch 125 240 52.0
condition 52 100 52.0
subroutine 19 22 86.3
pod 5 5 100.0
total 481 728 66.0


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel stream model for chaining multiple maps and greps.
4             ##
5             ###############################################################################
6              
7             package MCE::Stream;
8              
9 10     10   755295 use strict;
  10         92  
  10         257  
10 10     10   40 use warnings;
  10         11  
  10         255  
11              
12 10     10   40 no warnings qw( threads recursion uninitialized );
  10         11  
  10         516  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 10     10   49 use Scalar::Util qw( looks_like_number );
  10         19  
  10         499  
21              
22 10     10   4622 use MCE;
  10         31  
  10         62  
23 10     10   4747 use MCE::Queue;
  10         20  
  10         43  
24              
25             our @CARP_NOT = qw( MCE );
26              
27             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
28              
29             sub CLONE {
30 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
31             }
32              
33             ###############################################################################
34             ## ----------------------------------------------------------------------------
35             ## Import routine.
36             ##
37             ###############################################################################
38              
39             my ($_MCE, $_def, $_params, $_tag) = ({}, {}, {}, 'MCE::Stream');
40             my ($_prev_c, $_prev_m, $_prev_n, $_prev_w) = ({}, {}, {}, {});
41             my ($_user_tasks, $_queue) = ({}, {});
42              
43             sub import {
44 10     10   132 my ($_class, $_pkg) = (shift, caller);
45              
46 10         41 my $_p = $_def->{$_pkg} = {
47             MAX_WORKERS => 'auto',
48             CHUNK_SIZE => 'auto',
49             DEFAULT_MODE => 'map',
50             };
51              
52             ## Import functions.
53 10 50       33 if ($_pkg !~ /^MCE::/) {
54 10     10   60 no strict 'refs'; no warnings 'redefine';
  10     10   20  
  10         336  
  10         40  
  10         20  
  10         36309  
55 10         21 *{ $_pkg.'::mce_stream_f' } = \&run_file;
  10         60  
56 10         21 *{ $_pkg.'::mce_stream_s' } = \&run_seq;
  10         30  
57 10         11 *{ $_pkg.'::mce_stream' } = \&run;
  10         31  
58             }
59              
60             ## Process module arguments.
61 10         59 while ( my $_argument = shift ) {
62 0         0 my $_arg = lc $_argument;
63              
64 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
65 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
66 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
67 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
68 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
69 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
70 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
71 0 0       0 $_p->{DEFAULT_MODE} = shift, next if ( $_arg eq 'default_mode' );
72              
73 0 0       0 shift, next if ( $_arg eq 'fast' ); # ignored
74              
75             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
76 0 0       0 if ( $_arg eq 'sereal' ) {
77 0 0       0 if ( shift eq '0' ) {
78 0         0 require Storable;
79 0         0 $_p->{FREEZE} = \&Storable::freeze;
80 0         0 $_p->{THAW} = \&Storable::thaw;
81             }
82 0         0 next;
83             }
84              
85 0         0 _croak("Error: ($_argument) invalid module option");
86             }
87              
88             _croak("Error: (DEFAULT_MODE) is not valid")
89 10 50 33     62 if ($_p->{DEFAULT_MODE} ne 'grep' && $_p->{DEFAULT_MODE} ne 'map');
90              
91 10         51 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
92              
93 10         41 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
94             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
95 10 50       31 unless ($_p->{CHUNK_SIZE} eq 'auto');
96              
97 10         138 return;
98             }
99              
100             ###############################################################################
101             ## ----------------------------------------------------------------------------
102             ## Gather callback to ensure chunk order is preserved during gathering.
103             ## Also, the task end callback for when a task completes.
104             ##
105             ###############################################################################
106              
107             my ($_gather_ref, $_order_id, %_tmp);
108              
109             sub _preserve_order {
110              
111 379     379   960 $_tmp{$_[1]} = $_[0];
112              
113 379 100       577 if (defined $_gather_ref) {
114 185         210 while (1) {
115 370 100       655 last unless exists $_tmp{$_order_id};
116 185         180 push @{ $_gather_ref }, @{ delete $_tmp{$_order_id++} };
  185         205  
  185         435  
117             }
118             }
119             else {
120 194         235 $_order_id++;
121             }
122              
123 379         771 return;
124             }
125              
126             sub _task_end {
127              
128 102     102   205 my ($_mce, $_task_id, $_task_name) = @_;
129 102         237 my $_pid = $_mce->{_init_pid}.'.'.$_mce->{_caller};
130              
131 102 100       271 if (defined $_mce->{user_tasks}->[$_task_id + 1]) {
132 51         277 my $n_workers = $_mce->{user_tasks}->[$_task_id + 1]->{max_workers};
133 51         64 my $_id = @{ $_queue->{$_pid} } - $_task_id - 1;
  51         178  
134              
135 51         207 $_queue->{$_pid}[$_id]->enqueue((undef) x $n_workers);
136             }
137              
138             $_params->{task_end}->($_mce, $_task_id, $_task_name)
139 102 50 33     237 if (exists $_params->{task_end} && ref $_params->{task_end} eq 'CODE');
140              
141 102         230 return;
142             }
143              
144             ###############################################################################
145             ## ----------------------------------------------------------------------------
146             ## Init and finish routines.
147             ##
148             ###############################################################################
149              
150             sub init (@) {
151              
152 9 50 33 9 1 1215 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
153 9         54 my $_pkg = "$$.$_tid.".caller();
154              
155 9 50       63 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
156              
157             _croak("$_tag: (HASH) not allowed as input by this MCE model")
158 9 50       45 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
159              
160 9         18 @_ = ();
161              
162 9         27 return;
163             }
164              
165             sub finish (@) {
166              
167 24 50 33 24 1 8726 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
168 24 100       216 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
169              
170 24 100 66     290 if ( $_pkg eq 'MCE' ) {
    100          
171 10         20 for my $_k ( keys %{ $_MCE } ) { MCE::Stream->finish($_k, 1); }
  10         259  
  8         184  
172             }
173             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
174 6 50       94 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
175 6         28 $_gather_ref = $_order_id = undef, undef %_tmp;
176              
177 6         24 delete $_user_tasks->{$_pkg};
178 6         25 delete $_prev_c->{$_pkg};
179 6         30 delete $_prev_m->{$_pkg};
180 6         31 delete $_prev_n->{$_pkg};
181 6         22 delete $_prev_w->{$_pkg};
182 6         75 delete $_MCE->{$_pkg};
183              
184 6 50       35 if (defined $_queue->{$_pkg}) {
185 6         12 local $_;
186 6         12 $_->DESTROY() for (@{ $_queue->{$_pkg} });
  6         77  
187 6         65 delete $_queue->{$_pkg};
188             }
189             }
190              
191 24         86 @_ = ();
192              
193 24         64 return;
194             }
195              
196             ###############################################################################
197             ## ----------------------------------------------------------------------------
198             ## Parallel stream with MCE -- file.
199             ##
200             ###############################################################################
201              
202             sub run_file (@) {
203              
204 20 50 33 20 1 16555 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
205              
206 20 50       40 my ($_file, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  20         75  
207 20         90 my $_pid = "$$.$_tid.".caller();
208              
209 20 50       60 if (defined (my $_p = $_params->{$_pid})) {
210 20 50       50 delete $_p->{input_data} if (exists $_p->{input_data});
211 20 50       45 delete $_p->{sequence} if (exists $_p->{sequence});
212             }
213             else {
214 0         0 $_params->{$_pid} = {};
215             }
216              
217 20         65 for my $_i ($_start_pos .. @_ - 1) {
218 50         90 my $_r = ref $_[$_i];
219 50 100 66     355 if ($_r eq '' || $_r eq 'SCALAR' || $_r =~ /^(?:GLOB|FileHandle|IO::)/) {
      100        
220 20         30 $_file = $_[$_i]; $_pos = $_i;
  20         25  
221 20         35 last;
222             }
223             }
224              
225 20 100 66     220 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
226 10 50       235 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
227 10 50       125 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
228 10 50       100 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
229 10         75 $_params->{$_pid}{_file} = $_file;
230             }
231             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
232 10         35 $_params->{$_pid}{_file} = $_file;
233             }
234             else {
235 0         0 _croak("$_tag: (file) is not specified or valid");
236             }
237              
238 20 50       50 if (defined $_pos) {
239 20         65 pop @_ for ($_pos .. @_ - 1);
240             }
241              
242 20         45 return run(@_);
243             }
244              
245             ###############################################################################
246             ## ----------------------------------------------------------------------------
247             ## Parallel stream with MCE -- sequence.
248             ##
249             ###############################################################################
250              
251             sub run_seq (@) {
252              
253 10 50 33 10 1 7670 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
254              
255 10 50       70 my ($_begin, $_end, $_pos); my $_start_pos = (ref $_[0] eq 'HASH') ? 2 : 1;
  10         30  
256 10         50 my $_pid = "$$.$_tid.".caller();
257              
258 10 50       35 if (defined (my $_p = $_params->{$_pid})) {
259 10 50       40 delete $_p->{sequence} if (exists $_p->{sequence});
260 10 50       25 delete $_p->{input_data} if (exists $_p->{input_data});
261 10 50       30 delete $_p->{_file} if (exists $_p->{_file});
262             }
263             else {
264 0         0 $_params->{$_pid} = {};
265             }
266              
267 10         75 for my $_i ($_start_pos .. @_ - 1) {
268 25         50 my $_r = ref $_[$_i];
269              
270 25 50 66     195 if ($_r eq '' || $_r =~ /^Math::/ || $_r eq 'HASH' || $_r eq 'ARRAY') {
      66        
      33        
271 10         10 $_pos = $_i;
272              
273 10 50 33     40 if ($_r eq '' || $_r =~ /^Math::/) {
    0          
    0          
274 10         30 $_begin = $_[$_pos], $_end = $_[$_pos + 1];
275             $_params->{$_pid}{sequence} = [
276 10         45 $_[$_pos], $_[$_pos + 1], $_[$_pos + 2], $_[$_pos + 3]
277             ];
278             }
279             elsif ($_r eq 'HASH') {
280 0         0 $_begin = $_[$_pos]->{begin}, $_end = $_[$_pos]->{end};
281 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
282             }
283             elsif ($_r eq 'ARRAY') {
284 0         0 $_begin = $_[$_pos]->[0], $_end = $_[$_pos]->[1];
285 0         0 $_params->{$_pid}{sequence} = $_[$_pos];
286             }
287              
288 10         20 last;
289             }
290             }
291              
292             _croak("$_tag: (sequence) is not specified or valid")
293 10 50       35 unless (exists $_params->{$_pid}{sequence});
294 10 50       20 _croak("$_tag: (begin) is not specified for sequence")
295             unless (defined $_begin);
296 10 50       25 _croak("$_tag: (end) is not specified for sequence")
297             unless (defined $_end);
298              
299 10         30 $_params->{$_pid}{sequence_run} = undef;
300              
301 10 50       20 if (defined $_pos) {
302 10         40 pop @_ for ($_pos .. @_ - 1);
303             }
304              
305 10         25 return run(@_);
306             }
307              
308             ###############################################################################
309             ## ----------------------------------------------------------------------------
310             ## Parallel stream with MCE.
311             ##
312             ###############################################################################
313              
314             sub run (@) {
315              
316 59 50 33 59 1 18749 shift if (defined $_[0] && $_[0] eq 'MCE::Stream');
317              
318 59 100       307 my $_pkg = caller() eq 'MCE::Stream' ? caller(1) : caller();
319 59         192 my $_pid = "$$.$_tid.$_pkg";
320              
321 59 50 66     284 if (ref $_[0] eq 'HASH' && !exists $_[0]->{code}) {
322 0 0       0 $_params->{$_pid} = {} unless defined $_params->{$_pid};
323 0         0 for my $_p (keys %{ $_[0] }) {
  0         0  
324 0         0 $_params->{$_pid}{$_p} = $_[0]->{$_p};
325             }
326              
327 0         0 shift;
328             }
329              
330 59 100       134 my $_aref; $_aref = shift if (ref $_[0] eq 'ARRAY');
  59         167  
331              
332 59         93 $_order_id = 1; undef %_tmp;
  59         113  
333              
334 59 100       118 if (defined $_aref) {
335 25         40 $_gather_ref = $_aref; @{ $_aref } = ();
  25         25  
  25         100  
336             } else {
337 34         63 $_gather_ref = undef;
338             }
339              
340             ## -------------------------------------------------------------------------
341              
342 59         123 my (@_code, @_mode, @_name, @_wrks); my $_init_mce = 0; my $_pos = 0;
  59         78  
  59         79  
343 59         146 my $_default_mode = $_def->{$_pkg}{DEFAULT_MODE};
344              
345 59   100     191 while (ref $_[0] eq 'CODE' || ref $_[0] eq 'HASH') {
346 118 100       205 if (ref $_[0] eq 'CODE') {
347 108         157 push @_code, $_[0];
348 108         157 push @_mode, $_default_mode;
349             }
350             else {
351 10 0 33     35 last if (!exists $_[0]->{code} && !exists $_[0]->{mode});
352              
353 10 50       25 push @_code, exists $_[0]->{code} ? $_[0]->{code} : undef;
354 10 50       30 push @_mode, exists $_[0]->{mode} ? $_[0]->{mode} : $_default_mode;
355              
356 10 50       25 unless (ref $_code[-1] eq 'CODE') {
357 0         0 @_ = (); _croak("$_tag: (code) is not valid");
  0         0  
358             }
359 10 50 66     140 if ($_mode[-1] ne 'grep' && $_mode[-1] ne 'map') {
360 0         0 @_ = (); _croak("$_tag: (mode) is not valid");
  0         0  
361             }
362             }
363              
364 118 50       279 if (defined (my $_p = $_params->{$_pid})) {
365             push @_name, (ref $_p->{task_name} eq 'ARRAY')
366 118 50       279 ? $_p->{task_name}->[$_pos] : undef;
367             push @_wrks, (ref $_p->{max_workers} eq 'ARRAY')
368 118 50       269 ? $_p->{max_workers}->[$_pos] : undef;
369             }
370              
371             $_init_mce = 1 if (
372             !defined $_prev_c->{$_pid}[$_pos] ||
373 118 100 66     474 $_prev_c->{$_pid}[$_pos] != $_code[$_pos]
374             );
375             $_init_mce = 1 if (
376             !defined $_prev_m->{$_pid}[$_pos] ||
377 118 100 66     453 $_prev_m->{$_pid}[$_pos] ne $_mode[$_pos]
378             );
379              
380 118 100       273 $_init_mce = 1 if ($_prev_n->{$_pid}[$_pos] ne $_name[$_pos]);
381 118 100       280 $_init_mce = 1 if ($_prev_w->{$_pid}[$_pos] ne $_wrks[$_pos]);
382              
383 118         181 $_prev_c->{$_pid}[$_pos] = $_code[$_pos];
384 118         161 $_prev_m->{$_pid}[$_pos] = $_mode[$_pos];
385 118         157 $_prev_n->{$_pid}[$_pos] = $_name[$_pos];
386 118         128 $_prev_w->{$_pid}[$_pos] = $_wrks[$_pos];
387              
388 118         162 shift; $_pos++;
  118         344  
389             }
390              
391 59 50       156 if (defined $_prev_c->{$_pid}[$_pos]) {
392 0         0 pop @{ $_prev_c->{$_pid} } for ($_pos .. $#{ $_prev_c->{$_pid } });
  0         0  
  0         0  
393 0         0 pop @{ $_prev_m->{$_pid} } for ($_pos .. $#{ $_prev_m->{$_pid } });
  0         0  
  0         0  
394 0         0 pop @{ $_prev_n->{$_pid} } for ($_pos .. $#{ $_prev_n->{$_pid } });
  0         0  
  0         0  
395 0         0 pop @{ $_prev_w->{$_pid} } for ($_pos .. $#{ $_prev_w->{$_pid } });
  0         0  
  0         0  
396              
397 0         0 $_init_mce = 1;
398             }
399              
400 59 50       132 return unless (scalar @_code);
401              
402             ## -------------------------------------------------------------------------
403              
404 59         98 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  59         138  
405 59         83 my $_r = ref $_[0];
406              
407 59 100 66     281 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|GLOB|FileHandle|IO::)/) {
408 10 50       30 _croak("$_tag: (HASH) not allowed as input by this MCE model")
409             if $_r eq 'HASH';
410 10         25 $_input_data = shift;
411             }
412              
413 59 50       146 if (defined (my $_p = $_params->{$_pid})) {
414             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
415 59 50 33     294 if (exists $_p->{max_workers} && ref $_p->{max_workers} ne 'ARRAY');
416              
417 59 100 100     270 delete $_p->{sequence} if (defined $_input_data || scalar @_);
418 59 50       117 delete $_p->{user_func} if (exists $_p->{user_func});
419 59 50       93 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
420 59 50       122 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
421 59 50       98 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
422 59 50       107 delete $_p->{gather} if (exists $_p->{gather});
423             }
424              
425 59 50 33     266 if (@_code > 1 && $_max_workers > 1) {
426 59         160 $_max_workers = int($_max_workers / @_code + 0.5) + 1;
427             }
428              
429 59         79 my $_chunk_size = do {
430 59   50     131 my $_p = $_params->{$_pid} || {};
431             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
432             MCE::_parse_chunk_size(
433 59 50 33     495 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
434             $_input_data, scalar @_
435             );
436             };
437              
438 59 50       161 if (defined (my $_p = $_params->{$_pid})) {
439 59 100       176 if (exists $_p->{_file}) {
440 20         40 $_input_data = delete $_p->{_file};
441             } else {
442 39 50       87 $_input_data = $_p->{input_data} if exists $_p->{input_data};
443             }
444             }
445              
446             ## -------------------------------------------------------------------------
447              
448 59         259 MCE::_save_state($_MCE->{$_pid});
449              
450 59 100 66     226 if ($_init_mce || !exists $_queue->{$_pid}) {
451 14 50       56 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
452 14 50       51 $_queue->{$_pid} = [] if (!defined $_queue->{$_pid});
453              
454 14         28 my $_Q = $_queue->{$_pid};
455 14         23 pop(@{ $_Q })->DESTROY for (@_code .. @{ $_Q });
  14         51  
  0         0  
456              
457 14         137 push @{ $_Q }, MCE::Queue->new()
458 14         28 for (@{ $_Q } .. @_code - 2);
  14         33  
459              
460             ## must clear arrays for nested session to work with Perl < v5.14
461 14         118 _gen_user_tasks($_pid, $_Q, [@_code], [@_mode], [@_name], [@_wrks]);
462              
463 14         70 @_code = @_mode = @_name = @_wrks = ();
464              
465             my %_opts = (
466             max_workers => $_max_workers, task_name => $_tag,
467 14         93 user_tasks => $_user_tasks->{$_pid}, task_end => \&_task_end,
468             use_slurpio => 0,
469             );
470              
471 14 50       42 if (defined (my $_p = $_params->{$_pid})) {
472 14         23 local $_;
473              
474 14         28 for (keys %{ $_p }) {
  14         61  
475 28 50       61 next if ($_ eq 'sequence_run');
476 28 100 66     112 next if ($_ eq 'max_workers' && ref $_p->{max_workers} eq 'ARRAY');
477 14 50 33     99 next if ($_ eq 'task_name' && ref $_p->{task_name} eq 'ARRAY');
478 0 0       0 next if ($_ eq 'input_data');
479 0 0       0 next if ($_ eq 'chunk_size');
480 0 0       0 next if ($_ eq 'task_end');
481              
482             _croak("$_tag: ($_) is not a valid constructor argument")
483 0 0       0 unless (exists $MCE::_valid_fields_new{$_});
484              
485 0         0 $_opts{$_} = $_p->{$_};
486             }
487             }
488              
489 14         42 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
490             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
491 70 50 33     172 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
492             }
493              
494 14         292 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
495             }
496             else {
497             ## Workers may persist after running. Thus, updating the MCE instance.
498             ## These options do not require respawning.
499 45 50       130 if (defined (my $_p = $_params->{$_pid})) {
500 45         120 for my $_k (qw(
501             RS interval stderr_file stdout_file user_error user_output
502             job_delay submit_delay on_post_exit on_post_run user_args
503             flush_file flush_stderr flush_stdout max_retries
504             )) {
505 675 50       1020 $_MCE->{$_pid}{$_k} = $_p->{$_k} if (exists $_p->{$_k});
506             }
507             }
508             }
509              
510             ## -------------------------------------------------------------------------
511              
512 59 100       210 if (defined $_input_data) {
    100          
513 30         55 @_ = ();
514 30         145 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
515 30         175 delete $_MCE->{$_pid}{input_data};
516             }
517             elsif (scalar @_) {
518 19         147 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
519 11         130 delete $_MCE->{$_pid}{input_data};
520             }
521             else {
522 10 50 33     90 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
523             $_MCE->{$_pid}->run({
524             chunk_size => $_chunk_size,
525             sequence => $_params->{$_pid}{sequence}
526 10         75 }, 0);
527 10 50       45 if (exists $_params->{$_pid}{sequence_run}) {
528 10         20 delete $_params->{$_pid}{sequence_run};
529 10         25 delete $_params->{$_pid}{sequence};
530             }
531 10         15 delete $_MCE->{$_pid}{sequence};
532             }
533             }
534              
535 51         260 MCE::_restore_state();
536              
537             # destroy queue(s) if MCE::run requested workers to shutdown
538 51 50       140 if (!$_MCE->{$_pid}{_spawned}) {
539 0         0 $_->DESTROY() for @{ $_queue->{$_pid} };
  0         0  
540 0         0 delete $_queue->{$_pid};
541             }
542              
543 51 100       255 return map { @{ $_ } } delete @_tmp{ 1 .. $_order_id - 1 }
  194         179  
  194         796  
544             unless (defined $_aref);
545              
546 25         30 $_gather_ref = undef;
547              
548 25         110 return;
549             }
550              
551             ###############################################################################
552             ## ----------------------------------------------------------------------------
553             ## Private methods.
554             ##
555             ###############################################################################
556              
557             sub _croak {
558              
559 0     0   0 goto &MCE::_croak;
560             }
561              
562             sub _gen_user_tasks {
563              
564 14     14   47 my ($_pid, $_queue_ref, $_code_ref, $_mode_ref, $_name_ref, $_wrks_ref) = @_;
565              
566 14         28 @{ $_user_tasks->{$_pid} } = ();
  14         37  
567              
568             ## For the code block farthest to the right.
569              
570 14         42 push @{ $_user_tasks->{$_pid} }, {
571             task_name => $_name_ref->[-1],
572             max_workers => $_wrks_ref->[-1],
573              
574 14         179 gather => (@{ $_code_ref } > 1)
575             ? $_queue_ref->[-1] : \&_preserve_order,
576              
577             user_func => sub {
578 83     83   138 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
579 83         119 my @_a; my $_code = $_code_ref->[-1];
  83         152  
580              
581 83 100       142 if (ref $_chunk_ref) {
582             push @_a, ($_mode_ref->[-1] eq 'map')
583 72         101 ? map { &{ $_code } } @{ $_chunk_ref }
  72         122  
  56         106  
584 65 100       201 : grep { &{ $_code } } @{ $_chunk_ref };
  9         13  
  9         23  
  9         20  
585             }
586             else {
587             push @_a, ($_mode_ref->[-1] eq 'map')
588 18         22 ? map { &{ $_code } } $_chunk_ref
  18         45  
589 18 50       48 : grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
590             }
591              
592 83 50       562 MCE->gather( (@{ $_code_ref } > 1)
  83         478  
593             ? MCE->freeze([ \@_a, $_chunk_id ])
594             : (\@_a, $_chunk_id)
595             );
596             }
597 14 50       28 };
598              
599             ## For in-between code blocks (processed from right to left).
600              
601 14         28 for (my $_i = @{ $_code_ref } - 2; $_i > 0; $_i--) {
  14         56  
602 0         0 my $_pos = $_i;
603              
604 0         0 push @{ $_user_tasks->{$_pid} }, {
605             task_name => $_name_ref->[$_pos],
606             max_workers => $_wrks_ref->[$_pos],
607             gather => $_queue_ref->[$_pos - 1],
608              
609             user_func => sub {
610 0     0   0 my $_q = $_queue_ref->[$_pos];
611              
612 0         0 while (1) {
613 0         0 my $_chunk = $_q->dequeue;
614 0 0       0 last unless (defined $_chunk);
615              
616 0         0 my @_a; my $_code = $_code_ref->[$_pos];
  0         0  
617 0         0 $_chunk = MCE->thaw($_chunk);
618              
619             push @_a, ($_mode_ref->[$_pos] eq 'map')
620 0         0 ? map { &{ $_code } } @{ $_chunk->[0] }
  0         0  
  0         0  
621 0 0       0 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
622              
623 0         0 MCE->gather(MCE->freeze([ \@_a, $_chunk->[1] ]));
624             }
625              
626 0         0 return;
627             }
628 0         0 };
629             }
630              
631             ## For the left-most code block.
632              
633 14 50       23 if (@{ $_code_ref } > 1) {
  14         51  
634              
635 14         220 push @{ $_user_tasks->{$_pid} }, {
636             task_name => $_name_ref->[0],
637             max_workers => $_wrks_ref->[0],
638             gather => \&_preserve_order,
639              
640             user_func => sub {
641 22     22   53 my $_q = $_queue_ref->[0];
642              
643 22         35 while (1) {
644 105         340 my $_chunk = $_q->dequeue;
645 105 100       371 last unless (defined $_chunk);
646              
647 83         170 my @_a; my $_code = $_code_ref->[0];
  83         161  
648 83         385 $_chunk = MCE->thaw($_chunk);
649              
650             push @_a, ($_mode_ref->[0] eq 'map')
651 93         110 ? map { &{ $_code } } @{ $_chunk->[0] }
  93         193  
  83         888  
652 83 50       228 : grep { &{ $_code } } @{ $_chunk->[0] };
  0         0  
  0         0  
  0         0  
653              
654 83         537 MCE->gather(\@_a, $_chunk->[1]);
655             }
656              
657 22         52 return;
658             }
659 14         14 };
660             }
661              
662 14         37 return;
663             }
664              
665             1;
666              
667             __END__
668              
669             ###############################################################################
670             ## ----------------------------------------------------------------------------
671             ## Module usage.
672             ##
673             ###############################################################################
674              
675             =head1 NAME
676              
677             MCE::Stream - Parallel stream model for chaining multiple maps and greps
678              
679             =head1 VERSION
680              
681             This document describes MCE::Stream version 1.887
682              
683             =head1 SYNOPSIS
684              
685             ## Exports mce_stream, mce_stream_f, mce_stream_s
686             use MCE::Stream;
687              
688             my (@m1, @m2, @m3);
689              
690             ## Default mode is map and processed from right-to-left
691             @m1 = mce_stream sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
692             mce_stream \@m2, sub { $_ * 3 }, sub { $_ * 2 }, 1..10000;
693              
694             ## Native Perl
695             @m3 = map { $_ * $_ } grep { $_ % 5 == 0 } 1..10000;
696              
697             ## Streaming grep and map in parallel
698             mce_stream \@m3,
699             { mode => 'map', code => sub { $_ * $_ } },
700             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
701              
702             ## Array or array_ref
703             my @a = mce_stream sub { $_ * $_ }, 1..10000;
704             my @b = mce_stream sub { $_ * $_ }, \@list;
705              
706             ## Important; pass an array_ref for deeply input data
707             my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
708             my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
709              
710             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
711             ## Workers read directly and not involve the manager process
712             my @e = mce_stream_f sub { chomp; $_ }, "/path/to/file"; # efficient
713              
714             ## Involves the manager process, therefore slower
715             my @f = mce_stream_f sub { chomp; $_ }, $file_handle;
716             my @g = mce_stream_f sub { chomp; $_ }, $io;
717             my @h = mce_stream_f sub { chomp; $_ }, \$scalar;
718              
719             ## Sequence of numbers (begin, end [, step, format])
720             my @i = mce_stream_s sub { $_ * $_ }, 1, 10000, 5;
721             my @j = mce_stream_s sub { $_ * $_ }, [ 1, 10000, 5 ];
722              
723             my @k = mce_stream_s sub { $_ * $_ }, {
724             begin => 1, end => 10000, step => 5, format => undef
725             };
726              
727             =head1 DESCRIPTION
728              
729             This module allows one to stream multiple map and/or grep operations in
730             parallel. Code blocks run simultaneously from right-to-left. The results
731             are appended immediately when providing a reference to an array.
732              
733             ## Appends are serialized, even out-of-order ok, but immediately.
734             ## Out-of-order chunks are held temporarily until ordered chunks
735             ## arrive.
736              
737             mce_stream \@a, sub { $_ }, sub { $_ }, sub { $_ }, 1..10000;
738              
739             ## input
740             ## chunk1 input
741             ## chunk3 chunk2 input
742             ## chunk2 chunk2 chunk3 input
743             ## append1 chunk3 chunk1 chunk4 input
744             ## append2 chunk1 chunk5 chunk5 input
745             ## append3 chunk5 chunk4 chunk6 ...
746             ## append4 chunk4 chunk6 ...
747             ## append5 chunk6 ...
748             ## append6 ...
749             ## ...
750             ##
751              
752             MCE incurs a small overhead due to passing of data. A fast code block will
753             run faster natively when chaining multiple map functions. However, the
754             overhead will likely diminish as the complexity increases for the code.
755              
756             ## 0.334 secs -- baseline using the native map function
757             my @m1 = map { $_ * 4 } map { $_ * 3 } map { $_ * 2 } 1..1000000;
758              
759             ## 0.427 secs -- this is quite amazing considering data passing
760             my @m2 = mce_stream
761             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
762              
763             ## 0.355 secs -- appends to @m3 immediately, not after running
764             my @m3; mce_stream \@m3,
765             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1..1000000;
766              
767             Even faster is mce_stream_s; useful when input data is a range of numbers.
768             Workers generate sequences mathematically among themselves without any
769             interaction from the manager process. Two arguments are required for
770             mce_stream_s (begin, end). Step defaults to 1 if begin is smaller than end,
771             otherwise -1.
772              
773             ## 0.278 secs -- numbers are generated mathematically via sequence
774             my @m4; mce_stream_s \@m4,
775             sub { $_ * 4 }, sub { $_ * 3 }, sub { $_ * 2 }, 1, 1000000;
776              
777             =head1 OVERRIDING DEFAULTS
778              
779             The following list options which may be overridden when loading the module.
780             The fast option is obsolete in 1.867 onwards; ignored if specified.
781              
782             use Sereal qw( encode_sereal decode_sereal );
783             use CBOR::XS qw( encode_cbor decode_cbor );
784             use JSON::XS qw( encode_json decode_json );
785              
786             use MCE::Stream
787             max_workers => 8, # Default 'auto'
788             chunk_size => 500, # Default 'auto'
789             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
790             freeze => \&encode_sereal, # \&Storable::freeze
791             thaw => \&decode_sereal, # \&Storable::thaw
792             init_relay => 0, # Default undef; MCE 1.882+
793             use_threads => 0, # Default undef; MCE 1.882+
794             default_mode => 'grep', # Default 'map'
795             ;
796              
797             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
798             Specify C<< Sereal => 0 >> to use Storable instead.
799              
800             use MCE::Stream Sereal => 0;
801              
802             =head1 CUSTOMIZING MCE
803              
804             =over 3
805              
806             =item MCE::Stream->init ( options )
807              
808             =item MCE::Stream::init { options }
809              
810             =back
811              
812             The init function accepts a hash of MCE options. The gather and bounds_only
813             options, if specified, are ignored due to being used internally by the
814             module (not shown below).
815              
816             use MCE::Stream;
817              
818             MCE::Stream->init(
819             chunk_size => 1, max_workers => 4,
820              
821             user_begin => sub {
822             print "## ", MCE->wid, " started\n";
823             },
824              
825             user_end => sub {
826             print "## ", MCE->wid, " completed\n";
827             }
828             );
829              
830             my @a = mce_stream sub { $_ * $_ }, 1..100;
831              
832             print "\n", "@a", "\n";
833              
834             -- Output
835              
836             ## 1 started
837             ## 2 started
838             ## 3 started
839             ## 4 started
840             ## 3 completed
841             ## 1 completed
842             ## 2 completed
843             ## 4 completed
844              
845             1 4 9 16 25 36 49 64 81 100 121 144 169 196 225 256 289 324 361
846             400 441 484 529 576 625 676 729 784 841 900 961 1024 1089 1156
847             1225 1296 1369 1444 1521 1600 1681 1764 1849 1936 2025 2116 2209
848             2304 2401 2500 2601 2704 2809 2916 3025 3136 3249 3364 3481 3600
849             3721 3844 3969 4096 4225 4356 4489 4624 4761 4900 5041 5184 5329
850             5476 5625 5776 5929 6084 6241 6400 6561 6724 6889 7056 7225 7396
851             7569 7744 7921 8100 8281 8464 8649 8836 9025 9216 9409 9604 9801
852             10000
853              
854             Like with MCE::Stream->init above, MCE options may be specified using an
855             anonymous hash for the first argument. Notice how both max_workers and
856             task_name can take an anonymous array for setting values uniquely
857             per each code block.
858              
859             Remember that MCE::Stream processes from right-to-left when setting the
860             individual values.
861              
862             use MCE::Stream;
863              
864             my @a = mce_stream {
865             task_name => [ 'c', 'b', 'a' ],
866             max_workers => [ 2, 4, 3, ],
867              
868             user_end => sub {
869             my ($mce, $task_id, $task_name) = @_;
870             print "$task_id - $task_name completed\n";
871             },
872              
873             task_end => sub {
874             my ($mce, $task_id, $task_name) = @_;
875             MCE->print("$task_id - $task_name ended\n");
876             }
877             },
878             sub { $_ * 4 }, ## 2 workers, named c
879             sub { $_ * 3 }, ## 4 workers, named b
880             sub { $_ * 2 }, 1..10000; ## 3 workers, named a
881              
882             -- Output
883              
884             0 - a completed
885             0 - a completed
886             0 - a completed
887             0 - a ended
888             1 - b completed
889             1 - b completed
890             1 - b completed
891             1 - b completed
892             1 - b ended
893             2 - c completed
894             2 - c completed
895             2 - c ended
896              
897             Note that the anonymous hash, for specifying options, also comes first when
898             passing an array reference.
899              
900             my @a; mce_stream {
901             ...
902             }, \@a, sub { ... }, sub { ... }, 1..10000;
903              
904             =head1 API DOCUMENTATION
905              
906             Scripts using MCE::Stream can be written using the long or short form.
907             The long form becomes relevant when mixing modes. Again, processing
908             occurs from right-to-left.
909              
910             my @m3 = mce_stream
911             { mode => 'map', code => sub { $_ * $_ } },
912             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
913              
914             my @m4; mce_stream \@m4,
915             { mode => 'map', code => sub { $_ * $_ } },
916             { mode => 'grep', code => sub { $_ % 5 == 0 } }, 1..10000;
917              
918             For multiple grep blocks, the short form can be used. Simply specify the
919             default mode for the module. The two valid values for default_mode is 'grep'
920             and 'map'.
921              
922             use MCE::Stream default_mode => 'grep';
923              
924             my @f = mce_stream_f sub { /ending$/ }, sub { /^starting/ }, $file;
925              
926             The following assumes 'map' for default_mode in order to demonstrate all the
927             possibilities for providing input data.
928              
929             =over 3
930              
931             =item MCE::Stream->run ( sub { code }, list )
932              
933             =item mce_stream sub { code }, list
934              
935             =back
936              
937             Input data may be defined using a list or an array reference. Unlike MCE::Loop,
938             Flow, and Step, specifying a hash reference as input data isn't allowed.
939              
940             ## Array or array_ref
941             my @a = mce_stream sub { $_ * 2 }, 1..1000;
942             my @b = mce_stream sub { $_ * 2 }, \@list;
943              
944             ## Important; pass an array_ref for deeply input data
945             my @c = mce_stream sub { $_->[1] *= 2; $_ }, [ [ 0, 1 ], [ 0, 2 ], ... ];
946             my @d = mce_stream sub { $_->[1] *= 2; $_ }, \@deeply_list;
947              
948             ## Not supported
949             my @z = mce_stream sub { ... }, \%hash;
950              
951             =over 3
952              
953             =item MCE::Stream->run_file ( sub { code }, file )
954              
955             =item mce_stream_f sub { code }, file
956              
957             =back
958              
959             The fastest of these is the /path/to/file. Workers communicate the next offset
960             position among themselves with zero interaction by the manager process.
961              
962             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
963              
964             my @c = mce_stream_f sub { chomp; $_ . "\r\n" }, "/path/to/file"; # faster
965             my @d = mce_stream_f sub { chomp; $_ . "\r\n" }, $file_handle;
966             my @e = mce_stream_f sub { chomp; $_ . "\r\n" }, $io; # IO::All
967             my @f = mce_stream_f sub { chomp; $_ . "\r\n" }, \$scalar;
968              
969             =over 3
970              
971             =item MCE::Stream->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
972              
973             =item mce_stream_s sub { code }, $beg, $end [, $step, $fmt ]
974              
975             =back
976              
977             Sequence may be defined as a list, an array reference, or a hash reference.
978             The functions require both begin and end values to run. Step and format are
979             optional. The format is passed to sprintf (% may be omitted below).
980              
981             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
982              
983             my @f = mce_stream_s sub { $_ }, $beg, $end, $step, $fmt;
984             my @g = mce_stream_s sub { $_ }, [ $beg, $end, $step, $fmt ];
985              
986             my @h = mce_stream_s sub { $_ }, {
987             begin => $beg, end => $end, step => $step, format => $fmt
988             };
989              
990             =over 3
991              
992             =item MCE::Stream->run ( { input_data => iterator }, sub { code } )
993              
994             =item mce_stream { input_data => iterator }, sub { code }
995              
996             =back
997              
998             An iterator reference may be specified for input_data. The only other way
999             is to specify input_data via MCE::Stream->init. This prevents MCE::Stream
1000             from configuring the iterator reference as another user task which will
1001             not work.
1002              
1003             Iterators are described under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
1004              
1005             MCE::Stream->init(
1006             input_data => iterator
1007             );
1008              
1009             my @a = mce_stream sub { $_ * 3 }, sub { $_ * 2 };
1010              
1011             =head1 MANUAL SHUTDOWN
1012              
1013             =over 3
1014              
1015             =item MCE::Stream->finish
1016              
1017             =item MCE::Stream::finish
1018              
1019             =back
1020              
1021             Workers remain persistent as much as possible after running. Shutdown occurs
1022             automatically when the script terminates. Call finish when workers are no
1023             longer needed.
1024              
1025             use MCE::Stream;
1026              
1027             MCE::Stream->init(
1028             chunk_size => 20, max_workers => 'auto'
1029             );
1030              
1031             my @a = mce_stream { ... } 1..100;
1032              
1033             MCE::Stream->finish;
1034              
1035             =head1 INDEX
1036              
1037             L<MCE|MCE>, L<MCE::Core>
1038              
1039             =head1 AUTHOR
1040              
1041             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
1042              
1043             =cut
1044