File Coverage

blib/lib/IPC/MPS.pm
Criterion Covered Total %
statement 197 295 66.7
branch 61 166 36.7
condition 10 53 18.8
subroutine 17 21 80.9
pod 0 11 0.0
total 285 546 52.2


line stmt bran cond sub pod time code
1             package IPC::MPS;
2              
3 2     2   27482 use strict;
  2         2  
  2         50  
4 2     2   6 use warnings;
  2         2  
  2         46  
5              
6 2     2   6 use Exporter;
  2         0  
  2         140  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.19';
11              
12 2     2   6 use Carp;
  2         4  
  2         106  
13 2     2   874 use IO::Select;
  2         2278  
  2         74  
14 2     2   848 use IO::Socket;
  2         35706  
  2         6  
15 2     2   766 use Scalar::Util qw(refaddr);
  2         2  
  2         160  
16 2     2   1538 use Storable qw(freeze thaw);
  2         4634  
  2         4398  
17              
18              
19             my $DEBUG = 0;
20             $DEBUG and require Data::Dumper;
21              
22             my $sel = IO::Select->new();
23              
24             my @spawn = ();
25             my %msg = ();
26             my %fh2vpid = ();
27             my %vpid2fh = ();
28             my %fh2fh = ();
29             my $self_vpid = 0;
30             my $self_parent_fh;
31             my $self_parent_vpid = 0;
32             my $self_parent_closed = 0;
33             my %listener = ();
34             my %node = ();
35             my %snd = ();
36             my $ipc_loop = 0;
37             my $quit = 0;
38              
39             my %vpid2pid = ();
40 1     1 0 16 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         17  
41              
42             my @rcv = ();
43             my %r_bufs = ();
44             my %w_bufs = ();
45              
46             my %pack = ();
47             my %unpack = ();
48              
49             my %closed = ();
50              
51             my $need_reset = 0;
52              
53             my $blksize = 1024 * 16;
54              
55              
56             END {
57 2 50 0 2   7 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
58 2         34 close $_ foreach values %fh2fh;
59             }
60              
61             sub spawn(&) {
62 2     2 0 20 my ($spawn) = @_;
63 2 50       94 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
64 2         12 my $vpid = refaddr $child;
65 2         6 push @spawn, [$vpid, $child, $parent, $spawn];
66 2         36 return $vpid;
67             }
68              
69              
70             sub msg($$) {
71 3     3 0 33 my ($msg, $sub) = @_;
72 3         8 $msg{$msg} = $sub;
73             }
74              
75              
76             sub snd($$;@) {
77 9     9 0 1028 my ($vpid, $msg, @args) = @_;
78 9 50       23 defined $vpid or carp("Argument vpid required"), return;
79 9 50       20 defined $msg or carp("Argument msg required"), return;
80 9 100       22 $vpid = $self_parent_vpid if $vpid == 0;
81 9 50       16 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
82 9         8 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         33  
83 9 50 33     45 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
84 9         30 return 1;
85             }
86              
87              
88 0     0 0 0 sub quit() { $quit = 1 }
89              
90              
91             sub snd_wt($$;@) {
92 1     1 0 388 my ($vpid, $msg, @args) = @_;
93 1 50       4 defined $vpid or carp("Argument vpid required"), return;
94 1 50       2 defined $msg or carp("Argument msg required"), return;
95 1         2 snd($vpid, $msg, @args);
96 1         3 wt($vpid, $msg);
97             }
98              
99              
100             sub listener($$;%) {
101 0     0 0 0 my ($host, $port, %args) = @_;
102 0 0       0 defined $host or carp("Argument host required"), return;
103 0 0       0 defined $port or carp("Argument port required"), return;
104 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
105 0 0       0 if ($sock) {
106 0 0       0 _pack_unpack($sock, %args) or return;
107 0         0 $listener{$sock} = $sock;
108 0         0 $sel->add($sock);
109 0         0 return $sock;
110             } else {
111 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
112 0         0 return;
113             }
114             }
115              
116              
117             sub open_node($$;%) {
118 0     0 0 0 my ($host, $port, %args) = @_;
119 0 0       0 defined $host or carp("Argument host required"), return;
120 0 0       0 defined $port or carp("Argument port required"), return;
121 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
122 0         0 my $addr = sockaddr_in($port,inet_aton($host));
123 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
124 0         0 my $rv = $sock->connect($addr);
125 0 0       0 if ($rv) {
126 0 0       0 _pack_unpack($sock, %args) or return;
127 0         0 my $vpid = refaddr $sock;
128 0         0 $node{$sock} = $vpid;
129 0         0 $fh2vpid{$sock} = $vpid;
130 0         0 $vpid2fh{$vpid} = $sock;
131 0         0 $fh2fh{$sock} = $sock;
132 0         0 $sel->add($sock);
133 0         0 return $vpid;
134             } else {
135 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
136 0         0 return;
137             }
138             }
139              
140              
141             sub _pack_unpack($%) {
142 0     0   0 my ($fh, %args) = @_;
143 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
144 0         0 my $r = eval {
145 0         0 my $r = $unpack->($pack->({a => ["b"]}));
146 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
147             $$r{a}[0] and $$r{a}[0] eq "b")
148             {
149 0         0 return 1;
150             } else {
151 0         0 return 0;
152             }
153             };
154 0 0 0     0 if (not $r or $@) {
155 0         0 carp "False pack unpack test";
156 0         0 return;
157             }
158 0         0 $pack{$fh} = $pack;
159 0         0 $unpack{$fh} = $unpack;
160             } elsif ($args{pack} or $args{unpack}) {
161 0         0 carp "pack and unpack is pair options";
162 0         0 return;
163             }
164 0         0 return 1;
165             }
166              
167              
168             sub receive(&) {
169 3     3 0 46 my ($receive) = @_;
170              
171 3 50       10 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
172              
173 3         63 local $SIG{CHLD} = "IGNORE";
174 3         16 local $SIG{PIPE} = "IGNORE";
175              
176 3         7 foreach (@spawn) {
177 2         6 my ($vpid, $child, $parent, $spawn) = @$_;
178              
179 2         1760 my $kid_pid = fork;
180 2 50       62 defined $kid_pid or die "Can't fork: $!";
181              
182 2 100       56 unless ($kid_pid) {
183              
184 1         19 foreach (@spawn) {
185 1         49 close $$_[1];
186 1 50       21 close $$_[2] if $$_[2] ne $parent;
187             }
188              
189 1         15 close $_ foreach values %fh2fh, values %listener;
190 1         41 $sel = IO::Select->new();
191 1         65 @spawn = ();
192 1         6 %listener = ();
193 1         1 %node = ();
194 1         9 %msg = ();
195 1         3 %fh2vpid = ();
196 1         1 %vpid2fh = ();
197 1         2 %fh2fh = ();
198 1         22 %snd = ();
199              
200 1         5 %vpid2pid = ();
201              
202 1         9 $ipc_loop = 0;
203              
204 1         4 @rcv = ();
205 1         1 %r_bufs = ();
206 1         2 %w_bufs = ();
207              
208 1         2 %pack = ();
209 1         1 %unpack = ();
210              
211 1         2 %closed = ();
212              
213 1         1 $need_reset = 0;
214              
215 1         2 $self_parent_fh = $parent;
216 1         1 $self_parent_vpid = $self_vpid;
217              
218 1         3 $self_vpid = $vpid;
219              
220 1         3 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
221 1         11 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
222 1         2 $fh2fh{$self_parent_fh} = $self_parent_fh;
223              
224 1         6 $sel->add($self_parent_fh);
225              
226 1         83 $spawn->();
227              
228 0         0 exit;
229             }
230             else {
231 1         27 $vpid2pid{$vpid} = $kid_pid;
232             }
233             }
234              
235              
236 2         9 foreach (@spawn) {
237 1         11 my ($vpid, $child, $parent, $spawn) = @$_;
238 1         21 close $parent;
239 1         17 $fh2vpid{$child} = $vpid;
240 1         8 $vpid2fh{$vpid} = $child;
241 1         1 $fh2fh{$child} = $child;
242 1         42 $sel->add($child);
243             }
244 2         149 @spawn = ();
245              
246              
247              
248 2         15 $receive->();
249              
250              
251              
252 2 50       6 unless ($ipc_loop) {
253 2         3 $ipc_loop = 1;
254 2         9 ipc_loop();
255 0         0 $ipc_loop = 0;
256             }
257             }
258              
259              
260             sub wt($$) {
261 1     1 0 2 my ($waited_vpid, $waited_msg) = @_;
262 1 50       3 defined $waited_vpid or carp("Argument vpid required"), return;
263 1 50       3 defined $waited_msg or carp("Argument msg required"), return;
264 1 50       3 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
265 1         12 foreach my $i (0 .. $#rcv) {
266 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
267 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
268 0         0 splice @rcv, $i, 1;
269 0 0       0 return wantarray ? @$args : $$args[0];
270             }
271             }
272 1 50       2 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
273 1         17 return ipc_loop($waited_vpid, $waited_msg);
274             }
275              
276              
277             sub ipc_loop(;$$) {
278 3     3 0 7 my ($waited_vpid, $waited_msg) = @_;
279 3 50       7 $DEBUG and print "Start ipc_loop in $self_vpid (\$\$=$$)\n";
280 3   33     20 RESET: while ($sel->count() and not $quit) {
281              
282 17         125 foreach my $to (keys %snd) {
283 16 100       17 if (@{$snd{$to}}) {
  16         30  
284 8         12 my $fh = $vpid2fh{$to};
285 8 50       10 unless ($fh) {
286 0 0       0 if (@spawn) {
287 0         0 carp "Probably have forgotten to call receive.";
288 0         0 next;
289             } else {
290 0 0       0 if ($self_parent_fh) {
291 0 0       0 unless ($self_parent_closed) {
292 0         0 $fh = $self_parent_fh;
293             } else {
294 0         0 next;
295             }
296             } else {
297 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
298 0         0 next;
299             }
300             }
301             }
302 8 50       32 unless (exists $w_bufs{$fh}) {
303 8         8 my $packet;
304 8 50       16 if (my $pack = $pack{$fh}) {
305 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
306             } else {
307 8         6 $packet = freeze shift @{$snd{$to}};
  8         47  
308             }
309 8         358 my $buf = join "", pack("N", length $packet), $packet;
310 8         16 $w_bufs{$fh} = $buf;
311 8 50 0     29 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
312             }
313             }
314             }
315              
316 17         34 my $w_sel = IO::Select->new(map { $fh2fh{$_} } keys %w_bufs);
  8         34  
317              
318 17 50       319 if ($DEBUG > 1) {
319 0 0       0 print "Select count from $self_vpid, sel->count=", $sel->count(), ", w_sel->count=", $w_sel->count(), "\n",
320             $DEBUG > 2 ? Data::Dumper::Dumper({ snd => \%snd, r_bufs => \%r_bufs, w_bufs => \%w_bufs }) : "";
321             }
322              
323 17         53 my ($can_read, $can_write, $has_exception)= IO::Select->select($sel, $w_sel, $sel);
324 17 0       6590 $DEBUG > 1 and print "Select from $self_vpid: ", scalar(@$can_read), " ", ($can_write ? scalar(@$can_write) : ""), " ", scalar(@$has_exception), "\n";
    50          
325              
326 17         26 foreach my $fh (@$can_read) {
327 9 50       28 if ($listener{$fh}) {
328 0         0 my $sock = $fh->accept;
329 0         0 $pack{$sock} = $pack{$fh};
330 0         0 $unpack{$sock} = $unpack{$fh};
331 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
332 0         0 my $vpid = refaddr $sock;
333 0         0 $node{$sock} = $vpid;
334 0         0 $fh2vpid{$sock} = $vpid;
335 0         0 $vpid2fh{$vpid} = $sock;
336 0         0 $fh2fh{$sock} = $sock;
337 0         0 $sel->add($sock);
338 0         0 next;
339             }
340 9         130 my $len = sysread $fh, (my $_buf), $blksize;
341 9 100       22 if ($len) {
    50          
342 8         28 $r_bufs{$fh} .= $_buf;
343             NEXT_MSG: {
344 8         8 my $buf = $r_bufs{$fh};
  8         11  
345 8 50       33 if (length $buf >= 4) {
346 8         47 my $packet_length = unpack "N", substr $buf, 0, 4, "";
347 8 50       17 if (length $buf >= $packet_length) {
348 8         14 my $packet = substr $buf, 0, $packet_length, "";
349 8   50     34 $r_bufs{$fh} = $buf || "";
350 8 50 0     14 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
351              
352 8         5 my ($from, $to, $msg, $args);
353 8 50       16 if (my $unpack = $unpack{$fh}) {
354 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
355             } else {
356 8         7 ($from, $to, $msg, $args) = @{thaw $packet};
  8         36  
357             }
358              
359 8 50       177 if ($node{$fh}) {
360 0         0 $from = $node{$fh};
361 0         0 $to = $self_vpid;
362             }
363              
364 8 50       12 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
365 8 50       29 if ($to == $self_vpid) {
    0          
366 8 50       12 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
367 8 100 66     41 if (defined $waited_vpid and defined $waited_msg) {
368 1         3 push @rcv, [$from, $msg, $args];
369             } else {
370 7 50       13 if ($msg{$msg}) {
371 7         19 push @rcv, [$from, $msg, $args];
372             } else {
373 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
374             }
375             }
376             } elsif ($vpid2fh{$to}) {
377 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
378 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
379             } else {
380 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
381             }
382              
383 8 50       32 redo NEXT_MSG if $r_bufs{$fh};
384             }
385             }
386             }
387             } elsif (defined $len) {
388 1         7 $sel->remove($fh);
389 1         40 $w_sel->remove($fh);
390 1         31 my $vpid = delete $fh2vpid{$fh};
391 1         4 delete $vpid2fh{$vpid};
392 1         5 delete $r_bufs{$fh};
393 1         6 delete $w_bufs{$fh};
394 1         4 delete $fh2fh{$fh};
395 1         9 delete $vpid2pid{$vpid};
396 1         2 delete $pack{$fh};
397 1         1 delete $unpack{$fh};
398 1 50       4 if (my $node_vpid = $node{$fh}) {
399 0         0 delete $node{$fh};
400 0 0       0 if ($msg{NODE_CLOSED}) {
401 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
402             }
403             } else {
404 1 50       38 if ($msg{SPAWN_CLOSED}) {
405 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
406             }
407             }
408 1         2 delete $closed{$vpid};
409 1         17 close $fh;
410 1 50 33     10 if ($self_parent_fh and $self_parent_fh eq $fh) {
411 1         2 $self_parent_closed = 1;
412 1 50 33     4 unless (defined $waited_vpid and defined $waited_msg) {
413 1 50       4 unless (@rcv) {
414 1         92 exit;
415             }
416             }
417             }
418             } else {
419 0 0       0 $DEBUG and die "Can't read '$fh': $!";
420             }
421             }
422              
423              
424 16 100 66     61 if (defined $waited_vpid and defined $waited_msg) {
425 2         6 foreach my $i (0 .. $#rcv) {
426 1         1 my ($from, $msg, $args)= @{$rcv[$i]};
  1         3  
427 1 50 33     10 if ($msg eq $waited_msg and $from eq $waited_vpid) {
428 1         2 splice @rcv, $i, 1;
429 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
430 1         1 $need_reset = 1;
431 1 50       8 return wantarray ? @$args : $$args[0];
432             }
433             }
434 1 50       5 unless (exists $vpid2fh{$waited_vpid}) {
435 0         0 return;
436             }
437             } else {
438 14         49 while (my $rcv = shift @rcv) {
439 7         7 my ($from, $msg, $args)= @{$rcv};
  7         12  
440 7 50       30 $msg{$msg}->($from, @$args) unless $closed{$from};
441             }
442             }
443 14 50       25 if ($need_reset) {
444 0         0 $need_reset = 0;
445 0         0 next RESET;
446             }
447              
448              
449 14         43 foreach my $fh (@$can_write) {
450 8 50       20 $fh2fh{$fh} or next;
451              
452 8         10 my $buf = $w_bufs{$fh};
453 8         84 my $len = syswrite $fh, $buf, $blksize;
454 8 50       11 if ($len) {
455 8         13 substr $buf, 0, $len, "";
456 8 50       19 if (length $buf) {
457 0         0 $w_bufs{$fh} = $buf;
458             } else {
459 8         54 delete $w_bufs{$fh};
460             }
461             } else {
462 0 0         $DEBUG and die "Can't write to '$fh': $!";
463             }
464             }
465              
466             }
467 0           $need_reset = 1;
468 0           $quit = 0;
469 0           return;
470             }
471              
472              
473              
474             1;
475              
476              
477             __END__