File Coverage

blib/lib/IPC/MPS.pm
Criterion Covered Total %
statement 197 295 66.7
branch 61 166 36.7
condition 10 53 18.8
subroutine 17 21 80.9
pod 0 11 0.0
total 285 546 52.2


line stmt bran cond sub pod time code
1             package IPC::MPS;
2              
3 2     2   134702 use strict;
  2         22  
  2         56  
4 2     2   10 use warnings;
  2         4  
  2         48  
5              
6 2     2   8 use Exporter;
  2         4  
  2         176  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.20';
11              
12 2     2   18 use Carp;
  2         2  
  2         138  
13 2     2   960 use IO::Select;
  2         3432  
  2         94  
14 2     2   936 use IO::Socket;
  2         41940  
  2         8  
15 2     2   822 use Scalar::Util qw(refaddr);
  2         6  
  2         96  
16 2     2   1312 use Storable qw(freeze thaw);
  2         6350  
  2         6578  
17              
18              
19             my $DEBUG = 0;
20             $DEBUG and require Data::Dumper;
21              
22             my $sel = IO::Select->new();
23              
24             my @spawn = ();
25             my %msg = ();
26             my %fh2vpid = ();
27             my %vpid2fh = ();
28             my %fh2fh = ();
29             my $self_vpid = 0;
30             my $self_parent_fh;
31             my $self_parent_vpid = 0;
32             my $self_parent_closed = 0;
33             my %listener = ();
34             my %node = ();
35             my %snd = ();
36             my $ipc_loop = 0;
37             my $quit = 0;
38              
39             my %vpid2pid = ();
40 1     1 0 21 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         26  
41              
42             my @rcv = ();
43             my %r_bufs = ();
44             my %w_bufs = ();
45              
46             my %pack = ();
47             my %unpack = ();
48              
49             my %closed = ();
50              
51             my $need_reset = 0;
52              
53             my $blksize = 1024 * 16;
54              
55              
56             END {
57 2 50 0 2   162 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
58 2         123 close $_ foreach values %fh2fh;
59             }
60              
61             sub spawn(&) {
62 2     2 0 172 my ($spawn) = @_;
63 2 50       142 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
64 2         14 my $vpid = refaddr $child;
65 2         8 push @spawn, [$vpid, $child, $parent, $spawn];
66 2         8 return $vpid;
67             }
68              
69              
70             sub msg($$) {
71 3     3 0 60 my ($msg, $sub) = @_;
72 3         18 $msg{$msg} = $sub;
73             }
74              
75              
76             sub snd($$;@) {
77 9     9 0 1550 my ($vpid, $msg, @args) = @_;
78 9 50       27 defined $vpid or carp("Argument vpid required"), return;
79 9 50       17 defined $msg or carp("Argument msg required"), return;
80 9 100       21 $vpid = $self_parent_vpid if $vpid == 0;
81 9 50       20 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
82 9         11 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         39  
83 9 50 33     53 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
84 9         35 return 1;
85             }
86              
87              
88 0     0 0 0 sub quit() { $quit = 1 }
89              
90              
91             sub snd_wt($$;@) {
92 1     1 0 573 my ($vpid, $msg, @args) = @_;
93 1 50       3 defined $vpid or carp("Argument vpid required"), return;
94 1 50       3 defined $msg or carp("Argument msg required"), return;
95 1         4 snd($vpid, $msg, @args);
96 1         4 wt($vpid, $msg);
97             }
98              
99              
100             sub listener($$;%) {
101 0     0 0 0 my ($host, $port, %args) = @_;
102 0 0       0 defined $host or carp("Argument host required"), return;
103 0 0       0 defined $port or carp("Argument port required"), return;
104 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
105 0 0       0 if ($sock) {
106 0 0       0 _pack_unpack($sock, %args) or return;
107 0         0 $listener{$sock} = $sock;
108 0         0 $sel->add($sock);
109 0         0 return $sock;
110             } else {
111 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
112 0         0 return;
113             }
114             }
115              
116              
117             sub open_node($$;%) {
118 0     0 0 0 my ($host, $port, %args) = @_;
119 0 0       0 defined $host or carp("Argument host required"), return;
120 0 0       0 defined $port or carp("Argument port required"), return;
121 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
122 0         0 my $addr = sockaddr_in($port,inet_aton($host));
123 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
124 0         0 my $rv = $sock->connect($addr);
125 0 0       0 if ($rv) {
126 0 0       0 _pack_unpack($sock, %args) or return;
127 0         0 my $vpid = refaddr $sock;
128 0         0 $node{$sock} = $vpid;
129 0         0 $fh2vpid{$sock} = $vpid;
130 0         0 $vpid2fh{$vpid} = $sock;
131 0         0 $fh2fh{$sock} = $sock;
132 0         0 $sel->add($sock);
133 0         0 return $vpid;
134             } else {
135 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
136 0         0 return;
137             }
138             }
139              
140              
141             sub _pack_unpack($%) {
142 0     0   0 my ($fh, %args) = @_;
143 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
144 0         0 my $r = eval {
145 0         0 my $r = $unpack->($pack->({a => ["b"]}));
146 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
147             $$r{a}[0] and $$r{a}[0] eq "b")
148             {
149 0         0 return 1;
150             } else {
151 0         0 return 0;
152             }
153             };
154 0 0 0     0 if (not $r or $@) {
155 0         0 carp "False pack unpack test";
156 0         0 return;
157             }
158 0         0 $pack{$fh} = $pack;
159 0         0 $unpack{$fh} = $unpack;
160             } elsif ($args{pack} or $args{unpack}) {
161 0         0 carp "pack and unpack is pair options";
162 0         0 return;
163             }
164 0         0 return 1;
165             }
166              
167              
168             sub receive(&) {
169 3     3 0 131 my ($receive) = @_;
170              
171 3 50       9 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
172              
173 3         114 local $SIG{CHLD} = "IGNORE";
174 3         63 local $SIG{PIPE} = "IGNORE";
175              
176 3         12 foreach (@spawn) {
177 2         10 my ($vpid, $child, $parent, $spawn) = @$_;
178              
179 2         2065 my $kid_pid = fork;
180 2 50       142 defined $kid_pid or die "Can't fork: $!";
181              
182 2 100       92 unless ($kid_pid) {
183              
184 1         21 foreach (@spawn) {
185 1         49 close $$_[1];
186 1 50       33 close $$_[2] if $$_[2] ne $parent;
187             }
188              
189 1         18 close $_ foreach values %fh2fh, values %listener;
190 1         78 $sel = IO::Select->new();
191 1         124 @spawn = ();
192 1         8 %listener = ();
193 1         8 %node = ();
194 1         6 %msg = ();
195 1         2 %fh2vpid = ();
196 1         3 %vpid2fh = ();
197 1         3 %fh2fh = ();
198 1         32 %snd = ();
199              
200 1         15 %vpid2pid = ();
201              
202 1         6 $ipc_loop = 0;
203              
204 1         3 @rcv = ();
205 1         10 %r_bufs = ();
206 1         3 %w_bufs = ();
207              
208 1         2 %pack = ();
209 1         2 %unpack = ();
210              
211 1         2 %closed = ();
212              
213 1         2 $need_reset = 0;
214              
215 1         2 $self_parent_fh = $parent;
216 1         2 $self_parent_vpid = $self_vpid;
217              
218 1         2 $self_vpid = $vpid;
219              
220 1         12 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
221 1         22 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
222 1         4 $fh2fh{$self_parent_fh} = $self_parent_fh;
223              
224 1         9 $sel->add($self_parent_fh);
225              
226 1         133 $spawn->();
227              
228 0         0 exit;
229             }
230             else {
231 1         52 $vpid2pid{$vpid} = $kid_pid;
232             }
233             }
234              
235              
236 2         28 foreach (@spawn) {
237 1         15 my ($vpid, $child, $parent, $spawn) = @$_;
238 1         38 close $parent;
239 1         18 $fh2vpid{$child} = $vpid;
240 1         16 $vpid2fh{$vpid} = $child;
241 1         4 $fh2fh{$child} = $child;
242 1         51 $sel->add($child);
243             }
244 2         284 @spawn = ();
245              
246              
247              
248 2         37 $receive->();
249              
250              
251              
252 2 50       8 unless ($ipc_loop) {
253 2         4 $ipc_loop = 1;
254 2         17 ipc_loop();
255 0         0 $ipc_loop = 0;
256             }
257             }
258              
259              
260             sub wt($$) {
261 1     1 0 3 my ($waited_vpid, $waited_msg) = @_;
262 1 50       3 defined $waited_vpid or carp("Argument vpid required"), return;
263 1 50       3 defined $waited_msg or carp("Argument msg required"), return;
264 1 50       37 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
265 1         30 foreach my $i (0 .. $#rcv) {
266 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
267 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
268 0         0 splice @rcv, $i, 1;
269 0 0       0 return wantarray ? @$args : $$args[0];
270             }
271             }
272 1 50       5 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
273 1         29 return ipc_loop($waited_vpid, $waited_msg);
274             }
275              
276              
277             sub ipc_loop(;$$) {
278 3     3 0 11 my ($waited_vpid, $waited_msg) = @_;
279 3 50       13 $DEBUG and print "Start ipc_loop in $self_vpid (\$\$=$$)\n";
280 3   33     15 RESET: while ($sel->count() and not $quit) {
281              
282 17         219 foreach my $to (keys %snd) {
283 16 100       23 if (@{$snd{$to}}) {
  16         43  
284 8         14 my $fh = $vpid2fh{$to};
285 8 50       17 unless ($fh) {
286 0 0       0 if (@spawn) {
287 0         0 carp "Probably have forgotten to call receive.";
288 0         0 next;
289             } else {
290 0 0       0 if ($self_parent_fh) {
291 0 0       0 unless ($self_parent_closed) {
292 0         0 $fh = $self_parent_fh;
293             } else {
294 0         0 next;
295             }
296             } else {
297 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
298 0         0 next;
299             }
300             }
301             }
302 8 50       23 unless (exists $w_bufs{$fh}) {
303 8         25 my $packet;
304 8 50       34 if (my $pack = $pack{$fh}) {
305 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
306             } else {
307 8         15 $packet = freeze shift @{$snd{$to}};
  8         47  
308             }
309 8         482 my $buf = join "", pack("N", length $packet), $packet;
310 8         102 $w_bufs{$fh} = $buf;
311 8 50 0     31 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
312             }
313             }
314             }
315              
316 17         53 my $w_sel = IO::Select->new(map { $fh2fh{$_} } keys %w_bufs);
  8         48  
317              
318 17 50       511 if ($DEBUG > 1) {
319 0 0       0 print "Select count from $self_vpid, sel->count=", $sel->count(), ", w_sel->count=", $w_sel->count(), "\n",
320             $DEBUG > 2 ? Data::Dumper::Dumper({ snd => \%snd, r_bufs => \%r_bufs, w_bufs => \%w_bufs }) : "";
321             }
322              
323 17         53 my ($can_read, $can_write, $has_exception)= IO::Select->select($sel, $w_sel, $sel);
324 17 0       9904 $DEBUG > 1 and print "Select from $self_vpid: ", scalar(@$can_read), " ", ($can_write ? scalar(@$can_write) : ""), " ", scalar(@$has_exception), "\n";
    50          
325              
326 17         41 foreach my $fh (@$can_read) {
327 9 50       33 if ($listener{$fh}) {
328 0         0 my $sock = $fh->accept;
329 0         0 $pack{$sock} = $pack{$fh};
330 0         0 $unpack{$sock} = $unpack{$fh};
331 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
332 0         0 my $vpid = refaddr $sock;
333 0         0 $node{$sock} = $vpid;
334 0         0 $fh2vpid{$sock} = $vpid;
335 0         0 $vpid2fh{$vpid} = $sock;
336 0         0 $fh2fh{$sock} = $sock;
337 0         0 $sel->add($sock);
338 0         0 next;
339             }
340 9         135 my $len = sysread $fh, (my $_buf), $blksize;
341 9 100       33 if ($len) {
    50          
342 8         27 $r_bufs{$fh} .= $_buf;
343             NEXT_MSG: {
344 8         12 my $buf = $r_bufs{$fh};
  8         20  
345 8 50       24 if (length $buf >= 4) {
346 8         44 my $packet_length = unpack "N", substr $buf, 0, 4, "";
347 8 50       29 if (length $buf >= $packet_length) {
348 8         20 my $packet = substr $buf, 0, $packet_length, "";
349 8   50     48 $r_bufs{$fh} = $buf || "";
350 8 50 0     23 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
351              
352 8         14 my ($from, $to, $msg, $args);
353 8 50       28 if (my $unpack = $unpack{$fh}) {
354 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
355             } else {
356 8         30 ($from, $to, $msg, $args) = @{thaw $packet};
  8         33  
357             }
358              
359 8 50       239 if ($node{$fh}) {
360 0         0 $from = $node{$fh};
361 0         0 $to = $self_vpid;
362             }
363              
364 8 50       19 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
365 8 50       26 if ($to == $self_vpid) {
    0          
366 8 50       17 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
367 8 100 66     31 if (defined $waited_vpid and defined $waited_msg) {
368 1         4 push @rcv, [$from, $msg, $args];
369             } else {
370 7 50       16 if ($msg{$msg}) {
371 7         20 push @rcv, [$from, $msg, $args];
372             } else {
373 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
374             }
375             }
376             } elsif ($vpid2fh{$to}) {
377 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
378 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
379             } else {
380 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
381             }
382              
383 8 50       45 redo NEXT_MSG if $r_bufs{$fh};
384             }
385             }
386             }
387             } elsif (defined $len) {
388 1         8 $sel->remove($fh);
389 1         50 $w_sel->remove($fh);
390 1         49 my $vpid = delete $fh2vpid{$fh};
391 1         6 delete $vpid2fh{$vpid};
392 1         3 delete $r_bufs{$fh};
393 1         3 delete $w_bufs{$fh};
394 1         3 delete $fh2fh{$fh};
395 1         2 delete $vpid2pid{$vpid};
396 1         2 delete $pack{$fh};
397 1         3 delete $unpack{$fh};
398 1 50       5 if (my $node_vpid = $node{$fh}) {
399 0         0 delete $node{$fh};
400 0 0       0 if ($msg{NODE_CLOSED}) {
401 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
402             }
403             } else {
404 1 50       4 if ($msg{SPAWN_CLOSED}) {
405 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
406             }
407             }
408 1         2 delete $closed{$vpid};
409 1         22 close $fh;
410 1 50 33     18 if ($self_parent_fh and $self_parent_fh eq $fh) {
411 1         3 $self_parent_closed = 1;
412 1 50 33     4 unless (defined $waited_vpid and defined $waited_msg) {
413 1 50       4 unless (@rcv) {
414 1         291 exit;
415             }
416             }
417             }
418             } else {
419 0 0       0 $DEBUG and die "Can't read '$fh': $!";
420             }
421             }
422              
423              
424 16 100 66     59 if (defined $waited_vpid and defined $waited_msg) {
425 2         7 foreach my $i (0 .. $#rcv) {
426 1         2 my ($from, $msg, $args)= @{$rcv[$i]};
  1         4  
427 1 50 33     14 if ($msg eq $waited_msg and $from eq $waited_vpid) {
428 1         3 splice @rcv, $i, 1;
429 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
430 1         2 $need_reset = 1;
431 1 50       10 return wantarray ? @$args : $$args[0];
432             }
433             }
434 1 50       5 unless (exists $vpid2fh{$waited_vpid}) {
435 0         0 return;
436             }
437             } else {
438 14         39 while (my $rcv = shift @rcv) {
439 7         10 my ($from, $msg, $args)= @{$rcv};
  7         18  
440 7 50       33 $msg{$msg}->($from, @$args) unless $closed{$from};
441             }
442             }
443 14 50       44 if ($need_reset) {
444 0         0 $need_reset = 0;
445 0         0 next RESET;
446             }
447              
448              
449 14         44 foreach my $fh (@$can_write) {
450 8 50       24 $fh2fh{$fh} or next;
451              
452 8         16 my $buf = $w_bufs{$fh};
453 8         324 my $len = syswrite $fh, $buf, $blksize;
454 8 50       32 if ($len) {
455 8         60 substr $buf, 0, $len, "";
456 8 50       18 if (length $buf) {
457 0         0 $w_bufs{$fh} = $buf;
458             } else {
459 8         84 delete $w_bufs{$fh};
460             }
461             } else {
462 0 0         $DEBUG and die "Can't write to '$fh': $!";
463             }
464             }
465              
466             }
467 0           $need_reset = 1;
468 0           $quit = 0;
469 0           return;
470             }
471              
472              
473              
474             1;
475              
476              
477             __END__