File Coverage

blib/lib/IPC/Fork/Simple.pm
Criterion Covered Total %
statement 151 575 26.2
branch 13 236 5.5
condition 0 91 0.0
subroutine 42 72 58.3
pod 19 20 95.0
total 225 994 22.6


line stmt bran cond sub pod time code
1             package IPC::Fork::Simple;
2              
3             =head1 NAME
4              
5             IPC::Fork::Simple - Simplified interprocess communication for forking
6             processes.
7              
8             =head1 SYNOPSIS
9              
10             use IPC::Fork::Simple;
11              
12             my $ipc = IPC::Fork::Simple->new();
13             my $pid = fork();
14             if ( $pid ) {
15             $ipc->spawn_data_handler();
16             # Do important stuff here.
17             # ...
18             #
19             waitpid( $pid, 0 );
20             $ipc->collect_data_from_handler();
21             warn "Child sent: " . ${$ipc->from_child( $pid, 'test' )};
22             } else {
23             $ipc->init_child();
24             $ipc->to_master( 'test', 'a' x 300 ) || die $!;
25             }
26              
27             =head1 DESCRIPTION
28              
29             IPC::Fork::Simple is a module designed to simplify interprocess communication
30             used between a parent and its child forks. This version of the module only
31             supports one-way communication, from the child to the parent.
32              
33             =head1 THEORY OF OPERATION
34              
35             The basic idea behind this module is to one or more forks to return data to
36             their parent easily. This module divides a forking program into "master",
37             "child", and "other" forks. The master fork creates the first
38             IPC::Fork::Simple module and then calls fork() any number of times. Any
39             children created by the master will then call L to specify
40             their participation in the system. Child forks that do not call L,
41             prior forks that may have created the master, or other unrealted processes
42             in the same process group, will be considered other forks and will not have
43             a role in the system.
44              
45             When a child is ready to send data to the master, it must assign that data
46             a name by which it will be retrieved later by the master. When the master
47             is ready to collect the data from a child, it will request that data by name
48             and CID. Data passed from the child to the master will be automatically
49             serialized/unserialized by L, so almost any data type can be
50             transmitted, of up to 4 gigabytes in size.
51              
52             Once a fork calls L, the master will then be able to track the
53             child fork, returning any data that is sent, and returning whether or not
54             the child has closed its connection with he master.
55              
56             =head1 USAGE
57              
58             There are three methods of use for IPC::Fork::Simple, each relating to the
59             actions taken by the master while the children are running.
60              
61             =head2 Blocking Wait
62              
63             A single call to L with the appropriate BLOCK flag will
64             cause L to block until a child has disconnected. By
65             calling L once for each child, all data from all
66             children can be collected easily. Using this method makes it hard for the
67             master process to do anything other than spawn and monitor children.
68              
69             =head2 Polling
70              
71             A call to L with a false parameter will cause
72             L to only process pending data. If placed inside of a
73             loop, the master process can still gather data while it performs other work.
74             To determine when the children have ended the master can poll
75             L for the number and CIDs of children who have disconnected.
76             This method will allow the master to perform other tasks while the children
77             are running, but it will have to make periodic callbacks to
78             L.
79              
80             =head2 Data Handler
81              
82             Calling L will cause the master to fork, and create a
83             process which will automatially listen for and gather data from any children
84             spawned by the master, either before or after the call to L.
85             When the master is ready to collect the data from the children, the data handler
86             will copy all data to the master and exit. To determine when a child has exited
87             L can be polled or the appropriate BLOCK flag can be passed
88             to L. This method completely frees up the master to
89             perform other tasks. This method uses less memory and performs faster than the
90             others for large numbers of forks or for master processes that consume large
91             amounts of memory.
92              
93             =head2 Notes
94              
95             It was previously documented that calling wait(2) (or a similar function) to
96             determine if a child had ended was valid. This will correctly detect when a
97             child has exited, but an immediate call to one of the data or finished
98             child retrieval functions may not return that child's data. The only way
99             to be sure a child's data has been received is to check L
100             or attempt to fetch the data.
101              
102             =head1 CHILD IDENTIFICATION
103              
104             Internally, children are identified by a child id number, or CID. This
105             number is guaranteed to be unique for each child (and is currently
106             implemented as an integer starting with 0).
107              
108             Child processes also have a symbolic name used to identify themselves. This
109             name defaults to the child's PID, but can be changed. Symbolic names can be
110             re-used, and attempting to access data by symbolic name after a symbolic name
111             has been re-used will return the data from one of the children at random. It
112             is recommended that the symbolic name be unique, but it is not required. PIDs
113             are not guaranteed to be unique. See L and L for details.
114              
115             L will return a list of children who have ended, and
116             L will do the same for children who have called
117             L but not yet ended.
118              
119             =head1 EXPORTS
120              
121             By default, nothing is exported by IPC::Fork::Simple. Two tags are available
122             to export specific flags. Helper functions can be exported by their name.
123              
124             =head2 :packet_flags
125              
126             FLAG_PACKET flags are used to describe the reason L has
127             returned, and generally describing the the last action by a child.
128              
129             Note: Other flags, and thus other return values, do exist, however they should
130             never be returned to the caller unless due to a bug in IPC::Fork::Simple.
131              
132             =head3 FLAG_PACKET_NOERROR
133              
134             No error has occurred. This flag is only returned when L
135             is called without blocking, but no data or events were pending.
136              
137             =head3 FLAG_PACKET_CHILD_DISCONNECTED
138              
139             A child has ended (successfully or otherwise).
140              
141             =head3 FLAG_PACKET_DATA
142              
143             A child has sent data and it has been successfully received.
144              
145             =head3 FLAG_PACKET_CHILD_HELLO
146              
147             A child has called L.
148              
149             =head2 :block_flags
150              
151             Block flags define different blocking methods for calls to
152             L. See L for details.
153              
154             =head3 BLOCK_NEVER
155              
156             Never blocks. Processes all available data on the socket and then returns.
157              
158             Note: Technically, it is possible for this flag to block. For example, if a
159             child sends partial data, the call will block until the rest of the data is
160             received. These cases should be extremely rare.
161              
162             =head3 BLOCK_UNTIL_CHILD
163              
164             Blocks until a child disconnects.
165              
166             Note: This flag will cause a return in other cases which are only used
167             internally, however it's possible a bug may cause a L to
168             return to the caller under other conditions.
169              
170             =head3 BLOCK_UNTIL_DATA
171              
172             Blocks until a child returns data or disconnects. The notes for
173             BLOCK_UNTIL_CHILD apply here too (as this is simply a superset of
174             BLOCK_UNTIL_CHILD).
175              
176             =cut
177              
178             ##############################################################################
179              
180 1     1   35015 use strict;
  1         3  
  1         46  
181 1     1   2526 use IO::Socket::INET;
  1         32757  
  1         10  
182 1     1   1481 use IO::Select;
  1         1526  
  1         47  
183 1     1   923 use Storable qw/ thaw freeze /;
  1         9872  
  1         95  
184 1     1   12 use Carp;
  1         1  
  1         57  
185 1     1   6 use Socket;
  1         4  
  1         1152  
186              
187 1     1   6 use constant 1.01;
  1         27  
  1         28  
188 1     1   4 use constant DEBUG => 0;
  1         2  
  1         75  
189 1     1   5 use constant CLIENT_AUTHENTICATION_TIME => 30; # seconds
  1         3  
  1         114  
190              
191             if ( DEBUG ) {
192             require Data::Dumper;
193             import Data::Dumper;
194             require Data::Hexdumper;
195             import Data::Hexdumper;
196             }
197              
198 1     1   6 use vars qw( $VERSION );
  1         1  
  1         126  
199             $VERSION = 1.47;
200              
201             if ( DEBUG ) {
202             $SIG{__WARN__} = sub { warn "$$ " . shift; };
203             }
204              
205             require Exporter;
206             our ( @ISA, @EXPORT_OK, %EXPORT_TAGS );
207              
208             # Here are some constants we defined based on the size of various data types.
209             # These data types are not defined in machine-dependent ways, so these should
210             # only need to be changed when the packet formats change.
211              
212             # 1 byte for flags.
213 1     1   4 use constant HEADER_SIZE => 1;
  1         2  
  1         36  
214 1     1   4 use constant HEADER_PACKING => 'c';
  1         1  
  1         36  
215              
216             # Data name length (4 bytes) + data length (4 bytes).
217 1     1   4 use constant HEADER_DATA_ADDITIONAL_SIZE => 8;
  1         1  
  1         50  
218 1     1   12 use constant HEADER_DATA_PACKING => 'NN';
  1         2  
  1         48  
219              
220             # Number of finished children (4 bytes).
221 1     1   4 use constant HEADER_FINISHED_ADDITIONAL_SIZE => 4;
  1         3  
  1         46  
222 1     1   5 use constant HEADER_FINISHED_PACKING => 'N';
  1         1  
  1         83  
223              
224             # cid length (4 bytes) + whether or not the child is finished (1 byte) +
225             # source's symbolic name length (4 bytes).
226 1     1   5 use constant HEADER_FINISHED_EACH_ADDITIONAL_SIZE => 9;
  1         2  
  1         50  
227 1     1   4 use constant HEADER_FINISHED_EACH_PACKING => 'NcN';
  1         2  
  1         37  
