File Coverage

blib/lib/IPC/MPS/EV.pm
Criterion Covered Total %
statement 210 307 68.4
branch 63 164 38.4
condition 9 50 18.0
subroutine 20 25 80.0
pod 0 13 0.0
total 302 559 54.0


line stmt bran cond sub pod time code
1             package IPC::MPS::EV;
2              
3 2     2   140000 use strict;
  2         6  
  2         56  
4 2     2   12 use warnings;
  2         6  
  2         50  
5              
6 2     2   10 use Exporter;
  2         2  
  2         160  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.20';
11              
12 2     2   12 use Carp;
  2         4  
  2         134  
13 2     2   28 use EV;
  2         4  
  2         58  
14 2     2   1128 use Socket qw(SOCK_NONBLOCK);
  2         6968  
  2         328  
15 2     2   910 use IO::Socket;
  2         33806  
  2         10  
16 2     2   832 use Scalar::Util qw(refaddr);
  2         6  
  2         102  
17 2     2   1324 use Storable qw(freeze thaw);
  2         6284  
  2         6886  
18              
19              
20             my $DEBUG = 0;
21             $DEBUG and require Data::Dumper;
22              
23             my @spawn = ();
24             my %msg = ();
25             my %fh2vpid = ();
26             my %vpid2fh = ();
27             my %fh2fh = ();
28             my $self_vpid = 0;
29             my $self_parent_fh;
30             my $self_parent_vpid = 0;
31             my $self_parent_closed = 0;
32             my %listener = ();
33             my %node = ();
34             my %snd = ();
35             my $ipc_loop = 0;
36              
37             my %vpid2pid = ();
38 1     1 0 22 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         29  
39              
40             my @rcv = ();
41             my %r_bufs = ();
42             my %w_bufs = ();
43              
44             my %pack = ();
45             my %unpack = ();
46              
47             my %closed = ();
48              
49             my %fh2rw = ();
50             my %fh2ww = ();
51              
52             my ($waited_vpid, $waited_msg, @waited_rv);
53              
54             my $blksize = 1024 * 16;
55              
56              
57             END {
58 2 50 0 2   152 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
59 2         121 close $_ foreach values %fh2fh;
60             }
61              
62             sub spawn(&) {
63 2     2 0 184 my ($spawn) = @_;
64 2 50       138 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, PF_UNSPEC) or die "socketpair: $!";
65 2         14 my $vpid = refaddr $child;
66 2         10 push @spawn, [$vpid, $child, $parent, $spawn];
67 2         10 return $vpid;
68             }
69              
70              
71             sub msg($$) {
72 3     3 0 65 my ($msg, $sub) = @_;
73 3         20 $msg{$msg} = $sub;
74             }
75              
76              
77             sub snd($$;@) {
78 9     9 0 1560 my ($vpid, $msg, @args) = @_;
79 9 50       27 defined $vpid or carp("Argument vpid required"), return;
80 9 50       22 defined $msg or carp("Argument msg required"), return;
81 9 100       35 $vpid = $self_parent_vpid if $vpid == 0;
82 9 50       33 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
83 9         15 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         38  
84 9 50 33     60 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
85 9         38 w_event_cb_reg($vpid);
86 9         23 return 1;
87             }
88              
89              
90 0     0 0 0 sub quit() { EV::unloop }
91              
92              
93             sub snd_wt($$;@) {
94 1     1 0 599 my ($vpid, $msg, @args) = @_;
95 1 50       5 defined $vpid or carp("Argument vpid required"), return;
96 1 50       4 defined $msg or carp("Argument msg required"), return;
97 1         3 snd($vpid, $msg, @args);
98 1         5 wt($vpid, $msg);
99             }
100              
101              
102             sub listener($$;%) {
103 0     0 0 0 my ($host, $port, %args) = @_;
104 0 0       0 defined $host or carp("Argument host required"), return;
105 0 0       0 defined $port or carp("Argument port required"), return;
106 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
107 0 0       0 if ($sock) {
108 0 0       0 _pack_unpack($sock, %args) or return;
109 0         0 $listener{$sock} = $sock;
110             $fh2rw{$sock} = EV::io($sock, EV::READ, sub {
111 0     0   0 my $w = shift;
112 0         0 my $fh = $w->fh;
113 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
114 0         0 my $sock = $fh->accept;
115 0         0 $pack{$sock} = $pack{$fh};
116 0         0 $unpack{$sock} = $unpack{$fh};
117 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
118 0         0 my $vpid = refaddr $sock;
119 0         0 $node{$sock} = $vpid;
120 0         0 $fh2vpid{$sock} = $vpid;
121 0         0 $vpid2fh{$vpid} = $sock;
122 0         0 $fh2fh{$sock} = $sock;
123 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
124 0         0 });
125 0         0 return $sock;
126             } else {
127 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
128 0         0 return;
129             }
130             }
131              
132              
133             sub open_node($$;%) {
134 0     0 0 0 my ($host, $port, %args) = @_;
135 0 0       0 defined $host or carp("Argument host required"), return;
136 0 0       0 defined $port or carp("Argument port required"), return;
137 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
138 0         0 my $addr = sockaddr_in($port,inet_aton($host));
139 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
140 0         0 my $rv = $sock->connect($addr);
141 0 0       0 if ($rv) {
142 0 0       0 _pack_unpack($sock, %args) or return;
143 0         0 my $vpid = refaddr $sock;
144 0         0 $node{$sock} = $vpid;
145 0         0 $fh2vpid{$sock} = $vpid;
146 0         0 $vpid2fh{$vpid} = $sock;
147 0         0 $fh2fh{$sock} = $sock;
148 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
149 0         0 return $vpid;
150             } else {
151 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
152 0         0 return;
153             }
154             }
155              
156              
157             sub _pack_unpack($%) {
158 0     0   0 my ($fh, %args) = @_;
159 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
160 0         0 my $r = eval {
161 0         0 my $r = $unpack->($pack->({a => ["b"]}));
162 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
163             $$r{a}[0] and $$r{a}[0] eq "b")
164             {
165 0         0 return 1;
166             } else {
167 0         0 return 0;
168             }
169             };
170 0 0 0     0 if (not $r or $@) {
171 0         0 carp "False pack unpack test";
172 0         0 return;
173             }
174 0         0 $pack{$fh} = $pack;
175 0         0 $unpack{$fh} = $unpack;
176             } elsif ($args{pack} or $args{unpack}) {
177 0         0 carp "pack and unpack is pair options";
178 0         0 return;
179             }
180 0         0 return 1;
181             }
182              
183              
184             sub receive(&) {
185 3     3 0 79 my ($receive) = @_;
186              
187 3 50       12 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
188              
189 3         112 local $SIG{CHLD} = "IGNORE";
190 3         59 local $SIG{PIPE} = "IGNORE";
191              
192 3         11 foreach (@spawn) {
193 2         10 my ($vpid, $child, $parent, $spawn) = @$_;
194              
195 2         1757 my $kid_pid = fork;
196 2 50       128 defined $kid_pid or die "Can't fork: $!";
197              
198 2 100       76 unless ($kid_pid) {
199              
200 1         23 foreach (@spawn) {
201 1         34 close $$_[1];
202 1 50       18 close $$_[2] if $$_[2] ne $parent;
203             }
204              
205 1         26 close $_ foreach values %fh2fh, values %listener;
206 1         5 $_->stop foreach values %fh2rw, values %fh2ww;
207 1         14 @spawn = ();
208 1         17 %listener = ();
209 1         6 %node = ();
210 1         4 %msg = ();
211 1         5 %fh2vpid = ();
212 1         3 %vpid2fh = ();
213 1         2 %fh2fh = ();
214 1         47 %snd = ();
215              
216 1         10 %vpid2pid = ();
217              
218 1         10 $ipc_loop = 0;
219              
220 1         7 @rcv = ();
221 1         3 %r_bufs = ();
222 1         2 %w_bufs = ();
223              
224 1         2 %pack = ();
225 1         1 %unpack = ();
226              
227 1         3 %closed = ();
228              
229 1         3 %fh2rw = ();
230 1         2 %fh2ww = ();
231              
232 1         5 ($waited_vpid, $waited_msg, @waited_rv) = ();
233              
234 1         6 $self_parent_fh = $parent;
235 1         8 $self_parent_vpid = $self_vpid;
236              
237 1         10 $self_vpid = $vpid;
238              
239 1         12 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
240 1         19 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
241 1         10 $fh2fh{$self_parent_fh} = $self_parent_fh;
242              
243 1         42 $fh2rw{$self_parent_fh} = EV::io($self_parent_fh, EV::READ, \&r_event_cb);
244              
245 1         30 $spawn->();
246              
247 0         0 exit;
248             }
249             else {
250 1         58 $vpid2pid{$vpid} = $kid_pid;
251             }
252             }
253              
254              
255 2         11 foreach (@spawn) {
256 1         12 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
257 1         25 close $parent;
258 1         15 $fh2vpid{$child} = $vpid;
259 1         42 $vpid2fh{$vpid} = $child;
260 1         9 $fh2fh{$child} = $child;
261 1         74 $fh2rw{$child} = EV::io($child, EV::READ, \&r_event_cb);
262             }
263 2         100 @spawn = ();
264              
265              
266              
267 2         25 $receive->();
268              
269              
270              
271 2 50       7 unless ($ipc_loop) {
272 2         17 $ipc_loop = 1;
273 2         26 w_event_cb_reg();
274 2         1501 EV::loop;
275 0         0 $ipc_loop = 0;
276             }
277             }
278              
279              
280             sub wt($$) {
281 1     1 0 4 ($waited_vpid, $waited_msg) = @_;
282 1 50       4 defined $waited_vpid or carp("Argument vpid required"), return;
283 1 50       2 defined $waited_msg or carp("Argument msg required"), return;
284 1 50       12 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
285 1         7 foreach my $i (0 .. $#rcv) {
286 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
287 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
288 0         0 splice @rcv, $i, 1;
289 0 0       0 return wantarray ? @$args : $$args[0];
290             }
291             }
292 1 50       32 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
293 1         33 w_event_cb_reg();
294 1         19 EV::loop;
295 1         3 my @rv = @waited_rv;
296 1         3 ($waited_vpid, $waited_msg, @waited_rv) = ();
297 1 50       8 return wantarray ? @rv : $rv[0];
298             }
299              
300              
301             sub w_event_cb_reg {
302 26     26 0 57 my ($to_vpid) = @_;
303              
304 26 100       104 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
305 25 100       30 if (@{$snd{$to}}) {
  25         7056  
306 10         15 my $fh = $vpid2fh{$to};
307 10 100       22 unless ($fh) {
308 2 50       6 if (@spawn) {
309 2 50       6 carp "Probably have forgotten to call receive." if not defined $to_vpid;
310 2         8 next;
311             } else {
312 0 0       0 if ($self_parent_fh) {
313 0 0       0 unless ($self_parent_closed) {
314 0         0 $fh = $self_parent_fh;
315             } else {
316 0         0 next;
317             }
318             } else {
319 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
320 0         0 next;
321             }
322             }
323             }
324 8 50       26 unless (exists $w_bufs{$fh}) {
325 8         13 my $packet;
326 8 50       19 if (my $pack = $pack{$fh}) {
327 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
328             } else {
329 8         24 $packet = freeze shift @{$snd{$to}};
  8         40  
330             }
331 8         512 my $buf = join "", pack("N", length $packet), $packet;
332 8         87 $w_bufs{$fh} = $buf;
333 8 50 0     24 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
334 8         65 $fh2ww{$fh} = EV::io($fh, EV::WRITE, \&w_event_cb);
335             }
336             }
337             }
338             }
339              
340              
341              
342              
343             sub r_event_cb {
344 9     9 0 52 my $w = shift;
345 9         46 my $fh = $w->fh;
346              
347 9 50       41 $DEBUG > 1 and print "Read event from $self_vpid: \n";
348              
349 9         158 my $len = sysread $fh, (my $_buf), $blksize;
350 9 100       40 if ($len) {
    50          
351 8         35 $r_bufs{$fh} .= $_buf;
352             NEXT_MSG: {
353 8         13 my $buf = $r_bufs{$fh};
  8         26  
354 8 50       23 if (length $buf >= 4) {
355 8         61 my $packet_length = unpack "N", substr $buf, 0, 4, "";
356 8 50       24 if (length $buf >= $packet_length) {
357 8         25 my $packet = substr $buf, 0, $packet_length, "";
358 8   50     51 $r_bufs{$fh} = $buf || "";
359 8 50 0     18 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
360              
361 8         17 my ($from, $to, $msg, $args);
362 8 50       26 if (my $unpack = $unpack{$fh}) {
363 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
364             } else {
365 8         12 ($from, $to, $msg, $args) = @{thaw $packet};
  8         30  
366             }
367              
368 8 50       265 if ($node{$fh}) {
369 0         0 $from = $node{$fh};
370 0         0 $to = $self_vpid;
371             }
372              
373 8 50       22 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
374 8 50       23 if ($to == $self_vpid) {
    0          
375 8 50       19 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
376 8 100 66     36 if (defined $waited_vpid and defined $waited_msg) {
377 1         7 push @rcv, [$from, $msg, $args];
378             } else {
379 7 50       18 if ($msg{$msg}) {
380 7         21 push @rcv, [$from, $msg, $args];
381             } else {
382 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
383             }
384             }
385             } elsif ($vpid2fh{$to}) {
386 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
387 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
388 0         0 w_event_cb_reg();
389             } else {
390 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
391             }
392              
393 8 50       34 redo NEXT_MSG if $r_bufs{$fh};
394             }
395             }
396             }
397             } elsif (defined $len) {
398 1 50       5 if (exists $fh2ww{$fh}) {
399 0         0 delete $fh2ww{$fh};
400             }
401 1         4 my $vpid = delete $fh2vpid{$fh};
402 1         3 delete $vpid2fh{$vpid};
403 1         3 delete $fh2rw{$fh};
404 1         3 delete $r_bufs{$fh};
405 1         2 delete $w_bufs{$fh};
406 1         2 delete $fh2fh{$fh};
407 1         2 delete $vpid2pid{$vpid};
408 1         2 delete $pack{$fh};
409 1         3 delete $unpack{$fh};
410 1 50       4 if (my $node_vpid = $node{$fh}) {
411 0         0 delete $node{$fh};
412 0 0       0 if ($msg{NODE_CLOSED}) {
413 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
414 0         0 w_event_cb_reg();
415             }
416             } else {
417 1 50       4 if ($msg{SPAWN_CLOSED}) {
418 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
419 0         0 w_event_cb_reg();
420             }
421             }
422 1         2 delete $closed{$vpid};
423 1         18 close $fh;
424 1 50 33     23 if ($self_parent_fh and $self_parent_fh eq $fh) {
425 1         3 $self_parent_closed = 1;
426 1 50 33     5 unless (defined $waited_vpid and defined $waited_msg) {
427 1 50       3 unless (@rcv) {
428 1         336 exit;
429             }
430             }
431             }
432             } else {
433 0 0       0 $DEBUG and die "Can't read '$fh': $!";
434             }
435              
436 8 100 66     29 if (defined $waited_vpid and defined $waited_msg) {
437 1         28 foreach my $i (0 .. $#rcv) {
438 1         3 my ($from, $msg, $args)= @{$rcv[$i]};
  1         5  
439 1 50 33     16 if ($msg eq $waited_msg and $from eq $waited_vpid) {
440 1         4 splice @rcv, $i, 1;
441 1 50       4 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
442 1         4 @waited_rv = @$args;
443 1         4 EV::unloop();
444 1         5 return;
445             }
446             }
447 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
448 0         0 EV::unloop();
449 0         0 return;
450             }
451             } else {
452 7         21 while (my $rcv = shift @rcv) {
453 7         10 my ($from, $msg, $args)= @{$rcv};
  7         18  
454 7 50       49 $msg{$msg}->($from, @$args) unless $closed{$from};
455 6         21 w_event_cb_reg();
456             }
457             }
458             }
459              
460              
461              
462             sub w_event_cb {
463 8     8 0 23 my $w = shift;
464 8         27 my $fh = $w->fh;
465              
466 8 50       97 $DEBUG > 1 and print "Write event from $self_vpid.\n";
467 8 50       26 $fh2fh{$fh} or return;
468              
469 8         18 my $buf = $w_bufs{$fh};
470 8         311 my $len = syswrite $fh, $buf, $blksize;
471 8 50       35 if ($len) {
472 8         23 substr $buf, 0, $len, "";
473 8 50       14 if (length $buf) {
474 0         0 $w_bufs{$fh} = $buf;
475             } else {
476 8         26 delete $w_bufs{$fh};
477 8         14 delete $fh2ww{$fh};
478 8         19 w_event_cb_reg();
479             }
480             } else {
481 0 0         $DEBUG and die "Can't write to '$fh': $!";
482             }
483             }
484              
485              
486              
487             1;
488              
489              
490             __END__