File Coverage

blib/lib/MCE/Loop.pm
Criterion Covered Total %
statement 135 166 81.3
branch 68 130 52.3
condition 22 50 44.0
subroutine 13 15 86.6
pod 5 5 100.0
total 243 366 66.3


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## MCE model for building parallel loops.
4             ##
5             ###############################################################################
6              
7             package MCE::Loop;
8              
9 5     5   283471 use strict;
  5         54  
  5         148  
10 5     5   26 use warnings;
  5         19  
  5         202  
11              
12 5     5   29 no warnings qw( threads recursion uninitialized );
  5         10  
  5         308  
13              
14             our $VERSION = '1.888';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 5     5   34 use Scalar::Util qw( looks_like_number );
  5         10  
  5         315  
21 5     5   2671 use MCE;
  5         14  
  5         25  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Loop');
38              
39             sub import {
40 5     5   73 my ($_class, $_pkg) = (shift, caller);
41              
42 5         20 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 5 50       20 if ($_pkg !~ /^MCE::/) {
49 5     5   41 no strict 'refs'; no warnings 'redefine';
  5     5   9  
  5         276  
  5         34  
  5         6  
  5         11258  
50 5         17 *{ $_pkg.'::mce_loop_f' } = \&run_file;
  5         37  
51 5         12 *{ $_pkg.'::mce_loop_s' } = \&run_seq;
  5         22  
52 5         10 *{ $_pkg.'::mce_loop' } = \&run;
  5         20  
53             }
54              
55             ## Process module arguments.
56 5         21 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 5         25 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 5         20 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 5 50       15 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 5         84 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Init and finish routines.
92             ##
93             ###############################################################################
94              
95             sub init (@) {
96              
97 6 50 33 6 1 810 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
98 6         46 my $_pkg = "$$.$_tid.".caller();
99              
100 6 50       102 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
101              
102 6         20 @_ = ();
103              
104 6         20 return;
105             }
106              
107             sub finish (@) {
108              
109 11 50 33 11 1 4939 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
110 11 100       112 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
111              
112 11 100 66     252 if ( $_pkg eq 'MCE' ) {
    100          
113 5         11 for my $_k ( keys %{ $_MCE } ) { MCE::Loop->finish($_k, 1); }
  5         74  
  3         156  
114             }
115             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
116 3 50       88 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
117              
118 3         51 delete $_prev_c->{$_pkg};
119 3         47 delete $_MCE->{$_pkg};
120             }
121              
122 11         43 @_ = ();
123              
124 11         46 return;
125             }
126              
127             ###############################################################################
128             ## ----------------------------------------------------------------------------
129             ## Parallel loop with MCE -- file.
130             ##
131             ###############################################################################
132              
133             sub run_file (&@) {
134              
135 4 50 33 4 1 3086 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
136              
137 4         14 my $_code = shift; my $_file = shift;
  4         6  
138 4         26 my $_pid = "$$.$_tid.".caller();
139              
140 4 50       14 if (defined (my $_p = $_params->{$_pid})) {
141 4 50       12 delete $_p->{input_data} if (exists $_p->{input_data});
142 4 50       12 delete $_p->{sequence} if (exists $_p->{sequence});
143             }
144             else {
145 0         0 $_params->{$_pid} = {};
146             }
147              
148 4 100 66     134 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
149 2 50       66 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
150 2 50       30 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
151 2 50       30 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
152 2         30 $_params->{$_pid}{_file} = $_file;
153             }
154             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
155 2         12 $_params->{$_pid}{_file} = $_file;
156             }
157             else {
158 0         0 _croak("$_tag: (file) is not specified or valid");
159             }
160              
161 4         12 @_ = ();
162              
163 4         12 return run($_code);
164             }
165              
166             ###############################################################################
167             ## ----------------------------------------------------------------------------
168             ## Parallel loop with MCE -- sequence.
169             ##
170             ###############################################################################
171              
172             sub run_seq (&@) {
173              
174 2 50 33 2 1 1666 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
175              
176 2         8 my $_code = shift;
177 2         12 my $_pid = "$$.$_tid.".caller();
178              
179 2 50       10 if (defined (my $_p = $_params->{$_pid})) {
180 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
181 2 50       6 delete $_p->{_file} if (exists $_p->{_file});
182             }
183             else {
184 0         0 $_params->{$_pid} = {};
185             }
186              
187 2         4 my ($_begin, $_end);
188              
189 2 50 33     36 if (ref $_[0] eq 'HASH') {
    50          
    50          
190 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
191 0         0 $_params->{$_pid}{sequence} = $_[0];
192             }
193             elsif (ref $_[0] eq 'ARRAY') {
194 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
195 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
196 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
197             }
198             else {
199 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
200 0         0 $_params->{$_pid}{sequence} = $_[0];
201             }
202             }
203             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
204 2 50 33     10 if (@_ > 3 && $_[3] =~ /\d$/) {
205 0         0 $_begin = $_[0], $_end = $_[-1];
206 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
207             }
208             else {
209 2         4 $_begin = $_[0], $_end = $_[1];
210 2         8 $_params->{$_pid}{sequence} = [ @_ ];
211             }
212             }
213             else {
214 0         0 _croak("$_tag: (sequence) is not specified or valid");
215             }
216              
217 2 50       6 _croak("$_tag: (begin) is not specified for sequence")
218             unless (defined $_begin);
219 2 50       8 _croak("$_tag: (end) is not specified for sequence")
220             unless (defined $_end);
221              
222 2         4 $_params->{$_pid}{sequence_run} = undef;
223              
224 2         6 @_ = ();
225              
226 2         6 return run($_code);
227             }
228              
229             ###############################################################################
230             ## ----------------------------------------------------------------------------
231             ## Parallel loop with MCE.
232             ##
233             ###############################################################################
234              
235             sub run (&@) {
236              
237 14 50 33 14 1 5448 shift if (defined $_[0] && $_[0] eq 'MCE::Loop');
238              
239 14         46 my $_code = shift;
240 14 100       82 my $_pkg = caller() eq 'MCE::Loop' ? caller(1) : caller();
241 14         68 my $_pid = "$$.$_tid.$_pkg";
242              
243 14         26 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  14         42  
244 14         30 my $_r = ref $_[0];
245              
246 14 100 66     108 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
247 4         12 $_input_data = shift;
248             }
249              
250 14 50       40 if (defined (my $_p = $_params->{$_pid})) {
251             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
252 14 50       128 if (exists $_p->{max_workers});
253              
254 14 100 100     116 delete $_p->{sequence} if (defined $_input_data || scalar @_);
255 14 50       38 delete $_p->{user_func} if (exists $_p->{user_func});
256 14 50       42 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
257             }
258              
259             my $_chunk_size = MCE::_parse_chunk_size(
260 14         84 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
261             $_input_data, scalar @_
262             );
263              
264 14 50       52 if (defined (my $_p = $_params->{$_pid})) {
265 14 100       52 if (exists $_p->{_file}) {
266 4         10 $_input_data = delete $_p->{_file};
267             } else {
268 10 50       34 $_input_data = $_p->{input_data} if exists $_p->{input_data};
269             }
270             }
271              
272             ## -------------------------------------------------------------------------
273              
274 14         88 MCE::_save_state($_MCE->{$_pid});
275              
276 14 100 66     138 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
277 6 50       24 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
278 6         14 $_prev_c->{$_pid} = $_code;
279              
280 6         32 my %_opts = (
281             max_workers => $_max_workers, task_name => $_tag,
282             user_func => $_code,
283             );
284              
285 6 50       28 if (defined (my $_p = $_params->{$_pid})) {
286 6         10 for my $_k (keys %{ $_p }) {
  6         30  
287 10 50       24 next if ($_k eq 'sequence_run');
288 10 50       22 next if ($_k eq 'input_data');
289 10 50       26 next if ($_k eq 'chunk_size');
290              
291             _croak("$_tag: ($_k) is not a valid constructor argument")
292 10 50       38 unless (exists $MCE::_valid_fields_new{$_k});
293              
294 10         22 $_opts{$_k} = $_p->{$_k};
295             }
296             }
297              
298 6         30 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
299             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
300 30 50 33     118 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
301             }
302              
303 6         100 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
304             }
305              
306             ## -------------------------------------------------------------------------
307              
308 14 100       28 my @_a; my $_wa = wantarray; $_MCE->{$_pid}{gather} = \@_a if (defined $_wa);
  14         30  
  14         30  