228              
229             # Source's cid, source's symbolic name length (4 bytes) + data name length (4
230             # bytes) + data length (4 bytes).
231 1     1   4 use constant HEADER_HANDLER_DATA_ADDITIONAL_SIZE => 16;
  1         1  
  1         43  
232 1     1   3 use constant HEADER_HANDLER_DATA_PACKING => 'NNNN';
  1         1  
  1         30  
233              
234             # Shared key, symbolic name length (4 bytes).
235 1     1   4 use constant HEADER_CHILD_HELLO_ADDITIONAL_SIZE => 8;
  1         1  
  1         39  
236 1     1   4 use constant HEADER_CHILD_HELLO_PACKING => 'NN';
  1         1  
  1         33  
237              
238             # Constants used to define the type of packet being sent. FLAG_PACKET_* values
239             # occupy the bottom 4 bits of the "flags" byte, while FLAG_DATA_* values have
240             # the top 4 bits. The bottom 4 bits (for FLAG_PACKET_*) are treated as a
241             # 4-bit integer, while the upper 4 bits (for FLAG_DATA_*) are treated as a
242             # bitfield. The max value for FLAG_PACKET_* flags is 2**4 == 15.
243             # FLAG_RETURN_* values are return values used by _data_from_socket to
244             # indicate other return conditions. They're never transmitted as part of a
245             # packet, but need to share the same data-space as FLAG_PACKET_* values, so
246             # we start counting FLAG_PACKET_* from the highest FLAG_RETURN_ value +1.
247             #
248             # When adding these, pay attention to the regular expression for exporting
249             # these constants.
250              
251             # No error encountered.
252 1     1   4 use constant FLAG_RETURN_NOERROR => 0;
  1         1  
  1         33  
253             # A child was disconnected.
254 1     1   10 use constant FLAG_RETURN_CHILD_DISCONNECTED => 1;
  1         1  
  1         48  
255              
256             # Copy these two constants out into the FLAG_PACKET namespace for export use.
257 1     1   9 use constant FLAG_PACKET_NOERROR => FLAG_RETURN_NOERROR;
  1         1  
  1         51  
258 1     1   5 use constant FLAG_PACKET_CHILD_DISCONNECTED => FLAG_RETURN_CHILD_DISCONNECTED;
  1         1  
  1         45  
259              
260             # Packet contains data.
261 1     1   5 use constant FLAG_PACKET_DATA => 2;
  1         1  
  1         45  
262             # Packet contains data on all children that have connected (their cid,
263             # symbolic name, and whether or not they have finished).
264 1     1   5 use constant FLAG_PACKET_FINISHED_CHILDREN => 3;
  1         2  
  1         39  
265             # Query for children that have disconnected.
266 1     1   5 use constant FLAG_PACKET_ASK_FINISHED_CHILDREN => 4;
  1         1  
  1         47  
267             # Master asking the data handler to send all child data.
268 1     1   4 use constant FLAG_PACKET_GET_CHILD_DATA => 5;
  1         2  
  1         39  
269             # Master asking the data handler to exit after returning all data.
270 1     1   5 use constant FLAG_PACKET_GET_CHILD_DATA_AND_EXIT => 6;
  1         2  
  1         46  
271             # Master asking the data handler to send all child data, but block until there
272             # is some, if there is none.
273 1     1   5 use constant FLAG_PACKET_GET_CHILD_DATA_BLOCK => 7;
  1         1  
  1         40  
274             # Master asking the data handler to send all child data, but block until a
275             # child has exited.
276 1     1   4 use constant FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK => 8;
  1         2  
  1         30  
277             # Data handler reporting it has sent all data to parent and is clearing its
278             # stored data.
279 1     1   4 use constant FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT => 9;
  1         1  
  1         61  
280             # Child connecting to master and registering its symbolic name.
281 1     1   4 use constant FLAG_PACKET_CHILD_HELLO => 10;
  1         16  
  1         30  
282             # Data for all children, sent from the data handler.
283 1     1   4 use constant FLAG_PACKET_HANDLER_DATA => 11;
  1         2  
  1         44  
284              
285             # FLAG_DATA_* values should be powers of 2 and start at 16, in order to fit
286             # into MASK_FLAG_DATA's bitmask.
287              
288             # Bit representing whether or not the contained data is to be enqueued (as
289             # opposed to overwritten).
290 1     1   6 use constant FLAG_DATA_ENQUEUE => 16;
  1         1  
  1         38  
291              
292             # Mask bits to locate the FLAG_DATA_* bits.
293 1     1   5 use constant MASK_FLAG_DATA => 0xf0;
  1         1  
  1         45  
294              
295             # Constants to improve readability. When adding these, pay attention to the
296             # regular expression for exporting these constants.
297 1     1   5 use constant BLOCK_NEVER => 0;
  1         1  
  1         39  
298 1     1   5 use constant BLOCK_UNTIL_CHILD => 1;
  1         1  
  1         65  
299 1     1   6 use constant BLOCK_UNTIL_DATA => 2;
  1         1  
  1         7336  
