File Coverage

blib/lib/IPC/MPS/EV.pm
Criterion Covered Total %
statement 207 304 68.0
branch 63 164 38.4
condition 9 50 18.0
subroutine 19 24 79.1
pod 0 13 0.0
total 298 555 53.6


line stmt bran cond sub pod time code
1             package IPC::MPS::EV;
2              
3 2     2   47736 use strict;
  2         4  
  2         60  
4 2     2   10 use warnings;
  2         2  
  2         66  
5              
6 2     2   8 use Exporter;
  2         12  
  2         222  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.19';
11              
12 2     2   10 use Carp;
  2         94  
  2         174  
13 2     2   22 use EV;
  2         2  
  2         76  
14 2     2   1336 use IO::Socket;
  2         36696  
  2         6  
15 2     2   798 use Scalar::Util qw(refaddr);
  2         2  
  2         134  
16 2     2   1142 use Storable qw(freeze thaw);
  2         4410  
  2         4562  
17              
18              
19             my $DEBUG = 0;
20             $DEBUG and require Data::Dumper;
21              
22             my @spawn = ();
23             my %msg = ();
24             my %fh2vpid = ();
25             my %vpid2fh = ();
26             my %fh2fh = ();
27             my $self_vpid = 0;
28             my $self_parent_fh;
29             my $self_parent_vpid = 0;
30             my $self_parent_closed = 0;
31             my %listener = ();
32             my %node = ();
33             my %snd = ();
34             my $ipc_loop = 0;
35              
36             my %vpid2pid = ();
37 1     1 0 10 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         13  
38              
39             my @rcv = ();
40             my %r_bufs = ();
41             my %w_bufs = ();
42              
43             my %pack = ();
44             my %unpack = ();
45              
46             my %closed = ();
47              
48             my %fh2rw = ();
49             my %fh2ww = ();
50              
51             my ($waited_vpid, $waited_msg, @waited_rv);
52              
53             my $blksize = 1024 * 16;
54              
55              
56             END {
57 2 50 0 2   7 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
58 2         51 close $_ foreach values %fh2fh;
59             }
60              
61             sub spawn(&) {
62 2     2 0 16 my ($spawn) = @_;
63 2 50       68 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
64 2         12 my $vpid = refaddr $child;
65 2         6 push @spawn, [$vpid, $child, $parent, $spawn];
66 2         4 return $vpid;
67             }
68              
69              
70             sub msg($$) {
71 3     3 0 37 my ($msg, $sub) = @_;
72 3         19 $msg{$msg} = $sub;
73             }
74              
75              
76             sub snd($$;@) {
77 9     9 0 854 my ($vpid, $msg, @args) = @_;
78 9 50       19 defined $vpid or carp("Argument vpid required"), return;
79 9 50       18 defined $msg or carp("Argument msg required"), return;
80 9 100       17 $vpid = $self_parent_vpid if $vpid == 0;
81 9 50       14 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
82 9         8 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         26  
83 9 50 33     45 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
84 9         33 w_event_cb_reg($vpid);
85 9         14 return 1;
86             }
87              
88              
89 0     0 0 0 sub quit() { EV::unloop }
90              
91              
92             sub snd_wt($$;@) {
93 1     1 0 443 my ($vpid, $msg, @args) = @_;
94 1 50       4 defined $vpid or carp("Argument vpid required"), return;
95 1 50       2 defined $msg or carp("Argument msg required"), return;
96 1         3 snd($vpid, $msg, @args);
97 1         4 wt($vpid, $msg);
98             }
99              
100              
101             sub listener($$;%) {
102 0     0 0 0 my ($host, $port, %args) = @_;
103 0 0       0 defined $host or carp("Argument host required"), return;
104 0 0       0 defined $port or carp("Argument port required"), return;
105 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
106 0 0       0 if ($sock) {
107 0 0       0 _pack_unpack($sock, %args) or return;
108 0         0 $listener{$sock} = $sock;
109             $fh2rw{$sock} = EV::io($sock, EV::READ, sub {
110 0     0   0 my $w = shift;
111 0         0 my $fh = $w->fh;
112 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
113 0         0 my $sock = $fh->accept;
114 0         0 $pack{$sock} = $pack{$fh};
115 0         0 $unpack{$sock} = $unpack{$fh};
116 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
117 0         0 my $vpid = refaddr $sock;
118 0         0 $node{$sock} = $vpid;
119 0         0 $fh2vpid{$sock} = $vpid;
120 0         0 $vpid2fh{$vpid} = $sock;
121 0         0 $fh2fh{$sock} = $sock;
122 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
123 0         0 });
124 0         0 return $sock;
125             } else {
126 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
127 0         0 return;
128             }
129             }
130              
131              
132             sub open_node($$;%) {
133 0     0 0 0 my ($host, $port, %args) = @_;
134 0 0       0 defined $host or carp("Argument host required"), return;
135 0 0       0 defined $port or carp("Argument port required"), return;
136 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
137 0         0 my $addr = sockaddr_in($port,inet_aton($host));
138 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
139 0         0 my $rv = $sock->connect($addr);
140 0 0       0 if ($rv) {
141 0 0       0 _pack_unpack($sock, %args) or return;
142 0         0 my $vpid = refaddr $sock;
143 0         0 $node{$sock} = $vpid;
144 0         0 $fh2vpid{$sock} = $vpid;
145 0         0 $vpid2fh{$vpid} = $sock;
146 0         0 $fh2fh{$sock} = $sock;
147 0         0 $fh2rw{$sock} = EV::io($sock, EV::READ, \&r_event_cb);
148 0         0 return $vpid;
149             } else {
150 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
151 0         0 return;
152             }
153             }
154              
155              
156             sub _pack_unpack($%) {
157 0     0   0 my ($fh, %args) = @_;
158 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
159 0         0 my $r = eval {
160 0         0 my $r = $unpack->($pack->({a => ["b"]}));
161 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
162             $$r{a}[0] and $$r{a}[0] eq "b")
163             {
164 0         0 return 1;
165             } else {
166 0         0 return 0;
167             }
168             };
169 0 0 0     0 if (not $r or $@) {
170 0         0 carp "False pack unpack test";
171 0         0 return;
172             }
173 0         0 $pack{$fh} = $pack;
174 0         0 $unpack{$fh} = $unpack;
175             } elsif ($args{pack} or $args{unpack}) {
176 0         0 carp "pack and unpack is pair options";
177 0         0 return;
178             }
179 0         0 return 1;
180             }
181              
182              
183             sub receive(&) {
184 3     3 0 72 my ($receive) = @_;
185              
186 3 50       9 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
187              
188 3         59 local $SIG{CHLD} = "IGNORE";
189 3         21 local $SIG{PIPE} = "IGNORE";
190              
191 3         7 foreach (@spawn) {
192 2         4 my ($vpid, $child, $parent, $spawn) = @$_;
193              
194 2         1182 my $kid_pid = fork;
195 2 50       65 defined $kid_pid or die "Can't fork: $!";
196              
197 2 100       40 unless ($kid_pid) {
198              
199 1         17 foreach (@spawn) {
200 1         47 close $$_[1];
201 1 50       16 close $$_[2] if $$_[2] ne $parent;
202             }
203              
204 1         19 close $_ foreach values %fh2fh, values %listener;
205 1         7 $_->stop foreach values %fh2rw, values %fh2ww;
206 1         6 @spawn = ();
207 1         11 %listener = ();
208 1         3 %node = ();
209 1         3 %msg = ();
210 1         2 %fh2vpid = ();
211 1         2 %vpid2fh = ();
212 1         2 %fh2fh = ();
213 1         32 %snd = ();
214              
215 1         3 %vpid2pid = ();
216              
217 1         6 $ipc_loop = 0;
218              
219 1         3 @rcv = ();
220 1         2 %r_bufs = ();
221 1         2 %w_bufs = ();
222              
223 1         3 %pack = ();
224 1         2 %unpack = ();
225              
226 1         7 %closed = ();
227              
228 1         2 %fh2rw = ();
229 1         2 %fh2ww = ();
230              
231 1         3 ($waited_vpid, $waited_msg, @waited_rv) = ();
232              
233 1         3 $self_parent_fh = $parent;
234 1         1 $self_parent_vpid = $self_vpid;
235              
236 1         3 $self_vpid = $vpid;
237              
238 1         5 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
239 1         7 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
240 1         13 $fh2fh{$self_parent_fh} = $self_parent_fh;
241              
242 1         50 $fh2rw{$self_parent_fh} = EV::io($self_parent_fh, EV::READ, \&r_event_cb);
243              
244 1         31 $spawn->();
245              
246 0         0 exit;
247             }
248             else {
249 1         30 $vpid2pid{$vpid} = $kid_pid;
250             }
251             }
252              
253              
254 2         9 foreach (@spawn) {
255 1         8 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
256 1         12 close $parent;
257 1         7 $fh2vpid{$child} = $vpid;
258 1         6 $vpid2fh{$vpid} = $child;
259 1         4 $fh2fh{$child} = $child;
260 1         38 $fh2rw{$child} = EV::io($child, EV::READ, \&r_event_cb);
261             }
262 2         41 @spawn = ();
263              
264              
265              
266 2         24 $receive->();
267              
268              
269              
270 2 50       6 unless ($ipc_loop) {
271 2         2 $ipc_loop = 1;
272 2         13 w_event_cb_reg();
273 2         3175 EV::loop;
274 0         0 $ipc_loop = 0;
275             }
276             }
277              
278              
279             sub wt($$) {
280 1     1 0 2 ($waited_vpid, $waited_msg) = @_;
281 1 50       4 defined $waited_vpid or carp("Argument vpid required"), return;
282 1 50       3 defined $waited_msg or carp("Argument msg required"), return;
283 1 50       3 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
284 1         8 foreach my $i (0 .. $#rcv) {
285 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
286 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
287 0         0 splice @rcv, $i, 1;
288 0 0       0 return wantarray ? @$args : $$args[0];
289             }
290             }
291 1 50       3 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
292 1         2 w_event_cb_reg();
293 1         8 EV::loop;
294 1         2 my @rv = @waited_rv;
295 1         1 ($waited_vpid, $waited_msg, @waited_rv) = ();
296 1 50       5 return wantarray ? @rv : $rv[0];
297             }
298              
299              
300             sub w_event_cb_reg {
301 26     26 0 30 my ($to_vpid) = @_;
302              
303 26 100       64 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
304 25 100       39 if (@{$snd{$to}}) {
  25         11346  
305 10         20 my $fh = $vpid2fh{$to};
306 10 100       17 unless ($fh) {
307 2 50       4 if (@spawn) {
308 2 50       4 carp "Probably have forgotten to call receive." if not defined $to_vpid;
309 2         4 next;
310             } else {
311 0 0       0 if ($self_parent_fh) {
312 0 0       0 unless ($self_parent_closed) {
313 0         0 $fh = $self_parent_fh;
314             } else {
315 0         0 next;
316             }
317             } else {
318 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
319 0         0 next;
320             }
321             }
322             }
323 8 50       21 unless (exists $w_bufs{$fh}) {
324 8         7 my $packet;
325 8 50       14 if (my $pack = $pack{$fh}) {
326 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
327             } else {
328 8         4 $packet = freeze shift @{$snd{$to}};
  8         31  
329             }
330 8         310 my $buf = join "", pack("N", length $packet), $packet;
331 8         16 $w_bufs{$fh} = $buf;
332 8 50 0     17 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
333 8         49 $fh2ww{$fh} = EV::io($fh, EV::WRITE, \&w_event_cb);
334             }
335             }
336             }
337             }
338              
339              
340              
341              
342             sub r_event_cb {
343 9     9 0 21 my $w = shift;
344 9         45 my $fh = $w->fh;
345              
346 9 50       28 $DEBUG > 1 and print "Read event from $self_vpid: \n";
347              
348 9         85 my $len = sysread $fh, (my $_buf), $blksize;
349 9 100       27 if ($len) {
    50          
350 8         22 $r_bufs{$fh} .= $_buf;
351             NEXT_MSG: {
352 8         7 my $buf = $r_bufs{$fh};
  8         13  
353 8 50       24 if (length $buf >= 4) {
354 8         42 my $packet_length = unpack "N", substr $buf, 0, 4, "";
355 8 50       30 if (length $buf >= $packet_length) {
356 8         32 my $packet = substr $buf, 0, $packet_length, "";
357 8   50     44 $r_bufs{$fh} = $buf || "";
358 8 50 0     18 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
359              
360 8         9 my ($from, $to, $msg, $args);
361 8 50       13 if (my $unpack = $unpack{$fh}) {
362 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
363             } else {
364 8         16 ($from, $to, $msg, $args) = @{thaw $packet};
  8         26  
365             }
366              
367 8 50       173 if ($node{$fh}) {
368 0         0 $from = $node{$fh};
369 0         0 $to = $self_vpid;
370             }
371              
372 8 50       13 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
373 8 50       22 if ($to == $self_vpid) {
    0          
374 8 50       14 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
375 8 100 66     34 if (defined $waited_vpid and defined $waited_msg) {
376 1         2 push @rcv, [$from, $msg, $args];
377             } else {
378 7 50       25 if ($msg{$msg}) {
379 7         22 push @rcv, [$from, $msg, $args];
380             } else {
381 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
382             }
383             }
384             } elsif ($vpid2fh{$to}) {
385 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
386 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
387 0         0 w_event_cb_reg();
388             } else {
389 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
390             }
391              
392 8 50       25 redo NEXT_MSG if $r_bufs{$fh};
393             }
394             }
395             }
396             } elsif (defined $len) {
397 1 50       8 if (exists $fh2ww{$fh}) {
398 0         0 delete $fh2ww{$fh};
399             }
400 1         4 my $vpid = delete $fh2vpid{$fh};
401 1         5 delete $vpid2fh{$vpid};
402 1         3 delete $fh2rw{$fh};
403 1         6 delete $r_bufs{$fh};
404 1         2 delete $w_bufs{$fh};
405 1         5 delete $fh2fh{$fh};
406 1         2 delete $vpid2pid{$vpid};
407 1         3 delete $pack{$fh};
408 1         4 delete $unpack{$fh};
409 1 50       8 if (my $node_vpid = $node{$fh}) {
410 0         0 delete $node{$fh};
411 0 0       0 if ($msg{NODE_CLOSED}) {
412 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
413 0         0 w_event_cb_reg();
414             }
415             } else {
416 1 50       6 if ($msg{SPAWN_CLOSED}) {
417 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
418 0         0 w_event_cb_reg();
419             }
420             }
421 1         2 delete $closed{$vpid};
422 1         28 close $fh;
423 1 50 33     13 if ($self_parent_fh and $self_parent_fh eq $fh) {
424 1         1 $self_parent_closed = 1;
425 1 50 33     4 unless (defined $waited_vpid and defined $waited_msg) {
426 1 50       71 unless (@rcv) {
427 1         128 exit;
428             }
429             }
430             }
431             } else {
432 0 0       0 $DEBUG and die "Can't read '$fh': $!";
433             }
434              
435 8 100 66     36 if (defined $waited_vpid and defined $waited_msg) {
436 1         4 foreach my $i (0 .. $#rcv) {
437 1         1 my ($from, $msg, $args)= @{$rcv[$i]};
  1         2  
438 1 50 33     11 if ($msg eq $waited_msg and $from eq $waited_vpid) {
439 1         2 splice @rcv, $i, 1;
440 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
441 1         2 @waited_rv = @$args;
442 1         2 EV::unloop();
443 1         3 return;
444             }
445             }
446 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
447 0         0 EV::unloop();
448 0         0 return;
449             }
450             } else {
451 7         16 while (my $rcv = shift @rcv) {
452 7         6 my ($from, $msg, $args)= @{$rcv};
  7         10  
453 7 50       27 $msg{$msg}->($from, @$args) unless $closed{$from};
454 6         13 w_event_cb_reg();
455             }
456             }
457             }
458              
459              
460              
461             sub w_event_cb {
462 8     8 0 9 my $w = shift;
463 8         17 my $fh = $w->fh;
464              
465 8 50       17 $DEBUG > 1 and print "Write event from $self_vpid: \n";
466 8 50       16 $fh2fh{$fh} or return;
467              
468 8         10 my $buf = $w_bufs{$fh};
469 8         91 my $len = syswrite $fh, $buf, $blksize;
470 8 50       22 if ($len) {
471 8         13 substr $buf, 0, $len, "";
472 8 50       14 if (length $buf) {
473 0         0 $w_bufs{$fh} = $buf;
474             } else {
475 8         17 delete $w_bufs{$fh};
476 8         5 delete $fh2ww{$fh};
477 8         12 w_event_cb_reg();
478             }
479             } else {
480 0 0         $DEBUG and die "Can't write to '$fh': $!";
481             }
482             }
483              
484              
485              
486             1;
487              
488              
489             __END__