File Coverage

blib/lib/IPC/MPS/Event.pm
Criterion Covered Total %
statement 207 305 67.8
branch 63 164 38.4
condition 9 50 18.0
subroutine 19 24 79.1
pod 0 13 0.0
total 298 556 53.6


line stmt bran cond sub pod time code
1             package IPC::MPS::Event;
2              
3 2     2   242798 use strict;
  2         22  
  2         58  
4 2     2   14 use warnings;
  2         4  
  2         58  
5              
6 2     2   10 use Exporter;
  2         2  
  2         174  
7             our @ISA = qw(Exporter);
8             our @EXPORT = qw(spawn receive msg snd quit wt snd_wt listener open_node vpid2pid);
9              
10             our $VERSION = '0.20';
11              
12 2     2   14 use Carp;
  2         2  
  2         94  
13 2     2   32 use Event;
  2         4  
  2         14  
14 2     2   5320 use IO::Socket;
  2         41858  
  2         10  
15 2     2   854 use Scalar::Util qw(refaddr);
  2         4  
  2         130  
16 2     2   1232 use Storable qw(freeze thaw);
  2         6340  
  2         6938  
17              
18              
19             my $DEBUG = 0;
20             $DEBUG and require Data::Dumper;
21              
22             my @spawn = ();
23             my %msg = ();
24             my %fh2vpid = ();
25             my %vpid2fh = ();
26             my %fh2fh = ();
27             my $self_vpid = 0;
28             my $self_parent_fh;
29             my $self_parent_vpid = 0;
30             my $self_parent_closed = 0;
31             my %listener = ();
32             my %node = ();
33             my %snd = ();
34             my $ipc_loop = 0;
35              
36             my %vpid2pid = ();
37 1     1 0 27 sub vpid2pid { my ($vpid) = @_; $vpid2pid{$vpid} }
  1         27  