300              
301             {
302             my @packet_flags = map { /::([^:]+)$/; $1 }
303             grep( /^IPC::Fork::Simple::FLAG_PACKET_/, keys( %constant::declared ) );
304             my @block_flags = map { /::([^:]+)$/; $1 }
305             grep( /^IPC::Fork::Simple::BLOCK/, keys( %constant::declared ) );
306              
307             @ISA = ( 'Exporter' );
308             @EXPORT_OK = ( 'partition_list', @packet_flags, @block_flags );
309              
310             %EXPORT_TAGS = (
311             'packet_flags' => [@packet_flags],
312             'block_flags' => [@block_flags],
313             );
314             }
315              
316             sub ASSERT ($) {
317 0     0 0 0 my ( $cond ) = @_;
318              
319 0 0       0 if ( !$cond ) {
320 0         0 local $Carp::CarpLevel = 1;
321 0         0 confess "Assertion failed!";
322             }
323             }
324              
325             sub _new_defaults {
326 0     0   0 my ( $self ) = @_;
327              
328 0         0 $self->{'child_info'} = {};
329 0         0 $self->{'is_child'} = 0;
330 0         0 $self->{'finished_children'} = {};
331 0         0 $self->{'socket_to_cid'} = {};
332 0         0 $self->{'next_cid'} = 0;
333 0         0 $self->{'unauthenticated_clients'} = {};
334              
335             # Don't really need these here, they're just for my own knowledge.
336 0         0 $self->{'handler_port'} = undef;
337 0         0 $self->{'handler_socket'} = undef;
338 0         0 $self->{'handler_select'} = undef;
339 0         0 $self->{'handler_child_socket'} = undef;
340 0         0 $self->{'handler_pid'} = undef;
341 0         0 $self->{'is_handler_parent'} = undef;
342 0         0 $self->{'master_ip'} = '127.0.0.1';
343             }
344              
345             =head1 METHODS
346              
347             =head2 new
348              
349             Constructor for an IPC::Fork::Simple object. Takes no arguments. Returns an
350             IPC::Fork::Simple object on success, or die()'s on failure.
351              
352             =cut
353              
354             sub new {
355 0     0 1 0 my ( $class ) = @_;
356 0         0 my $self = {};
357 0         0 bless $self, $class;
358              
359 0         0 $self->_new_defaults();
360              
361 0         0 $self->{'master_socket'} =
362             IO::Socket::INET->new( Type => SOCK_STREAM, LocalAddr => $self->{'master_ip'}, Listen => 100 );
363 0 0       0 if ( !$self->{'master_socket'} ) {
364 0         0 die "Failed to create socket " . $self->{'master_port'} . ": $!";
365             }
366 0         0 $self->{'master_select'} = IO::Select->new( $self->{'master_socket'} );
367 0 0       0 if ( !$self->{'master_select'} ) {
368 0         0 die "Failed to create IO::Select object!";
369             }
370 0   0     0 $self->{'master_port'} = $self->{'master_socket'}->sockport() || die $!;
371 0         0 $self->{'shared_key'} = int( rand( 0xFFFFFFFF ) );
372              
373 0         0 return $self;
374             }
375              
376             =head2 new_child
377              
378             Constructor for an IPC::Fork::Simple child-only object, used for bi-
379             directional with a master.
380              
381             The first parameter is an opaque value containing master connection info as
382             returned by L on an existing IPC::Fork::Simple object.
383              
384             The second, optional, parameter is a symbolic name for this process. See
385             L for information on symbolic process names. If not set, defaults
386             to the process ID.
387              
388             =cut
389              
390             sub new_child {
391 0     0 1 0 my ( $class, $opaque, $symbolic ) = @_;
392 0         0 my $self = {};
393 0         0 bless $self, $class;
394              
395 0 0       0 return unless $opaque;
396              
397 0         0 $self->_new_defaults();
398              
399 0         0 my $connection_info = thaw( $opaque );
400              
401 0         0 $self->{'master_ip'} = $connection_info->{'ip'};
402 0         0 $self->{'master_port'} = $connection_info->{'port'};
403 0         0 $self->{'shared_key'} = $connection_info->{'shared_key'};
404 0 0       0 return unless $self->init_child( $symbolic );
405              
406 0         0 return $self;
407             }
408              
409             =head2 spawn_data_handler
410              
411             Only usable by the master.
412              
413             Runs the parent in data hander mode (see above). Causes the caller to
414             fork(), which may be undesirable in some circumstances. Calls die() on failure.
415              
416             =cut
417              
418             sub spawn_data_handler {
419 0     0 1 0 my ( $self ) = @_;
420 0 0       0 return if $self->{'is_child'};
421 0 0       0 return if $self->{'handler_pid'};
422              
423 0         0 local $SIG{'PIPE'} = 'IGNORE';
424              
425 0         0 $self->{'handler_socket'} =
426             IO::Socket::INET->new( Type => SOCK_STREAM, LocalAddr => $self->{'master_ip'}, Listen => 100 );
427 0 0       0 if ( !$self->{'handler_socket'} ) {
428 0         0 die "Failed to create socket " . $self->{'handler_port'} . ": $!";
429             }
430 0         0 $self->{'handler_ip'} = $self->{'master_ip'};
431 0   0     0 $self->{'handler_port'} = $self->{'handler_socket'}->sockport() || die $!;
432 0         0 $self->{'handler_select'} = IO::Select->new();
433 0 0       0 if ( !$self->{'handler_select'} ) {
434 0         0 die "Failed to create IO::Select object";
435             }
436              
437 0         0 my ( $rh, $wh );
438 0         0 pipe( $rh, $wh );
439 0         0 $self->{'handler_pid'} = fork();
440 0 0       0 if ( !defined $self->{'handler_pid'} ) {
441 0         0 die "Fork failed: $!";
442             }
443 0 0       0 if ( $self->{'handler_pid'} ) {
444 0         0 local $SIG{PIPE} = 'IGNORE';
445 0         0 undef $self->{'handler_child_socket'};
446 0         0 foreach my $s ( $self->{'master_select'}->handles ) {
447 0         0 close $s;
448             }
449 0         0 undef $self->{'master_select'};
450 0         0 undef $self->{'master_socket'};
451 0         0 $self->{'is_child'} = 0;
452 0         0 $self->{'is_handler_parent'} = 1;
453 0         0 close( $wh );
454 0         0 <$rh>;
455 0         0 close( $rh );
456 0   0     0 $self->{'handler_socket_comm'} = $self->{'handler_socket'}->accept()
457             || die "Accept failure... I don't know what to do! $!";
458 0         0 $self->{'handler_select'}->add( $self->{'handler_socket_comm'} );
459              
460             } else {
461              
462             sub _send_finished_children {
463 0     0   0 my ( $self ) = @_;
464 0         0 my $finished_child_data = '';
465              
466 0         0 foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0         0  
467 0 0       0 $finished_child_data .= pack(
468             HEADER_FINISHED_EACH_PACKING, # packing
469             $cid,
470             ( exists $self->{'finished_children'}->{$cid} ? 1 : 0 ),
471             length( $self->{'child_info'}->{$cid}->{'symbolic_name'} )
472             ) . $self->{'child_info'}->{$cid}->{'symbolic_name'};
473             }
474              
475 0         0 $self->{'handler_child_socket'}->send(
476             pack(
477             HEADER_PACKING . HEADER_FINISHED_PACKING, # packing
478             FLAG_PACKET_FINISHED_CHILDREN,
479 0 0       0 scalar( keys( %{ $self->{'child_info'} } ) )
480             )
481             . $finished_child_data
482             ) || die "Failed to report finished children to master: $!";
483              
484 0         0 $self->{'finished_children'} = {};
485             }
486              
487             sub _handler_data_to_socket {
488 0     0   0 my ( $socket, $info, $source_cid, $data_name, $queued ) = @_;
489              
490 0         0 my $source_symbolic_name = $info->{$source_cid}->{'symbolic_name'};
491 0         0 my $flags = FLAG_PACKET_HANDLER_DATA;
492 0         0 my $data;
493              
494 0 0       0 if ( $queued ) {
495 0         0 $flags |= FLAG_DATA_ENQUEUE;
496 0         0 $data = \$info->{$source_cid}->{'data_queue'}->{$data_name};
497             } else {
498 0         0 $data = \$info->{$source_cid}->{'data'}->{$data_name};
499             }
500              
501             my $r = $socket->send(
502             pack(
503             HEADER_PACKING . HEADER_HANDLER_DATA_PACKING, # packing
504             $flags,
505             $source_cid,
506             length( $source_symbolic_name ),
507             length( $data_name ),
508             length( ${$data} )
509             )
510             . $source_symbolic_name
511             . $data_name
512 0   0     0 . ${$data}
513             ) || die "Failed to send data to master: $!";
514              
515 0 0       0 return 0 unless $r;
516 0         0 return 1;
517             }
518              
519 0         0 $0 = 'data_handler';
520              
521 0         0 $self->{'handler_child_socket'} = IO::Socket::INET->new(
522             Type => SOCK_STREAM,
523             PeerAddr => $self->{'handler_ip'},
524             PeerPort => $self->{'handler_port'}
525             );
526 0 0       0 if ( !$self->{'handler_child_socket'} ) {
527 0         0 die "Failed to create client socket to " . $self->{'handler_port'} . ": $!";
528             }
529              
530 0         0 undef $self->{'handler_select'};
531 0         0 undef $self->{'handler_socket'};
532 0         0 undef $self->{'handler_socket_comm'};
533 0         0 undef $self->{'is_child'};
534              
535 0         0 $self->{'is_handler_parent'} = 0;
536 0         0 close( $wh );
537 0         0 close( $rh );
538              
539 0         0 $self->{'master_select'}->add( $self->{'handler_child_socket'} );
540              
541 0         0 while ( 1 ) {
542 0         0 my $r = $self->_data_from_socket( $self->{'master_select'}, BLOCK_UNTIL_CHILD );
543 0 0 0     0 if ( $r == FLAG_PACKET_ASK_FINISHED_CHILDREN ) {
    0 0        
    0 0        
      0        
544 0         0 $self->_send_finished_children();
545             } elsif ( ( $r == FLAG_PACKET_GET_CHILD_DATA )
546             || ( $r == FLAG_PACKET_GET_CHILD_DATA_AND_EXIT )
547             || ( $r == FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK )
548             || ( $r == FLAG_PACKET_GET_CHILD_DATA_BLOCK ) )
549             {
550              
551             # If we're exiting, gather all outstanding data first.
552 0 0 0     0 if ( $r == FLAG_PACKET_GET_CHILD_DATA_AND_EXIT ) {
    0          
553 0         0 while ( keys( %{ $self->{'socket_to_cid'} } ) ) {
  0         0  
554 0         0 $self->_data_from_socket( $self->{'master_select'}, BLOCK_UNTIL_CHILD );
555             }
556              
557             # If they only want us to return when we've got data...
558             } elsif ( ( $r == FLAG_PACKET_GET_CHILD_DATA_BLOCK )
559             || ( $r == FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK ) )
560             {
561 0         0 my $do_we_have_data;
562              
563             # Gymnastics to determine if we have something to share.
564             # If a child has sent data or exited, we can continue.
565             # Otherwise, block until we collect something. Remember,
566             # once we send something we delete it, so if we have
567             # anything at all we know it will be new to the master.
568 0         0 do {
569 0         0 $do_we_have_data = 0;
570 0 0       0 if ( scalar( keys( %{ $self->{'finished_children'} } ) ) ) {
  0 0       0  
571 0         0 $do_we_have_data = 1;
572             # Only check for new data as a condition to continue
573             # if the caller wants us to.
574             } elsif ( $r == FLAG_PACKET_GET_CHILD_DATA_BLOCK ) {
575 0         0 foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0         0  
576 0 0 0     0 if ( ( keys( %{ $self->{'child_info'}->{$cid}->{'data'} } ) )
  0         0  
  0         0  
577             || ( keys( %{ $self->{'child_info'}->{$cid}->{'data_queue'} } ) ) )
578             {
579 0         0 $do_we_have_data = 1;
580             }
581             }
582             }
583 0 0       0 if ( !$do_we_have_data ) {
584 0         0 $self->_data_from_socket( $self->{'master_select'}, BLOCK_UNTIL_DATA );
585             }
586             } until ( $do_we_have_data );
587             }
588              
589 0         0 $self->_send_finished_children();
590 0         0 foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0         0  
591 0         0 foreach my $data_name ( keys( %{ $self->{'child_info'}->{$cid}->{'data'} } ) ) {
  0         0  
592 0         0 _handler_data_to_socket(
593             $self->{'handler_child_socket'},
594             $self->{'child_info'},
595             $cid,
596             $data_name,
597             0 # send queued data?
598             );
599             }
600 0         0 foreach my $data_name ( keys( %{ $self->{'child_info'}->{$cid}->{'data_queue'} } ) ) {
  0         0  
601 0         0 _handler_data_to_socket(
602             $self->{'handler_child_socket'},
603             $self->{'child_info'},
604             $cid,
605             $data_name,
606             1 # send queued data?
607             );
608             }
609             }
610 0 0       0 $self->{'handler_child_socket'}
611             ->send( pack( HEADER_PACKING, FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT ) )
612             || die "Failed to report checkpoint to master: $!";
613 0 0       0 if ( $r == FLAG_PACKET_GET_CHILD_DATA_AND_EXIT ) {
614 0         0 last;
615             }
616              
617 0         0 foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0         0  
618 0         0 $self->{'child_info'}->{$cid}->{'data'} = {};
619 0         0 $self->{'child_info'}->{$cid}->{'data_queue'} = {};
620             }
621             } elsif ( ( $r != FLAG_PACKET_DATA ) && ( $r != FLAG_RETURN_CHILD_DISCONNECTED ) ) {
622 0         0 warn "Should not be here! Got packet for: $r";
623             }
624             }
625             # Data handler fork has done its job... exit!
626 0         0 exit 0;
627             }
628             }
629              
630             =head2 collect_data_from_handler
631              
632             Only usable by the master when using the data handler method.
633              
634             When using the data hander method of operation (see above), this function
635             will cause the data hander fork to return all data it has received from
636             children to the master and will cause the data hander to clear its cache
637             of child data.
638              
639             The first, optional, parameter defines whether or not the data handler
640             should stay running after returning all data. For backwards compatibility, the
641             default (false) is to exit after collecting all data.
642              
643             If this parameter is set to true, the data handler will not exit after the
644             data is sent, allowing the caller to collect data again at a later time.
645              
646             If this parameter is set to false, no more child processes will be able to
647             send data back to the master, as the data handler will have exited. This
648             should only be called after all children have ended.
649              
650             The second, optional, parameter is one of the BLOCK flags, as used by
651             L. See EXAMPLES for details on the meaning of these flags.
652              
653             =cut
654              
655             sub collect_data_from_handler {
656 0     0 1 0 my ( $self, $keep_alive, $block ) = @_;
657 0         0 my ( $r, $msg );
658              
659 0 0       0 if ( !$self->{'handler_pid'} ) { return; }
  0         0  
660 0         0 local $SIG{'PIPE'} = 'IGNORE';
661              
662 0 0       0 if ( $keep_alive ) {
663 0 0       0 if ( $block == BLOCK_NEVER ) {
    0          
    0          
664 0         0 $msg = FLAG_PACKET_GET_CHILD_DATA;
665             } elsif ( $block == BLOCK_UNTIL_DATA ) {
666 0         0 $msg = FLAG_PACKET_GET_CHILD_DATA_BLOCK;
667             } elsif ( $block == BLOCK_UNTIL_CHILD ) {
668 0         0 $msg = FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK;
669             } else {
670 0         0 carp "Invalid value for BLOCK!";
671             }
672             } else {
673 0         0 $msg = FLAG_PACKET_GET_CHILD_DATA_AND_EXIT;
674             }
675 0 0       0 $self->{'handler_socket_comm'}->send( pack( HEADER_PACKING, $msg ) )
676             || die "Failed to send data to data handler: $!";
677              
678             # _data_from_socket will return when
679             # FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT is received.
680 0         0 do {
681 0         0 $r = $self->_data_from_socket( $self->{'handler_select'}, BLOCK_UNTIL_CHILD );
682             } until ( $r == FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT );
683              
684 0 0       0 if ( !$keep_alive ) {
685             # _data_from_socket will return when the remote socket is closed.
686 0         0 $self->_data_from_socket( $self->{'handler_select'}, BLOCK_UNTIL_CHILD );
687 0         0 waitpid( $self->{'handler_pid'}, 0 );
688 0         0 $self->{'handler_pid'} = 0;
689             }
690 0         0 return 1;
691             }
692              
693             =head2 init_child
694              
695             Only usable by a child.
696              
697             Only to be called by a child after a fork, this method configured this
698             child for communication with the master (or data handler). Will die on failure.
699              
700             The first, optional, parameter is a symbolic name for this child with which
701             the master can retrieve data. Each child will automatically be assigned a
702             unique id (cid), but the optional symbolic name can be used to simplify
703             development. If not set, the symbolic name will be set to the process ID. The
704             symbolic name can not be a zero-length string.
705              
706             Note: If a symbolic name is re-used, fetching data by symbolic name will fetch
707             data for one randomly chosen child that shares that name. If symbolic names
708             will be re-used, it's suggested that data is fetched instead by cid.
709              
710             Be aware that PIDs, the default symbolic name, may be re-used on a system,
711             leading to a collision of symbolic names. In order to avoid this issue, do not
712             call wait (or otherwise reap the child process) until you have fetched (and
713             then cleared) all of its data. Alternately, address child processes by cid
714             instead.
715              
716             =cut
717              
718             sub init_child {
719 0     0 1 0 my ( $self, $symbolic_name ) = @_;
720              
721             # We can't really protect against being called on the master...
722 0 0       0 return if $self->{'is_child'};
723 0         0 local $SIG{'PIPE'} = 'IGNORE';
724 0         0 delete $self->{'master_socket'};
725 0         0 delete $self->{'child_info'};
726              
727 0 0 0     0 if ( ( !defined $symbolic_name ) || ( length( $symbolic_name ) == 0 ) ) {
728 0         0 $symbolic_name = $$;
729             }
730              
731 0         0 $self->{'symbolic_name'} = $symbolic_name;
732 0         0 $self->{'is_child'} = 1;
733 0         0 $self->{'child_socket'} =
734             IO::Socket::INET->new( Type => SOCK_STREAM, PeerAddr => $self->{'master_ip'}, PeerPort => $self->{'master_port'} );
735 0 0       0 if ( !$self->{'child_socket'} ) {
736 0         0 die "Failed to connect to master socket " . $self->{'master_port'} . ": $!";
737             }
738              
739 0 0       0 $self->{'child_socket'}->send(
740             pack(
741             HEADER_PACKING . HEADER_CHILD_HELLO_PACKING, # Packing
742             FLAG_PACKET_CHILD_HELLO,
743             $self->{'shared_key'},
744             length( $self->{'symbolic_name'} )
745             )
746             . $self->{'symbolic_name'}
747             ) || die "Failed to send data to master: $!";
748 0         0 return 1;
749             }
750              
751             =head2 to_master
752              
753             Only usable by a child.
754              
755             Sends data to the master (or data handler). Takes two parameters, the first a
756             string, used as a symbolic name for the data by which it will be retrieved. The
757             second parameter is the data (a scalar) that should be sent. Data can be in any
758             format understandable by L, however since this data is sent between
759             forks, data containing filehandles should not be passed.
760              
761             =cut
762              
763             sub to_master {
764 0     0 1 0 my ( $self, $name, $data ) = @_;
765 0 0       0 return unless $self->{'is_child'};
766 0 0       0 if ( !$self->{'child_socket'} ) { die "Must call init_child before sending data!"; }
  0         0  
767             # Last parameter here says not to enqueue the data.
768 0         0 return $self->_data_to_socket( $self->{'child_socket'}, $name, $data, 0 );
769             }
770              
771             =head2 push_to_master
772              
773             Only usable by a child.
774              
775             Pushes data into a queue sent to the master. Unlike L, data sent with
776             L is not overwritten, but appended to, much like when working
777             with an array. Function semantics are otherwise identical to L.
778              
779             The first parameter is the symbolic name for the data, and the second is a
780             reference to the data that will be sent.
781              
782             =cut
783              
784             sub push_to_master {
785 0     0 1 0 my ( $self, $name, $data ) = @_;
786 0 0       0 return unless $self->{'is_child'};
787 0 0       0 if ( !$self->{'child_socket'} ) { die "Must call init_child before sending data!"; }
  0         0  
788 0         0 return $self->_data_to_socket( $self->{'child_socket'}, $name, $data, FLAG_DATA_ENQUEUE );
789             }
790              
791             =head2 from_cid
792              
793             Only usable by the master.
794              
795             Retrieves data from a child after the child has sent it. Takes two parameters,
796             the first is the cid from which the data was sent, and the second is a symbolic
797             name (a string) for the data, which the child specified when the data was sent.
798              
799             Returns nothing if no data is available, or a reference to whatever data the
800             child sent. Note: You may need to use ref() in order to determine the type of
801             the data sent.
802              
803             =cut
804              
805             sub from_cid {
806 0     0 1 0 my ( $self, $cid, $name ) = @_;
807 0 0 0     0 if ( ( $self->{'is_child'} )
      0        
808             || ( !$self->{'child_info'}->{$cid} )
809             || ( !$self->{'child_info'}->{$cid}->{'data'} ) )
810             {
811 0         0 return;
812             }
813 0         0 return $self->{'child_info'}->{$cid}->{'data'}->{$name};
814             }
815              
816             =head2 from_child
817              
818             Only usable by the master.
819              
820             Semantics are the same as L, but searches by symbolic name instead
821             of cid.
822              
823             =cut
824              
825             sub from_child {
826 0     0 1 0 my ( $self, $sn, $name ) = @_;
827 0 0       0 return if ( $self->{'is_child'} );
828              
829 0         0 my $cid = $self->_find_cid_for_symbolic_name( $sn );
830 0 0       0 return unless defined $cid;
831 0         0 return $self->from_cid( $cid, $name );
832             }
833              
834             =head2 pop_from_cid
835              
836             Only usable by the master.
837              
838             Retrieves pushed data from a child after the child has sent it. Takes two
839             parameters, the first is the cid from which the data was sent, and the second is
840             a symbolic name (a string) for the data, which the child specified when the data
841             was sent.
842              
843             Called in scalar context, returns nothing if no data is available, or a
844             reference to the oldest data the child pushed. Called in array context, returns
845             an empty array if no data is available, or an array of references to the data
846             pushed by the child, ordered oldest to most recent.
847              
848             After the data is returned, it is removed from the internal list, so a
849             subsequent call to L will return the next oldest set of data.
850             Note: You may need to use ref() in order to determine the type of the data sent.
851              
852             =cut
853              
854             sub pop_from_cid {
855 0     0 1 0 my ( $self, $cid, $name ) = @_;
856 0 0 0     0 if ( ( $self->{'is_child'} )
      0        
      0        
857             || ( !$self->{'child_info'}->{$cid} )
858             || ( !$self->{'child_info'}->{$cid}->{'data_queue'} )
859             || ( !$self->{'child_info'}->{$cid}->{'data_queue'}->{$name} ) )
860             {
861 0         0 return;
862             }
863              
864 0 0       0 if ( wantarray ) {
865 0         0 my @r = @{ $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} };
  0         0  
866 0         0 $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} = [];
867 0         0 return @r;
868             } else {
869 0         0 return shift @{ $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} };
  0         0  
