File Coverage

blib/lib/MCE/Grep.pm
Criterion Covered Total %
statement 175 229 76.4
branch 80 164 48.7
condition 24 55 43.6
subroutine 15 18 83.3
pod 5 5 100.0
total 299 471 63.4


line stmt bran cond sub pod time code
1             ###############################################################################
2             ## ----------------------------------------------------------------------------
3             ## Parallel grep model similar to the native grep function.
4             ##
5             ###############################################################################
6              
7             package MCE::Grep;
8              
9 4     4   254894 use strict;
  4         32  
  4         101  
10 4     4   17 use warnings;
  4         7  
  4         100  
11              
12 4     4   16 no warnings qw( threads recursion uninitialized );
  4         4  
  4         218  
13              
14             our $VERSION = '1.887';
15              
16             ## no critic (BuiltinFunctions::ProhibitStringyEval)
17             ## no critic (Subroutines::ProhibitSubroutinePrototypes)
18             ## no critic (TestingAndDebugging::ProhibitNoStrict)
19              
20 4     4   20 use Scalar::Util qw( looks_like_number weaken );
  4         7  
  4         177  
21 4     4   1627 use MCE;
  4         8  
  4         23  
22              
23             our @CARP_NOT = qw( MCE );
24              
25             my $_tid = $INC{'threads.pm'} ? threads->tid() : 0;
26              
27             sub CLONE {
28 0 0   0   0 $_tid = threads->tid() if $INC{'threads.pm'};
29             }
30              
31             ###############################################################################
32             ## ----------------------------------------------------------------------------
33             ## Import routine.
34             ##
35             ###############################################################################
36              
37             my ($_MCE, $_def, $_params, $_prev_c, $_tag) = ({}, {}, {}, {}, 'MCE::Grep');
38              
39             sub import {
40 4     4   55 my ($_class, $_pkg) = (shift, caller);
41              
42 4         16 my $_p = $_def->{$_pkg} = {
43             MAX_WORKERS => 'auto',
44             CHUNK_SIZE => 'auto',
45             };
46              
47             ## Import functions.
48 4 50       12 if ($_pkg !~ /^MCE::/) {
49 4     4   48 no strict 'refs'; no warnings 'redefine';
  4     4   11  
  4         181  
  4         21  
  4         14  
  4         9232  
50 4         11 *{ $_pkg.'::mce_grep_f' } = \&run_file;
  4         26  
51 4         8 *{ $_pkg.'::mce_grep_s' } = \&run_seq;
  4         9  
52 4         8 *{ $_pkg.'::mce_grep' } = \&run;
  4         12  
53             }
54              
55             ## Process module arguments.
56 4         34 while ( my $_argument = shift ) {
57 0         0 my $_arg = lc $_argument;
58              
59 0 0       0 $_p->{MAX_WORKERS} = shift, next if ( $_arg eq 'max_workers' );
60 0 0       0 $_p->{CHUNK_SIZE} = shift, next if ( $_arg eq 'chunk_size' );
61 0 0       0 $_p->{TMP_DIR} = shift, next if ( $_arg eq 'tmp_dir' );
62 0 0       0 $_p->{FREEZE} = shift, next if ( $_arg eq 'freeze' );
63 0 0       0 $_p->{THAW} = shift, next if ( $_arg eq 'thaw' );
64 0 0       0 $_p->{INIT_RELAY} = shift, next if ( $_arg eq 'init_relay' );
65 0 0       0 $_p->{USE_THREADS} = shift, next if ( $_arg eq 'use_threads' );
66              
67             ## Sereal 3.015+, if available, is used automatically by MCE 1.8+.
68 0 0       0 if ( $_arg eq 'sereal' ) {
69 0 0       0 if ( shift eq '0' ) {
70 0         0 require Storable;
71 0         0 $_p->{FREEZE} = \&Storable::freeze;
72 0         0 $_p->{THAW} = \&Storable::thaw;
73             }
74 0         0 next;
75             }
76              
77 0         0 _croak("Error: ($_argument) invalid module option");
78             }
79              
80 4         22 $_p->{MAX_WORKERS} = MCE::_parse_max_workers($_p->{MAX_WORKERS});
81              
82 4         14 MCE::_validate_number($_p->{MAX_WORKERS}, 'MAX_WORKERS', $_tag);
83             MCE::_validate_number($_p->{CHUNK_SIZE}, 'CHUNK_SIZE', $_tag)
84 4 50       10 unless ($_p->{CHUNK_SIZE} eq 'auto');
85              
86 4         54 return;
87             }
88              
89             ###############################################################################
90             ## ----------------------------------------------------------------------------
91             ## Gather callback for storing by chunk_id => chunk_ref into a hash.
92             ##
93             ###############################################################################
94              
95             my ($_total_chunks, %_tmp);
96              
97             sub _gather {
98              
99 37     37   131 my ($_chunk_id, $_data_ref) = @_;
100              
101 37         96 $_tmp{$_chunk_id} = $_data_ref;
102 37         46 $_total_chunks++;
103              
104 37         75 return;
105             }
106              
107             ###############################################################################
108             ## ----------------------------------------------------------------------------
109             ## Init and finish routines.
110             ##
111             ###############################################################################
112              
113             sub init (@) {
114              
115 3 50 33 3 1 360 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
116 3         21 my $_pkg = "$$.$_tid.".caller();
117              
118 3 50       21 $_params->{$_pkg} = (ref $_[0] eq 'HASH') ? shift : { @_ };
119              
120             _croak("$_tag: (HASH) not allowed as input by this MCE model")
121 3 50       15 if ( ref $_params->{$_pkg}{input_data} eq 'HASH' );
122              
123 3         6 @_ = ();
124              
125 3         6 return;
126             }
127              
128             sub finish (@) {
129              
130 7 50 33 7 1 1366 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
131 7 100       36 my $_pkg = (defined $_[0]) ? shift : "$$.$_tid.".caller();
132              
133 7 100 66     68 if ( $_pkg eq 'MCE' ) {
    100          
134 4         9 for my $_k ( keys %{ $_MCE } ) { MCE::Grep->finish($_k, 1); }
  4         27  
  2         28  
135             }
136             elsif ( $_MCE->{$_pkg} && $_MCE->{$_pkg}{_init_pid} eq "$$.$_tid" ) {
137 1 50       20 $_MCE->{$_pkg}->shutdown(@_) if $_MCE->{$_pkg}{_spawned};
138 1         3 $_total_chunks = undef, undef %_tmp;
139              
140 1         3 delete $_prev_c->{$_pkg};
141 1         12 delete $_MCE->{$_pkg};
142             }
143              
144 7         17 @_ = ();
145              
146 7         19 return;
147             }
148              
149             ###############################################################################
150             ## ----------------------------------------------------------------------------
151             ## Parallel grep with MCE -- file.
152             ##
153             ###############################################################################
154              
155             sub run_file (&@) {
156              
157 2 50 33 2 1 2180 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
158              
159 2         5 my $_code = shift; my $_file = shift;
  2         5  
160 2         8 my $_pid = "$$.$_tid.".caller();
161              
162 2 50       8 if (defined (my $_p = $_params->{$_pid})) {
163 2 50       6 delete $_p->{input_data} if (exists $_p->{input_data});
164 2 50       5 delete $_p->{sequence} if (exists $_p->{sequence});
165             }
166             else {
167 0         0 $_params->{$_pid} = {};
168             }
169              
170 2 100 66     52 if (defined $_file && ref $_file eq '' && $_file ne '') {
    50 66        
      33        
171 1 50       26 _croak("$_tag: ($_file) does not exist") unless (-e $_file);
172 1 50       14 _croak("$_tag: ($_file) is not readable") unless (-r $_file);
173 1 50       11 _croak("$_tag: ($_file) is not a plain file") unless (-f $_file);
174 1         10 $_params->{$_pid}{_file} = $_file;
175             }
176             elsif (ref $_file eq 'SCALAR' || ref($_file) =~ /^(?:GLOB|FileHandle|IO::)/) {
177 1         5 $_params->{$_pid}{_file} = $_file;
178             }
179             else {
180 0         0 _croak("$_tag: (file) is not specified or valid");
181             }
182              
183 2         6 @_ = ();
184              
185 2         6 return run($_code);
186             }
187              
188             ###############################################################################
189             ## ----------------------------------------------------------------------------
190             ## Parallel grep with MCE -- sequence.
191             ##
192             ###############################################################################
193              
194             sub run_seq (&@) {
195              
196 1 50 33 1 1 1458 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
197              
198 1         3 my $_code = shift;
199 1         5 my $_pid = "$$.$_tid.".caller();
200              
201 1 50       5 if (defined (my $_p = $_params->{$_pid})) {
202 1 50       4 delete $_p->{input_data} if (exists $_p->{input_data});
203 1 50       4 delete $_p->{_file} if (exists $_p->{_file});
204             }
205             else {
206 0         0 $_params->{$_pid} = {};
207             }
208              
209 1         2 my ($_begin, $_end);
210              
211 1 50 33     9 if (ref $_[0] eq 'HASH') {
    50          
    50          
212 0         0 $_begin = $_[0]->{begin}, $_end = $_[0]->{end};
213 0         0 $_params->{$_pid}{sequence} = $_[0];
214             }
215             elsif (ref $_[0] eq 'ARRAY') {
216 0 0 0     0 if (@{ $_[0] } > 3 && $_[0]->[3] =~ /\d$/) {
  0         0  
217 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[-1];
218 0         0 $_params->{$_pid}{sequence} = [ $_[0]->[0], $_[0]->[-1] ];
219             }
220             else {
221 0         0 $_begin = $_[0]->[0], $_end = $_[0]->[1];
222 0         0 $_params->{$_pid}{sequence} = $_[0];
223             }
224             }
225             elsif (ref $_[0] eq '' || ref($_[0]) =~ /^Math::/) {
226 1 50 33     5 if (@_ > 3 && $_[3] =~ /\d$/) {
227 0         0 $_begin = $_[0], $_end = $_[-1];
228 0         0 $_params->{$_pid}{sequence} = [ $_[0], $_[-1] ];
229             }
230             else {
231 1         2 $_begin = $_[0], $_end = $_[1];
232 1         3 $_params->{$_pid}{sequence} = [ @_ ];
233             }
234             }
235             else {
236 0         0 _croak("$_tag: (sequence) is not specified or valid");
237             }
238              
239 1 50       4 _croak("$_tag: (begin) is not specified for sequence")
240             unless (defined $_begin);
241 1 50       2 _croak("$_tag: (end) is not specified for sequence")
242             unless (defined $_end);
243              
244 1         3 $_params->{$_pid}{sequence_run} = undef;
245              
246 1         2 @_ = ();
247              
248 1         3 return run($_code);
249             }
250              
251             ###############################################################################
252             ## ----------------------------------------------------------------------------
253             ## Parallel grep with MCE.
254             ##
255             ###############################################################################
256              
257             sub run (&@) {
258              
259 7 50 33 7 1 2274 shift if (defined $_[0] && $_[0] eq 'MCE::Grep');
260              
261 7         16 my $_code = shift; $_total_chunks = 0; undef %_tmp;
  7         13  
  7         15  
262 7 100       35 my $_pkg = caller() eq 'MCE::Grep' ? caller(1) : caller();
263 7         39 my $_pid = "$$.$_tid.$_pkg";
264              
265 7         13 my $_input_data; my $_max_workers = $_def->{$_pkg}{MAX_WORKERS};
  7         22  
266 7         14 my $_r = ref $_[0];
267              
268 7 100 66     49 if (@_ == 1 && $_r =~ /^(?:ARRAY|HASH|SCALAR|CODE|GLOB|FileHandle|IO::)/) {
269 1 50       6 _croak("$_tag: (HASH) not allowed as input by this MCE model")
270             if $_r eq 'HASH';
271 1         2 $_input_data = shift;
272             }
273              
274 7 50       22 if (defined (my $_p = $_params->{$_pid})) {
275             $_max_workers = MCE::_parse_max_workers($_p->{max_workers})
276 7 50       45 if (exists $_p->{max_workers});
277              
278 7 100 100     44 delete $_p->{sequence} if (defined $_input_data || scalar @_);
279 7 50       20 delete $_p->{user_func} if (exists $_p->{user_func});
280 7 50       17 delete $_p->{user_tasks} if (exists $_p->{user_tasks});
281 7 50       17 delete $_p->{use_slurpio} if (exists $_p->{use_slurpio});
282 7 50       17 delete $_p->{bounds_only} if (exists $_p->{bounds_only});
283 7 50       13 delete $_p->{gather} if (exists $_p->{gather});
284             }
285              
286 7         13 my $_chunk_size = do {
287 7   50     16 my $_p = $_params->{$_pid} || {};
288             (defined $_p->{init_relay} || defined $_def->{$_pkg}{INIT_RELAY}) ? 1 :
289             MCE::_parse_chunk_size(
290 7 50 33     76 $_def->{$_pkg}{CHUNK_SIZE}, $_max_workers, $_params->{$_pid},
291             $_input_data, scalar @_
292             );
293             };
294              
295 7 50       22 if (defined (my $_p = $_params->{$_pid})) {
296 7 100       20 if (exists $_p->{_file}) {
297 2         5 $_input_data = delete $_p->{_file};
298             } else {
299 5 50       13 $_input_data = $_p->{input_data} if exists $_p->{input_data};
300             }
301             }
302              
303             ## -------------------------------------------------------------------------
304              
305 7         31 MCE::_save_state($_MCE->{$_pid});
306              
307 7 100 66     46 if (!defined $_prev_c->{$_pid} || $_prev_c->{$_pid} != $_code) {
308 3 50       9 $_MCE->{$_pid}->shutdown() if (defined $_MCE->{$_pid});
309 3         9 $_prev_c->{$_pid} = $_code;
310              
311             my %_opts = (
312             max_workers => $_max_workers, task_name => $_tag,
313             user_func => sub {
314              
315 37     37   58 my ($_mce, $_chunk_ref, $_chunk_id) = @_;
316 37         61 my $_wantarray = $_mce->{user_args}[0];
317              
318 37 50       54 if ($_wantarray) {
319 37         41 my @_a;
320              
321 37 100       69 if (ref $_chunk_ref eq 'SCALAR') {
322 1 50       4 local $/ = $_mce->{RS} if defined $_mce->{RS};
323 1         95 open my $_MEM_FH, '<', $_chunk_ref;
324 1         1103 binmode $_MEM_FH, ':raw';
325 1 100       7 while (<$_MEM_FH>) { push (@_a, $_) if &{ $_code }; }
  9         41  
  9         16  
326 1         7 close $_MEM_FH;
327 1         7 weaken $_MEM_FH;
328             }
329             else {
330 36 100       50 if (ref $_chunk_ref) {
331 27         32 push @_a, grep { &{ $_code } } @{ $_chunk_ref };
  27         29  
  27         71  
  27         455  
332             } else {
333 9         13 push @_a, grep { &{ $_code } } $_chunk_ref;
  9         8  
  9         20  
334             }
335             }
336              
337 37         359 MCE->gather($_chunk_id, \@_a);
338             }
339             else {
340 0         0 my $_cnt = 0;
341              
342 0 0       0 if (ref $_chunk_ref eq 'SCALAR') {
343 0 0       0 local $/ = $_mce->{RS} if defined $_mce->{RS};
344 0         0 open my $_MEM_FH, '<', $_chunk_ref;
345 0         0 binmode $_MEM_FH, ':raw';
346 0 0       0 while (<$_MEM_FH>) { $_cnt++ if &{ $_code }; }
  0         0  
  0         0  
347 0         0 close $_MEM_FH;
348 0         0 weaken $_MEM_FH;
349             }
350             else {
351 0 0       0 if (ref $_chunk_ref) {
352 0         0 $_cnt += grep { &{ $_code } } @{ $_chunk_ref };
  0         0  
  0         0  
  0         0  
353             } else {
354 0         0 $_cnt += grep { &{ $_code } } $_chunk_ref;
  0         0  
  0         0  
355             }
356             }
357              
358 0 0       0 MCE->gather($_cnt) if defined $_wantarray;
359             }
360             },
361 3         30 );
362              
363 3 50       12 if (defined (my $_p = $_params->{$_pid})) {
364 3         3 for my $_k (keys %{ $_p }) {
  3         12  
365 3 50       9 next if ($_k eq 'sequence_run');
366 3 50       6 next if ($_k eq 'input_data');
367 3 50       9 next if ($_k eq 'chunk_size');
368              
369             _croak("$_tag: ($_k) is not a valid constructor argument")
370 3 50       9 unless (exists $MCE::_valid_fields_new{$_k});
371              
372 3         6 $_opts{$_k} = $_p->{$_k};
373             }
374             }
375              
376 3         6 for my $_k (qw/ tmp_dir freeze thaw init_relay use_threads /) {
377             $_opts{$_k} = $_def->{$_pkg}{uc($_k)}
378 15 50 33     39 if (exists $_def->{$_pkg}{uc($_k)} && !exists $_opts{$_k});
379             }
380              
381 3         21 $_MCE->{$_pid} = MCE->new(pkg => $_pkg, %_opts);
382             }
383              
384             ## -------------------------------------------------------------------------
385              
386 7         72 my $_cnt = 0; my $_wantarray = wantarray;
  7         22  
387              
388 7 100       32 $_MCE->{$_pid}{use_slurpio} = ($_chunk_size > &MCE::MAX_RECS_SIZE) ? 1 : 0;
389 7         20 $_MCE->{$_pid}{user_args} = [ $_wantarray ];
390              
391             $_MCE->{$_pid}{gather} = $_wantarray
392 7 50   0   24 ? \&_gather : sub { $_cnt += $_[0]; return; };
  0         0  
  0         0  
393              
394 7 100       17 if (defined $_input_data) {
    100          
395 3         5 @_ = ();
396 3         16 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, $_input_data);
397 3         15 delete $_MCE->{$_pid}{input_data};
398             }
399             elsif (scalar @_) {
400 3         18 $_MCE->{$_pid}->process({ chunk_size => $_chunk_size }, \@_);
401 1         20 delete $_MCE->{$_pid}{input_data};
402             }
403             else {
404 1 50 33     15 if (defined $_params->{$_pid} && exists $_params->{$_pid}{sequence}) {
405             $_MCE->{$_pid}->run({
406             chunk_size => $_chunk_size,
407             sequence => $_params->{$_pid}{sequence}
408 1         10 }, 0);
409 1 50       8 if (exists $_params->{$_pid}{sequence_run}) {
410 1         2 delete $_params->{$_pid}{sequence_run};
411 1         3 delete $_params->{$_pid}{sequence};
412             }
413 1         3 delete $_MCE->{$_pid}{sequence};
414             }
415             }
416              
417 5         33 MCE::_restore_state();
418              
419 5 50       12 if ($_wantarray) {
    0          
420 5         88 return map { @{ $_ } } delete @_tmp{ 1 .. $_total_chunks };
  37         36  
  37         129  
421             }
422             elsif (defined $_wantarray) {
423 0         0 return $_cnt;
424             }
425              
426 0         0 return;
427             }
428              
429             ###############################################################################
430             ## ----------------------------------------------------------------------------
431             ## Private methods.
432             ##
433             ###############################################################################
434              
435             sub _croak {
436              
437 0     0   0 goto &MCE::_croak;
438             }
439              
440             1;
441              
442             __END__
443              
444             ###############################################################################
445             ## ----------------------------------------------------------------------------
446             ## Module usage.
447             ##
448             ###############################################################################
449              
450             =head1 NAME
451              
452             MCE::Grep - Parallel grep model similar to the native grep function
453              
454             =head1 VERSION
455              
456             This document describes MCE::Grep version 1.887
457              
458             =head1 SYNOPSIS
459              
460             ## Exports mce_grep, mce_grep_f, and mce_grep_s
461             use MCE::Grep;
462              
463             ## Array or array_ref
464             my @a = mce_grep { $_ % 5 == 0 } 1..10000;
465             my @b = mce_grep { $_ % 5 == 0 } \@list;
466              
467             ## Important; pass an array_ref for deeply input data
468             my @c = mce_grep { $_->[1] % 2 == 0 } [ [ 0, 1 ], [ 0, 2 ], ... ];
469             my @d = mce_grep { $_->[1] % 2 == 0 } \@deeply_list;
470              
471             ## File path, glob ref, IO::All::{ File, Pipe, STDIO } obj, or scalar ref
472             ## Workers read directly and not involve the manager process
473             my @e = mce_grep_f { /pattern/ } "/path/to/file"; # efficient
474              
475             ## Involves the manager process, therefore slower
476             my @f = mce_grep_f { /pattern/ } $file_handle;
477             my @g = mce_grep_f { /pattern/ } $io;
478             my @h = mce_grep_f { /pattern/ } \$scalar;
479              
480             ## Sequence of numbers (begin, end [, step, format])
481             my @i = mce_grep_s { %_ * 3 == 0 } 1, 10000, 5;
482             my @j = mce_grep_s { %_ * 3 == 0 } [ 1, 10000, 5 ];
483              
484             my @k = mce_grep_s { %_ * 3 == 0 } {
485             begin => 1, end => 10000, step => 5, format => undef
486             };
487              
488             =head1 DESCRIPTION
489              
490             This module provides a parallel grep implementation via Many-Core Engine.
491             MCE incurs a small overhead due to passing of data. A fast code block will
492             run faster natively. However, the overhead will likely diminish as the
493             complexity increases for the code.
494              
495             my @m1 = grep { $_ % 5 == 0 } 1..1000000; ## 0.065 secs
496             my @m2 = mce_grep { $_ % 5 == 0 } 1..1000000; ## 0.194 secs
497              
498             Chunking, enabled by default, greatly reduces the overhead behind the scene.
499             The time for mce_grep below also includes the time for data exchanges between
500             the manager and worker processes. More parallelization will be seen when the
501             code incurs additional CPU time.
502              
503             my @m1 = grep { /[2357][1468][9]/ } 1..1000000; ## 0.353 secs
504             my @m2 = mce_grep { /[2357][1468][9]/ } 1..1000000; ## 0.218 secs
505              
506             Even faster is mce_grep_s; useful when input data is a range of numbers.
507             Workers generate sequences mathematically among themselves without any
508             interaction from the manager process. Two arguments are required for
509             mce_grep_s (begin, end). Step defaults to 1 if begin is smaller than end,
510             otherwise -1.
511              
512             my @m3 = mce_grep_s { /[2357][1468][9]/ } 1, 1000000; ## 0.165 secs
513              
514             Although this document is about MCE::Grep, the L<MCE::Stream> module can write
515             results immediately without waiting for all chunks to complete. This is made
516             possible by passing the reference to an array (in this case @m4 and @m5).
517              
518             use MCE::Stream default_mode => 'grep';
519              
520             my @m4; mce_stream \@m4, sub { /[2357][1468][9]/ }, 1..1000000;
521              
522             ## Completed in 0.203 secs. This is amazing considering the
523             ## overhead for passing data between the manager and workers.
524              
525             my @m5; mce_stream_s \@m5, sub { /[2357][1468][9]/ }, 1, 1000000;
526              
527             ## Completed in 0.120 secs. Like with mce_grep_s, specifying a
528             ## sequence specification turns out to be faster due to lesser
529             ## overhead for the manager process.
530              
531             A common scenario is grepping for pattern(s) inside a massive log file.
532             Notice how parallelism increases as complexity increases for the pattern.
533             Testing was done against a 300 MB file containing 250k lines.
534              
535             use MCE::Grep;
536              
537             my @m; open my $LOG, "<", "/path/to/log/file" or die "$!\n";
538              
539             @m = grep { /pattern/ } <$LOG>; ## 0.756 secs
540             @m = grep { /foobar|[2357][1468][9]/ } <$LOG>; ## 24.681 secs
541              
542             ## Parallelism with mce_grep. This involves the manager process
543             ## due to processing a file handle.
544              
545             @m = mce_grep { /pattern/ } <$LOG>; ## 0.997 secs
546             @m = mce_grep { /foobar|[2357][1468][9]/ } <$LOG>; ## 7.439 secs
547              
548             ## Even faster with mce_grep_f. Workers access the file directly
549             ## with zero interaction from the manager process.
550              
551             my $LOG = "/path/to/file";
552             @m = mce_grep_f { /pattern/ } $LOG; ## 0.112 secs
553             @m = mce_grep_f { /foobar|[2357][1468][9]/ } $LOG; ## 6.840 secs
554              
555             =head1 PARSING HUGE FILES
556              
557             The MCE::Grep module lacks an optimization for quickly determining if a match
558             is found from not knowing the pattern inside the code block. Use the following
559             snippet as a template to achieve better performance. Also, take a look at
560             examples/egrep.pl, included with the distribution.
561              
562             use MCE::Loop;
563              
564             MCE::Loop->init(
565             max_workers => 8, use_slurpio => 1
566             );
567              
568             my $pattern = 'karl';
569             my $hugefile = 'very_huge.file';
570              
571             my @result = mce_loop_f {
572             my ($mce, $slurp_ref, $chunk_id) = @_;
573              
574             ## Quickly determine if a match is found.
575             ## Process slurped chunk only if true.
576              
577             if ($$slurp_ref =~ /$pattern/m) {
578             my @matches;
579              
580             ## The following is fast on Unix. Performance degrades
581             ## drastically on Windows beyond 4 workers.
582              
583             open my $MEM_FH, '<', $slurp_ref;
584             binmode $MEM_FH, ':raw';
585             while (<$MEM_FH>) { push @matches, $_ if (/$pattern/); }
586             close $MEM_FH;
587              
588             ## Therefore, use the following construct on Windows.
589              
590             while ( $$slurp_ref =~ /([^\n]+\n)/mg ) {
591             my $line = $1; # save $1 to not lose the value
592             push @matches, $line if ($line =~ /$pattern/);
593             }
594              
595             ## Gather matched lines.
596              
597             MCE->gather(@matches);
598             }
599              
600             } $hugefile;
601              
602             print join('', @result);
603              
604             =head1 OVERRIDING DEFAULTS
605              
606             The following list options which may be overridden when loading the module.
607              
608             use Sereal qw( encode_sereal decode_sereal );
609             use CBOR::XS qw( encode_cbor decode_cbor );
610             use JSON::XS qw( encode_json decode_json );
611              
612             use MCE::Grep
613             max_workers => 4, # Default 'auto'
614             chunk_size => 100, # Default 'auto'
615             tmp_dir => "/path/to/app/tmp", # $MCE::Signal::tmp_dir
616             freeze => \&encode_sereal, # \&Storable::freeze
617             thaw => \&decode_sereal, # \&Storable::thaw
618             init_relay => 0, # Default undef; MCE 1.882+
619             use_threads => 0, # Default undef; MCE 1.882+
620             ;
621              
622             From MCE 1.8 onwards, Sereal 3.015+ is loaded automatically if available.
623             Specify C<< Sereal => 0 >> to use Storable instead.
624              
625             use MCE::Grep Sereal => 0;
626              
627             =head1 CUSTOMIZING MCE
628              
629             =over 3
630              
631             =item MCE::Grep->init ( options )
632              
633             =item MCE::Grep::init { options }
634              
635             =back
636              
637             The init function accepts a hash of MCE options. The gather option, if
638             specified, is ignored due to being used internally by the module.
639              
640             use MCE::Grep;
641              
642             MCE::Grep->init(
643             chunk_size => 1, max_workers => 4,
644              
645             user_begin => sub {
646             print "## ", MCE->wid, " started\n";
647             },
648              
649             user_end => sub {
650             print "## ", MCE->wid, " completed\n";
651             }
652             );
653              
654             my @a = mce_grep { $_ % 5 == 0 } 1..100;
655              
656             print "\n", "@a", "\n";
657              
658             -- Output
659              
660             ## 2 started
661             ## 3 started
662             ## 1 started
663             ## 4 started
664             ## 3 completed
665             ## 4 completed
666             ## 1 completed
667             ## 2 completed
668              
669             5 10 15 20 25 30 35 40 45 50 55 60 65 70 75 80 85 90 95 100
670              
671             =head1 API DOCUMENTATION
672              
673             =over 3
674              
675             =item MCE::Grep->run ( sub { code }, list )
676              
677             =item mce_grep { code } list
678              
679             =back
680              
681             Input data may be defined using a list or an array reference. Unlike MCE::Loop,
682             Flow, and Step, specifying a hash reference as input data isn't allowed.
683              
684             ## Array or array_ref
685             my @a = mce_grep { /[2357]/ } 1..1000;
686             my @b = mce_grep { /[2357]/ } \@list;
687              
688             ## Important; pass an array_ref for deeply input data
689             my @c = mce_grep { $_->[1] =~ /[2357]/ } [ [ 0, 1 ], [ 0, 2 ], ... ];
690             my @d = mce_grep { $_->[1] =~ /[2357]/ } \@deeply_list;
691              
692             ## Not supported
693             my @z = mce_grep { ... } \%hash;
694              
695             =over 3
696              
697             =item MCE::Grep->run_file ( sub { code }, file )
698              
699             =item mce_grep_f { code } file
700              
701             =back
702              
703             The fastest of these is the /path/to/file. Workers communicate the next offset
704             position among themselves with zero interaction by the manager process.
705              
706             C<IO::All> { File, Pipe, STDIO } is supported since MCE 1.845.
707              
708             my @c = mce_grep_f { /pattern/ } "/path/to/file"; # faster
709             my @d = mce_grep_f { /pattern/ } $file_handle;
710             my @e = mce_grep_f { /pattern/ } $io; # IO::All
711             my @f = mce_grep_f { /pattern/ } \$scalar;
712              
713             =over 3
714              
715             =item MCE::Grep->run_seq ( sub { code }, $beg, $end [, $step, $fmt ] )
716              
717             =item mce_grep_s { code } $beg, $end [, $step, $fmt ]
718              
719             =back
720              
721             Sequence may be defined as a list, an array reference, or a hash reference.
722             The functions require both begin and end values to run. Step and format are
723             optional. The format is passed to sprintf (% may be omitted below).
724              
725             my ($beg, $end, $step, $fmt) = (10, 20, 0.1, "%4.1f");
726              
727             my @f = mce_grep_s { /[1234]\.[5678]/ } $beg, $end, $step, $fmt;
728             my @g = mce_grep_s { /[1234]\.[5678]/ } [ $beg, $end, $step, $fmt ];
729              
730             my @h = mce_grep_s { /[1234]\.[5678]/ } {
731             begin => $beg, end => $end,
732             step => $step, format => $fmt
733             };
734              
735             =over 3
736              
737             =item MCE::Grep->run ( sub { code }, iterator )
738              
739             =item mce_grep { code } iterator
740              
741             =back
742              
743             An iterator reference may be specified for input_data. Iterators are described
744             under section "SYNTAX for INPUT_DATA" at L<MCE::Core>.
745              
746             my @a = mce_grep { $_ % 3 == 0 } make_iterator(10, 30, 2);
747              
748             =head1 MANUAL SHUTDOWN
749              
750             =over 3
751              
752             =item MCE::Grep->finish
753              
754             =item MCE::Grep::finish
755              
756             =back
757              
758             Workers remain persistent as much as possible after running. Shutdown occurs
759             automatically when the script terminates. Call finish when workers are no
760             longer needed.
761              
762             use MCE::Grep;
763              
764             MCE::Grep->init(
765             chunk_size => 20, max_workers => 'auto'
766             );
767              
768             my @a = mce_grep { ... } 1..100;
769              
770             MCE::Grep->finish;
771              
772             =head1 INDEX
773              
774             L<MCE|MCE>, L<MCE::Core>
775              
776             =head1 AUTHOR
777              
778             Mario E. Roy, S<E<lt>marioeroy AT gmail DOT comE<gt>>
779              
780             =cut
781