File Coverage

blib/lib/IPC/Messaging.pm
Criterion Covered Total %
statement 156 379 41.1
branch 47 184 25.5
condition 7 86 8.1
subroutine 38 59 64.4
pod 0 20 0.0
total 248 728 34.0


line stmt bran cond sub pod time code
1             package IPC::Messaging;
2 2     2   68379 use 5.008;
  2         8  
  2         65  
3 2     2   9 use warnings;
  2         5  
  2         50  
4 2     2   11 use strict;
  2         8  
  2         85  
5             require Exporter;
6 2     2   19 use base 'Exporter';
  2         3  
  2         265  
7 2     2   19 use vars qw(@EXPORT $VERSION);
  2         3  
  2         141  
8 2     2   2007 use B::Generate;
  2         6754  
  2         81  
9 2     2   2681 use IO::Socket::UNIX;
  2         70800  
  2         15  
10 2     2   1368 use IO::Socket::INET;
  2         4  
  2         14  
11 2     2   2157 use Socket qw(:all);
  2         6  
  2         4091  
12 2     2   2785 use Storable;
  2         8902  
  2         165  
13 2     2   3055 use Time::HiRes;
  2         4131  
  2         10  
14 2     2   231 use Carp;
  2         3  
  2         193  
15 2     2   2774 use Module::Load::Conditional "can_load";
  2         104371  
  2         145  
16 2     2   2213 use POSIX ":sys_wait_h";
  2         37730  
  2         21  
