File Coverage

blib/lib/Parallel/DataPipe.pm
Criterion Covered Total %
statement 236 280 84.2
branch 64 114 56.1
condition 20 33 60.6
subroutine 44 46 95.6
pod 2 20 10.0
total 366 493 74.2


line stmt bran cond sub pod time code
1             package Parallel::DataPipe;
2            
3             our $VERSION='0.12';
4 152     152   6749524 use 5.8.0;
  152         548  
5 152     136   9612 use strict;
  136         2252  
  136         3428  
6 136     133   16568 use warnings;
  133         2285  
  133         3222  
7 133     129   59825 use IO::Select;
  129         157993  
  129         7547  
8 129     125   15573 use List::Util qw(first min);
  125         1306  
  125         14196  
9 125 50   109   9884 use constant PIPE_MAX_CHUNK_SIZE => $^O =~ m{linux|cygwin}? 16*1024 : 1024;
  109         779  
  109         10842  
10 109     92   4921 use constant _EOF_ => (-(1<<31));
  92         276  
  92         244904  
11            
12             sub run {
13 350     350 1 3832138 my $param = {};
14 350         1497 my ($input,$map,$output) = @_;
15 350 100       4049 if (ref($input) eq 'HASH') {
16 258         1076 $param = $input;
17             } else {
18 92         460 $param = {input=>$input, process=>$map, output=>$output };
19             }
20 350         5431 pipeline($param);
21             }
22            
23             sub pipeline {
24 383     383 1 79895 my $class=shift;
25 383 100       4836 if (ref($class) eq 'HASH') {
26 350         2286 unshift @_, $class;
27 350         1787 $class = __PACKAGE__;
28             }
29 383         1285 my @pipes;
30             # init pipes
31             my $default_input;
32 383         1795 for my $param (@_) {
33 400 100       1946 unless (exists $param->{input}) {
34 17 50       663 $param->{input} = $default_input or die "You have to specify input for the first pipe";
35             }
36 400         6533 my $pipe = $class->new($param);
37 309 100       2713 if (ref($pipe->{output}) eq 'ARRAY') {
38 175         937 $default_input = $pipe->{output};
39             }
40 309         4534 push @pipes, $pipe;
41             }
42 292         2772 run_pipes(0,@pipes);
43 292         1157 my $result = $pipes[$#pipes]->{output};
44             # @pipes=() kills parent
45             # as well as its implicit destroying
46             # destroy pipes one by one if you want to survive!!!
47 292         1867 undef $_ for @pipes;
48 292 100       2602 return unless defined(wantarray);
49 158 50       405 return unless $result;
50 158 50       124534 return wantarray?@$result:$result;
51             }
52            
53             sub run_pipes {
54 792     792 0 2208 my ($prev_busy,$me,@next) = @_;
55 792   33     3193 my $me_busy = $me->load_data || $me->busy_processors;
56 792         2786 while ($me_busy) {
57 502204         1234990 $me->receive_and_merge_data;
58 502204   100     1103755 $me_busy = $me->load_data || $me->busy_processors;
59 502204   66     1120505 my $next_busy = @next && run_pipes($prev_busy || $me_busy, @next);
60 502204   66     879937 $me_busy ||= $next_busy;
61             # get data from pipe if we have free_processors
62 502204 100 66     1137887 return $me_busy if $prev_busy && $me->free_processors;
63             }
64 293         908 return 0;
65             }
66            
67             # input_iterator is either array or subroutine reference which get's data from queue or other way and returns it
68             # if there is no data it returns undef
69             sub input_iterator {
70 506123     506123 0 668584 my $self = shift;
71 506123         896971 $self->{input_iterator}->(@_);
72             }
73            
74             sub output_iterator {
75 502204     502204 0 707388 my $self = shift;
76 502204         1008305 $self->{output_iterator}->(@_);
77             }
78            
79             # this is to set/create input iterator
80             sub set_input_iterator {
81 400     400 0 1413 my ($self,$param) = @_;
82 400         1140 my $old_behaviour = $param->{input_iterator};
83 400         2779 my ($input_iterator) = extract_param($param, qw(input_iterator input queue data));
84 400 100       5575 unless (ref($input_iterator) eq 'CODE') {
85 319 50       1465 die "array or code reference expected for input_iterator" unless ref($input_iterator) eq 'ARRAY';
86 319         1426 my $queue = $input_iterator;
87 319         1037 $self->{input} = $queue;
88 319 50       1106 if ($old_behaviour) {
89 0         0 my $l = @$queue;
90 0         0 my $i = 0;
91 0 0   0   0 $input_iterator = sub {$i<$l?$queue->[$i++]:undef};
  0         0  
92             } else {
93             # this behaviour is introduced with 0.06
94 319 50   505572   3529 $input_iterator = sub {$queue?shift(@$queue):undef};
  505572         1175535  
95             }
96             }
97 400         2168 $self->{input_iterator} = $input_iterator;
98             }
99            
100             sub set_output_iterator {
101 309     309 0 2800 my ($self,$param) = @_;
102 309         9031 my ($output_iterator) = extract_param($param, qw(merge_data output_iterator output output_queue output_data merge reduce));
103 309 100       6828 unless (ref($output_iterator) eq 'CODE') {
104 175   50     4504 my $queue = $output_iterator || [];
105 175         1759 $self->{output} = $queue;
106 175     441908   3313 $output_iterator = sub {push @$queue,$_};
  441908         1196585  
107             }
108 309         5109 $self->{output_iterator} = $output_iterator;
109             }
110            
111             # loads all free processor with data from input
112             # return the number of loaded processors
113             sub load_data {
114 502996     502996 0 680637 my $self = shift;
115 502996         863111 my @free_processors = $self->free_processors;
116 502996         777383 my $result = 0;
117 502996         888683 for my $processor (@free_processors) {
118 506123         858734 my $data = $self->input_iterator;
119             # return number of processors loaded
120 506123 100       995529 return $result unless defined($data);
121 502204         672816 $result++;
122 502204         825316 $self->load_data_processor($data,$processor);
123             }
124 499077         1355560 return $result;
125             }
126            
127             # this should work with Windows NT or if user explicitly set that
128             my $number_of_cpu_cores = $ENV{NUMBER_OF_PROCESSORS};
129             sub number_of_cpu_cores {
130             #$number_of_cpu_cores = $_[0] if @_; # setter
131 278 100   278 0 1291 return $number_of_cpu_cores if $number_of_cpu_cores;
132 92         184 eval {
133             # try unix (linux,cygwin,etc.)
134 92         431480 $number_of_cpu_cores = scalar grep m{^processor\t:\s\d+\s*$},`cat /proc/cpuinfo 2>/dev/null`;
135             # try bsd
136 92 50       6532 ($number_of_cpu_cores) = map m{hw.ncpu:\s+(\d+)},`sysctl -a` unless $number_of_cpu_cores;
137             };
138             # otherwise it sets number_of_cpu_cores to 2
139 92   50     2668 return $number_of_cpu_cores || 1;
140             }
141            
142             sub freeze {
143 62058     62058 0 85009 my $self = shift;
144 62058         206325 $self->{freeze}->(@_);
145             }
146            
147             sub thaw {
148 62074     62074 0 94564 my $self = shift;
149 62074         350943 $self->{thaw}->(@_);
150             }
151            
152             # this inits freeze and thaw with Storable subroutines and try to replace them with Sereal counterparts
153             sub init_serializer {
154 400     400 0 2740 my ($self,$param) = @_;
155 400   66     6646 my ($freeze,$thaw) = grep $_ && ref($_) eq 'CODE',map delete $param->{$_},qw(freeze thaw);
156 400 100 66     2972 if ($freeze && $thaw) {
157 76         304 $self->{freeze} = $freeze;
158 76         228 $self->{thaw} = $thaw;
159             } else {
160             # try cereal
161 92 50   92   47472 eval q{
  92         96784  
  92         6716  
  324         95969  
162             use Sereal qw(encode_sereal decode_sereal);
163             $self->{freeze} = \&encode_sereal;
164             $self->{thaw} = \&decode_sereal;
165             1;
166             }
167             or
168             eval q{
169             use Storable;
170             $self->{freeze} = \&Storable::nfreeze;
171             $self->{thaw} = \&Storable::thaw;
172             1;
173             };
174            
175             }
176             }
177            
178            
179             # this subroutine reads data from pipe and converts it to perl reference
180             # or scalar - if size is negative
181             # it always expects size of frozen scalar so it know how many it should read
182             # to feed thaw
183             sub _get_data {
184 514317     514317   881868 my ($self,$fh) = @_;
185 514317         654308 my ($data_size,$data);
186 514317         1551128 $fh->sysread($data_size,4);
187 514317         267373003 $data_size = unpack("l",$data_size);
188 514317 100       1238159 return undef if $data_size == _EOF_; # this if for process_data terminating
189 514226 50       890377 if ($data_size == 0) {
190 0         0 $data = '';
191             } else {
192 514226         731654 my $length = abs($data_size);
193 514226         648071 my $offset = 0;
194             # allocate all the buffer for $data beforehand
195 514226         1906325 $data = sprintf("%${length}s","");
196 514226         1066921 while ($offset < $length) {
197 523954         1246769 my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
198 523954         1451639 $fh->sysread(my $buf,$chunk_size);
199             # use lvalue form of substr to copy data in preallocated buffer
200 523954         5922373 substr($data,$offset,$chunk_size) = $buf;
201 523954         1314109 $offset += $chunk_size;
202             }
203 514226 100       996714 $data = $self->thaw($data) if $data_size<0;
204             }
205 514226         2446097 return $data;
206             }
207            
208             # this subroutine serialize data reference. otherwise
209             # it puts negative size of scalar and scalar itself to pipe.
210             sub _put_data {
211 515186     515186   894493 my ($self,$fh,$data) = @_;
212 515186 100       826106 unless (defined($data)) {
213 960         3240 $fh->syswrite(pack("l", _EOF_));
214 960         35400 return;
215             }
216 514226         781692 my $length = length($data);
217 514226 100       896272 if (ref($data)) {
218 62058         112783 $data = $self->freeze($data);
219 62058         1561495 $length = -length($data);
220             }
221 514226         2180255 $fh->syswrite(pack("l", $length));
222 514226         19080183 $length = abs($length);
223 514226         811989 my $offset = 0;
224 514226         1182357 while ($offset < $length) {
225 523442         1384693 my $chunk_size = min(PIPE_MAX_CHUNK_SIZE,$length-$offset);
226 523442         2196898 $fh->syswrite(substr($data,$offset,$chunk_size));
227 523442         9448204 $offset += $chunk_size;
228             }
229             }
230            
231             sub _fork_data_processor {
232 4277     4277   11290 my ($data_processor_callback, $init_data_processor) = @_;
233             # create processor as fork
234 4277         2835346 my $pid = fork();
235 4277 50       79264 unless (defined $pid) {
236             #print "say goodbye - can't fork!\n"; <>;
237 0         0 die "can't fork!";
238             }
239 4277 100       13916 if ($pid == 0) {
240             local $SIG{TERM} = sub {
241 0     0   0 exit;
242 91         20901 }; # exit silently from data processors
243             # data processor is eternal loop which wait for raw data on pipe from main
244             # data processor is killed when it's not needed anymore by _kill_data_processors
245 91 50 33     4448 $init_data_processor->() if ref($init_data_processor) && ref($init_data_processor) eq 'CODE';
246 91         2660 $data_processor_callback->() while (1);
247 0         0 exit;
248             }
249 4186         944067 return $pid;
250             }
251            
252             sub _create_data_processor {
253 4277     4277   18191 my ($self,$process_data_callback, $init_data_processor) = @_;
254            
255             # parent <=> child pipes
256 4277         103990 my ($parent_read, $parent_write) = pipely();
257 4277         10962 my ($child_read, $child_write) = pipely();
258            
259             my $data_processor = sub {
260 12113     12113   37983 local $_ = $self->_get_data($child_read);
261 12113 100       36645 unless (defined($_)) {
262 91         74208 exit 0;
263             }
264 12022         35042 $_ = $process_data_callback->($_);
265 12022         75563 $self->_put_data($parent_write,$_);
266 4277         49474 };
267            
268             # return data processor record
269             return {
270 4277         35125 pid => _fork_data_processor($data_processor,$init_data_processor), # needed to kill processor when there is no more data to process
271             child_write => $child_write, # pipe to write raw data from main to data processor
272             parent_read => $parent_read, # pipe to write raw data from main to data processor
273             };
274             }
275            
276             sub extract_param {
277 1909     1909 0 30541 my ($param, @alias) = @_;
278 1909     4076   36702 return first {defined($_)} map delete($param->{$_}), @alias;
  4076         15030  
279             }
280            
281             sub create_data_processors {
282 400     400 0 1516 my ($self,$param) = @_;
283 400         1186 my $process_data_callback = extract_param($param,qw(process_data process processor map));
284 400         2053 my $init_data_processor = extract_param($param,qw(init_data_processor));
285 400         1356 my $number_of_data_processors = extract_param($param,qw(number_of_data_processors number_of_processors));
286 400 100       6232 $number_of_data_processors = $self->number_of_cpu_cores unless $number_of_data_processors;
287 400 50       5432 die "process_data parameter should be code ref" unless ref($process_data_callback) eq 'CODE';
288 400 50       2461 die "\$number_of_data_processors:undefined" unless defined($number_of_data_processors);
289 400         4708 return [map $self->_create_data_processor($process_data_callback, $init_data_processor, $_), 0..$number_of_data_processors-1];
290             }
291            
292             sub load_data_processor {
293 502204     502204 0 792151 my ($self,$data,$processor) = @_;
294 502204         793459 $processor->{item_number} = $self->{item_number}++;
295 502204 50       825128 die "no support of data processing for undef items!" unless defined($data);
296 502204         667621 $processor->{busy} = 1;
297 502204         912377 $self->_put_data($processor->{child_write},$data);
298             }
299            
300             sub busy_processors {
301 3343     3343 0 5615 my $self = shift;
302 3343         5067 return grep $_->{busy}, @{$self->{processors}};
  3343         15395  
303             }
304            
305             sub free_processors {
306 503495     503495 0 622210 my $self = shift;
307 503495         742991 return grep !$_->{busy}, @{$self->{processors}};
  503495         2608749  
308             }
309            
310             sub receive_and_merge_data {
311 502204     502204 0 733650 my $self = shift;
312 502204         668142 my ($processors,$ready) = @{$self}{qw(processors ready)};
  502204         1048519  
313 502204 100       927196 $self->{ready} = $ready = [] unless $ready;
314 502204 100 66     1496386 @$ready = IO::Select->new(map $_->{busy} && $_->{parent_read},@$processors)->can_read() unless @$ready;
315 502204         14248340 my $fh = shift(@$ready);
316 502204     4255345   2520303 my $processor = first {$_->{parent_read} == $fh} @$processors;
  4255345         6369044  
317 502204         1537551 local $_ = $self->_get_data($fh);
318 502204         953585 $processor->{busy} = undef; # make processor free
319 502204         1135793 $self->output_iterator($_,$processor->{item_number});
320             }
321            
322             sub _kill_data_processors {
323 60     60   660 my ($self) = @_;
324 60         480 my $processors = $self->{processors};
325 60         1080 my @pid_to_kill = map $_->{pid}, @$processors;
326 60         360 my %pid_to_wait = map {$_=>undef} @pid_to_kill;
  960         4260  
327             # put undef to input of data_processor - they know it's time to exit
328 60         1260 $self->_put_data($_->{child_write}) for @$processors;
329 60         300 while (@pid_to_kill) {
330 960         27816900 my $pid = wait;
331 960         18300 delete $pid_to_wait{$pid};
332 960         30900 @pid_to_kill = keys %pid_to_wait;
333             }
334             }
335            
336             sub new {
337 400     400 0 1766 my ($class, $param) = @_;
338 400         6894 my $self = {mypid=>$$};
339 400         2590 bless $self,$class;
340 400         3461 $self->set_input_iterator($param);
341             # item_number for merge implementation
342 400         2455 $self->{item_number} = 0;
343             # check if user want to use alternative serialisation routines
344 400         5415 $self->init_serializer($param);
345             # @$processors is array with data processor info
346 400         3832 $self->{processors} = $self->create_data_processors($param);
347             # data_merge is sub which merge all processed data inside parent thread
348             # it is called each time after process_data returns some new portion of data
349 309         11220 $self->set_output_iterator($param);
350 309         4963 my $not_supported = join ", ", keys %$param;
351 309 50       1054 die "Parameters are redundant or not supported:". $not_supported if $not_supported;
352 309         6859 return $self;
353             }
354            
355             sub DESTROY {
356 76     76   879 my $self = shift;
357 76 100       21629 return unless $self->{mypid} == $$;
358 60         1680 $self->_kill_data_processors;
359             #semctl($self->{sem_id},0,IPC_RMID,0);
360             }
361            
362             =begin comment
363            
364             Why I copied IO::Pipely::pipely instead of use IO::Pipely qw(pipely)?
365             1. Do not depend on installation of additional module
366             2. I don't know (yet) how to win race condition:
367             A) In Makefile.PL I would to check if fork & pipe works on the platform before creating Makefile.
368             But I am not sure if it's ok that at that moment I can use pipely to create pipes.
369             so
370             B) to use pipely I have to create makefile
371             For now I decided just copy code for pipely into this module.
372             Then if I know how do win that race condition I will get rid of this code and
373             will use IO::Pipely qw(pipely) instead and
374             will add dependency on it.
375            
376             =end comment
377            
378             =cut
379            
380             # IO::Pipely is copyright 2000-2012 by Rocco Caputo.
381 92     92   47656 use Symbol qw(gensym);
  92         70840  
  92         6348  