309              
310 14 100       66 if (defined $_input_data) {
    100          
311 8         20 @_ = ();
312 8         56 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
313 7         45 delete $_MCE->{$_pid}{input_data};
314             }
315             elsif (scalar @_) {
316 4         36 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
317 2         26 delete $_MCE->{$_pid}{input_data};
318             }
319             else {
320 2 50 33     38 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
321             $_MCE->{$_pid}->run({
322             chunk_size => $_chunk_size,
323             sequence => $_params->{$_pid}{sequence}
324 2         18 }, 0);
325 2 50       12 if (exists $_params->{$_pid}{sequence_run}) {
326 2         6 delete $_params->{$_pid}{sequence_run};
327 2         4 delete $_params->{$_pid}{sequence};
328             }
329 2         6 delete $_MCE->{$_pid}{sequence};
330             }
331             }
332              
333 11         103 MCE::_restore_state();
334              
335 11 100       34 delete $_MCE->{$_pid}{gather} if (defined $_wa);
336              
337 11 100       139 return ((defined $_wa) ? @_a : ());
338             }
339              
340             ###############################################################################
341             ## ----------------------------------------------------------------------------
342             ## Private methods.
343             ##
344             ###############################################################################
345              
346             sub _croak {
347              
348 0     0     goto &MCE::_croak;
349             }
350              
351             1;
352              
353             __END__