870             }
871             }
872              
873             =head2 pop_from_child
874              
875             Only usable by the master.
876              
877             Semantics are the same as L, but searches by symbolic name
878             instead of cid.
879              
880             =cut
881              
882             sub pop_from_child {
883 0     0 1 0 my ( $self, $sn, $name ) = @_;
884 0 0       0 return if $self->{'is_child'};
885              
886 0         0 my $cid = $self->_find_cid_for_symbolic_name( $sn );
887 0 0       0 return unless defined $cid;
888              
889 0 0       0 if ( wantarray ) {
890 0         0 my @r = $self->pop_from_cid( $cid, $name );
891 0         0 return @r;
892             } else {
893 0         0 my $r = $self->pop_from_cid( $cid, $name );
894 0         0 return $r;
895             }
896             }
897              
898             =head2 finished_children
899              
900             Only usable by the master.
901              
902             In scalar context, returns the number of children who have finished.
903              
904             In array contaxt and the first, optional, parameter is true, returns a hash of
905             cid-to-symbolic name mappings for these children. If the first parameter is not
906             set, or is false, returns a list of CIDs that have finished.
907              
908             =cut
909              
910             sub finished_children {
911 0     0 1 0 my ( $self, $as_hash ) = @_;
912              
913             # We're the parent of a running handler fork, so we need to ask the
914             # handler to return the current total to us.
915 0 0 0     0 if ( ( $self->{'is_handler_parent'} ) && ( $self->{'handler_pid'} ) ) {
916 0         0 $self->_do_finished_children_request();
917             }
918              
919 0 0       0 if ( wantarray ) {
920 0 0       0 if ( $as_hash ) {
921 0         0 return %{ $self->{'finished_children'} };
  0         0  
922             } else {
923 0         0 return keys( %{ $self->{'finished_children'} } );
  0         0  
924             }
925             } else {
926 0         0 return scalar( keys( %{ $self->{'finished_children'} } ) );
  0         0  
927             }
928             }
929              
930             =head2 running_children
931              
932             Only usable by the master.
933              
934             In scalar context, returns the number of children who have called
935             L but have not yet ended.
936              
937             In array contaxt and the first, optional, parameter is true, returns a hash of
938             cid-to-symbolic name mappings for these children. If the first parameter is
939             not set, or is false, returns a list of CIDs that have not yet finished.
940              
941             =cut
942              
943             sub running_children {
944 0     0 1 0 my ( $self, $as_hash ) = @_;
945              
946             # We're the parent of a running handler fork, so we need to ask the
947             # handler to return the current total to us.
948 0 0 0     0 if ( ( $self->{'is_handler_parent'} ) && ( $self->{'handler_pid'} ) ) {
949 0         0 $self->_do_finished_children_request();
950             }
951              
952 0         0 my %running_children;
953              
954 0         0 foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0         0  
955 0 0       0 if ( !exists $self->{'finished_children'} ) {
956 0         0 $running_children{$cid} = $self->{'child_info'}->{'symbolic_name'};
957             }
958             }
959              
960 0 0       0 if ( wantarray ) {
961 0 0       0 if ( $as_hash ) {
962 0         0 return %running_children;
963             } else {
964 0         0 return keys( %running_children );
965             }
966             } else {
967 0         0 return scalar( keys( %running_children ) );
968             }
969             }
970              
971             =head2 process_child_data
972              
973             Only usable by the master when using the blocking wait and polling methods.
974              
975             Processes data from all children. Takes a single parameter, a BLOCK flag that
976             determines if, and how, L should block. See the EXPORTS
977             section for details on these flags.
978              
979             L and L can be called between calls
980             to process_child_data, but there is no guarantee there will be any data
981             available.
982              
983             If L is not called often or fast enough, children will be
984             forced to block on calls to L, and data loss is possible.
985              
986             Returns a FLAG_PACKET flag describing the last child action. See the EXPORTS
987             section for details on these flags.
988              
989             =cut
990              
991             sub process_child_data {
992 0     0 1 0 my ( $self, $block ) = @_;
993 0 0       0 return if $self->{'is_child'};
994 0 0       0 return if $self->{'handler_pid'};
995 0         0 return $self->_data_from_socket( $self->{'master_select'}, $block );
996             }
997              
998             =head2 clear_finished_children
999              
1000             Only usable by the master.
1001              
1002             Deletes the master's copy of the list of children who have ended. If a data
1003             handler is being used, its copy of the list is not affected.
1004              
1005             The only optional parameter is the list of child PIDs to remove data for. If
1006             specified, only the entries for those specified children will be removed. If no
1007             list is passed, then all data will be cleared.
1008              
1009             =cut
1010              
1011             sub clear_finished_children {
1012 0     0 1 0 my ( $self, @children ) = @_;
1013 0 0       0 if ( @children ) {
1014 0         0 foreach my $c ( @children ) {
1015 0         0 delete $self->{'finished_children'}->{$c};
1016             }
1017             } else {
1018 0         0 $self->{'finished_children'} = {};
1019             }
1020             }
1021              
1022             =head2 clear_child_data
1023              
1024             Only usable by the master.
1025              
1026             Deletes the master's copy of the data (standard and enqueued) children who have
1027             ended. If a data handler is being used, its copy of the lists are not affected.
1028              
1029             The only optional parameter is the list of child PIDs to remove data for. If
1030             specified, only the entries for those specified children will be removed. If no
1031             list is passed, then all data will be cleared.
1032              
1033             =cut
1034              
1035             sub clear_child_data {
1036 0     0 1 0 my ( $self, @children ) = @_;
1037 0 0       0 if ( @children ) {
1038 0         0 foreach my $c ( @children ) {
1039 0         0 delete $self->{'child_info'}->{$c};
1040             }
1041             } else {
1042 0         0 $self->{'child_info'} = {};
1043             }
1044             }
1045              
1046             =head2 get_connection_info
1047              
1048             Only usable by the master.
1049              
1050             Retrieves an opaque value representing connection data for this object (or its
1051             data handler). Only useful to pass into L.
1052              
1053             =cut
1054              
1055             sub get_connection_info {
1056 0     0 1 0 my ( $self ) = @_;
1057              
1058 0 0       0 return if $self->{'is_child'};
1059              
1060 0         0 return freeze(
1061             {
1062             'port' => $self->{'master_port'},
1063             'ip' => $self->{'master_ip'},
1064             'shared_key' => $self->{'shared_key'},
1065             }
1066             );
1067             }
1068              
1069             =head2 get_waitable_fds
1070              
1071             Only usable by the master.
1072              
1073             Returns an array of any waitable/important filehandles. Useful if the caller
1074             wants to implement his own loop and only call IPC::Fork::Simple methods when
1075             there is data waiting for IPC::Fork::Simple. The caller could select on the
1076             list of returned handles here and if one is readable, then call the appropriate
1077             IPC::Fork::Simple method and to allow the module to handle its data.
1078              
1079             =cut
1080              
1081             sub get_waitable_fds {
1082 0     0 1 0 my ( $self ) = @_;
1083              
1084 0 0       0 return () if $self->{'is_child'};
1085              
1086 0 0       0 if ( $self->{'is_handler_parent'} ) {
1087 0         0 return $self->{'handler_select'}->handles();
1088             } else {
1089 0         0 return $self->{'master_select'}->handles();
1090             }
1091             }
1092              
1093             ### Exportable functions
1094              
1095             =head1 USEFUL FUNCTIONS
1096              
1097             Included with IPC::Fork::Simple are some helpful functions. These are not
1098             exported by default. Note, these are not methods, they are standard functions.
1099             They must be called directly and not as methods on an IPC::Fork::Simple object.
1100              
1101             =head2 partition_list
1102              
1103             Partitions a list of length L into N pieces as evenly as possible. If even
1104             partitioning is not possible, the first L % N elements will be one element
1105             larger than the rest.
1106              
1107             The first parameter is the number of partitions (N), the second is an array
1108             reference to the data to partition. An array of N array references will be
1109             returned. If this value is <= 1, a single element array containing a copy of
1110             the list is returned.
1111              
1112             Example:
1113              
1114             @r = partition_list( 3, [1..10] );
1115             # @r is now: [ 1, 2, 3, 4 ], [ 5, 6, 7 ], [ 8, 9, 10 ]
1116              
1117             =cut
1118              
1119             sub partition_list {
1120 6     6 1 419 my ( $count, $list ) = @_;
1121 6 50       17 die "Invalid parameters" if ref $count;
1122 6 100       15 return ( [@{$list}] ) unless $count > 1;
  1         12  
1123              
1124 5         5 my @final;
1125 5         7 my $start = 0;
1126 5         7 my $size_of_partition;
1127             my $leftover;
1128 0         0 my $i;
1129              
1130 5 100       7 if ( $count < scalar( @{$list} ) ) {
  5         14  
1131 3         4 $size_of_partition = int( scalar( @{$list} ) / $count );
  3         7  
1132 3         4 $leftover = scalar( @{$list} ) % $count;
  3         7  
1133 3 100       8 if ( $leftover ) {
1134 2         8 $size_of_partition++;
1135             }
1136             } else {
1137 2         4 $size_of_partition = 1;
1138             }
1139              
1140 5         14 for ( $i = 0; $i < $count; $i++ ) {
1141 15 100       18 if ( $start >= scalar( @{$list} ) ) {
  15         29  
1142 4         12 $final[$i] = [];
1143             } else {
1144             # This is weird syntax for getting an array slice out of an arrayref.
1145 11         20 $final[$i] = [@{$list}[$start .. $start + $size_of_partition - 1]];
  11         39  
1146 11         16 $start += $size_of_partition;
1147 11 100       31 if ( $leftover ) {
1148 3         4 $leftover--;
1149 3 100       12 if ( $leftover == 0 ) {
1150 2         6 $size_of_partition--;
1151             }
1152             }
1153             }
1154             }
1155              
1156 5         41 return @final;
1157             }
1158              
1159             ### End of public methods, begin private stuff...
1160              
1161             # Send data to our parent, which could be a master or a data handler. The
1162             # caller is expected to know which and set the appropriate flags.
1163             sub _data_to_socket {
1164 0     0     my ( $self, $socket, $name, $data, $data_flags ) = @_;
1165 0           local $SIG{'PIPE'} = 'IGNORE';
1166 0           $data = freeze( \$data );
1167              
1168 0 0         if ( !defined $data_flags ) {
1169 0           $data_flags = 0;
1170             }
1171              
1172 0           my $flags = ( FLAG_PACKET_DATA | $data_flags );
1173              
1174 0   0       my $r = $socket->send(
1175             pack( HEADER_PACKING . HEADER_DATA_PACKING, $flags, length( $name ), length( $data ) ) . $name . $data )
1176             || die "Failed to send data to socket: $!";
1177              
1178 0 0         return $r ? 1 : 0;
1179             }
1180              
1181             # Waits on a socket for data, or a child to disconnect. Expects caller to know
1182             # whether or not to unwrap the received data (if the client is a master).
1183             # Returns the FLAG_PACKET_* type of the packet received, usually FLAG_PACKET_DATA,
1184             # unless it's called in blocking mode
1185             sub _data_from_socket {
1186 0     0     my ( $self, $select, $block ) = @_;
1187 0           my $data;
1188              
1189             my $disconnect_client = sub {
1190 0     0     my ( $s ) = @_;
1191 0           $select->remove( $s );
1192 0 0         if ( defined $self->{'socket_to_cid'}->{$s} ) {
1193             # Don't register a "finished child" if it's the data handler that
1194             # exited. handler_socket_comm is only set on a master.
1195 0 0 0       if ( ( !$self->{'handler_socket_comm'} )
1196             || ( $s != $self->{'handler_socket_comm'} ) )
1197             {
1198 0           $self->{'finished_children'}->{ $self->{'socket_to_cid'}->{$s} } =
1199             $self->{'child_info'}->{ $self->{'socket_to_cid'}->{$s} }->{'symbolic_name'};
1200             }
1201 0           delete $self->{'socket_to_cid'}->{$s};
1202             }
1203 0           delete $self->{'unauthenticated_clients'}->{$s};
1204 0           $s->close();
1205 0           };
1206              
1207             my $flush_unauthenticated_clients = sub {
1208 0     0     my $start_ts = time();
1209 0           while ( my ( $k, $v ) = each( %{ $self->{'unauthenticated_clients'} } ) ) {
  0            
1210 0 0         if ( $start_ts > $v->{'ts'} + CLIENT_AUTHENTICATION_TIME ) {
1211 0           $disconnect_client->( $v->{'socket'} );
1212             }
1213             }
1214 0           };
1215              
1216             my $VALIDATE = sub {
1217 0     0     my ( $s, $cond ) = @_;
1218              
1219 0 0         if ( !$cond ) {
1220 0           $disconnect_client->( $s );
1221 0           return undef;
1222             }
1223 0           return 1;
1224 0           };
1225              
1226             my $recv_more = sub {
1227 0     0     my ( $socket, $more ) = @_;
1228 0           my $data = '';
1229              
1230 0           while ( length( $data ) < $more ) {
1231 0           my $r;
1232 0           $socket->recv( $r, $more - length( $data ) );
1233 0 0 0       if ( ( !defined $r ) || ( length( $r ) == 0 ) ) {
1234 0           $disconnect_client->( $socket );
1235 0           return undef;
1236             }
1237 0           $data .= $r;
1238             }
1239              
1240             # Not necessary, but we can keep it in case something goes awry above.
1241 0 0 0       if ( ( !defined $data ) || ( length( $data ) != $more ) ) {
1242 0           $disconnect_client->( $socket );
1243 0           return undef;
1244             }
1245              
1246 0           if ( DEBUG ) {
1247             my @guessconst;
1248             foreach my $c ( keys( %constant::declared ) ) {
1249             if ( $c =~ /::HEADER_.+_SIZE$/ ) {
1250             if ( length( $data ) == eval $c ) {
1251             $c =~ s/^.+:://;
1252             push @guessconst, $c;
1253             }
1254             }
1255             }
1256             warn "Read "
1257             . length( $data )
1258             . " bytes ("
1259             . join( ',', @guessconst ) . "?)\n"
1260             . hexdump( data => $data ) . "\n";
1261             }
1262 0           return $data;
1263 0           };
1264              
1265             # Wrap the select in a do/while loop so we restart after catching any
1266             # signals, regardless of any signal handlers the caller may have
1267             # installed. By using a do/while loop, we're guaranteed to run at least
1268             # once, even if we're set not to block.
1269 0   0       do {
1270             # Passing 'undef' will block indefinitely. Passing 0 will not block. We
1271             # accept a few different BLOCK values here, so what we're saying is to
1272             # pass undef (ie, block) if we're in any mode other than BLOCK_NEVER.
1273             # This probably should be re-written to be clearer.
1274 0 0         while ( my @ready = $select->can_read( ( $block != BLOCK_NEVER ? undef : 0 ) ) ) {
1275              
1276             # Only a data handler has a handler_child_socket.
1277             # Process requests from the master last, to insure we have the
1278             # most up-to-data data from our children.
1279 0 0         if ( $self->{'handler_child_socket'} ) {
1280             # Intentionally skip the last element of @ready here!
1281 0           for ( my $i = 0; $i < $#ready; $i++ ) {
1282 0 0         if ( $ready[$i] == $self->{'handler_child_socket'} ) {
1283 0           $ready[$i] = $ready[$#ready];
1284 0           $ready[$#ready] = $self->{'handler_child_socket'};
1285             }
1286             }
1287             }
1288              
1289 0           foreach my $s ( @ready ) {
1290 0 0 0       if ( ( $self->{'master_socket'} ) && ( $s == $self->{'master_socket'} ) ) {
1291 0           my $new_sock = $s->accept();
1292 0 0         next unless $new_sock;
1293 0           $select->add( $new_sock );
1294 0           $flush_unauthenticated_clients->();
1295 0           $self->{'unauthenticated_clients'}->{$new_sock} = {
1296             sock => $new_sock,
1297             ts => time(),
1298             };
1299             } else {
1300 0           $data = $recv_more->( $s, HEADER_SIZE );
1301 0 0         if ( !defined $data ) {
1302 0 0         if ( $self->{'unauthenticated_clients'}->{$s} ) {
1303             # This isn't a condition the caller should care
1304             # about.
1305 0           next;
1306             }
1307 0           return FLAG_RETURN_CHILD_DISCONNECTED;
1308             }
1309              
1310 0           my ( $flags ) = unpack( HEADER_PACKING, $data );
1311 0           my $data_flags = ( $flags & MASK_FLAG_DATA );
1312 0           $flags = ( $flags & ~MASK_FLAG_DATA );
1313              
1314 0 0 0       if ( $flags == FLAG_PACKET_ASK_FINISHED_CHILDREN
      0        
      0        
      0        
      0        
1315             || $flags == FLAG_PACKET_GET_CHILD_DATA
1316             || $flags == FLAG_PACKET_GET_CHILD_DATA_AND_EXIT
1317             || $flags == FLAG_PACKET_GET_CHILD_DATA_BLOCK
1318             || $flags == FLAG_PACKET_GET_CHILD_DATA_FINISHED_BLOCK
1319             || $flags == FLAG_PACKET_DATA_HANDLER_DATA_CHECKPOINT )
1320             {
1321 0           return $flags;
1322             }
1323              
1324 0 0         if ( $flags == FLAG_PACKET_CHILD_HELLO ) {
    0          
    0          
    0          
1325             # Okay, lets get the length of the child's symbolic name.
1326 0           $data = $recv_more->( $s, HEADER_CHILD_HELLO_ADDITIONAL_SIZE );
1327 0 0         if ( !defined $data ) {
1328 0 0         if ( $self->{'unauthenticated_clients'}->{$s} ) {
1329             # This isn't a condition the caller should care
1330             # about.
1331 0           next;
1332             }
1333 0           return FLAG_RETURN_CHILD_DISCONNECTED;
1334             }
1335              
1336             # Unpack the shared key and symbolic name length.
1337 0           my ( $proposed_key, $name_len ) = unpack( HEADER_CHILD_HELLO_PACKING, $data );
1338 0 0         next unless $VALIDATE->( $s, $name_len > 0 );
1339 0 0         next unless $VALIDATE->( $s, $proposed_key == $self->{'shared_key'} );
1340 0           delete $self->{'unauthenticated_clients'}->{$s};
1341              
1342 0           $data = $recv_more->( $s, $name_len );
1343 0 0         next unless $VALIDATE->( $s, defined $data );
1344              
1345 0           $self->{'socket_to_cid'}->{$s} = $self->{'next_cid'};
1346 0           $self->{'child_info'}->{ $self->{'next_cid'} } = {
1347             'symbolic_name' => $data,
1348             'data' => {},
1349             'data_queue' => {},
1350             };
1351 0           $self->{'next_cid'}++;
1352              
1353             } elsif ( $flags == FLAG_PACKET_DATA ) {
1354 0           $data = $recv_more->( $s, HEADER_DATA_ADDITIONAL_SIZE );
1355 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1356              
1357 0           my ( $namelen, $datalen ) = unpack( HEADER_DATA_PACKING, $data );
1358              
1359 0           ASSERT( defined $self->{'socket_to_cid'}->{$s} );
1360 0           my $cid = $self->{'socket_to_cid'}->{$s};
1361              
1362 0 0 0       if ( !$namelen || !$datalen ) {
1363 0           warn "Got badly formatted data from child.";
1364 0           next;
1365             }
1366              
1367 0           $data = $recv_more->( $s, $namelen + $datalen );
1368 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1369              
1370 0           my $name = substr( $data, 0, $namelen );
1371 0           $data = substr( $data, $namelen );
1372              
1373             # If we have a handler_child_socket then we are a data
1374             # handler, so we should not thaw or unthaw data.
1375 0 0         if ( !$self->{'handler_child_socket'} ) {
1376 0           $data = thaw( $data );
1377             }
1378              
1379 0 0         if ( $data_flags & FLAG_DATA_ENQUEUE ) {
1380 0 0         if ( !exists $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} ) {
1381 0           $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} = [];
1382             }
1383 0           push @{ $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} }, $data;
  0            
1384             } else {
1385 0           $self->{'child_info'}->{$cid}->{'data'}->{$name} = $data;
1386             }
1387 0 0         if ( $block == BLOCK_UNTIL_DATA ) { return FLAG_PACKET_DATA; }
  0            
1388              
1389             } elsif ( $flags == FLAG_PACKET_HANDLER_DATA ) {
1390 0           $data = $recv_more->( $s, HEADER_HANDLER_DATA_ADDITIONAL_SIZE );
1391 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1392              
1393 0           my ( $cid, $symboliclen, $namelen, $datalen ) = unpack( HEADER_HANDLER_DATA_PACKING, $data );
1394              
1395 0 0 0       if ( !$namelen || !$datalen || !$symboliclen ) {
      0        
1396 0           warn "Got badly formatted data from child.";
1397 0           next;
1398             }
1399              
1400 0           $data = $recv_more->( $s, $namelen + $datalen + $symboliclen );
1401 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1402              
1403 0           my $symbolic = substr( $data, 0, $symboliclen );
1404 0           my $name = substr( $data, $symboliclen, $namelen );
1405 0           $data = substr( $data, $namelen + $symboliclen );
1406              
1407             # Only a master of a data handler receives this flag,
1408             # so we always thaw.
1409 0           $data = thaw( $data );
1410              
1411 0 0         if ( $data_flags & FLAG_DATA_ENQUEUE ) {
1412 0 0         if ( !exists $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} ) {
1413 0           $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} = [];
1414             }
1415 0           push @{ $self->{'child_info'}->{$cid}->{'data_queue'}->{$name} }, $data;
  0            
1416             } else {
1417 0           $self->{'child_info'}->{$cid}->{'data'}->{$name} = $data;
1418             }
1419 0 0         if ( $block == BLOCK_UNTIL_DATA ) { return FLAG_PACKET_DATA; }
  0            
1420              
1421             } elsif ( $flags == FLAG_PACKET_FINISHED_CHILDREN ) {
1422 0           $data = $recv_more->( $s, HEADER_FINISHED_ADDITIONAL_SIZE );
1423 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1424              
1425 0           my ( $count ) = unpack( HEADER_FINISHED_PACKING, $data );
1426 0           while ( $count-- ) {
1427 0           $data = $recv_more->( $s, HEADER_FINISHED_EACH_ADDITIONAL_SIZE );
1428 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1429              
1430 0           my ( $finished_cid, $is_finished, $symbolic_name_length ) =
1431             unpack( HEADER_FINISHED_EACH_PACKING, $data );
1432              
1433 0           $data = $recv_more->( $s, $symbolic_name_length );
1434 0 0         return FLAG_RETURN_CHILD_DISCONNECTED if !defined $data;
1435              
1436 0 0         if ( $is_finished ) {
1437 0           $self->{'finished_children'}->{$finished_cid} = $data;
1438             }
1439              
1440 0 0         if ( !exists $self->{'child_info'}->{$finished_cid} ) {
1441 0           $self->{'child_info'}->{$finished_cid} = {
1442             'data' => {},
1443             'data_queue' => {},
1444             };
1445             }
1446 0           $self->{'child_info'}->{$finished_cid}->{'symbolic_name'} = $data;
1447             }
1448              
1449 0           return FLAG_PACKET_FINISHED_CHILDREN;
1450             } else {
1451 0 0         if ( !exists $self->{'unauthenticated_clients'} ) {
1452 0           warn "Got packet type ($flags) that I don't know how to handle!";
1453             } else {
1454 0           $disconnect_client->( $s );
1455             }
1456 0           next;
1457             }
1458             #next;
1459             }
1460             }
1461             }
1462              
1463 0 0         if ( $select->count == 0 ) {
1464             # Technically, this could be hit any time we lose all other forks,
1465             # but various code paths have us only reaching this point when we
1466             # go to request data from the handler and he's not there any more.
1467 0           die "Data handler exited unexpectedly!";
1468             }
1469             } while ( ( $block == BLOCK_UNTIL_CHILD )
1470             || ( $block == BLOCK_UNTIL_DATA ) );
1471 0           return FLAG_RETURN_NOERROR;
1472             }
1473              
1474             sub _find_cid_for_symbolic_name {
1475 0     0     my ( $self, $name ) = @_;
1476              
1477 0           if ( DEBUG ) {
1478             warn Dumper( $self->{'child_info'} );
1479             }
1480              
1481 0           foreach my $cid ( keys( %{ $self->{'child_info'} } ) ) {
  0            
1482 0 0         return $cid if $self->{'child_info'}->{$cid}->{'symbolic_name'} eq $name;
1483             }
1484 0           return undef;
1485             }
1486              
1487             # This will update running children too.
1488             sub _do_finished_children_request {
1489 0     0     my ( $self ) = @_;
1490 0           local $SIG{'PIPE'} = 'IGNORE';
1491 0           my $r;
1492              
1493 0 0         $self->{'handler_socket_comm'}->send( pack( HEADER_PACKING, FLAG_PACKET_ASK_FINISHED_CHILDREN ) )
1494             || die "Failed to send data to parent: $$ -> $!";
1495 0           do {
1496 0           $r = $self->_data_from_socket( $self->{'handler_select'}, BLOCK_UNTIL_CHILD );
1497             } while ( $r != FLAG_PACKET_FINISHED_CHILDREN );
1498             }
1499              
1500             =head1 EXAMPLES
1501              
1502             =head2 Data Handler
1503              
1504             use warnings;
1505             use strict;
1506            
1507             use IPC::Fork::Simple;
1508            
1509             my $ipc = IPC::Fork::Simple->new();
1510             my $pid = fork();
1511            
1512             if ( $pid ) {
1513             $ipc->spawn_data_handler();
1514             waitpid( $pid, 0 );
1515             $ipc->collect_data_from_handler();
1516             warn length(${$ipc->from_child( $pid, 'test' )});
1517             } else {
1518             $ipc->init_child();
1519             $ipc->to_master( 'test', 'a' x 300 ) || die $!;
1520             }
1521              
1522             =head2 Blocking
1523              
1524             use warnings;
1525             use strict;
1526            
1527             use IPC::Fork::Simple;
1528             use POSIX ":sys_wait_h";
1529            
1530             my $ipc = IPC::Fork::Simple->new();
1531            
1532             my $pid = fork();
1533             die 'stupid fork' unless defined $pid;
1534            
1535             if ( $pid ) {
1536             $ipc->process_child_data(1);
1537             my @finished = $ipc->finished_children();
1538             die unless 1 == scalar( $ipc->finished_children() );
1539             die unless 300 == length(${$ipc->from_child( $pid, 'test' )});
1540             die unless 300 == length(${$ipc->from_cid( $finished[0], 'test' )});
1541             } else {
1542             $ipc->init_child();
1543             $ipc->to_master( 'test', 'a' x 300 ) || die $!;
1544             }
1545            
1546             =head2 Polling
1547              
1548             use warnings;
1549             use strict;
1550            
1551             use IPC::Fork::Simple;
1552             use POSIX ":sys_wait_h";
1553            
1554             my $ipc = IPC::Fork::Simple->new();
1555             my $pid = fork();
1556            
1557             if ( $pid ) {
1558             while ( !$ipc->finished_children() ) {
1559             $ipc->process_child_data(0);
1560             waitpid( -1, WNOHANG );
1561             sleep(0);
1562             }
1563             warn length(${$ipc->from_child( $pid, 'test' )});
1564             } else {
1565             $ipc->init_child();
1566             $ipc->to_master( 'test', 'a' x 300 ) || die $!;
1567             }
1568              
1569             =head2 Data queues
1570              
1571             use warnings;
1572             use strict;
1573            
1574             use IPC::Fork::Simple;
1575            
1576             my $ipc = IPC::Fork::Simple->new();
1577            
1578             my $pid = fork();
1579             die 'stupid fork' unless defined $pid;
1580            
1581             if ( $pid ) {
1582             $ipc->process_child_data(1);
1583             die unless 300 == length(${$ipc->pop_from_child( $pid, 'test' )});
1584             die unless 301 == length(${$ipc->pop_from_child( $pid, 'test' )});
1585             die unless 302 == length(${$ipc->pop_from_child( $pid, 'test' )});
1586             } else {
1587             $ipc->init_child();
1588             $ipc->push_to_master( 'test', 'a' x 300 ) || die $!;
1589             $ipc->push_to_master( 'test', 'b' x 301 ) || die $!;
1590             $ipc->push_to_master( 'test', 'c' x 302 ) || die $!;
1591             }
1592              
1593             =head2 Bi-directional communication
1594              
1595             use warnings;
1596             use strict;
1597            
1598             use IPC::Fork::Simple qw/:block_flags/;
1599            
1600             my $ipc = IPC::Fork::Simple->new();
1601             my $master_pid = $$;
1602             my $pid = fork();
1603             die 'stupid fork' unless defined $pid;
1604            
1605             if ( $pid ) {
1606             $ipc->process_child_data(BLOCK_UNTIL_DATA);
1607             my $child_connection_data = $ipc->from_child( $pid, 'connection_info' );
1608             my $ipc2 = IPC::Fork::Simple->new_child( ${$child_connection_data} ) || die;
1609             $ipc2->to_master( 'master_test', 'a' x 300 );
1610             } else {
1611             $ipc->init_child();
1612             my $ipc2 = IPC::Fork::Simple->new();
1613             $ipc->to_master( 'connection_info', $ipc2->get_connection_info() ) || die $!;
1614             $ipc2->process_child_data(BLOCK_UNTIL_DATA);
1615             die unless length( ${$ipc2->from_child( $master_pid, 'master_test' )} ) == 300;
1616             }
1617              
1618             =head2 Bi-directional communication with data handlers
1619              
1620             use warnings;
1621             use strict;
1622            
1623             use IPC::Fork::Simple qw/:block_flags/;
1624            
1625             my $ipc = IPC::Fork::Simple->new();
1626             my $master_pid = $$;
1627             my $pid = fork();
1628             die 'stupid fork' unless defined $pid;
1629            
1630             if ( $pid ) {
1631             $ipc->spawn_data_handler();
1632             my $child_connection_data;
1633            
1634             $ipc->collect_data_from_handler(1, BLOCK_UNTIL_DATA);
1635             $child_connection_data = $ipc->from_child( $pid, 'connection_info' )
1636            
1637             my $ipc2 = IPC::Fork::Simple->new_child( ${$child_connection_data} ) || die;
1638             $ipc2->to_master( 'master_test', 'a' x 300 );
1639             } else {
1640             $ipc->init_child();
1641            
1642             my $ipc2 = IPC::Fork::Simple->new();
1643             $ipc2->spawn_data_handler();
1644             $ipc->to_master( 'connection_info', $ipc2->get_connection_info() ) || die $!;
1645             my $test;
1646            
1647             do {
1648             sleep(0);
1649             $ipc2->collect_data_from_handler(1);
1650             $test = $ipc2->from_child( $master_pid, 'master_test' )
1651             } until ( $test );
1652            
1653             die unless length( ${$test} ) == 300;
1654             }
1655            
1656             =head2 Further examples
1657              
1658             Further examples can be found in the t/functional directory supplied with the
1659             distribution.
1660              
1661             =head1 NOTES
1662              
1663             =head2 Zombies
1664              
1665             Child processes are not reaped automatically by this module, so the caller
1666             will need to call wait (or similar function) as usual to reap child processes.
1667              
1668             =head2 Security
1669              
1670             This module creates a TCP listen socket on a random high-numbered port on
1671             127.0.0.1. If a malicious program connects to that socket, it could cause the
1672             master process to hang waiting for that socket to disconnect. This module takes
1673             basic steps to insure this does not happen (connecting clients must present the
1674             correct 32-bit key within 30 seconds of connecting, but this is only checked
1675             when another client connects), but this is not fool-proof.
1676              
1677             =head2 Invalid connections
1678              
1679             If someone connects, but does not send the proper data, it is possible that we
1680             could return from L with FLAG_PACKET_CHILD_DISCONNECTED
1681             but without updating any data or the finished child list. I believe all possible
1682             causes of this have been resolved, but developers should still be aware of this
1683             potential issue.
1684              
1685             Callers checking for a return value of FLAG_PACKET_CHILD_DISCONNECTED should
1686             therefor also check L to make sure a real child actually
1687             finished.
1688              
1689             =head2 Unit tests
1690              
1691             The module currently lacks unit tests but does have a collection of functional
1692             tests. During "make test" these functional tests are not run, as they can be
1693             system intensive. Ideally, unit tests will be developed for this purpose, but
1694             until then they can be run by hand. They can be found in the t/functional
1695             directory as part of the distribution.
1696              
1697             =head1 TO DO
1698              
1699             Merge the internal finished_children hash with the internal child_info hash.
1700             The child_info hash already holds most of the data, a flag to determine
1701             whether or not that child is still connected would be simple to add, but
1702             removing the quick lookups against finished_children would make the code more
1703             verbose in places. Merging the two hashes would also reduce data duplication
1704             of the symbolic name.
1705              
1706             Add unit tests, or make functional tests run as part of "make test".
1707              
1708             =head1 CHANGES
1709              
1710             =head2 1.47 - 20110622, jeagle
1711              
1712             Implement basic integrity checks to prevent unexpected connections from
1713             interfering with normal operation.
1714              
1715             Add L function, L method.
1716              
1717             =head2 1.46 - 20100830, jeagle
1718              
1719             Version bump and repackage for CPAN.
1720              
1721             =head2 1.45 - 20100623, jeagle
1722              
1723             Clean and prepare for export to CPAN.
1724              
1725             Version bump to synchronize source repository version with module version.
1726              
1727             =head2 0.8 - 20100506, jeagle
1728              
1729             Replace MSG_NOSIGNAL with an ignored SIGPIPE, because we can't rely on
1730             MSG_NOSIGNAL to be defined everywhere.
1731              
1732             =head2 0.7 - 20100427, jeagle
1733              
1734             Disable SIGPIPE for failed send()s, returns error instead (to match
1735             documentation/intention).
1736              
1737             Correctly process large reads (>64k).
1738              
1739             =head2 0.6 - 20100309, phirince
1740              
1741             Extra check in pop_from_cid to get rid of undefined value errors.
1742              
1743             =head2 0.5 - 20100219, jeagle
1744              
1745             Correct layout issues with example documentation.
1746              
1747             Clarify the use of wait(2) in determining if a "child" has ended.
1748              
1749             =head2 0.4 - 20100219, jeagle
1750              
1751             Fix more bugs related to PID size assumptions.
1752              
1753             Fix various networking bugs that could cause data loss.
1754              
1755             Implement new bi-directional communication abilities.
1756              
1757             Implement new data queue types.
1758              
1759             Allow processes to identify themselves by a symbolic name, instead of pid (if
1760             not set, defaults to pid).
1761              
1762             =head2 0.3 - 20090512, phirince
1763              
1764             Fixed bug 2741310 - IPC::Fork::Simple assumed pids are 16 bits instead of 32
1765             bits.
1766              
1767             =head2 0.2 - 20090217, jeagle
1768              
1769             Fixed a bug with L returning early when a signal is
1770             received.
1771              
1772             =head2 0.1 - 20090130, jeagle
1773              
1774             Initial release.
1775              
1776             =cut
1777              
1778             1;