17              
18             $VERSION = '0.01_14';
19             sub spawn (&);
20             sub receive (&);
21             sub receive_loop (&);
22             sub got;
23             sub then (&);
24             sub after ($$);
25              
26             @EXPORT = qw(spawn receive receive_loop got then after);
27              
28             my $MAX_DGRAM_SIZE = 16384;
29             my $TCP_READ_SIZE = 65536;
30             my $secret;
31             my $root;
32             my $messaging_dir;
33             my $i_am_root;
34             my $my_sock;
35             my $my_sock_fileno;
36             my %their_sock;
37             my @msg_queue;
38             my @processes_to_reap;
39             my $recv;
40             my %read_socks;
41             my %write_socks;
42             my ($use_kqueue, $use_epoll, $use_select);
43             my $kq;
44             my $epfd;
45              
46             sub debug
47 0     0 0 0 {
48             # print STDERR @_;
49             }
50              
51             sub spawn (&)
52             {
53 0     0 0 0 my ($psub) = @_;
54 0         0 my $pid = CORE::fork;
55 0 0       0 die "unable to fork: $!" unless defined $pid;
56 0 0       0 if ($pid) {
57             # parent
58 0         0 my $child = IPC::Messaging::Process->_new($pid);
59             receive {
60 0     0   0 got [_READY => $child] => then {};
  0         0  
61 0         0 after 5 => then { die "child $pid not ready" };
  0         0  
62 0         0 };
63 0         0 return $child;
64             } else {
65             # child
66 0         0 $i_am_root = 0;
67 0         0 initpid();
68 0         0 my $parent = IPC::Messaging::Process->_new(getppid);
69 0         0 $parent->_READY;
70 0         0 $psub->();
71 0         0 exit 0;
72             }
73             }
74              
75             sub END
76             {
77 2 50 33 2   650 if ($messaging_dir && $i_am_root && -e $messaging_dir && !-l $messaging_dir) {
    0 33        
      33        
      0        
78             # system("ls -l $messaging_dir");
79 2         23047 system("rm -rf $messaging_dir");
80             } elsif ($messaging_dir && !$i_am_root) {
81 0         0 my $parent = IPC::Messaging::Process->_new(getppid);
82 0         0 $parent->EXIT;
83             }
84             }
85              
86             sub run_queue
87             {
88 0     0 0 0 my ($r) = @_;
89 0 0       0 return unless @{$r->{pats}};
  0         0  
90 0         0 for (my $i = 0; $i < @msg_queue; $i++) {
91 0         0 my $m = $msg_queue[$i];
92 0         0 for my $pat (@{$r->{pats}}) {
  0         0  
93 0 0 0     0 if ($pat->{name} ne $m->{m} && $pat->{name} ne "_") {
94             # does not match if message name is different
95 0         0 next;
96             }
97 0 0 0     0 if ($pat->{proc} && $m->{f} && $m->{f} != $pat->{proc}) {
      0        
98             # does not match if sender process is different
99 0         0 next;
100             }
101 0   0     0 my $msock = $m->{fsock} || $m->{sock};
102 0 0 0     0 if ($pat->{sock} && $msock && $msock->fileno != $pat->{sock}) {
      0        
103             # does not match if sender socket is different
104 0         0 next;
105             }
106 0   0     0 my $h = $pat->{match} || {};
107 0         0 my $match = 1;
108 0         0 for my $k (keys %$h) {
109 0 0 0     0 unless (exists $m->{d}{$k} && $m->{d}{$k} eq $h->{$k}) {
110 0         0 $match = 0;
111 0         0 last;
112             }
113             }
114 0 0       0 next unless $match;
115 0         0 debug "MATCH $m->{m}!\n";
116 0 0       0 if ($pat->{filter}) {
117 0 0       0 next unless $pat->{filter}->($m->{m}, $m->{d});
118             }
119 0         0 splice @msg_queue, $i, 1;
120 0   0     0 my $proc_or_sock = $m->{sock} || ($m->{f} ? IPC::Messaging::Process->_new($m->{f}) : undef);
121 0         0 $_ = $proc_or_sock;
122 0         0 my $ignore = ${$pat->{then}}->($m->{m}, $m->{d}, $proc_or_sock);
  0         0  
123 0         0 return 1;
124             }
125             }
126             }
127              
128             sub watch_fd
129             {
130 2     2 0 4 my ($fd, $write) = @_;
131 2 50       44 if ($kq) {
    50          
132 0 0       0 if ($write) {
133 0         0 $kq->EV_SET($fd, &IO::KQueue::EVFILT_WRITE, &IO::KQueue::EV_ADD, 0, 0);
134             } else {
135 0         0 $kq->EV_SET($fd, &IO::KQueue::EVFILT_READ, &IO::KQueue::EV_ADD, 0, 0);
136             }
137             } elsif ($epfd) {
138 2 50       7 if ($write) {
139 0         0 IO::Epoll::epoll_ctl($epfd, &IO::Epoll::EPOLL_CTL_ADD, $fd, &IO::Epoll::EPOLLOUT);
140             } else {
141 2         23 IO::Epoll::epoll_ctl($epfd, &IO::Epoll::EPOLL_CTL_ADD, $fd, &IO::Epoll::EPOLLIN);
142             }
143             }
144             }
145              
146             sub unwatch_fd
147             {
148 0     0 0 0 my ($fd) = @_;
149 0         0 my $rd = delete $read_socks{$fd};
150 0         0 my $wr = delete $write_socks{$fd};
151 0 0       0 if ($kq) {
    0          
152 0 0       0 $kq->EV_SET($fd, &IO::KQueue::EVFILT_READ, &IO::KQueue::EV_DELETE, 0, 0) if $rd;
153 0 0       0 $kq->EV_SET($fd, &IO::KQueue::EVFILT_WRITE, &IO::KQueue::EV_DELETE, 0, 0) if $wr;
154             } elsif ($epfd) {
155 0 0       0 IO::Epoll::epoll_ctl($epfd, &IO::Epoll::EPOLL_CTL_DEL, $fd, &IO::Epoll::EPOLLIN) if $rd;
156 0 0       0 IO::Epoll::epoll_ctl($epfd, &IO::Epoll::EPOLL_CTL_DEL, $fd, &IO::Epoll::EPOLLOUT) if $wr;
157             }
158             }
159              
160             sub reap_dead_kids
161             {
162 0     0 0 0 my @to_reap = @processes_to_reap;
163 0         0 @processes_to_reap = ();
164 0         0 for my $pid (@to_reap) {
165 0         0 my $x = waitpid($pid, WNOHANG);
166 0 0       0 push @processes_to_reap, $pid if $x == 0;
167             }
168             }
169              
170             sub pickup_one_message
171             {
172 0     0 0 0 my ($t) = @_;
173 0         0 debug "$$: select $my_sock $t\n";
174 0         0 my @fd;
175 0 0       0 reap_dead_kids() if @processes_to_reap;
176 0 0       0 if ($use_kqueue) {
    0          
177             # XXX errors are ignored, bad
178 0         0 @fd = map { $_->[&IO::KQueue::KQ_IDENT] } $kq->kevent($t*1000);
  0         0  
179             } elsif ($use_epoll) {
180             # XXX errors are ignored, bad
181 0 0       0 @fd = map { $_->[0] } @{IO::Epoll::epoll_wait($epfd, 100, $t*1000) || []};
  0         0  
  0         0  
182             } else {
183 0         0 my $to_read = IO::Select->new($my_sock,map { $_->{sock} } values %read_socks);
  0         0  
184 0         0 my $to_write = IO::Select->new(map { $_->{sock} } values %write_socks);
  0         0  
185 0         0 my ($r,$w) = IO::Select->select($to_read, $to_write, undef, $t);
186 0         0 @fd = map { $_->fileno } @$w, @$r;
  0         0  
187             }
188 0         0 for my $fd (@fd) {
189 0 0       0 if ($fd == $my_sock_fileno) {
    0          
    0          
190 0         0 my $data = "";
191 0         0 $my_sock->recv($data, $MAX_DGRAM_SIZE);
192 0 0       0 return unless $data;
193 0         0 debug "$$: got something:\n\t$data\n";
194 0         0 my $msg = eval { Storable::thaw($data) };
  0         0  
195 0 0 0     0 debug "$$: cannot thaw: $@\n" if !$msg && $@;
196 0 0       0 return unless $msg;
197 0 0 0     0 return unless $msg->{s} && $msg->{s} eq $secret && $msg->{m} && $msg->{f};
      0        
      0        
198 0   0     0 $msg->{d} ||= {};
199 0         0 push @msg_queue, $msg;
200 0 0       0 if ($msg->{m} eq "EXIT") {
201 0         0 my $x = waitpid($msg->{f}, WNOHANG);
202 0 0       0 push @processes_to_reap, $msg->{f} if $x == 0;
203             }
204             } elsif ($write_socks{$fd}) {
205 0         0 my $s = $write_socks{$fd};
206 0         0 unwatch_fd($fd);
207 0 0       0 if ($s->{type} eq "tcp_connecting") {
208 0         0 my $sock = $s->{sock};
209 0         0 my $peer = $sock->peerhost;
210 0         0 my $peer_port = $sock->peerport;
211 0         0 my $opt = getsockopt($sock, SOL_SOCKET, SO_ERROR);
212 0 0       0 $opt = unpack("I", $opt) if defined $opt;
213 0 0       0 if ($opt) {
214 0         0 push @msg_queue, {
215             m => "tcp_error",
216             sock => $sock,
217             d => {
218             errno => $opt,
219             },
220             };
221             } else {
222 0         0 push @msg_queue, {
223             m => "tcp_connected",
224             sock => $sock,
225             d => {
226             peer => $peer,
227             peer_port => $peer_port,
228             },
229             };
230 0         0 $read_socks{$fd} = {
231             sock => $s->{sock},
232             type => "tcp",
233             by_line => $s->{by_line},
234             from => $peer,
235             from_port => $peer_port,
236             };
237 0         0 watch_fd($fd);
238             }
239             }
240             } elsif ($read_socks{$fd}) {
241 0         0 my $s = $read_socks{$fd};
242 0 0       0 if ($s->{type} eq "tcp_listen") {
    0          
    0          
243 0         0 my $sock = $s->{sock}->accept;
244 0         0 my $from = $sock->peerhost;
245 0         0 my $from_port = $sock->peerport;
246 0         0 push @msg_queue, {
247             m => "tcp_connect",
248             fsock => $s->{sock},
249             sock => $sock,
250             d => {
251             from => $from,
252             from_port => $from_port,
253             },
254             };
255 0         0 $read_socks{$sock->fileno} = {
256             sock => $sock,
257             type => "tcp",
258             from => $from,
259             from_port => $from_port,
260             by_line => $s->{by_line},
261             buf => "",
262             };
263 0         0 watch_fd($sock->fileno);
264             } elsif ($s->{type} eq "tcp") {
265 0         0 my $d = "";
266 0         0 my $sock = $s->{sock};
267 0         0 my $len = sysread $sock, $d, $TCP_READ_SIZE;
268 0 0 0     0 if (!defined $len || $len <= 0) {
    0          
269 0 0 0     0 if ($s->{buf} && $s->{by_line}) {
270 0         0 push @msg_queue, {
271             m => "tcp_line",
272             sock => $sock,
273             d => {
274             from => $s->{from},
275             from_port => $s->{from_port},
276             line => $s->{buf},
277             },
278             };
279             }
280 0         0 push @msg_queue, {
281             m => "tcp_disconnect",
282             d => {
283             from => $s->{from},
284             from_port => $s->{from_port},
285             },
286             };
287 0         0 unwatch_fd($fd);
288 0         0 $sock->close;
289             } elsif ($s->{by_line}) {
290 0         0 $s->{buf} .= $d;
291 0         0 while ($s->{buf} =~ s/^(.*?\n)//) {
292 0         0 push @msg_queue, {
293             m => "tcp_line",
294             sock => $sock,
295             d => {
296             from => $s->{from},
297             from_port => $s->{from_port},
298             line => $1,
299             },
300             };
301             }
302             } else {
303 0         0 push @msg_queue, {
304             m => "tcp_data",
305             sock => $sock,
306             d => {
307             from => $s->{from},
308             from_port => $s->{from_port},
309             data => $d,
310             },
311             };
312             }
313             } elsif ($s->{type} eq "udp") {
314 0         0 my $d = "";
315 0         0 my $sock = $s->{sock};
316 0         0 $sock->recv($d, $MAX_DGRAM_SIZE);
317 0 0       0 return unless $d;
318 0         0 debug "$$: got udp\n";
319 0         0 push @msg_queue, {
320             m => "udp",
321             sock => $sock,
322             d => {
323             from => $sock->peerhost,
324             from_port => $sock->peerport,
325             data => $d,
326             },
327             };
328             } else {
329             # XXX
330             # Something is fishy, we don't know what to do with this
331             # socket, so unwatch it in order to not have the "always ready"
332             # condition.
333 0         0 unwatch_fd($fd);
334             }
335             }
336             }
337             }
338              
339             sub tcp_server
340             {
341 0     0 0 0 my (undef, $port, %p) = @_;
342 0 0 0     0 my $sock = IO::Socket::INET->new(
    0          
343             Listen => $p{listen_queue} || 5,
344             ($p{bind} ? (LocalAddr => $p{bind}) : ()),
345             LocalPort => $port,
346             Proto => "tcp",
347             ReuseAddr => 1,
348             ) or die $@;
349 0         0 $read_socks{$sock->fileno} = {
350             sock => $sock,
351             type => "tcp_listen",
352             by_line => $p{by_line},
353             };
354 0         0 watch_fd($sock->fileno);
355 0         0 return $sock;
356             }
357              
358             sub tcp_client
359             {
360 0     0 0 0 my (undef, $host, $port, %p) = @_;
361 0 0       0 my $sock = IO::Socket::INET->new(
362             Proto => "tcp",
363             PeerHost => $host,
364             PeerPort => $port,
365             Blocking => 0,
366             ) or die $@;
367 0         0 $write_socks{$sock->fileno} = {
368             sock => $sock,
369             type => "tcp_connecting",
370             by_line => $p{by_line},
371             };
372 0         0 watch_fd($sock->fileno, "write");
373 0         0 return $sock;
374             }
375              
376             sub udp
377             {
378 0     0 0 0 my (undef, $port, $bind) = @_;
379 0   0     0 $port ||= 0;
380 0 0       0 my $sock = IPC::Messaging::UDP->new(
    0          
381             Proto => "udp",
382             LocalPort => $port,
383             ($bind ? (LocalAddr => $bind) : ()),
384             ReuseAddr => 1,
385             ) or die $@;
386 0         0 $read_socks{$sock->fileno} = {
387             sock => $sock,
388             type => "udp",
389             };
390 0         0 watch_fd($sock->fileno);
391 0         0 return $sock;
392             }
393              
394             sub receive_parse
395             {
396 10     10 0 17 my ($rsub) = @_;
397 10 50       21 die "internal error: non-empty \$recv" if $recv;
398 10         23 my $r = $recv = { then_balance => 0 };
399 10         14 eval { $rsub->(); };
  10         21  
400 10         29 $recv = undef;
401 10 100       51 die $@ if $@;
402 1 50       216 croak "dangling \"then\"" if $r->{then_balance};
403 0 0 0     0 unless ($r->{pats} || $r->{timeout}) {
404 0         0 die "an empty \"receive\"";
405             }
406 0         0 $r;
407             }
408              
409             sub receive_once
410             {
411 0     0 0 0 my ($r) = @_;
412 0         0 my $start = Time::HiRes::time;
413 0         0 while (1) {
414 0 0 0     0 if (!$i_am_root && !kill 0, $root) {
415 0         0 die "root process has quit, aborting";
416             }
417 0 0       0 if (run_queue($r)) {
418 0         0 debug "$$: first pickup\n";
419 0         0 pickup_one_message(0);
420 0         0 last;
421             }
422 0 0       0 if ($r->{timeout}) {
423 0         0 debug "$$: pickup with timeout\n";
424 0         0 debug "$r->{timeout}[0] ", Time::HiRes::time(), " $start\n";
425 0 0       0 next if pickup_one_message($r->{timeout}[0]-(Time::HiRes::time()-$start));
426             } else {
427 0         0 debug "$$: indefinite pickup\n";
428 0 0       0 next if pickup_one_message(5);
429             }
430 0 0 0     0 if ($r->{timeout} && $r->{timeout}[0]-(Time::HiRes::time()-$start) < 0) {
431 0         0 debug "$$: timeout!\n";
432 0         0 ${$r->{timeout}[1]}->();
  0         0  
433 0         0 last;
434             }
435             }
436             }
437              
438             sub receive (&)
439             {
440 10     10 0 3472 my $r = receive_parse(@_);
441 0         0 receive_once($r);
442             }
443              
444             sub receive_loop (&)
445             {
446 0     0 0 0 my $r = receive_parse(@_);
447 0         0 receive_once($r) while 1;
448             }
449              
450             sub got
451             {
452 8     8 0 28 my (@p) = @_;
453 8 100       28 die "\"got\" outside \"receive\"" unless $recv;
454 7 100       21 die "invalid \"got\" syntax: not enough arguments" unless @p >= 2;
455 6         10 my $pat = {};
456 6         13 $pat->{then} = pop @p;
457 6 100       29 die "invalid \"got\" syntax: missing \"then\""
458             unless UNIVERSAL::isa($pat->{then}, "IPC::Messaging::Then");
459 5 100       17 if (UNIVERSAL::isa($p[0], "ARRAY")) {
460 3 100       7 if (@p != 1) {
461 1         7 die "invalid \"got\" syntax: arrayref not by itself";
462             }
463 2         3 @p = @{$p[0]};
  2         4  
464             }
465 4 100       16 die "invalid \"got\" syntax: missing message name" unless @p;
466 3         5 my $name = shift @p;
467 3 100       14 die "invalid \"got\" syntax: message name must not be a reference" if ref $name;
468 2         4 $pat->{name} = $name;
469 2 50       5 if (@p) {
470 2         3 my $from = $p[0];
471 2 50       10 if (UNIVERSAL::isa($from, "IPC::Messaging::Process")) {
    50          
472 0         0 $pat->{proc} = "$from";
473 0         0 shift @p;
474             } elsif (UNIVERSAL::isa($from, "IO::Handle")) {
475 0         0 $pat->{sock} = $from->fileno;
476 0         0 shift @p;
477             }
478             }
479 2 50       5 if (@p) {
480 2 50       8 if (UNIVERSAL::isa($p[0], "CODE")) {
481 0         0 $pat->{filter} = shift @p;
482             }
483             }
484 2 50       5 if (@p) {
485 2 100       18 if (UNIVERSAL::isa($p[0], "HASH")) {
    50          
486 1 50       8 die "invalid \"got\" syntax: unexpected hashref" unless @p == 1;
487 0         0 @p = %{$p[0]};
  0         0  
488             } elsif (@p % 2 != 0) {
489 1         6 die "invalid \"got\" syntax: odd number of matching elements";
490             }
491             }
492 0 0       0 $pat->{match} = {@p} if @p;
493 0         0 push @{$recv->{pats}}, $pat;
  0         0  
494 0         0 $recv->{then_balance}--;
495             }
496              
497             sub then (&)
498             {
499 9     9 0 369 my ($act) = @_;
500 9 100       27 die "\"then\" outside \"receive\"" unless $recv;
501 8         13 $recv->{then_balance}++;
502 8         32 bless \$act, "IPC::Messaging::Then";
503             }
504              
505             sub after ($$)
506             {
507 4     4 0 333 my ($t, $then) = @_;
508 4 100       12 die "\"after\" outside \"receive\"" unless $recv;
509 3 100       17 die "invalid \"after\" syntax: missing \"then\""
510             unless UNIVERSAL::isa($then, "IPC::Messaging::Then");
511 2 100       10 die "duplicate \"after\" in \"receive\"" if $recv->{timeout};
512 1         3 $recv->{then_balance}--;
513 1         3 $recv->{timeout} = [$t, $then];
514             }
515              
516             sub timer
517             {
518 0     0 0 0 my (undef, $interval, %msg) = @_;
519 0         0 die "timers are not implemented yet\n";
520 0         0 return IPC::Messaging::Timer->new($interval, %msg);
521             }
522              
523             sub global_init
524             {
525 2     2 0 136 $secret = int(rand(10000))+1;
526 2         7 $root = $$;
527 2         6 $i_am_root = 1;
528 2         18 $messaging_dir = "/tmp/ipc-messaging-$>/$root";
529 2 50 33     194 system("rm -rf $messaging_dir") if -e $messaging_dir && !-l $messaging_dir;
530 2         16804 system("mkdir -p $messaging_dir");
531 2         279 $use_kqueue = can_load(modules => { "IO::KQueue" => 0 });
532 2         2310 $use_epoll = can_load(modules => { "IO::Epoll" => 0 });
533 2   33     36067 $use_select = !$use_kqueue && !$use_epoll && can_load(modules => { "IO::Select" => 0 });
534 2 50 33     33 die "cannot find neither IO::KQueue nor IO::Epoll nor IO::Select"
      33        
535             unless $use_kqueue || $use_epoll || $use_select;
536             }
537              
538             sub initpid
539             {
540 2 50   2 0 18 return if ref $$;
541 2 50       15 global_init() unless $secret;
542 2         42 my $this = IPC::Messaging::Process->_new($$);
543 2         41 my $pid = B::svref_2object(\$$);
544 2         136 $pid->FLAGS($pid->FLAGS & ~B::SVf_READONLY);
545 2         12 $$ = $this;
546 2         13 $pid->FLAGS($pid->FLAGS | B::SVf_READONLY);
547              
548 2 50       8 $kq = IO::KQueue->new if $use_kqueue;
549 2 50       33 $epfd = IO::Epoll::epoll_create(100) if $use_epoll;
550              
551 2 50       134 $my_sock = IO::Socket::UNIX->new(
552             Local => "$messaging_dir/$$.sock",
553             Type => SOCK_DGRAM)
554             or die $@;
555 2         934 $my_sock_fileno = $my_sock->fileno;
556 2         32 watch_fd($my_sock_fileno);
557 2         104 %their_sock = ();
558 2         8 @msg_queue = ();
559 2         56 %read_socks = ();
560             }
561              
562             package IPC::Messaging::Process;
563 2     2   17298 use warnings;
  2         7  
  2         148  
