File Coverage

blib/lib/IPC/MPS/Event.pm
Criterion Covered Total %
statement 210 308 68.1
branch 63 164 38.4
condition 9 50 18.0
subroutine 20 25 80.0
pod 0 13 0.0
total 302 560 53.9


line stmt bran cond sub pod time code
1             package IPC::MPS::Event;
2              
3 2     2   125166 use strict;
  2         22  
  2         48  
4 2     2   8 use warnings;
  2         2  
  2         48  
5              
6 2     2   8 use Exporter;
  2         2  
  2         144  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.21';
11              
12 2     2   12 use Carp;
  2         2  
  2         78  
13 2     2   26 use Event;
  2         4  
  2         22  
14 2     2   3992 use Socket qw(SOCK_NONBLOCK);
  2         5828  
  2         254  
15 2     2   766 use IO::Socket;
  2         27514  
  2         8  
16 2     2   668 use Scalar::Util qw(refaddr);
  2         4  
  2         114  
17 2     2   1066 use Storable qw(freeze thaw);
  2         5104  
  2         5618  
18              
19              
20             my $DEBUG = 0;
21             $DEBUG and require Data::Dumper;
22              
23             my @spawn = ();
24             my %msg = ();
25             my %fh2vpid = ();
26             my %vpid2fh = ();
27             my %fh2fh = ();
28             my $self_vpid = 0;
29             my $self_parent_fh;
30             my $self_parent_vpid = 0;
31             my $self_parent_closed = 0;
32             my %listener = ();
33             my %node = ();
34             my %snd = ();
35             my $ipc_loop = 0;
36              
37             my %vpid2pid = ();
38 1     1 0 17 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         25  
39              
40             my @rcv = ();
41             my %r_bufs = ();
42             my %w_bufs = ();
43              
44             my %pack = ();
45             my %unpack = ();
46              
47             my %closed = ();
48              
49             my %fh2ww = ();
50              
51             my ($waited_vpid, $waited_msg, @waited_rv);
52              
53             my $blksize = 1024 * 16;
54              
55              
56             END {
57 2 50 0 2   152 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
58 2         57 close $_ foreach values %fh2fh;
59             }
60              
61             sub spawn(&) {
62 2     2 0 144 my ($spawn) = @_;
63 2 50       128 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM|SOCK_NONBLOCK, PF_UNSPEC) or die "socketpair: $!";
64 2         12 my $vpid = refaddr $child;
65 2         6 push @spawn, [$vpid, $child, $parent, $spawn];
66 2         8 return $vpid;
67             }
68              
69              
70             sub msg($$) {
71 3     3 0 56 my ($msg, $sub) = @_;
72 3         10 $msg{$msg} = $sub;
73             }
74              
75              
76             sub snd($$;@) {
77 9     9 0 1281 my ($vpid, $msg, @args) = @_;
78 9 50       23 defined $vpid or carp("Argument vpid required"), return;
79 9 50       19 defined $msg or carp("Argument msg required"), return;
80 9 100       34 $vpid = $self_parent_vpid if $vpid == 0;
81 9 50       16 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
82 9         13 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         31  
83 9 50 33     54 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
84 9         28 w_event_cb_reg($vpid);
85 9         649 return 1;
86             }
87              
88              
89 0     0 0 0 sub quit() { Event::unloop }
90              
91              
92             sub snd_wt($$;@) {
93 1     1 0 536 my ($vpid, $msg, @args) = @_;
94 1 50       35 defined $vpid or carp("Argument vpid required"), return;
95 1 50       19 defined $msg or carp("Argument msg required"), return;
96 1         5 snd($vpid, $msg, @args);
97 1         5 wt($vpid, $msg);
98             }
99              
100              
101             sub listener($$;%) {
102 0     0 0 0 my ($host, $port, %args) = @_;
103 0 0       0 defined $host or carp("Argument host required"), return;
104 0 0       0 defined $port or carp("Argument port required"), return;
105 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
106 0 0       0 if ($sock) {
107 0 0       0 _pack_unpack($sock, %args) or return;
108 0         0 $listener{$sock} = $sock;
109             Event->io(fd => $sock, poll => "r", cb => sub {
110 0     0   0 my ($event) = @_;
111 0         0 my $fh = $event->w->fd;
112 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
113 0         0 my $sock = $fh->accept;
114 0         0 $pack{$sock} = $pack{$fh};
115 0         0 $unpack{$sock} = $unpack{$fh};
116 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
117 0         0 my $vpid = refaddr $sock;
118 0         0 $node{$sock} = $vpid;
119 0         0 $fh2vpid{$sock} = $vpid;
120 0         0 $vpid2fh{$vpid} = $sock;
121 0         0 $fh2fh{$sock} = $sock;
122 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
123 0         0 });
124 0         0 return $sock;
125             } else {
126 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
127 0         0 return;
128             }
129             }
130              
131              
132             sub open_node($$;%) {
133 0     0 0 0 my ($host, $port, %args) = @_;
134 0 0       0 defined $host or carp("Argument host required"), return;
135 0 0       0 defined $port or carp("Argument port required"), return;
136 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
137 0         0 my $addr = sockaddr_in($port,inet_aton($host));
138 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
139 0         0 my $rv = $sock->connect($addr);
140 0 0       0 if ($rv) {
141 0 0       0 _pack_unpack($sock, %args) or return;
142 0         0 my $vpid = refaddr $sock;
143 0         0 $node{$sock} = $vpid;
144 0         0 $fh2vpid{$sock} = $vpid;
145 0         0 $vpid2fh{$vpid} = $sock;
146 0         0 $fh2fh{$sock} = $sock;
147 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
148 0         0 return $vpid;
149             } else {
150 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
151 0         0 return;
152             }
153             }
154              
155              
156             sub _pack_unpack($%) {
157 0     0   0 my ($fh, %args) = @_;
158 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
159 0         0 my $r = eval {
160 0         0 my $r = $unpack->($pack->({a => ["b"]}));
161 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
162             $$r{a}[0] and $$r{a}[0] eq "b")
163             {
164 0         0 return 1;
165             } else {
166 0         0 return 0;
167             }
168             };
169 0 0 0     0 if (not $r or $@) {
170 0         0 carp "False pack unpack test";
171 0         0 return;
172             }
173 0         0 $pack{$fh} = $pack;
174 0         0 $unpack{$fh} = $unpack;
175             } elsif ($args{pack} or $args{unpack}) {
176 0         0 carp "pack and unpack is pair options";
177 0         0 return;
178             }
179 0         0 return 1;
180             }
181              
182              
183             sub receive(&) {
184 3     3 0 77 my ($receive) = @_;
185              
186 3 50       11 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
187              
188 3         87 local $SIG{CHLD} = "IGNORE";
189 3         46 local $SIG{PIPE} = "IGNORE";
190              
191 3         11 foreach (@spawn) {
192 2         8 my ($vpid, $child, $parent, $spawn) = @$_;
193              
194 2         1622 my $kid_pid = fork;
195 2 50       121 defined $kid_pid or die "Can't fork: $!";
196              
197 2 100       64 unless ($kid_pid) {
198              
199 1         28 foreach (@spawn) {
200 1         38 close $$_[1];
201 1 50       26 close $$_[2] if $$_[2] ne $parent;
202             }
203              
204 1         17 close $_ foreach values %fh2fh, values %listener;
205 1         31 $_->cancel foreach Event::all_watchers();
206 1         23 @spawn = ();
207 1         6 %listener = ();
208 1         8 %node = ();
209 1         9 %msg = ();
210 1         3 %fh2vpid = ();
211 1         9 %vpid2fh = ();
212 1         6 %fh2fh = ();
213 1         53 %snd = ();
214              
215 1         3 %vpid2pid = ();
216              
217 1         12 $ipc_loop = 0;
218              
219 1         4 @rcv = ();
220 1         7 %r_bufs = ();
221 1         6 %w_bufs = ();
222              
223 1         13 %pack = ();
224 1         2 %unpack = ();
225              
226 1         2 %closed = ();
227              
228 1         2 %fh2ww = ();
229              
230 1         3 ($waited_vpid, $waited_msg, @waited_rv) = ();
231              
232 1         1 $self_parent_fh = $parent;
233 1         5 $self_parent_vpid = $self_vpid;
234              
235 1         3 $self_vpid = $vpid;
236              
237 1         6 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
238 1         16 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
239 1         3 $fh2fh{$self_parent_fh} = $self_parent_fh;
240              
241 1         51 Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);
242              
243 1         541 $spawn->();
244              
245 0         0 exit;
246             }
247             else {
248 1         42 $vpid2pid{$vpid} = $kid_pid;
249             }
250             }
251              
252              
253 2         16 foreach (@spawn) {
254 1         22 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
255 1         14 close $parent;
256 1         15 $fh2vpid{$child} = $vpid;
257 1         12 $vpid2fh{$vpid} = $child;
258 1         8 $fh2fh{$child} = $child;
259 1         73 Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
260             }
261 2         648 @spawn = ();
262              
263              
264              
265 2         8 $receive->();
266              
267              
268              
269 2 50       6 unless ($ipc_loop) {
270 2         3 $ipc_loop = 1;
271 2         34 w_event_cb_reg();
272 2         124 Event::loop();
273 0         0 $ipc_loop = 0;
274             }
275             }
276              
277              
278             sub wt($$) {
279 1     1 0 2 ($waited_vpid, $waited_msg) = @_;
280 1 50       3 defined $waited_vpid or carp("Argument vpid required"), return;
281 1 50       4 defined $waited_msg or carp("Argument msg required"), return;
282 1 50       2 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
283 1         9 foreach my $i (0 .. $#rcv) {
284 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
285 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
286 0         0 splice @rcv, $i, 1;
287 0 0       0 return wantarray ? @$args : $$args[0];
288             }
289             }
290 1 50       4 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
291 1         3 w_event_cb_reg();
292 1         12 Event::loop();
293 1         16 my @rv = @waited_rv;
294 1         3 ($waited_vpid, $waited_msg, @waited_rv) = ();
295 1 50       6 return wantarray ? @rv : $rv[0];
296             }
297              
298              
299             sub w_event_cb_reg {
300 26     26 0 44 my ($to_vpid) = @_;
301              
302 26 100       75 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
303 25 100       31 if (@{$snd{$to}}) {
  25         6235  
304 10         18 my $fh = $vpid2fh{$to};
305 10 100       15 unless ($fh) {
306 2 50       4 if (@spawn) {
307 2 50       4 carp "Probably have forgotten to call receive." if not defined $to_vpid;
308 2         8 next;
309             } else {
310 0 0       0 if ($self_parent_fh) {
311 0 0       0 unless ($self_parent_closed) {
312 0         0 $fh = $self_parent_fh;
313             } else {
314 0         0 next;
315             }
316             } else {
317 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
318 0         0 next;
319             }
320             }
321             }
322 8 50       17 unless (exists $w_bufs{$fh}) {
323 8         18 my $packet;
324 8 50       18 if (my $pack = $pack{$fh}) {
325 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
326             } else {
327 8         10 $packet = freeze shift @{$snd{$to}};
  8         41  
328             }
329 8         518 my $buf = join "", pack("N", length $packet), $packet;
330 8         22 $w_bufs{$fh} = $buf;
331 8 50 0     15 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
332 8         32 $fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
333             }
334             }
335             }
336             }
337              
338              
339              
340              
341             sub r_event_cb {
342 9     9 0 1262 my ($event) = @_;
343 9         82 my $fh = $event->w->fd;
344              
345 9 50       25 $DEBUG > 1 and print "Read event from $self_vpid: \n";
346              
347 9         117 my $len = sysread $fh, (my $_buf), $blksize;
348 9 100       31 if ($len) {
    50          
349 8         27 $r_bufs{$fh} .= $_buf;
350             NEXT_MSG: {
351 8         11 my $buf = $r_bufs{$fh};
  8         18  
352 8 50       24 if (length $buf >= 4) {
353 8         37 my $packet_length = unpack "N", substr $buf, 0, 4, "";
354 8 50       18 if (length $buf >= $packet_length) {
355 8         18 my $packet = substr $buf, 0, $packet_length, "";
356 8   50     53 $r_bufs{$fh} = $buf || "";
357 8 50 0     22 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
358              
359 8         14 my ($from, $to, $msg, $args);
360 8 50       17 if (my $unpack = $unpack{$fh}) {
361 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
362             } else {
363 8         8 ($from, $to, $msg, $args) = @{thaw $packet};
  8         29  
364             }
365              
366 8 50       212 if ($node{$fh}) {
367 0         0 $from = $node{$fh};
368 0         0 $to = $self_vpid;
369             }
370              
371 8 50       16 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
372 8 50       20 if ($to == $self_vpid) {
    0          
373 8 50       30 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
374 8 100 66     28 if (defined $waited_vpid and defined $waited_msg) {
375 1         4 push @rcv, [$from, $msg, $args];
376             } else {
377 7 50       19 if ($msg{$msg}) {
378 7         19 push @rcv, [$from, $msg, $args];
379             } else {
380 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
381             }
382             }
383             } elsif ($vpid2fh{$to}) {
384 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
385 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
386 0         0 w_event_cb_reg();
387             } else {
388 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
389             }
390              
391 8 50       28 redo NEXT_MSG if $r_bufs{$fh};
392             }
393             }
394             }
395             } elsif (defined $len) {
396 1 50       4 if (exists $fh2ww{$fh}) {
397 0         0 $fh2ww{$fh}->cancel;
398 0         0 delete $fh2ww{$fh};
399             }
400 1         5 $event->w->cancel;
401 1         5 my $vpid = delete $fh2vpid{$fh};
402 1         2 delete $vpid2fh{$vpid};
403 1         3 delete $r_bufs{$fh};
404 1         2 delete $w_bufs{$fh};
405 1         2 delete $fh2fh{$fh};
406 1         2 delete $vpid2pid{$vpid};
407 1         2 delete $pack{$fh};
408 1         2 delete $unpack{$fh};
409 1 50       3 if (my $node_vpid = $node{$fh}) {
410 0         0 delete $node{$fh};
411 0 0       0 if ($msg{NODE_CLOSED}) {
412 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
413 0         0 w_event_cb_reg();
414             }
415             } else {
416 1 50       5 if ($msg{SPAWN_CLOSED}) {
417 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
418 0         0 w_event_cb_reg();
419             }
420             }
421 1         2 delete $closed{$vpid};
422 1         17 close $fh;
423 1 50 33     23 if ($self_parent_fh and $self_parent_fh eq $fh) {
424 1         2 $self_parent_closed = 1;
425 1 50 33     4 unless (defined $waited_vpid and defined $waited_msg) {
426 1 50       3 unless (@rcv) {
427 1         223 exit;
428             }
429             }
430             }
431             } else {
432 0 0       0 $DEBUG and die "Can't read '$fh': $!";
433             }
434              
435 8 100 66     29 if (defined $waited_vpid and defined $waited_msg) {
436 1         4 foreach my $i (0 .. $#rcv) {
437 1         1 my ($from, $msg, $args)= @{$rcv[$i]};
  1         3  
438 1 50 33     14 if ($msg eq $waited_msg and $from eq $waited_vpid) {
439 1         3 splice @rcv, $i, 1;
440 1 50       3 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
441 1         2 @waited_rv = @$args;
442 1         6 Event::unloop();
443 1         6 return;
444             }
445             }
446 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
447 0         0 Event::unloop();
448 0         0 return;
449             }
450             } else {
451 7         17 while (my $rcv = shift @rcv) {
452 7         8 my ($from, $msg, $args)= @{$rcv};
  7         20  
453 7 50       29 $msg{$msg}->($from, @$args) unless $closed{$from};
454 6         11 w_event_cb_reg();
455             }
456             }
457             }
458              
459              
460              
461             sub w_event_cb {
462 8     8 0 70 my ($event) = @_;
463 8         48 my $fh = $event->w->fd;
464              
465 8 50       22 $DEBUG > 1 and print "Write event from $self_vpid.\n";
466 8 50       23 $fh2fh{$fh} or return;
467              
468 8         16 my $buf = $w_bufs{$fh};
469 8         273 my $len = syswrite $fh, $buf, $blksize;
470 8 50       27 if ($len) {
471 8         18 substr $buf, 0, $len, "";
472 8 50       15 if (length $buf) {
473 0         0 $w_bufs{$fh} = $buf;
474             } else {
475 8         18 delete $w_bufs{$fh};
476 8         42 $event->w->cancel;
477 8         31 delete $fh2ww{$fh};
478 8         18 w_event_cb_reg();
479             }
480             } else {
481 0 0         $DEBUG and die "Can't write to '$fh': $!";
482             }
483             }
484              
485              
486              
487             1;
488              
489              
490             __END__