File Coverage

blib/lib/MCE/Map.pm
Criterion Covered Total %
statement 178 233 76.3
branch 78 160 48.7
condition 24 55 43.6
subroutine 16 19 84.2
pod 5 5 100.0
total 301 472 63.7


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel map model similar to the native map function.
4             ##
5             ###############################################################################
6              
7             package MCE::Map;
8              
9 4     4   216983 use strict;
  4         32  
  4         117  
10 4     4   20 use warnings;
  4         8  
  4         154  
11              
12 4     4   29 no warnings qw( threads recursion uninitialized );
  4         5  
  4         255  
13              
14             our $VERSION = '1.889';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 4     4   25 use Scalar::Util qw( looks_like_number weaken );
  4         10  
  4         192  
21 4     4   1947 use MCE;
  4         11  
  4         26  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Map');
38              
39             sub import {
40 4     4   58 my ($_class, $_pkg) = (shift, caller);
41              
42 4         16 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 4 50       18 if ($_pkg !~ /^MCE::/) {
49 4     4   32 no strict 'refs'; no warnings 'redefine';
  4     4   8  
  4         205  
  4         28  
  4         7  
  4         11587  
50 4         9 *{ $_pkg.'::mce_map_f' } = \&run_file;
  4         34  
51 4         11 *{ $_pkg.'::mce_map_s' } = \&run_seq;
  4         12  
52 4         7 *{ $_pkg.'::mce_map' } = \&run;
  4         16  
53             }
54              
55             ## Process module arguments.
56 4         18 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 4         19 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 4         18 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 4 50       12 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 4         65 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Gather callback for storing by chunk_id => chunk_ref into a hash.
92             ##
93             ###############################################################################
94              
95             my ($_total_chunks, %_tmp);
96              
97             sub _gather {
98              
99 37     37   89 my ($_chunk_id, $_data_ref) = @_;
100              
101 37         123 $_tmp{$_chunk_id} = $_data_ref;
102 37         50 $_total_chunks++;
103              
104 37         95 return;
105             }
106              
107             ###############################################################################
108             ## ----------------------------------------------------------------------------
109             ## Init and finish routines.
110             ##
111             ###############################################################################
112              
113             sub init (@) {
114              
115 3 50 33 3 1 465 shift if (defined $_[0] && $_[0] eq 'MCE::Map');
116 3         24 my $_pkg = "$$.$_tid.".caller();
117              
118 3 50       30 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
119              
120             _croak("$_tag: (HASH) not allowed as input by this MCE model")
121 3 50       12 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
122              
123 3         9 @_ = ();
124              
125 3         6 return;
126             }
127              
128             sub finish (@) {
129              
130 7 50 33 7 1 834 shift if (defined $_[0] && $_[0] eq 'MCE::Map');
131 7 100       49 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
132              
133 7 100 66     89 if ( $_pkg eq 'MCE' ) {
    100          
134 4         14 for my $_k ( keys %{ $_MCE } ) { MCE::Map->finish($_k, 1); }
  4         46  
  2         46  
135             }
136             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
137 1 50       26 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
138 1         4 $_total_chunks = undef, undef %_tmp;
139              
140 1         4 delete $_prev_c->{$_pkg};
141 1         12 delete $_MCE->{$_pkg};
142             }
143              
144 7         20 @_ = ();
145              
146 7         23 return;
147             }
148              
149             ###############################################################################
150             ## ----------------------------------------------------------------------------
151             ## Parallel map with MCE -- file.
152             ##
153             ###############################################################################
154              
155             sub run_file (&@) {
156              
157 2 50 33 2 1 1488 shift if (defined $_[0] && $_[0] eq 'MCE::Map');
158              
159 2         6 my $_code = shift; my $_file = shift;
  2         3  
160 2         9 my $_pid = "$$.$_tid.".caller();
161              
162 2 50       8 if (defined (my $_p = $_params->{$_pid})) {
163 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
164 2 50       5 delete $_p->{sequence} if (exists $_p->{sequence});
165             }
166             else {
167 0         0 $_params->{$_pid} = {};
168             }
169              
170 2 100 66     40 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
171 1 50       29 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
172 1 50       15 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
173 1 50       12 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
174 1         11 $_params->{$_pid}{_file} = $_file;
175             }
176             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
177 1         4 $_params->{$_pid}{_file} = $_file;
178             }
179             else {
180 0         0 _croak("$_tag: (file) is not specified or valid");
181             }
182              
183 2         6 @_ = ();
184              
185 2         7 return run($_code);
186             }
187              
188             ###############################################################################
189             ## ----------------------------------------------------------------------------
190             ## Parallel map with MCE -- sequence.
191             ##
192             ###############################################################################
193              
194             sub run_seq (&@) {
195              
196 1 50 33 1 1 817 shift if (defined $_[0] && $_[0] eq 'MCE::Map');
197              
198 1         4 my $_code = shift;
199 1         7 my $_pid = "$$.$_tid.".caller();
200              
201 1 50       5 if (defined (my $_p = $_params->{$_pid})) {
202 1 50       4 delete $_p->{input_data} if (exists $_p->{input_data});
203 1 50       3 delete $_p->{_file} if (exists $_p->{_file});
204             }
205             else {
206 0         0 $_params->{$_pid} = {};
207             }
208              
209 1         2 my ($_begin, $_end);
210              
211 1 50 33     8 if (ref $_[0] eq 'HASH') {
    50          
    50          
212 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
213 0         0 $_params->{$_pid}{sequence} = $_[0];
214             }
215             elsif (ref $_[0] eq 'ARRAY') {
216 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
217 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
218 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
219             }
220             else {
221 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
222 0         0 $_params->{$_pid}{sequence} = $_[0];
223             }
224             }
225             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
226 1 50 33     5 if (@_ > 3 && $_[3] =~ /\d$/) {
227 0         0 $_begin = $_[0], $_end = $_[-1];
228 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
229             }
230             else {
231 1         2 $_begin = $_[0], $_end = $_[1];
232 1         3 $_params->{$_pid}{sequence} = [ @_ ];
233             }
234             }
235             else {
236 0         0 _croak("$_tag: (sequence) is not specified or valid");
237             }
238              
239 1 50       3 _croak("$_tag: (begin) is not specified for sequence")
240             unless (defined $_begin);
241 1 50       3 _croak("$_tag: (end) is not specified for sequence")
242             unless (defined $_end);
243              
244 1         2 $_params->{$_pid}{sequence_run} = undef;
245              
246 1         2 @_ = ();
247              
248 1         4 return run($_code);
249             }
250              
251             ###############################################################################
252             ## ----------------------------------------------------------------------------
253             ## Parallel map with MCE.
254             ##
255             ###############################################################################
256              
257             sub run (&@) {
258              
259 7 50 33 7 1 2606 shift if (defined $_[0] && $_[0] eq 'MCE::Map');
260              
261 7         13 my $_code = shift; $_total_chunks = 0; undef %_tmp;
  7         13  
  7         27  
262 7 100       30 my $_pkg = caller() eq 'MCE::Map' ? caller(1) : caller();
263 7         34 my $_pid = "$$.$_tid.$_pkg";
264              
265 7         21 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  7         18  
266 7         9 my $_r = ref $_[0];
267              
268 7 100 66     44 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
269 1 50       4 _croak("$_tag: (HASH) not allowed as input by this MCE model")
270             if $_r eq 'HASH';
271 1         3 $_input_data = shift;
272             }
273              
274 7 50       20 if (defined (my $_p = $_params->{$_pid})) {
275             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
276 7 50       63 if (exists $_p->{max_workers});
277              
278 7 100 100     48 delete $_p->{sequence} if (defined $_input_data || scalar @_);
279 7 50       23 delete $_p->{user_func} if (exists $_p->{user_func});
280 7 50       27 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
281 7 50       22 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
282 7 50       13 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
283 7 50       24 delete $_p->{gather} if (exists $_p->{gather});
284             }
285              
286 7         11 my $_chunk_size = do {
287 7   50     30 my $_p = $_params->{$_pid} || {};
288             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
289             MCE::_parse_chunk_size(
290 7 50 33     76 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
291             $_input_data, scalar @_
292             );
293             };
294              
295 7 50       20 if (defined (my $_p = $_params->{$_pid})) {
296 7 100       21 if (exists $_p->{_file}) {
297 2         6 $_input_data = delete $_p->{_file};
298             } else {
299 5 50       13 $_input_data = $_p->{input_data} if exists $_p->{input_data};
300             }
301             }
302              
303             ## -------------------------------------------------------------------------
304              
305 7         32 MCE::_save_state($_MCE->{$_pid});
306              
307 7 100 66     49 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
308 3 50       12 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
309 3         6 $_prev_c->{$_pid} = $_code;
310              
311             my %_opts = (
312             max_workers => $_max_workers, task_name => $_tag,
313             user_func => sub {
314              
315 37     37   90 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
316 37         77 my $_wantarray = $_mce->{user_args}[0];
317              
318 37 50       78 if ($_wantarray) {
319 37         59 my @_a;
320              
321 37 100       88 if (ref $_chunk_ref eq 'SCALAR') {
322 1 50       3 local $/ = $_mce->{RS} if defined $_mce->{RS};
323 1     1   97 open my $_MEM_FH, '<', $_chunk_ref;
  1         9  
  1         2  
  1         30  
324 1         1249 binmode $_MEM_FH, ':raw';
325 1         8 while (<$_MEM_FH>) { push @_a, &{ $_code }; }
  9         41  
  9         15  
326 1         6 close $_MEM_FH;
327 1         6 weaken $_MEM_FH;
328             }
329             else {
330 36 100       80 if (ref $_chunk_ref) {
331 27         66 push @_a, map { &{ $_code } } @{ $_chunk_ref };
  27         45  
  27         106  
  27         63  
332             } else {
333 9         18 push @_a, map { &{ $_code } } $_chunk_ref;
  9         12  
  9         24  
334             }
335             }
336              
337 37         523 MCE->gather($_chunk_id, \@_a);
338             }
339             else {
340 0         0 my $_cnt = 0;
341              
342 0 0       0 if (ref $_chunk_ref eq 'SCALAR') {
343 0 0       0 local $/ = $_mce->{RS} if defined $_mce->{RS};
344 0         0 open my $_MEM_FH, '<', $_chunk_ref;
345 0         0 binmode $_MEM_FH, ':raw';
346 0         0 while (<$_MEM_FH>) { $_cnt++; &{ $_code }; }
  0         0  
  0         0  
  0         0  
347 0         0 close $_MEM_FH;
348 0         0 weaken $_MEM_FH;
349             }
350             else {
351 0 0       0 if (ref $_chunk_ref) {
352 0         0 $_cnt += map { &{ $_code } } @{ $_chunk_ref };
  0         0  
  0         0  
  0         0  
353             } else {
354 0         0 $_cnt += map { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
355             }
356             }
357              
358 0 0       0 MCE->gather($_cnt) if defined $_wantarray;
359             }
360             },
361 3         36 );
362              
363 3 50       12 if (defined (my $_p = $_params->{$_pid})) {
364 3         6 for my $_k (keys %{ $_p }) {
  3         18  
365 3 50       9 next if ($_k eq 'sequence_run');
366 3 50       9 next if ($_k eq 'input_data');
367 3 50       9 next if ($_k eq 'chunk_size');
368              
369             _croak("$_tag: ($_k) is not a valid constructor argument")
370 3 50       9 unless (exists $MCE::_valid_fields_new{$_k});
371              
372 3         9 $_opts{$_k} = $_p->{$_k};
373             }
374             }
375              
376 3         9 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
377             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
378 15 50 33     45 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
379             }
380              
381 3         24 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
382             }
383              
384             ## -------------------------------------------------------------------------
385              
386 7         17 my $_cnt = 0; my $_wantarray = wantarray;
  7         12  
