File Coverage

blib/lib/MCE/Candy.pm
Criterion Covered Total %
statement 12 115 10.4
branch 2 54 3.7
condition 0 12 0.0
subroutine 4 12 33.3
pod 5 5 100.0
total 23 198 11.6


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Sugar methods and output iterators.
4             ##
5             ###############################################################################
6              
7             package MCE::Candy;
8              
9 1     1   3446 use strict;
  1         2  
  1         27  
10 1     1   4 use warnings;
  1         2  
  1         24  
11              
12 1     1   4 no warnings qw( threads recursion uninitialized );
  1         1  
  1         985  
13              
14             our $VERSION = '1.887';
15              
16             our @CARP_NOT = qw( MCE );
17              
18             ###############################################################################
19             ## ----------------------------------------------------------------------------
20             ## Import routine.
21             ##
22             ###############################################################################
23              
24             my $_imported;
25              
26             sub import {
27              
28 1 50   1   11 return if ($_imported++);
29              
30 1 50       3 unless ($INC{'MCE.pm'}) {
31 0         0 $\ = undef; require Carp;
  0         0  
32 0         0 Carp::croak(
33             "MCE::Candy requires MCE. Please see the MCE::Candy documentation\n".
34             "for more information.\n\n"
35             );
36             }
37              
38 1         8 return;
39             }
40              
41             ###############################################################################
42             ## ----------------------------------------------------------------------------
43             ## Forchunk, foreach, and forseq sugar methods.
44             ##
45             ###############################################################################
46              
47             sub forchunk {
48              
49 0 0   0 1   my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0            
50 0           my $_input_data = $_[0];
51              
52 0           MCE::_validate_runstate($self, 'MCE::forchunk');
53              
54 0           my ($_user_func, $_params_ref);
55              
56 0 0         if (ref $_[1] eq 'HASH') {
57 0           $_user_func = $_[2]; $_params_ref = $_[1];
  0            
58             } else {
59 0           $_user_func = $_[1]; $_params_ref = {};
  0            
60             }
61              
62 0           @_ = ();
63              
64 0 0         MCE::_croak('MCE::forchunk: (input_data) is not specified')
65             unless (defined $_input_data);
66 0 0         MCE::_croak('MCE::forchunk: (code_block) is not specified')
67             unless (defined $_user_func);
68              
69 0           $_params_ref->{input_data} = $_input_data;
70 0           $_params_ref->{user_func} = $_user_func;
71              
72 0           $self->run(1, $_params_ref);
73              
74 0           return $self;
75             }
76              
77             sub foreach {
78              
79 0 0   0 1   my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0            
80 0           my $_input_data = $_[0];
81              
82 0           MCE::_validate_runstate($self, 'MCE::foreach');
83              
84 0           my ($_user_func, $_params_ref);
85              
86 0 0         if (ref $_[1] eq 'HASH') {
87 0           $_user_func = $_[2]; $_params_ref = $_[1];
  0            
88             } else {
89 0           $_user_func = $_[1]; $_params_ref = {};
  0            
90             }
91              
92 0           @_ = ();
93              
94 0 0         MCE::_croak('MCE::foreach: (HASH) not allowed as input by this method')
95             if (ref $_input_data eq 'HASH');
96 0 0         MCE::_croak('MCE::foreach: (input_data) is not specified')
97             unless (defined $_input_data);
98 0 0         MCE::_croak('MCE::foreach: (code_block) is not specified')
99             unless (defined $_user_func);
100              
101 0           $_params_ref->{chunk_size} = 1;
102 0           $_params_ref->{input_data} = $_input_data;
103 0           $_params_ref->{user_func} = $_user_func;
104              
105 0           $self->run(1, $_params_ref);
106              
107 0           return $self;
108             }
109              
110             sub forseq {
111              
112 0 0   0 1   my $x = shift; my $self = ref($x) ? $x : $MCE::MCE;
  0            
113 0           my $_sequence = $_[0];
114              
115 0           MCE::_validate_runstate($self, 'MCE::forseq');
116              
117 0           my ($_user_func, $_params_ref);
118              
119 0 0         if (ref $_[1] eq 'HASH') {
120 0           $_user_func = $_[2]; $_params_ref = $_[1];
  0            
121             } else {
122 0           $_user_func = $_[1]; $_params_ref = {};
  0            
123             }
124              
125 0           @_ = ();
126              
127 0 0         MCE::_croak('MCE::forseq: (sequence) is not specified')
128             unless (defined $_sequence);
129 0 0         MCE::_croak('MCE::forseq: (code_block) is not specified')
130             unless (defined $_user_func);
131              
132 0           $_params_ref->{sequence} = $_sequence;
133 0           $_params_ref->{user_func} = $_user_func;
134              
135 0           $self->run(1, $_params_ref);
136              
137 0           return $self;
138             }
139              
140             ###############################################################################
141             ## ----------------------------------------------------------------------------
142             ## Output iterators for preserving output order.
143             ##
144             ###############################################################################
145              
146             sub out_iter_array {
147              
148 0     0 1   my $_aref = shift; my %_tmp; my $_order_id = 1;
  0            
  0            
149              
150 0 0         if (ref $_aref eq 'MCE::Shared::Object') {
151 0           my $_pkg = $_aref->blessed;
152 0 0         MCE::_croak('The argument to (out_iter_array) is not valid.')
153             unless $_pkg->can('TIEARRAY');
154             }
155             else {
156 0 0         MCE::_croak('The argument to (out_iter_array) is not an array ref.')
157             unless (ref $_aref eq 'ARRAY');
158             }
159              
160             return sub {
161 0     0     my $_chunk_id = shift;
162              
163 0 0 0       if ($_chunk_id == $_order_id && keys %_tmp == 0) {
164             ## already orderly
165 0           $_order_id++, push @{ $_aref }, @_;
  0            
166             }
167             else {
168             ## hold temporarily otherwise until orderly
169 0           @{ $_tmp{ $_chunk_id } } = @_;
  0            
170              
171 0           while (1) {
172 0 0         last unless exists $_tmp{ $_order_id };
173 0           push @{ $_aref }, @{ delete $_tmp{ $_order_id++ } };
  0            
  0            
174             }
175             }
176 0           };
177             }
178              
179             sub out_iter_fh {
180              
181 0     0 1   my $_fh = $_[0]; my %_tmp; my $_order_id = 1;
  0            
  0            
182 0 0 0       $_fh = \$_[0] if (!ref $_fh && ref \$_[0]);
183              
184 0 0         MCE::_croak('The argument to (out_iter_fh) is not a supported file handle.')
185             unless (ref($_fh) =~ /^(?:GLOB|FileHandle|IO::)/);
186              
187 0 0         if ($_fh->can('print')) {
188             return sub {
189 0     0     my $_chunk_id = shift;
190              
191 0 0 0       if ($_chunk_id == $_order_id && keys %_tmp == 0) {
192             ## already orderly
193 0           $_order_id++, $_fh->print(@_);
194             }
195             else {
196             ## hold temporarily otherwise until orderly
197 0           @{ $_tmp{ $_chunk_id } } = @_;
  0            
198              
199 0           while (1) {
200 0 0         last unless exists $_tmp{ $_order_id };
201 0           $_fh->print(@{ delete $_tmp{ $_order_id++ } });
  0            
202             }
203             }
204 0           };
205             }
206             else {
207             return sub {
208 0     0     my $_chunk_id = shift;
209              
210 0 0 0       if ($_chunk_id == $_order_id && keys %_tmp == 0) {
211             ## already orderly
212 0           $_order_id++, print {$_fh} @_;
  0            
213             }
214             else {
215             ## hold temporarily otherwise until orderly
216 0           @{ $_tmp{ $_chunk_id } } = @_;
  0            
217              
218 0           while (1) {
219 0 0         last unless exists $_tmp{ $_order_id };
220 0           print {$_fh} @{ delete $_tmp{ $_order_id++ } };
  0            
  0            
221             }
222             }
223 0           };
224             }
225             }
226              
227             1;
228              
229             __END__
230              
231             ###############################################################################
232             ## ----------------------------------------------------------------------------
233             ## Module usage.
234             ##
235             ###############################################################################
236              
237             =head1 NAME
238              
239             MCE::Candy - Sugar methods and output iterators
240              
241             =head1 VERSION
242              
243             This document describes MCE::Candy version 1.887
244              
245             =head1 DESCRIPTION
246              
247             This module provides a collection of sugar methods and helpful output iterators
248             for preserving output order.
249              
250             =head1 "FOR" SUGAR METHODS
251              
252             The sugar methods described below were created prior to the 1.5 release which
253             added MCE Models. This module is loaded automatically upon calling a "for"
254             method.
255              
256             =head2 $mce->forchunk ( $input_data [, { options } ], sub { ... } )
257              
258             Forchunk, foreach, and forseq are sugar methods in MCE. Workers are
259             spawned automatically, the code block is executed in parallel, and shutdown
260             is called. Do not call these methods if workers must persist afterwards.
261              
262             Specifying options is optional. Valid options are the same as for the
263             process method.
264              
265             ## Declare a MCE instance.
266              
267             my $mce = MCE->new(
268             max_workers => $max_workers,
269             chunk_size => 20
270             );
271              
272             ## Arguments inside the code block are the same as passed to user_func.
273              
274             $mce->forchunk(\@input_array, sub {
275             my ($mce, $chunk_ref, $chunk_id) = @_;
276             foreach ( @{ $chunk_ref } ) {
277             MCE->print("$chunk_id: $_\n");
278             }
279             });
280              
281             ## Input hash, current API available since 1.828.
282              
283             $mce->forchunk(\%input_hash, sub {
284             my ($mce, $chunk_ref, $chunk_id) = @_;
285             for my $key ( keys %{ $chunk_ref } ) {
286             MCE->print("$chunk_id: [ $key ] ", $chunk_ref->{$key}, "\n");
287             }
288             });
289              
290             ## Passing chunk_size as an option.
291              
292             $mce->forchunk(\@input_array, { chunk_size => 30 }, sub { ... });
293             $mce->forchunk(\%input_hash, { chunk_size => 30 }, sub { ... });
294              
295             =head2 $mce->foreach ( $input_data [, { options } ], sub { ... } )
296              
297             Foreach implies chunk_size => 1 and cannot be overwritten. Thus, looping is
298             not necessary inside the block. Unlike forchunk above, a hash reference as
299             input data isn't allowed.
300              
301             my $mce = MCE->new(
302             max_workers => $max_workers
303             );
304              
305             $mce->foreach(\@input_data, sub {
306             my ($mce, $chunk_ref, $chunk_id) = @_;
307             my $row = $chunk_ref->[0];
308             MCE->print("$chunk_id: $row\n");
309             });
310              
311             =head2 $mce->forseq ( $sequence_spec [, { options } ], sub { ... } )
312              
313             Sequence may be defined using an array or hash reference.
314              
315             my $mce = MCE->new(
316             max_workers => 3
317             );
318              
319             $mce->forseq([ 20, 40 ], sub {
320             my ($mce, $n, $chunk_id) = @_;
321             my $result = `ping 192.168.1.${n}`;
322             ...
323             });
324              
325             $mce->forseq({ begin => 15, end => 10, step => -1 }, sub {
326             my ($mce, $n, $chunk_id) = @_;
327             print $n, " from ", MCE->wid, "\n";
328             });
329              
330             The $n_seq variable points to an array_ref of sequences. Chunk size defaults
331             to 1 when not specified.
332              
333             $mce->forseq([ 20, 80 ], { chunk_size => 10 }, sub {
334             my ($mce, $n_seq, $chunk_id) = @_;
335             for my $n ( @{ $n_seq } ) {
336             my $result = `ping 192.168.1.${n}`;
337             ...
338             }
339             });
340              
341             =head1 OUTPUT ITERATORS WITH INPUT
342              
343             This module includes 2 output iterators which are useful for preserving output
344             order while gathering data. These cover the 2 general use cases. The chunk_id
345             value must be the first argument to gather. Gather must also not be called
346             more than once inside the block.
347              
348             =head2 gather => MCE::Candy::out_iter_array( \@array )
349              
350             The example utilizes the Core API with chunking disabled. Basically, setting
351             chunk_size to 1.
352              
353             use MCE;
354             use MCE::Candy;
355              
356             my @results;
357              
358             my $mce = MCE->new(
359             chunk_size => 1, max_workers => 4,
360             gather => MCE::Candy::out_iter_array(\@results),
361             user_func => sub {
362             my ($mce, $chunk_ref, $chunk_id) = @_;
363             $mce->gather($chunk_id, $chunk_ref->[0] * 2);
364             }
365             );
366              
367             $mce->process([ 100 .. 109 ]);
368              
369             print "@results", "\n";
370              
371             -- Output
372              
373             200 202 204 206 208 210 212 214 216 218
374              
375             Chunking may be desired for thousands or more items. In other words, wanting
376             to reduce the overhead placed on IPC.
377              
378             use MCE;
379             use MCE::Candy;
380              
381             my @results;
382              
383             my $mce = MCE->new(
384             chunk_size => 100, max_workers => 4,
385             gather => MCE::Candy::out_iter_array(\@results),
386             user_func => sub {
387             my ($mce, $chunk_ref, $chunk_id) = @_;
388             my @output;
389             foreach my $item (@{ $chunk_ref }) {
390             push @output, $item * 2;
391             }
392             $mce->gather($chunk_id, @output);
393             }
394             );
395              
396             $mce->process([ 100_000 .. 200_000 - 1 ]);
397              
398             print scalar @results, "\n";
399              
400             -- Output
401              
402             100000
403              
404             =head2 gather => MCE::Candy::out_iter_fh( $fh )
405              
406             Let's change things a bit and use MCE::Flow for the next 2 examples. Chunking
407             is not desired for the first example.
408              
409             use MCE::Flow;
410             use MCE::Candy;
411              
412             open my $fh, '>', '/tmp/foo.txt';
413              
414             mce_flow {
415             chunk_size => 1, max_workers => 4,
416             gather => MCE::Candy::out_iter_fh($fh)
417             },
418             sub {
419             my ($mce, $chunk_ref, $chunk_id) = @_;
420             $mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
421              
422             }, (100 .. 109);
423              
424             close $fh;
425              
426             -- Output sent to '/tmp/foo.txt'
427              
428             200
429             202
430             204
431             206
432             208
433             210
434             212
435             214
436             216
437             218
438              
439             =head2 gather => MCE::Candy::out_iter_fh( $io )
440              
441             Same thing, an C<IO::*> object that can C<print> is supported since MCE 1.845.
442              
443             use IO::All;
444             use MCE::Flow;
445             use MCE::Candy;
446              
447             my $io = io('/tmp/foo.txt'); # i.e. $io->can('print')
448              
449             mce_flow {
450             chunk_size => 1, max_workers => 4,
451             gather => MCE::Candy::out_iter_fh($io)
452             },
453             sub {
454             my ($mce, $chunk_ref, $chunk_id) = @_;
455             $mce->gather($chunk_id, $chunk_ref->[0] * 2, "\n");
456              
457             }, (100 .. 109);
458              
459             $io->close;
460              
461             -- Output sent to '/tmp/foo.txt'
462              
463             200
464             202
465             204
466             206
467             208
468             210
469             212
470             214
471             216
472             218
473              
474             Chunking is desired for the next example due to processing many thousands.
475              
476             use MCE::Flow;
477             use MCE::Candy;
478              
479             open my $fh, '>', '/tmp/foo.txt';
480              
481             mce_flow {
482             chunk_size => 100, max_workers => 4,
483             gather => MCE::Candy::out_iter_fh( $fh )
484             },
485             sub {
486             my ($mce, $chunk_ref, $chunk_id) = @_;
487             my @output;
488             foreach my $item (@{ $chunk_ref }) {
489             push @output, ($item * 2) . "\n";
490             }
491             $mce->gather($chunk_id, @output);
492              
493             }, (100_000 .. 200_000 - 1);
494              
495             close $fh;
496              
497             print -s '/tmp/foo.txt', "\n";
498              
499             -- Output
500              
501             700000
502              
503             =head1 OUTPUT ITERATORS WITHOUT INPUT
504              
505             Input data is not a requirement for using the output iterators included in this
506             module. The 'chunk_id' value is set uniquely and the same as 'wid' when not
507             processing input data.
508              
509             =head2 gather => MCE::Candy::out_iter_array( \@array )
510              
511             use MCE::Flow;
512             use MCE::Candy;
513              
514             my @results;
515              
516             mce_flow {
517             max_workers => 'auto', ## Note that 'auto' is never greater than 8
518             gather => MCE::Candy::out_iter_array(\@results)
519             },
520             sub {
521             my ($mce) = @_; ## This line is not necessary
522             ## Calling via module okay; e.g: MCE->method
523             ## Do work
524             ## Sending a complex data structure is allowed
525              
526             ## Output will become orderly by iterator
527             $mce->gather( $mce->chunk_id, {
528             wid => $mce->wid, result => $mce->wid * 2
529             });
530             };
531              
532             foreach my $href (@results) {
533             print $href->{wid} .": ". $href->{result} ."\n";
534             }
535              
536             -- Output
537              
538             1: 2
539             2: 4
540             3: 6
541             4: 8
542             5: 10
543             6: 12
544             7: 14
545             8: 16
546              
547             =head2 gather => MCE::Candy::out_iter_fh( $fh )
548              
549             use MCE::Flow;
550             use MCE::Candy;
551              
552             open my $fh, '>', '/tmp/out.txt';
553              
554             mce_flow {
555             max_workers => 'auto', ## See get_ncpu in <MCE::Util|MCE::Util>
556             gather => MCE::Candy::out_iter_fh($fh)
557             },
558             sub {
559             my $output = "# Worker ID: " . MCE->wid . "\n";
560              
561             ## Append results to $output string
562             $output .= (MCE->wid * 2) . "\n\n";
563              
564             ## Output will become orderly by iterator
565             MCE->gather( MCE->wid, $output );
566             };
567              
568             close $fh;
569              
570             -- Output
571              
572             # Worker ID: 1
573             2
574              
575             # Worker ID: 2
576             4
577              
578             # Worker ID: 3
579             6
580              
581             # Worker ID: 4
582             8
583              
584             # Worker ID: 5
585             10
586              
587             # Worker ID: 6
588             12
589              
590             # Worker ID: 7
591             14
592              
593             # Worker ID: 8
594             16
595              
596             =head1 INDEX
597              
598             L<MCE|MCE>, L<MCE::Core>
599              
600             =head1 AUTHOR
601              
602             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
603              
604             =cut
605