382 92         368 use IO::Socket qw(
383             AF_UNIX
384             PF_INET
385             PF_UNSPEC
386             SOCK_STREAM
387             SOL_SOCKET
388             SOMAXCONN
389             SO_ERROR
390             SO_REUSEADDR
391             inet_aton
392             pack_sockaddr_in
393             unpack_sockaddr_in
394 92     92   42136 );
  92         1847360  
395 92     92   22080 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  92         184  
  92         5980  
396 92     92   552 use Errno qw(EINPROGRESS EWOULDBLOCK);
  92         184  
  92         75808  
397            
398             my (@oneway_pipe_types, @twoway_pipe_types);
399             if ($^O eq "MSWin32" or $^O eq "MacOS") {
400             @oneway_pipe_types = qw(inet socketpair pipe);
401             @twoway_pipe_types = qw(inet socketpair pipe);
402             }
403             elsif ($^O eq "cygwin") {
404             @oneway_pipe_types = qw(pipe inet socketpair);
405             @twoway_pipe_types = qw(inet pipe socketpair);
406             }
407             else {
408             @oneway_pipe_types = qw(pipe socketpair inet);
409             @twoway_pipe_types = qw(socketpair inet pipe);
410             }
411            
412             sub pipely {
413 8554     8554 0 18875 my %arg = @_;
414            
415 8554         16365 my $conduit_type = delete($arg{type});
416 8554   50     98546 my $debug = delete($arg{debug}) || 0;
417            
418             # Generate symbols to be used as filehandles for the pipe's ends.
419             #
420             # Filehandle autovivification isn't used for portability with older
421             # versions of Perl.
422            
423 8554         116112 my ($a_read, $b_write) = (gensym(), gensym());
424            
425             # Try the specified conduit type only. No fallback.
426            
427 8554 50       578908 if (defined $conduit_type) {
428 0 0       0 return ($a_read, $b_write) if _try_oneway_type(
429             $conduit_type, $debug, \$a_read, \$b_write
430             );
431             }
432            
433             # Otherwise try all available conduit types until one works.
434             # Conduit types that fail are discarded for speed.
435            
436 8554         54504 while (my $try_type = $oneway_pipe_types[0]) {
437 8554 50       67253 return ($a_read, $b_write) if _try_oneway_type(
438             $try_type, $debug, \$a_read, \$b_write
439             );
440 0         0 shift @oneway_pipe_types;
441             }
442            
443             # There's no conduit type left. Bummer!
444            
445 0 0       0 $debug and warn "nothing worked";
446 0         0 return;
447             }
448            
449             # Try a pipe by type.
450            
451             sub _try_oneway_type {
452 8554     8554   49303 my ($type, $debug, $a_read, $b_write) = @_;
453            
454             # Try a pipe().
455 8554 50       22406 if ($type eq "pipe") {
456 8554         47940 eval {
457 8554 50       364594 pipe($$a_read, $$b_write) or die "pipe failed: $!";
458             };
459            
460             # Pipe failed.
461 8554 50       25686 if (length $@) {
462 0 0       0 warn "pipe failed: $@" if $debug;
463 0         0 return;
464             }
465            
466 8554 50       15269 $debug and do {
467 0         0 warn "using a pipe";
468 0         0 warn "ar($$a_read) bw($$b_write)\n";
469             };
470            
471             # Turn off buffering. POE::Kernel does this for us, but
472             # someone might want to use the pipe class elsewhere.
473 8554         177233 select((select($$b_write), $| = 1)[0]);
474 8554         49695 return 1;
475             }
476            
477             # Try a UNIX-domain socketpair.
478 0 0         if ($type eq "socketpair") {
479 0           eval {
480 0 0         socketpair($$a_read, $$b_write, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
481             or die "socketpair failed: $!";
482             };
483            
484 0 0         if (length $@) {
485 0 0         warn "socketpair failed: $@" if $debug;
486 0           return;
487             }
488            
489 0 0         $debug and do {
490 0           warn "using a UNIX domain socketpair";
491 0           warn "ar($$a_read) bw($$b_write)\n";
492             };
493            
494             # It's one-way, so shut down the unused directions.
495 0           shutdown($$a_read, 1);
496 0           shutdown($$b_write, 0);
497            
498             # Turn off buffering. POE::Kernel does this for us, but someone
499             # might want to use the pipe class elsewhere.
500 0           select((select($$b_write), $| = 1)[0]);
501 0           return 1;
502             }
503            
504             # Try a pair of plain INET sockets.
505 0 0         if ($type eq "inet") {
506 0           eval {
507 0           ($$a_read, $$b_write) = _make_socket();
508             };
509            
510 0 0         if (length $@) {
511 0 0         warn "make_socket failed: $@" if $debug;
512 0           return;
513             }
514            
515 0 0         $debug and do {
516 0           warn "using a plain INET socket";
517 0           warn "ar($$a_read) bw($$b_write)\n";
518             };
519            
520             # It's one-way, so shut down the unused directions.
521 0           shutdown($$a_read, 1);
522 0           shutdown($$b_write, 0);
523            
524             # Turn off buffering. POE::Kernel does this for us, but someone
525             # might want to use the pipe class elsewhere.
526 0           select((select($$b_write), $| = 1)[0]);
527 0           return 1;
528             }
529            
530             # There's nothing left to try.
531 0 0         $debug and warn "unknown pipely() socket type ``$type''";
532 0           return;
533             }
534            
535            
536             1;
537            
538             =head1 NAME
539            
540             C - parallel data processing conveyor
541            
542             =encoding utf-8
543            
544             =head1 SYNOPSIS
545            
546             use Parallel::DataPipe;
547             Parallel::DataPipe::run {
548             input => [1..100],
549             process => sub { "$_:$$" },
550             number_of_data_processors => 100,
551             output => sub { print "$_\n" },
552             };
553            
554            
555             =head1 DESCRIPTION
556            
557            
558             If you have some long running script processing data item by item
559             (having on input some data and having on output some processed data i.e. aggregation, webcrawling,etc)
560             you can speed it up 4-20 times using parallel datapipe conveyour.
561             Modern computer (even modern smartphones ;) ) have multiple CPU cores: 2,4,8, even 24!
562             And huge amount of memory: memory is cheap now.
563             So they are ready for parallel data processing.
564             With this script there is an easy and flexible way to use that power.
565            
566             So what are the benefits of this module?
567            
568             1) because it uses input_iterator it does not have to know all input data before starting parallel processing
569            
570             2) because it uses merge_data processed data is ready for using in main thread immediately.
571            
572             1) and 2) remove requirements for memory which is needed to store data items before and after parallel work. and allows parallelize work on collecting, processing and using processed data.
573            
574             If you don't want to overload your database with multiple simultaneous queries
575             you make queries only within input_iterator and then process_data and then flush it with merge_data.
576             On the other hand you usually win if make queries in process_data and do a lot of data processors.
577             Possibly even more then physical cores if database queries takes a long time and then small amount to process.
578            
579             It's not surprise, that DB servers usually serves N queries simultaneously faster then N queries one by one.
580            
581             Make tests and you will know.
582            
583             To (re)write your script for using all processing power of your server you have to find out:
584            
585             1) the method to obtain source/input data. I call it input iterator. It can be either array with some identifiers/urls or reference to subroutine which returns next portion of data or undef if there is nor more data to process.
586            
587             2) how to process data i.e. method which receives input item and produce output item. I call it process_data subroutine. The good news is that item which is processed and then returned can be any scalar value in perl, including references to array and hashes. It can be everything that Storable can freeze and then thaw.
588            
589             3) how to use processed data. I call it merge_data. In the example above it just prints an item, but you could do buffered inserts to database, send email, etc.
590            
591             Take into account that 1) and 3) is executed in main script thread. While all 2) work is done in parallel forked threads. So for 1) and 3) it's better not to do things that block execution and remains hungry dogs 2) without meat to eat. So (still) this approach will benefit if you know that bottleneck in you script is CPU on processing step. Of course it's not the case for some web crawling tasks unless you do some heavy calculations
592            
593             =head1 SUBROUTINES
594            
595             =head2 run
596            
597             This is subroutine which covers magic of parallelizing data processing.
598             It receives paramaters with these keys via hash ref.
599            
600             B - reference to array or subroutine which should return data item to be processed.
601             in case of subroutine it should return undef to signal EOF.
602             In case of array it uses it as queue, i.e. shift(@$array) until there is no data item,
603             This behaviour has been introduced in 0.06.
604             Also you can use these aliases:
605             input_iterator, queue, data
606            
607             Note: in version before 0.06 it was input_iterator and if reffered to array it remained untouched.
608             while new behaviour is to treat this parameter like a queue.
609             0.06 support old behaviour only for input_iterator,
610             while in the future it will behave as a queue to make life easier
611            
612             B - reference to subroutine which process data items. they are passed via $_ variable
613             Then it should return processed data. this subroutine is executed in forked process so don't
614             use any shared resources inside it.
615             Also you can update children state, but it will not affect parent state.
616             Also you can use these aliases:
617             process_data
618            
619             These parameters are optional and has reasonable defaults, so you change them only know what you do
620            
621             B - optional. either reference to a subroutine or array which receives processed data item.
622             subroutine can use $_ or $_[0] to access data item and $_[1] to access item_number.
623             this subroutine is executed in parent thread, so you can rely on changes that it made.
624             if you don't specify this parameter array with processed data can be received as a subroutine result.
625             You can use this aliseases for this parameter:
626             merge_data, merge
627            
628             B - (optional) number of parallel data processors. if you don't specify,
629             it tries to find out a number of cpu cores
630             and create the same number of data processor children.
631             It looks for NUMBER_OF_PROCESSORS environment variable, which is set under Windows NT.
632             If this environment variable is not found it looks to /proc/cpuinfo which is availbale under Unix env.
633             It makes sense to have explicit C
634             which possibly is greater then cpu cores number
635             if you are to use all slave DB servers in your environment
636             and making query to DB servers takes more time then processing returned data.
637             Otherwise it's optimal to have C equal to number of cpu cores.
638            
639             B, B - you can use alternative serializer.
640             for example if you know that you are working with array of words (0..65535) you can use
641             freeze => sub {pack('S*',@{$_[0]})} and thaw => sub {[unpack('S*',$_[0])]}
642             which will reduce the amount of bytes exchanged between processes.
643             But do it as the last optimization resort only.
644             In fact automatic choise is quite good and efficient.
645             It uses encode_sereal and decode_sereal if Sereal module is found.
646             Otherwise it use Storable freeze and thaw.
647            
648             Note: run has also undocumented prototype for calling (\@\$) i.e.
649            
650             my @x2 = Parallel::DataPipe::run([1..100],sub {$_*2});
651            
652             This feature is experimental and can be removed. Use it at your own risk.
653            
654             =head2 pipeline
655            
656             pipeline() is a chain of run() (parallel data pipes) executed in parallel
657             and input for next pipe is implicitly got from previous one.
658            
659             run {input => \@queue, process => \&process, output => \@out}
660            
661             is the same as
662            
663             pipeline {input => \@queue, process => \&process, output => \@out}
664            
665             But with pipeline you can create chain of connected pipes and run all of them in parallel
666             like it's done in unix with processes pipeline.
667            
668             pipeline(
669             { input => \@queue, process => \&process1},
670             { process => \&process2},
671             { process => \&process3, output => sub {print "$_\n";} },
672             );
673            
674             And it works like in unix - input of next pipe is (implicitly) set to output from previous pipe.
675             You have to specify input for the first pipe explicitly (see example of parallel grep 'hello' below ).
676            
677             If you don't specify input for next pipe it is assumed that it is output from previous pipe like in unix.
678             Also this assumption that input of next pipe depends on output of previous is applied for algorithm
679             on prioritizing of execution of pipe processors.
680             As long as the very right (last in list) pipe has input items to process it executes it's data processors.
681             If this pipe has free processor that is not loaded with data then the processors from previous pipe are executed
682             to produce an input data for next pipe.
683             This is recursively applied for all chain of pipes.
684            
685             Here is parallel grep implemented in 40 lines of perl code:
686            
687             use List::More qw(part);
688             my @dirs = '.';
689             my @files;
690             pipeline(
691             # this pipe looks (recursively) for all files in specified @dirs
692             {
693             input => \@dirs,
694             process => sub {
695             my ($files,$dirs) = part -d?1:0,glob("$_/*");
696             return [$files,$dirs];
697             },
698             output => sub {
699             my ($files,$dirs) = @$_;
700             push @dirs,@$dirs;# recursion is here
701             push @files,@$files;
702             },
703             },
704             # this pipe grep files for word hello
705             {
706             input => \@files,
707             process => sub {
708             my ($file) = $_;
709             open my $fh, $file;
710             my @lines;
711             while (<$fh>) {
712             # line_number : line
713             push @lines,"$.:$_" if m{hello};
714             }
715             return [$file,\@lines];
716             },
717             output => sub {
718             my ($file,$lines) = @$_;
719             # print filename, line_number , line
720             print "$file:$_" for @$lines;
721             }
722             }
723             );
724            
725             =head1 HOW parallel pipe (run) WORKS
726            
727             1) Main thread (parent) forks C of children for processing data.
728            
729             2) As soon as data comes from C it sends it to next child using
730             pipe mechanizm.
731            
732             3) Child processes data and returns result back to parent using pipe.
733            
734             4) Parent firstly fills up all the pipes to children with data and then
735             starts to expect processed data on pipes from children.
736            
737             5) If it receives result from chidlren it sends processed data to C subroutine,
738             and starts loop 2) again.
739            
740             6) loop 2) continues until input data is ended (end of C array or C sub returned undef).
741            
742             7) In the end parent expects processed data from all busy chidlren and puts processed data to C
743            
744             8) After having all the children sent processed data they are killed and run returns to the caller.
745            
746             Note:
747             If C or returns reference, it serialize/deserialize data before/after pipe.
748             That way you have full control whether data will be serialized on IPC.
749            
750             =head1 SEE ALSO
751            
752             L
753            
754             L
755            
756             L
757            
758             L
759            
760             L - pipes that work almost everywhere
761            
762             L - portable multitasking and networking framework for any event loop
763            
764             L
765            
766             L
767            
768             =head1 DEPENDENCIES
769            
770             Only core modules are used.
771            
772             if found it uses Sereal module for serialization instead of Storable as the former is more efficient.
773            
774             =head1 BUGS
775            
776             For all bugs please send an email to okharch@gmail.com.
777            
778             =head1 SOURCE REPOSITORY
779            
780             See the git source on github
781             L
782            
783             =head1 COPYRIGHT
784            
785             Copyright (c) 2013 Oleksandr Kharchenko
786            
787             All right reserved. This library is free software; you can redistribute it
788             and/or modify it under the same terms as Perl itself.
789            
790             =head1 AUTHOR
791            
792             Oleksandr Kharchenko
793            
794             =cut