387              
388 7 100       56 $_MCE->{$_pid}{use_slurpio} = ($_chunk_size > &MCE::MAX_RECS_SIZE) ? 1 : 0;
389 7         27 $_MCE->{$_pid}{user_args} = [ $_wantarray ];
390              
391             $_MCE->{$_pid}{gather} = $_wantarray
392 7 50   0   25 ? \&_gather : sub { $_cnt += $_[0]; return; };
  0         0  
  0         0  
393              
394 7 100       26 if (defined $_input_data) {
    100          
395 3         6 @_ = ();
396 3         16 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
397 3         13 delete $_MCE->{$_pid}{input_data};
398             }
399             elsif (scalar @_) {
400 3         24 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
401 1         21 delete $_MCE->{$_pid}{input_data};
402             }
403             else {
404 1 50 33     16 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
405             $_MCE->{$_pid}->run({
406             chunk_size => $_chunk_size,
407             sequence => $_params->{$_pid}{sequence}
408 1         10 }, 0);
409 1 50       14 if (exists $_params->{$_pid}{sequence_run}) {
410 1         2 delete $_params->{$_pid}{sequence_run};
411 1         3 delete $_params->{$_pid}{sequence};
412             }
413 1         3 delete $_MCE->{$_pid}{sequence};
414             }
415             }
416              
417 5         37 MCE::_restore_state();
418              
419 5 50       10 if ($_wantarray) {
    0          
420 5         32 return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks };
  37         42  
  37         164  
421             }
422             elsif (defined $_wantarray) {
423 0         0 return $_cnt;
424             }
425              
426 0         0 return;
427             }
428              
429             ###############################################################################
430             ## ----------------------------------------------------------------------------
431             ## Private methods.
432             ##
433             ###############################################################################
434              
435             sub _croak {
436              
437 0     0   0 goto &MCE::_croak;
438             }
439              
440             1;
441              
442             __END__