564 2     2   18 use strict;
  2         2  
  2         92  
565 2     2   11 use vars qw($AUTOLOAD);
  2         3  
  2         127  
566 2     2   10 use IO::Socket::UNIX;
  2         4  
  2         25  
567 2     2   1841 use Storable;
  2         6  
  2         298  
568              
569 2     2   9877 use overload '0+' => \&_numify;
  2         3233  
  2         23  
570 2     2   169 use overload '""' => \&_stringify;
  2         4  
  2         13  
571 2     2   168 use overload '<=>' => sub { "$_[0]" <=> "$_[1]" };
  2     2   21  
  2         16  
  2         462  
572 2     2   158 use overload 'cmp' => sub { "$_[0]" cmp "$_[1]" };
  2     0   4  
  2         11  
  0         0  
573              
574             sub _new
575             {
576 2     2   20 my ($pkg, $pid) = @_;
577 2         14 my $me = {pid => $pid};
578 2         54 bless $me, $pkg;
579             }
580              
581             sub _numify
582             {
583 0     0   0 return $_[0]->{pid};
584             }
585              
586             sub _stringify
587             {
588 4     4   143 return "$_[0]->{pid}";
589             }
590              
591 0     0     sub DESTROY {}
592              
593             sub AUTOLOAD
594             {
595 0     0     my $proc = shift;
596 0           my $name = $AUTOLOAD;
597 0           $name =~ s/^IPC::Messaging::Process:://;
598 0           my $m = {
599             m => $name,
600             f => "$$",
601             s => $secret,
602             d => {@_},
603             };
604 0           my $data = Storable::freeze($m);
605 0           my $sock = $their_sock{"$proc"};
606 0 0         unless ($sock) {
607 0           $sock = $their_sock{"$proc"} = IO::Socket::UNIX->new(
608             Peer => "$messaging_dir/$proc.sock",
609             Type => SOCK_DGRAM,
610             Timeout => 10);
611             }
612 0 0         die "cannot create peer socket: $!" unless $sock;
613 0           IPC::Messaging::debug "$$: sending to $messaging_dir/$proc.sock:\n\t$data\n";
614 0           $sock->send($data);
615             }
616              
617             package IPC::Messaging::Then;
618              
619             package IPC::Messaging::UDP;
620 2     2   977 use Socket;
  2         2  
  2         3197  
