File Coverage

blib/lib/IPC/MPS/Event.pm
Criterion Covered Total %
statement 207 305 67.8
branch 63 164 38.4
condition 9 50 18.0
subroutine 19 24 79.1
pod 0 13 0.0
total 298 556 53.6


line stmt bran cond sub pod time code
1             package IPC::MPS::Event;
2              
3 2     2   40944 use strict;
  2         2  
  2         84  
4 2     2   8 use warnings;
  2         2  
  2         40  
5              
6 2     2   6 use Exporter;
  2         2  
  2         120  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.19';
11              
12 2     2   10 use Carp;
  2         2  
  2         78  
13 2     2   14 use Event;
  2         2  
  2         8  
14 2     2   3620 use IO::Socket;
  2         31250  
  2         8  
15 2     2   674 use Scalar::Util qw(refaddr);
  2         2  
  2         124  
16 2     2   1036 use Storable qw(freeze thaw);
  2         4268  
  2         4426  
17              
18              
19             my $DEBUG = 0;
20             $DEBUG and require Data::Dumper;
21              
22             my @spawn = ();
23             my %msg = ();
24             my %fh2vpid = ();
25             my %vpid2fh = ();
26             my %fh2fh = ();
27             my $self_vpid = 0;
28             my $self_parent_fh;
29             my $self_parent_vpid = 0;
30             my $self_parent_closed = 0;
31             my %listener = ();
32             my %node = ();
33             my %snd = ();
34             my $ipc_loop = 0;
35              
36             my %vpid2pid = ();
37 1     1 0 9 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         18  
38              
39             my @rcv = ();
40             my %r_bufs = ();
41             my %w_bufs = ();
42              
43             my %pack = ();
44             my %unpack = ();
45              
46             my %closed = ();
47              
48             my %fh2ww = ();
49              
50             my ($waited_vpid, $waited_msg, @waited_rv);
51              
52             my $blksize = 1024 * 16;
53              
54              
55             END {
56 2 50 0 2   8 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
57 2         37 close $_ foreach values %fh2fh;
58             }
59              
60             sub spawn(&) {
61 2     2 0 16 my ($spawn) = @_;
62 2 50       76 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
63 2         10 my $vpid = refaddr $child;
64 2         8 push @spawn, [$vpid, $child, $parent, $spawn];
65 2         4 return $vpid;
66             }
67              
68              
69             sub msg($$) {
70 3     3 0 27 my ($msg, $sub) = @_;
71 3         12 $msg{$msg} = $sub;
72             }
73              
74              
75             sub snd($$;@) {
76 9     9 0 946 my ($vpid, $msg, @args) = @_;
77 9 50       23 defined $vpid or carp("Argument vpid required"), return;
78 9 50       22 defined $msg or carp("Argument msg required"), return;
79 9 100       23 $vpid = $self_parent_vpid if $vpid == 0;
80 9 50       17 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
81 9         10 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         28  
82 9 50 33     64 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
83 9         20 w_event_cb_reg($vpid);
84 9         591 return 1;
85             }
86              
87              
88 0     0 0 0 sub quit() { Event::unloop }
89              
90              
91             sub snd_wt($$;@) {
92 1     1 0 396 my ($vpid, $msg, @args) = @_;
93 1 50       5 defined $vpid or carp("Argument vpid required"), return;
94 1 50       3 defined $msg or carp("Argument msg required"), return;
95 1         3 snd($vpid, $msg, @args);
96 1         4 wt($vpid, $msg);
97             }
98              
99              
100             sub listener($$;%) {
101 0     0 0 0 my ($host, $port, %args) = @_;
102 0 0       0 defined $host or carp("Argument host required"), return;
103 0 0       0 defined $port or carp("Argument port required"), return;
104 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
105 0 0       0 if ($sock) {
106 0 0       0 _pack_unpack($sock, %args) or return;
107 0         0 $listener{$sock} = $sock;
108             Event->io(fd => $sock, poll => "r", cb => sub {
109 0     0   0 my ($event) = @_;
110 0         0 my $fh = $event->w->fd;
111 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
112 0         0 my $sock = $fh->accept;
113 0         0 $pack{$sock} = $pack{$fh};
114 0         0 $unpack{$sock} = $unpack{$fh};
115 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
116 0         0 my $vpid = refaddr $sock;
117 0         0 $node{$sock} = $vpid;
118 0         0 $fh2vpid{$sock} = $vpid;
119 0         0 $vpid2fh{$vpid} = $sock;
120 0         0 $fh2fh{$sock} = $sock;
121 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
122 0         0 });
123 0         0 return $sock;
124             } else {
125 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
126 0         0 return;
127             }
128             }
129              
130              
131             sub open_node($$;%) {
132 0     0 0 0 my ($host, $port, %args) = @_;
133 0 0       0 defined $host or carp("Argument host required"), return;
134 0 0       0 defined $port or carp("Argument port required"), return;
135 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
136 0         0 my $addr = sockaddr_in($port,inet_aton($host));
137 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
138 0         0 my $rv = $sock->connect($addr);
139 0 0       0 if ($rv) {
140 0 0       0 _pack_unpack($sock, %args) or return;
141 0         0 my $vpid = refaddr $sock;
142 0         0 $node{$sock} = $vpid;
143 0         0 $fh2vpid{$sock} = $vpid;
144 0         0 $vpid2fh{$vpid} = $sock;
145 0         0 $fh2fh{$sock} = $sock;
146 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
147 0         0 return $vpid;
148             } else {
149 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
150 0         0 return;
151             }
152             }
153              
154              
155             sub _pack_unpack($%) {
156 0     0   0 my ($fh, %args) = @_;
157 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
158 0         0 my $r = eval {
159 0         0 my $r = $unpack->($pack->({a => ["b"]}));
160 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
161             $$r{a}[0] and $$r{a}[0] eq "b")
162             {
163 0         0 return 1;
164             } else {
165 0         0 return 0;
166             }
167             };
168 0 0 0     0 if (not $r or $@) {
169 0         0 carp "False pack unpack test";
170 0         0 return;
171             }
172 0         0 $pack{$fh} = $pack;
173 0         0 $unpack{$fh} = $unpack;
174             } elsif ($args{pack} or $args{unpack}) {
175 0         0 carp "pack and unpack is pair options";
176 0         0 return;
177             }
178 0         0 return 1;
179             }
180              
181              
182             sub receive(&) {
183 3     3 0 46 my ($receive) = @_;
184              
185 3 50       9 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
186              
187 3         53 local $SIG{CHLD} = "IGNORE";
188 3         20 local $SIG{PIPE} = "IGNORE";
189              
190 3         7 foreach (@spawn) {
191 2         10 my ($vpid, $child, $parent, $spawn) = @$_;
192              
193 2         1201 my $kid_pid = fork;
194 2 50       62 defined $kid_pid or die "Can't fork: $!";
195              
196 2 100       38 unless ($kid_pid) {
197              
198 1         23 foreach (@spawn) {
199 1         63 close $$_[1];
200 1 50       24 close $$_[2] if $$_[2] ne $parent;
201             }
202              
203 1         15 close $_ foreach values %fh2fh, values %listener;
204 1         17 $_->cancel foreach Event::all_watchers();
205 1         17 @spawn = ();
206 1         6 %listener = ();
207 1         4 %node = ();
208 1         2 %msg = ();
209 1         8 %fh2vpid = ();
210 1         7 %vpid2fh = ();
211 1         10 %fh2fh = ();
212 1         51 %snd = ();
213              
214 1         7 %vpid2pid = ();
215              
216 1         3 $ipc_loop = 0;
217              
218 1         3 @rcv = ();
219 1         3 %r_bufs = ();
220 1         3 %w_bufs = ();
221              
222 1         3 %pack = ();
223 1         3 %unpack = ();
224              
225 1         1 %closed = ();
226              
227 1         2 %fh2ww = ();
228              
229 1         2 ($waited_vpid, $waited_msg, @waited_rv) = ();
230              
231 1         2 $self_parent_fh = $parent;
232 1         8 $self_parent_vpid = $self_vpid;
233              
234 1         1 $self_vpid = $vpid;
235              
236 1         7 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
237 1         8 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
238 1         4 $fh2fh{$self_parent_fh} = $self_parent_fh;
239              
240 1         50 Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);
241              
242 1         556 $spawn->();
243              
244 0         0 exit;
245             }
246             else {
247 1         32 $vpid2pid{$vpid} = $kid_pid;
248             }
249             }
250              
251              
252 2         8 foreach (@spawn) {
253 1         10 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
254 1         19 close $parent;
255 1         8 $fh2vpid{$child} = $vpid;
256 1         5 $vpid2fh{$vpid} = $child;
257 1         7 $fh2fh{$child} = $child;
258 1         35 Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
259             }
260 2         397 @spawn = ();
261              
262              
263              
264 2         8 $receive->();
265              
266              
267              
268 2 50       9 unless ($ipc_loop) {
269 2         11 $ipc_loop = 1;
270 2         17 w_event_cb_reg();
271 2         121 Event::loop();
272 0         0 $ipc_loop = 0;
273             }
274             }
275              
276              
277             sub wt($$) {
278 1     1 0 2 ($waited_vpid, $waited_msg) = @_;
279 1 50       4 defined $waited_vpid or carp("Argument vpid required"), return;
280 1 50       4 defined $waited_msg or carp("Argument msg required"), return;
281 1 50       3 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
282 1         12 foreach my $i (0 .. $#rcv) {
283 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
284 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
285 0         0 splice @rcv, $i, 1;
286 0 0       0 return wantarray ? @$args : $$args[0];
287             }
288             }
289 1 50       4 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
290 1         3 w_event_cb_reg();
291 1         12 Event::loop();
292 1         11 my @rv = @waited_rv;
293 1         2 ($waited_vpid, $waited_msg, @waited_rv) = ();
294 1 50       6 return wantarray ? @rv : $rv[0];
295             }
296              
297              
298             sub w_event_cb_reg {
299 26     26 0 32 my ($to_vpid) = @_;
300              
301 26 100       79 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
302 25 100       24 if (@{$snd{$to}}) {
  25         4674  
303 10         15 my $fh = $vpid2fh{$to};
304 10 100       16 unless ($fh) {
305 2 50       6 if (@spawn) {
306 2 50       4 carp "Probably have forgotten to call receive." if not defined $to_vpid;
307 2         4 next;
308             } else {
309 0 0       0 if ($self_parent_fh) {
310 0 0       0 unless ($self_parent_closed) {
311 0         0 $fh = $self_parent_fh;
312             } else {
313 0         0 next;
314             }
315             } else {
316 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
317 0         0 next;
318             }
319             }
320             }
321 8 50       23 unless (exists $w_bufs{$fh}) {
322 8         9 my $packet;
323 8 50       17 if (my $pack = $pack{$fh}) {
324 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
325             } else {
326 8         9 $packet = freeze shift @{$snd{$to}};
  8         44  
327             }
328 8         320 my $buf = join "", pack("N", length $packet), $packet;
329 8         18 $w_bufs{$fh} = $buf;
330 8 50 0     24 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
331 8         30 $fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
332             }
333             }
334             }
335             }
336              
337              
338              
339              
340             sub r_event_cb {
341 9     9 0 520 my ($event) = @_;
342 9         47 my $fh = $event->w->fd;
343              
344 9 50       36 $DEBUG > 1 and print "Read event from $self_vpid: \n";
345              
346 9         77 my $len = sysread $fh, (my $_buf), $blksize;
347 9 100       28 if ($len) {
    50          
348 8         21 $r_bufs{$fh} .= $_buf;
349             NEXT_MSG: {
350 8         8 my $buf = $r_bufs{$fh};
  8         16  
351 8 50       20 if (length $buf >= 4) {
352 8         35 my $packet_length = unpack "N", substr $buf, 0, 4, "";
353 8 50       17 if (length $buf >= $packet_length) {
354 8         15 my $packet = substr $buf, 0, $packet_length, "";
355 8   50     53 $r_bufs{$fh} = $buf || "";
356 8 50 0     12 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
357              
358 8         8 my ($from, $to, $msg, $args);
359 8 50       16 if (my $unpack = $unpack{$fh}) {
360 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
361             } else {
362 8         9 ($from, $to, $msg, $args) = @{thaw $packet};
  8         27  
363             }
364              
365 8 50       195 if ($node{$fh}) {
366 0         0 $from = $node{$fh};
367 0         0 $to = $self_vpid;
368             }
369              
370 8 50       22 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
371 8 50       21 if ($to == $self_vpid) {
    0          
372 8 50       14 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
373 8 100 66     52 if (defined $waited_vpid and defined $waited_msg) {
374 1         4 push @rcv, [$from, $msg, $args];
375             } else {
376 7 50       11 if ($msg{$msg}) {
377 7         20 push @rcv, [$from, $msg, $args];
378             } else {
379 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
380             }
381             }
382             } elsif ($vpid2fh{$to}) {
383 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
384 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
385 0         0 w_event_cb_reg();
386             } else {
387 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
388             }
389              
390 8 50       28 redo NEXT_MSG if $r_bufs{$fh};
391             }
392             }
393             }
394             } elsif (defined $len) {
395 1 50       8 if (exists $fh2ww{$fh}) {
396 0         0 $fh2ww{$fh}->cancel;
397 0         0 delete $fh2ww{$fh};
398             }
399 1         11 $event->w->cancel;
400 1         9 my $vpid = delete $fh2vpid{$fh};
401 1         4 delete $vpid2fh{$vpid};
402 1         4 delete $r_bufs{$fh};
403 1         3 delete $w_bufs{$fh};
404 1         54 delete $fh2fh{$fh};
405 1         3 delete $vpid2pid{$vpid};
406 1         1 delete $pack{$fh};
407 1         2 delete $unpack{$fh};
408 1 50       4 if (my $node_vpid = $node{$fh}) {
409 0         0 delete $node{$fh};
410 0 0       0 if ($msg{NODE_CLOSED}) {
411 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
412 0         0 w_event_cb_reg();
413             }
414             } else {
415 1 50       4 if ($msg{SPAWN_CLOSED}) {
416 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
417 0         0 w_event_cb_reg();
418             }
419             }
420 1         1 delete $closed{$vpid};
421 1         28 close $fh;
422 1 50 33     25 if ($self_parent_fh and $self_parent_fh eq $fh) {
423 1         1 $self_parent_closed = 1;
424 1 50 33     5 unless (defined $waited_vpid and defined $waited_msg) {
425 1 50       4 unless (@rcv) {
426 1         107 exit;
427             }
428             }
429             }
430             } else {
431 0 0       0 $DEBUG and die "Can't read '$fh': $!";
432             }
433              
434 8 100 66     23 if (defined $waited_vpid and defined $waited_msg) {
435 1         4 foreach my $i (0 .. $#rcv) {
436 1         1 my ($from, $msg, $args)= @{$rcv[$i]};
  1         2  
437 1 50 33     8 if ($msg eq $waited_msg and $from eq $waited_vpid) {
438 1         2 splice @rcv, $i, 1;
439 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
440 1         1 @waited_rv = @$args;
441 1         4 Event::unloop();
442 1         5 return;
443             }
444             }
445 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
446 0         0 Event::unloop();
447 0         0 return;
448             }
449             } else {
450 7         26 while (my $rcv = shift @rcv) {
451 7         8 my ($from, $msg, $args)= @{$rcv};
  7         14  
452 7 50       30 $msg{$msg}->($from, @$args) unless $closed{$from};
453 6         15 w_event_cb_reg();
454             }
455             }
456             }
457              
458              
459              
460             sub w_event_cb {
461 8     8 0 52 my ($event) = @_;
462 8         57 my $fh = $event->w->fd;
463              
464 8 50       20 $DEBUG > 1 and print "Write event from $self_vpid: \n";
465 8 50       18 $fh2fh{$fh} or return;
466              
467 8         14 my $buf = $w_bufs{$fh};
468 8         224 my $len = syswrite $fh, $buf, $blksize;
469 8 50       20 if ($len) {
470 8         15 substr $buf, 0, $len, "";
471 8 50       15 if (length $buf) {
472 0         0 $w_bufs{$fh} = $buf;
473             } else {
474 8         19 delete $w_bufs{$fh};
475 8         45 $event->w->cancel;
476 8         39 delete $fh2ww{$fh};
477 8         16 w_event_cb_reg();
478             }
479             } else {
480 0 0         $DEBUG and die "Can't write to '$fh': $!";
481             }
482             }
483              
484              
485              
486             1;
487              
488              
489             __END__