38              
39             my @rcv = ();
40             my %r_bufs = ();
41             my %w_bufs = ();
42              
43             my %pack = ();
44             my %unpack = ();
45              
46             my %closed = ();
47              
48             my %fh2ww = ();
49              
50             my ($waited_vpid, $waited_msg, @waited_rv);
51              
52             my $blksize = 1024 * 16;
53              
54              
55             END {
56 2 50 0 2   197 $ipc_loop or @spawn and carp "Probably have forgotten to call receive.";
57 2         70 close $_ foreach values %fh2fh;
58             }
59              
60             sub spawn(&) {
61 2     2 0 182 my ($spawn) = @_;
62 2 50       234 socketpair(my $child, my $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) or die "socketpair: $!";
63 2         16 my $vpid = refaddr $child;
64 2         10 push @spawn, [$vpid, $child, $parent, $spawn];
65 2         8 return $vpid;
66             }
67              
68              
69             sub msg($$) {
70 3     3 0 70 my ($msg, $sub) = @_;
71 3         17 $msg{$msg} = $sub;
72             }
73              
74              
75             sub snd($$;@) {
76 9     9 0 1580 my ($vpid, $msg, @args) = @_;
77 9 50       29 defined $vpid or carp("Argument vpid required"), return;
78 9 50       21 defined $msg or carp("Argument msg required"), return;
79 9 100       25 $vpid = $self_parent_vpid if $vpid == 0;
80 9 50       20 $DEBUG and print "Send message '$msg' from $self_vpid to $vpid vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @args), ".\n";
81 9         13 push @{$snd{$vpid}}, [$self_vpid, $vpid, $msg, \@args];
  9         41  
82 9 50 33     65 $closed{$vpid} = 1 if $msg eq "close" or $msg eq "exit";
83 9         37 w_event_cb_reg($vpid);
84 9         816 return 1;
85             }
86              
87              
88 0     0 0 0 sub quit() { Event::unloop }
89              
90              
91             sub snd_wt($$;@) {
92 1     1 0 658 my ($vpid, $msg, @args) = @_;
93 1 50       5 defined $vpid or carp("Argument vpid required"), return;
94 1 50       3 defined $msg or carp("Argument msg required"), return;
95 1         3 snd($vpid, $msg, @args);
96 1         46 wt($vpid, $msg);
97             }
98              
99              
100             sub listener($$;%) {
101 0     0 0 0 my ($host, $port, %args) = @_;
102 0 0       0 defined $host or carp("Argument host required"), return;
103 0 0       0 defined $port or carp("Argument port required"), return;
104 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0, LocalHost => $host, LocalPort => $port, Listen => 20, ReuseAddr => 1);
105 0 0       0 if ($sock) {
106 0 0       0 _pack_unpack($sock, %args) or return;
107 0         0 $listener{$sock} = $sock;
108             Event->io(fd => $sock, poll => "r", cb => sub {
109 0     0   0 my ($event) = @_;
110 0         0 my $fh = $event->w->fd;
111 0 0       0 $DEBUG > 1 and print "Read event for listener from $self_vpid: \n";
112 0         0 my $sock = $fh->accept;
113 0         0 $pack{$sock} = $pack{$fh};
114 0         0 $unpack{$sock} = $unpack{$fh};
115 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
116 0         0 my $vpid = refaddr $sock;
117 0         0 $node{$sock} = $vpid;
118 0         0 $fh2vpid{$sock} = $vpid;
119 0         0 $vpid2fh{$vpid} = $sock;
120 0         0 $fh2fh{$sock} = $sock;
121 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
122 0         0 });
123 0         0 return $sock;
124             } else {
125 0         0 carp "Cannot open socket '$host:$port' in $self_vpid: $!";
126 0         0 return;
127             }
128             }
129              
130              
131             sub open_node($$;%) {
132 0     0 0 0 my ($host, $port, %args) = @_;
133 0 0       0 defined $host or carp("Argument host required"), return;
134 0 0       0 defined $port or carp("Argument port required"), return;
135 0         0 my $sock = IO::Socket::INET->new(Proto => 'tcp', Blocking => 0);
136 0         0 my $addr = sockaddr_in($port,inet_aton($host));
137 0         0 $sock->sockopt(SO_KEEPALIVE, 1);
138 0         0 my $rv = $sock->connect($addr);
139 0 0       0 if ($rv) {
140 0 0       0 _pack_unpack($sock, %args) or return;
141 0         0 my $vpid = refaddr $sock;
142 0         0 $node{$sock} = $vpid;
143 0         0 $fh2vpid{$sock} = $vpid;
144 0         0 $vpid2fh{$vpid} = $sock;
145 0         0 $fh2fh{$sock} = $sock;
146 0         0 Event->io(fd => $sock, poll => "r", cb => \&r_event_cb);
147 0         0 return $vpid;
148             } else {
149 0         0 carp "Cannot connect to socket '$host:$port' in $self_vpid: $!";
150 0         0 return;
151             }
152             }
153              
154              
155             sub _pack_unpack($%) {
156 0     0   0 my ($fh, %args) = @_;
157 0 0 0     0 if (my $pack = $args{pack} and my $unpack = $args{unpack}) {
    0 0        
158 0         0 my $r = eval {
159 0         0 my $r = $unpack->($pack->({a => ["b"]}));
160 0 0 0     0 if (ref $r eq "HASH" and ref $$r{a} eq "ARRAY" and
      0        
      0        
161             $$r{a}[0] and $$r{a}[0] eq "b")
162             {
163 0         0 return 1;
164             } else {
165 0         0 return 0;
166             }
167             };
168 0 0 0     0 if (not $r or $@) {
169 0         0 carp "False pack unpack test";
170 0         0 return;
171             }
172 0         0 $pack{$fh} = $pack;
173 0         0 $unpack{$fh} = $unpack;
174             } elsif ($args{pack} or $args{unpack}) {
175 0         0 carp "pack and unpack is pair options";
176 0         0 return;
177             }
178 0         0 return 1;
179             }
180              
181              
182             sub receive(&) {
183 3     3 0 93 my ($receive) = @_;
184              
185 3 50       10 $DEBUG > 1 and print "Call receive in $self_vpid (\$\$=$$)\n";
186              
187 3         107 local $SIG{CHLD} = "IGNORE";
188 3         52 local $SIG{PIPE} = "IGNORE";
189              
190 3         15 foreach (@spawn) {
191 2         6 my ($vpid, $child, $parent, $spawn) = @$_;
192              
193 2         1936 my $kid_pid = fork;
194 2 50       149 defined $kid_pid or die "Can't fork: $!";
195              
196 2 100       86 unless ($kid_pid) {
197              
198 1         50 foreach (@spawn) {
199 1         55 close $$_[1];
200 1 50       32 close $$_[2] if $$_[2] ne $parent;
201             }
202              
203 1         28 close $_ foreach values %fh2fh, values %listener;
204 1         33 $_->cancel foreach Event::all_watchers();
205 1         15 @spawn = ();
206 1         12 %listener = ();
207 1         4 %node = ();
208 1         4 %msg = ();
209 1         11 %fh2vpid = ();
210 1         9 %vpid2fh = ();
211 1         4 %fh2fh = ();
212 1         62 %snd = ();
213              
214 1         10 %vpid2pid = ();
215              
216 1         8 $ipc_loop = 0;
217              
218 1         8 @rcv = ();
219 1         3 %r_bufs = ();
220 1         14 %w_bufs = ();
221              
222 1         12 %pack = ();
223 1         4 %unpack = ();
224              
225 1         3 %closed = ();
226              
227 1         11 %fh2ww = ();
228              
229 1         12 ($waited_vpid, $waited_msg, @waited_rv) = ();
230              
231 1         9 $self_parent_fh = $parent;
232 1         4 $self_parent_vpid = $self_vpid;
233              
234 1         2 $self_vpid = $vpid;
235              
236 1         13 $fh2vpid{$self_parent_fh} = $self_parent_vpid;
237 1         15 $vpid2fh{$self_parent_vpid} = $self_parent_fh;
238 1         5 $fh2fh{$self_parent_fh} = $self_parent_fh;
239              
240 1         96 Event->io(fd => $self_parent_fh, poll => "r", cb => \&r_event_cb);
241              
242 1         807 $spawn->();
243              
244 0         0 exit;
245             }
246             else {
247 1         55 $vpid2pid{$vpid} = $kid_pid;
248             }
249             }
250              
251              
252 2         18 foreach (@spawn) {
253 1         32 my ($vpid, $child, $parent, $spawn, $receive) = @$_;
254 1         21 close $parent;
255 1         10 $fh2vpid{$child} = $vpid;
256 1         10 $vpid2fh{$vpid} = $child;
257 1         20 $fh2fh{$child} = $child;
258 1         73 Event->io(fd => $child, poll => "r", cb => \&r_event_cb);
259             }
260 2         788 @spawn = ();
261              
262              
263              
264 2         9 $receive->();
265              
266              
267              
268 2 50       11 unless ($ipc_loop) {
269 2         13 $ipc_loop = 1;
270 2         19 w_event_cb_reg();
271 2         199 Event::loop();
272 0         0 $ipc_loop = 0;
273             }
274             }
275              
276              
277             sub wt($$) {
278 1     1 0 19 ($waited_vpid, $waited_msg) = @_;
279 1 50       7 defined $waited_vpid or carp("Argument vpid required"), return;
280 1 50       4 defined $waited_msg or carp("Argument msg required"), return;
281 1 50       3 $waited_vpid = $self_parent_vpid if $waited_vpid == 0;
282 1         5 foreach my $i (0 .. $#rcv) {
283 0         0 my ($from, $msg, $args)= @{$rcv[$i]};
  0         0  
284 0 0 0     0 if ($from eq $waited_vpid and $msg eq $waited_msg) {
285 0         0 splice @rcv, $i, 1;
286 0 0       0 return wantarray ? @$args : $$args[0];
287             }
288             }
289 1 50       3 $DEBUG and print "Start waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
290 1         3 w_event_cb_reg();
291 1         18 Event::loop();
292 1         18 my @rv = @waited_rv;
293 1         2 ($waited_vpid, $waited_msg, @waited_rv) = ();
294 1 50       8 return wantarray ? @rv : $rv[0];
295             }
296              
297              
298             sub w_event_cb_reg {
299 26     26 0 54 my ($to_vpid) = @_;
300              
301 26 100       103 foreach my $to (defined $to_vpid ? $to_vpid : keys %snd) {
302 25 100       34 if (@{$snd{$to}}) {
  25         7861  
303 10         18 my $fh = $vpid2fh{$to};
304 10 100       24 unless ($fh) {
305 2 50       6 if (@spawn) {
306 2 50       6 carp "Probably have forgotten to call receive." if not defined $to_vpid;
307 2         8 next;
308             } else {
309 0 0       0 if ($self_parent_fh) {
310 0 0       0 unless ($self_parent_closed) {
311 0         0 $fh = $self_parent_fh;
312             } else {
313 0         0 next;
314             }
315             } else {
316 0         0 carp "The addressee $to is unknown or has left in $self_vpid (\$\$=$$)\n";
317 0         0 next;
318             }
319             }
320             }
321 8 50       38 unless (exists $w_bufs{$fh}) {
322 8         13 my $packet;
323 8 50       26 if (my $pack = $pack{$fh}) {
324 0         0 $packet = $pack->(shift @{$snd{$to}});
  0         0  
325             } else {
326 8         17 $packet = freeze shift @{$snd{$to}};
  8         61  
327             }
328 8         544 my $buf = join "", pack("N", length $packet), $packet;
329 8         26 $w_bufs{$fh} = $buf;
330 8 50 0     29 $DEBUG and (@{$snd{$to}} or delete $snd{$to});
  0         0  
331 8         195 $fh2ww{$fh} = Event->io(fd => $fh, poll => "w", cb => \&w_event_cb);
332             }
333             }
334             }
335             }
336              
337              
338              
339              
340             sub r_event_cb {
341 9     9 0 1241 my ($event) = @_;
342 9         76 my $fh = $event->w->fd;
343              
344 9 50       34 $DEBUG > 1 and print "Read event from $self_vpid: \n";
345              
346 9         147 my $len = sysread $fh, (my $_buf), $blksize;
347 9 100       35 if ($len) {
    50          
348 8         36 $r_bufs{$fh} .= $_buf;
349             NEXT_MSG: {
350 8         14 my $buf = $r_bufs{$fh};
  8         18  
351 8 50       23 if (length $buf >= 4) {
352 8         38 my $packet_length = unpack "N", substr $buf, 0, 4, "";
353 8 50       24 if (length $buf >= $packet_length) {
354 8         26 my $packet = substr $buf, 0, $packet_length, "";
355 8   50     76 $r_bufs{$fh} = $buf || "";
356 8 50 0     30 $DEBUG and ($r_bufs{$fh} or delete $r_bufs{$fh});
357              
358 8         16 my ($from, $to, $msg, $args);
359 8 50       28 if (my $unpack = $unpack{$fh}) {
360 0         0 ($from, $to, $msg, $args) = @{$unpack->($packet)};
  0         0  
361             } else {
362 8         14 ($from, $to, $msg, $args) = @{thaw $packet};
  8         33  
363             }
364              
365 8 50       252 if ($node{$fh}) {
366 0         0 $from = $node{$fh};
367 0         0 $to = $self_vpid;
368             }
369              
370 8 50       21 $DEBUG and print "Got message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
371 8 50       25 if ($to == $self_vpid) {
    0          
372 8 50       24 $DEBUG and print "Run message sub '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
373 8 100 66     48 if (defined $waited_vpid and defined $waited_msg) {
374 1         5 push @rcv, [$from, $msg, $args];
375             } else {
376 7 50       19 if ($msg{$msg}) {
377 7         24 push @rcv, [$from, $msg, $args];
378             } else {
379 0 0       0 $DEBUG and print "Unknown message '$msg'\n";
380             }
381             }
382             } elsif ($vpid2fh{$to}) {
383 0 0       0 $DEBUG and print "Remittance message '$msg' from $from to $to vpid in $self_vpid (\$\$=$$) with args: ", join(", ", @$args), ".\n";
384 0         0 push @{$snd{$to}}, [$from, $to, $msg, $args];
  0         0  
385 0         0 w_event_cb_reg();
386             } else {
387 0         0 carp "Got Wandered message '$msg' from $from to $to in $self_vpid (\$\$=$$)\n";
388             }
389              
390 8 50       41 redo NEXT_MSG if $r_bufs{$fh};
391             }
392             }
393             }
394             } elsif (defined $len) {
395 1 50       6 if (exists $fh2ww{$fh}) {
396 0         0 $fh2ww{$fh}->cancel;
397 0         0 delete $fh2ww{$fh};
398             }
399 1         6 $event->w->cancel;
400 1         6 my $vpid = delete $fh2vpid{$fh};
401 1         2 delete $vpid2fh{$vpid};
402 1         3 delete $r_bufs{$fh};
403 1         2 delete $w_bufs{$fh};
404 1         4 delete $fh2fh{$fh};
405 1         2 delete $vpid2pid{$vpid};
406 1         3 delete $pack{$fh};
407 1         3 delete $unpack{$fh};
408 1 50       6 if (my $node_vpid = $node{$fh}) {
409 0         0 delete $node{$fh};
410 0 0       0 if ($msg{NODE_CLOSED}) {
411 0 0       0 $msg{NODE_CLOSED}->($node_vpid, $fh->connected ? 1 : 0) unless $closed{$vpid};
    0          
412 0         0 w_event_cb_reg();
413             }
414             } else {
415 1 50       4 if ($msg{SPAWN_CLOSED}) {
416 0 0       0 $msg{SPAWN_CLOSED}->($vpid) unless $closed{$vpid};
417 0         0 w_event_cb_reg();
418             }
419             }
420 1         2 delete $closed{$vpid};
421 1         27 close $fh;
422 1 50 33     21 if ($self_parent_fh and $self_parent_fh eq $fh) {
423 1         3 $self_parent_closed = 1;
424 1 50 33     6 unless (defined $waited_vpid and defined $waited_msg) {
425 1 50       3 unless (@rcv) {
426 1         281 exit;
427             }
428             }
429             }
430             } else {
431 0 0       0 $DEBUG and die "Can't read '$fh': $!";
432             }
433              
434 8 100 66     31 if (defined $waited_vpid and defined $waited_msg) {
435 1         6 foreach my $i (0 .. $#rcv) {
436 1         2 my ($from, $msg, $args)= @{$rcv[$i]};
  1         4  
437 1 50 33     14 if ($msg eq $waited_msg and $from eq $waited_vpid) {
438 1         3 splice @rcv, $i, 1;
439 1 50       4 $DEBUG and print "Stop waiting for '$waited_vpid -> $waited_msg' in $self_vpid (\$\$=$$)\n";
440 1         3 @waited_rv = @$args;
441 1         7 Event::unloop();
442 1         8 return;
443             }
444             }
445 0 0       0 unless (exists $vpid2fh{$waited_vpid}) {
446 0         0 Event::unloop();
447 0         0 return;
448             }
449             } else {
450 7         22 while (my $rcv = shift @rcv) {
451 7         10 my ($from, $msg, $args)= @{$rcv};
  7         16  
452 7 50       43 $msg{$msg}->($from, @$args) unless $closed{$from};
453 6         14 w_event_cb_reg();
454             }
455             }
456             }
457              
458              
459              
460             sub w_event_cb {
461 8     8 0 90 my ($event) = @_;
462 8         61 my $fh = $event->w->fd;
463              
464 8 50       24 $DEBUG > 1 and print "Write event from $self_vpid.\n";
465 8 50       32 $fh2fh{$fh} or return;
466              
467 8         18 my $buf = $w_bufs{$fh};
468 8         320 my $len = syswrite $fh, $buf, $blksize;
469 8 50       33 if ($len) {
470 8         19 substr $buf, 0, $len, "";
471 8 50       18 if (length $buf) {
472 0         0 $w_bufs{$fh} = $buf;
473             } else {
474 8         23 delete $w_bufs{$fh};
475 8         56 $event->w->cancel;
476 8         37 delete $fh2ww{$fh};
477 8         19 w_event_cb_reg();
478             }
479             } else {
480 0 0         $DEBUG and die "Can't write to '$fh': $!";
481             }
482             }
483              
484              
485              
486             1;
487              
488              
489             __END__