File Coverage

blib/lib/IPC/MPS.pm
Criterion Covered Total %
statement 200 298 67.1
branch 61 166 36.7
condition 10 53 18.8
subroutine 18 22 81.8
pod 0 11 0.0
total 289 550 52.5


line stmt bran cond sub pod time code
1             package IPC::MPS;
2              
3 2     2   110850 use strict;
  2         18  
  2         50  
4 2     2   8 use warnings;
  2         4  
  2         46  
5              
6 2     2   6 use Exporter;
  2         4  
  2         140  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.21';
11              
12 2     2   10 use Carp;
  2         2  
  2         106  
13 2     2   828 use IO::Select;
  2         2612  
  2         78  
14 2     2   864 use Socket qw(SOCK_NONBLOCK);
  2         5908  
  2         258  
15 2     2   772 use IO::Socket;
  2         29984  
  2         12  
16 2     2   688 use Scalar::Util qw(refaddr);
  2         2  
  2         88  
17 2     2   1140 use Storable qw(freeze thaw);
  2         5208  
  2         5300  
18              
19              
20             my $DEBUG = 0;
21             $DEBUG and require Data::Dumper;
22              
23             my $sel = IO::Select->new();
24              
25             my @spawn = ();
26             my %msg = ();
27             my %fh2vpid = ();
28             my %vpid2fh = ();
29             my %fh2fh = ();
30             my $self_vpid = 0;
31             my $self_parent_fh;
32             my $self_parent_vpid = 0;
33             my $self_parent_closed = 0;
34             my %listener = ();
35             my %node = ();
36             my %snd = ();
37             my $ipc_loop = 0;
38             my $quit = 0;
39              
40             my %vpid2pid = ();
41 1     1 0 10 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         16  
42              
43             my @rcv = ();
44             my %r_bufs = ();
45             my %w_bufs = ();
46              
47             my %pack = ();
48             my %unpack = ();
49              
50             my %closed = ();
51              
52             my $need_reset = 0;
53              
54             my $blksize = 1024 * 16;
55              
56              
57             END {
58 2 50 0 2   139 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
59 2         103 close $_ foreach values %fh2fh;
60             }
61              
62             sub spawn(&) {
63 2     2 0 132 my ($spawn) = @_;
64 2 50       120 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, PF_UNSPEC) or die "socketpair: $!";
65 2         10 my $vpid = refaddr $child;
66 2         6 push @spawn, [$vpid, $child, $parent, $spawn];
67 2         6 return $vpid;
68             }
69              
70              
71             sub msg($$) {
72 3     3 0 52 my ($msg, $sub) = @_;
73 3         17 $msg{$msg} = $sub;
74             }
75              
76              
77             sub snd($$;@) {
78 9     9 0 1212 my ($vpid, $msg, @args) = @_;
79 9 50       19 defined $vpid or carp("Argument vpid required"), return;
80 9 50       16 defined $msg or carp("Argument msg required"), return;
81 9 100       18 $vpid = $self_parent_vpid if $vpid == 0;
82 9 50       17 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
83 9         11 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         27  
84 9 50 33     50 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
85 9         35 return 1;
86             }
87              
88              
89 0     0 0 0 sub quit() { $quit = 1 }
90              
91              
92             sub snd_wt($$;@) {
93 1     1 0 498 my ($vpid, $msg, @args) = @_;
94 1 50       3 defined $vpid or carp("Argument vpid required"), return;
95 1 50       3 defined $msg or carp("Argument msg required"), return;
96 1         2 snd($vpid, $msg, @args);
97 1         3 wt($vpid, $msg);
98             }
99              
100              
101             sub listener($$;%) {
102 0     0 0 0 my ($host, $port, %args) = @_;
103 0 0       0 defined $host or carp("Argument host required"), return;
104 0 0       0 defined $port or carp("Argument port required"), return;
105 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
106 0 0       0 if ($sock) {
107 0 0       0 _pack_unpack($sock, %args) or return;
108 0         0 $listener{$sock} = $sock;
109 0         0 $sel->add($sock);
110 0         0 return $sock;
111             } else {
112 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
113 0         0 return;
114             }
115             }
116              
117              
118             sub open_node($$;%) {
119 0     0 0 0 my ($host, $port, %args) = @_;
120 0 0       0 defined $host or carp("Argument host required"), return;
121 0 0       0 defined $port or carp("Argument port required"), return;
122 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
123 0         0 my $addr = sockaddr_in($port,inet_aton($host));
124 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
125 0         0 my $rv = $sock->connect($addr);
126 0 0       0 if ($rv) {
127 0 0       0 _pack_unpack($sock, %args) or return;
128 0         0 my $vpid = refaddr $sock;
129 0         0 $node{$sock} = $vpid;
130 0         0 $fh2vpid{$sock} = $vpid;
131 0         0 $vpid2fh{$vpid} = $sock;
132 0         0 $fh2fh{$sock} = $sock;
133 0         0 $sel->add($sock);
134 0         0 return $vpid;
135             } else {
136 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
137 0         0 return;
138             }
139             }
140              
141              
142             sub _pack_unpack($%) {
143 0     0   0 my ($fh, %args) = @_;
144 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
145 0         0 my $r = eval {
146 0         0 my $r = $unpack->($pack->({a => ["b"]}));
147 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
148             $$r{a}[0] and $$r{a}[0] eq "b")
149             {
150 0         0 return 1;
151             } else {
152 0         0 return 0;
153             }
154             };
155 0 0 0     0 if (not $r or $@) {
156 0         0 carp "False pack unpack test";
157 0         0 return;
158             }
159 0         0 $pack{$fh} = $pack;
160 0         0 $unpack{$fh} = $unpack;
161             } elsif ($args{pack} or $args{unpack}) {
162 0         0 carp "pack and unpack is pair options";
163 0         0 return;
164             }
165 0         0 return 1;
166             }
167              
168              
169             sub receive(&) {
170 3     3 0 110 my ($receive) = @_;
171              
172 3 50       11 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
173              
174 3         97 local $SIG{CHLD} = "IGNORE";
175 3         53 local $SIG{PIPE} = "IGNORE";
176              
177 3         35 foreach (@spawn) {
178 2         6 my ($vpid, $child, $parent, $spawn) = @$_;
179              
180 2         1801 my $kid_pid = fork;
181 2 50       132 defined $kid_pid or die "Can't fork: $!";
182              
183 2 100       63 unless ($kid_pid) {
184              
185 1         22 foreach (@spawn) {
186 1         43 close $$_[1];
187 1 50       26 close $$_[2] if $$_[2] ne $parent;
188             }
189              
190 1         30 close $_ foreach values %fh2fh, values %listener;
191 1         62 $sel = IO::Select->new();
192 1         119 @spawn = ();
193 1         3 %listener = ();
194 1         6 %node = ();
195 1         16 %msg = ();
196 1         8 %fh2vpid = ();
197 1         2 %vpid2fh = ();
198 1         4 %fh2fh = ();
199 1         50 %snd = ();
200              
201 1         10 %vpid2pid = ();
202              
203 1         9 $ipc_loop = 0;
204              
205 1         4 @rcv = ();
206 1         3 %r_bufs = ();
207 1         2 %w_bufs = ();
208              
209 1         7 %pack = ();
210 1         4 %unpack = ();
211              
212 1         2 %closed = ();
213              
214 1         2 $need_reset = 0;
215              
216 1         13 $self_parent_fh = $parent;
217 1         3 $self_parent_vpid = $self_vpid;
218              
219 1         6 $self_vpid = $vpid;
220              
221 1         8 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
222 1         14 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
223 1         11 $fh2fh{$self_parent_fh} = $self_parent_fh;
224              
225 1         13 $sel->add($self_parent_fh);
226              
227 1         149 $spawn->();
228              
229 0         0 exit;
230             }
231             else {
232 1         46 $vpid2pid{$vpid} = $kid_pid;
233             }
234             }
235              
236              
237 2         44 foreach (@spawn) {
238 1         25 my ($vpid, $child, $parent, $spawn) = @$_;
239 1         28 close $parent;
240 1         13 $fh2vpid{$child} = $vpid;
241 1         21 $vpid2fh{$vpid} = $child;
242 1         8 $fh2fh{$child} = $child;
243 1         40 $sel->add($child);
244             }
245 2         235 @spawn = ();
246              
247              
248              
249 2         33 $receive->();
250              
251              
252              
253 2 50       11 unless ($ipc_loop) {
254 2         4 $ipc_loop = 1;
255 2         21 ipc_loop();
256 0         0 $ipc_loop = 0;
257             }
258             }
259              
260              
261             sub wt($$) {
262 1     1 0 35 my ($waited_vpid, $waited_msg) = @_;
263 1 50       17 defined $waited_vpid or carp("Argument vpid required"), return;
264 1 50       4 defined $waited_msg or carp("Argument msg required"), return;
265 1 50       2 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
266 1         11 foreach my $i (0 .. $#rcv) {
267 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
268 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
269 0         0 splice @rcv, $i, 1;
270 0 0       0 return wantarray ? @$args : $$args[0];
271             }
272             }
273 1 50       4 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
274 1         21 return ipc_loop($waited_vpid, $waited_msg);
275             }
276              
277              
278             sub ipc_loop(;$$) {
279 3     3 0 10 my ($waited_vpid, $waited_msg) = @_;
280 3 50       12 $DEBUG and print "Start ipc_loop in $self_vpid (\$\$=$$)\n";
281 3   33     27 RESET: while ($sel->count() and not $quit) {
282              
283 17         158 foreach my $to (keys %snd) {
284 16 100       20 if (@{$snd{$to}}) {
  16         38  
285 8         11 my $fh = $vpid2fh{$to};
286 8 50       27 unless ($fh) {
287 0 0       0 if (@spawn) {
288 0         0 carp "Probably have forgotten to call receive.";
289 0         0 next;
290             } else {
291 0 0       0 if ($self_parent_fh) {
292 0 0       0 unless ($self_parent_closed) {
293 0         0 $fh = $self_parent_fh;
294             } else {
295 0         0 next;
296             }
297             } else {
298 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
299 0         0 next;
300             }
301             }
302             }
303 8 50       21 unless (exists $w_bufs{$fh}) {
304 8         11 my $packet;
305 8 50       19 if (my $pack = $pack{$fh}) {
306 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
307             } else {
308 8         82 $packet = freeze shift @{$snd{$to}};
  8         38  
309             }
310 8         461 my $buf = join "", pack("N", length $packet), $packet;
311 8         28 $w_bufs{$fh} = $buf;
312 8 50 0     27 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
313             }
314             }
315             }
316              
317 17         53 my $w_sel = IO::Select->new(map { $fh2fh{$_} } keys %w_bufs);
  8         37  
318              
319 17 50       436 if ($DEBUG > 1) {
320 0 0       0 print "Select count from $self_vpid, sel->count=", $sel->count(), ", w_sel->count=", $w_sel->count(), "\n",
321             $DEBUG > 2 ? Data::Dumper::Dumper({ snd => \%snd, r_bufs => \%r_bufs, w_bufs => \%w_bufs }) : "";
322             }
323              
324 17         53 my ($can_read, $can_write, $has_exception)= IO::Select->select($sel, $w_sel, $sel);
325 17 0       8002 $DEBUG > 1 and print "Select from $self_vpid: ", scalar(@$can_read), " ", ($can_write ? scalar(@$can_write) : ""), " ", scalar(@$has_exception), "\n";
    50          
326              
327 17         44 foreach my $fh (@$can_read) {
328 9 50       23 if ($listener{$fh}) {
329 0         0 my $sock = $fh->accept;
330 0         0 $pack{$sock} = $pack{$fh};
331 0         0 $unpack{$sock} = $unpack{$fh};
332 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
333 0         0 my $vpid = refaddr $sock;
334 0         0 $node{$sock} = $vpid;
335 0         0 $fh2vpid{$sock} = $vpid;
336 0         0 $vpid2fh{$vpid} = $sock;
337 0         0 $fh2fh{$sock} = $sock;
338 0         0 $sel->add($sock);
339 0         0 next;
340             }
341 9         139 my $len = sysread $fh, (my $_buf), $blksize;
342 9 100       29 if ($len) {
    50          
343 8         24 $r_bufs{$fh} .= $_buf;
344             NEXT_MSG: {
345 8         26 my $buf = $r_bufs{$fh};
  8         51  
346 8 50       19 if (length $buf >= 4) {
347 8         42 my $packet_length = unpack "N", substr $buf, 0, 4, "";
348 8 50       33 if (length $buf >= $packet_length) {
349 8         16 my $packet = substr $buf, 0, $packet_length, "";
350 8   50     43 $r_bufs{$fh} = $buf || "";
351 8 50 0     17 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
352              
353 8         12 my ($from, $to, $msg, $args);
354 8 50       15 if (my $unpack = $unpack{$fh}) {
355 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
356             } else {
357 8         15 ($from, $to, $msg, $args) = @{thaw $packet};
  8         38  
358             }
359              
360 8 50       239 if ($node{$fh}) {
361 0         0 $from = $node{$fh};
362 0         0 $to = $self_vpid;
363             }
364              
365 8 50       18 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
366 8 50       22 if ($to == $self_vpid) {
    0          
367 8 50       12 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
368 8 100 66     30 if (defined $waited_vpid and defined $waited_msg) {
369 1         3 push @rcv, [$from, $msg, $args];
370             } else {
371 7 50       13 if ($msg{$msg}) {
372 7         19 push @rcv, [$from, $msg, $args];
373             } else {
374 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
375             }
376             }
377             } elsif ($vpid2fh{$to}) {
378 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
379 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
380             } else {
381 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
382             }
383              
384 8 50       30 redo NEXT_MSG if $r_bufs{$fh};
385             }
386             }
387             }
388             } elsif (defined $len) {
389 1         6 $sel->remove($fh);
390 1         53 $w_sel->remove($fh);
391 1         25 my $vpid = delete $fh2vpid{$fh};
392 1         3 delete $vpid2fh{$vpid};
393 1         2 delete $r_bufs{$fh};
394 1         2 delete $w_bufs{$fh};
395 1         3 delete $fh2fh{$fh};
396 1         3 delete $vpid2pid{$vpid};
397 1         2 delete $pack{$fh};
398 1         2 delete $unpack{$fh};
399 1 50       3 if (my $node_vpid = $node{$fh}) {
400 0         0 delete $node{$fh};
401 0 0       0 if ($msg{NODE_CLOSED}) {
402 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
403             }
404             } else {
405 1 50       5 if ($msg{SPAWN_CLOSED}) {
406 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
407             }
408             }
409 1         2 delete $closed{$vpid};
410 1         25 close $fh;
411 1 50 33     10 if ($self_parent_fh and $self_parent_fh eq $fh) {
412 1         1 $self_parent_closed = 1;
413 1 50 33     5 unless (defined $waited_vpid and defined $waited_msg) {
414 1 50       3 unless (@rcv) {
415 1         308 exit;
416             }
417             }
418             }
419             } else {
420 0 0       0 $DEBUG and die "Can't read '$fh': $!";
421             }
422             }
423              
424              
425 16 100 66     54 if (defined $waited_vpid and defined $waited_msg) {
426 2         6 foreach my $i (0 .. $#rcv) {
427 1         2 my ($from, $msg, $args)= @{$rcv[$i]};
  1         3  
428 1 50 33     11 if ($msg eq $waited_msg and $from eq $waited_vpid) {
429 1         3 splice @rcv, $i, 1;
430 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
431 1         2 $need_reset = 1;
432 1 50       9 return wantarray ? @$args : $$args[0];
433             }
434             }
435 1 50       4 unless (exists $vpid2fh{$waited_vpid}) {
436 0         0 return;
437             }
438             } else {
439 14         53 while (my $rcv = shift @rcv) {
440 7         9 my ($from, $msg, $args)= @{$rcv};
  7         28  
441 7 50       43 $msg{$msg}->($from, @$args) unless $closed{$from};
442             }
443             }
444 14 50       35 if ($need_reset) {
445 0         0 $need_reset = 0;
446 0         0 next RESET;
447             }
448              
449              
450 14         40 foreach my $fh (@$can_write) {
451 8 50       21 $fh2fh{$fh} or next;
452              
453 8         12 my $buf = $w_bufs{$fh};
454 8         276 my $len = syswrite $fh, $buf, $blksize;
455 8 50       33 if ($len) {
456 8         27 substr $buf, 0, $len, "";
457 8 50       13 if (length $buf) {
458 0         0 $w_bufs{$fh} = $buf;
459             } else {
460 8         69 delete $w_bufs{$fh};
461             }
462             } else {
463 0 0         $DEBUG and die "Can't write to '$fh': $!";
464             }
465             }
466              
467             }
468 0           $need_reset = 1;
469 0           $quit = 0;
470 0           return;
471             }
472              
473              
474              
475             1;
476              
477              
478             __END__