621 2     2   54 use base 'IO::Socket::INET';
  2         6  
  2         1599  
622              
623             sub sendto
624             {
625 0     0     my ($socket, $data, $addr, $port) = @_;
626 0           my $iaddr = Socket::inet_aton($addr);
627 0           send $socket, $data, 0, scalar Socket::sockaddr_in($port, $iaddr);
628             }
629              
630             package IPC::Messaging::Timer;
631              
632             our $COUNT;
633             our %ACTIVE;
634             our %SUSPENDED;
635              
636             sub new
637             {
638 0     0     my ($class, $interval, %msg) = @_;
639              
640 0           my $me = {
641             interval => $interval,
642             start => Time::HiRes::time,
643             id => ++$COUNT,
644             };
645             }
646              
647             sub reset
648             {
649 0     0     my ($me) = @_;
650 0           $me->{start} = Time::HiRes::time;
651             }
652              
653             package IPC::Messaging;
654              
655             BEGIN {
656 2     2   9 initpid();
657             *CORE::GLOBAL::fork = sub {
658 0     0   0 my $r = fork;
659 0 0 0     0 if (defined $r && !$r) {
660 0         0 $secret = 0;
661 0         0 initpid();
662             }
663 0         0 $r;
664 2         269 };
665             }
666              
667             1;
668             __END__