File Coverage

blib/lib/IO/Lambda/Signal.pm
Criterion Covered Total %
statement 137 163 84.0
branch 43 78 55.1
condition n/a
subroutine 26 29 89.6
pod 3 14 21.4
total 209 284 73.5


line stmt bran cond sub pod time code
1             # $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $
2             package IO::Lambda::Signal;
3 11     11   1860 use vars qw(@ISA %SIGDATA);
  11         22  
  11         1309  
4             @ISA = qw(Exporter);
5             @EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process);
6             %EXPORT_TAGS = ( all => \@EXPORT_OK);
7              
8             our $DEBUG = $IO::Lambda::DEBUG{signal} || 0;
9              
10 11     11   65 use strict;
  11         20  
  11         289  
11 11     11   54 use Carp;
  11         22  
  11         743  
12 11     11   1839 use IO::Handle;
  11         14131  
  11         414  
13 11     11   1741 use POSIX ":sys_wait_h";
  11         13676  
  11         84  
14 11     11   8338 use IO::Lambda qw(:all :dev);
  11         23  
  11         27990  
15              
16             my $MASTER = bless {}, __PACKAGE__;
17              
18             # register yield handler
19             IO::Lambda::add_loop($MASTER);
20 11     11   15172 END { IO::Lambda::remove_loop($MASTER) };
21              
22 484892     484892 0 1509994 sub empty { 0 == keys %SIGDATA }
23              
24             sub remove
25             {
26 3     3 0 8 my $lambda = $_[1];
27 3         7 my %rec;
28 3         7 keys %SIGDATA;
29 3         14 while ( my ($id, $v) = each %SIGDATA) {
30 1         3 for my $r (@{$v-> {lambdas}}) {
  1         9  
31 1         3 push @{$rec{$id}}, $r-> [0];
  1         9  
32             }
33             }
34 3         19 while ( my ($id, $v) = each %rec) {
35 1         12 unwatch_signal( $id, $_ ) for @$v;
36             }
37             }
38              
39             sub yield
40             {
41 484782     484782 0 974656 my %v = %SIGDATA;
42 484782         890693 for my $id ( keys %v) {
43 484782         666207 my $v = $v{$id};
44             # use mutex in case signal happens right here during handling
45 484782         643064 $v-> {mutex} = 0;
46 484782 50       916108 warn " yield sig $id\n" if $DEBUG > 1;
47             AGAIN:
48 484785 100       1674919 next unless $v-> {signal};
49              
50 443         510 my @r = @{$v-> {lambdas}};
  443         987  
51 443 50       922 warn " calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1;
52 443         666 for my $r ( @r) {
53 779         1219 my ( $lambda, $callback, @param) = @$r;
54 779         1513 $callback-> ( $lambda, @param);
55             }
56              
57 443         647 my $sigs = $v-> {mutex};
58 443 100       1810 if ( $sigs) {
59 3 50       18 warn " caught $sigs signals during yield\n" if $DEBUG > 1;
60 3         6 $v-> {signal} = $sigs;
61 3         15 $v-> {mutex} -= $sigs;
62 3         18 goto AGAIN;
63             }
64             }
65             }
66              
67             sub signal_handler
68             {
69 32     32 0 68 my $id = shift;
70 32 50       125 warn "SIG{$id}\n" if $DEBUG;
71 32 50       251 return unless exists $SIGDATA{$id};
72 32         82 $SIGDATA{$id}-> {signal}++;
73 32         87 $SIGDATA{$id}-> {mutex}++;
74 32 50       530 $IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal');
75             }
76              
77             sub watch_signal
78             {
79 34     34 0 109 my ($id, $lambda, $callback, @param) = @_;
80              
81 34         93 my $entry = [ $lambda, $callback, @param ];
82 34 100       145 unless ( exists $SIGDATA{$id}) {
83             $SIGDATA{$id} = {
84             mutex => 0,
85             signal => 0,
86 28         748 save => $SIG{$id},
87             lambdas => [$entry],
88             };
89 28     32   373 $SIG{$id} = sub { signal_handler($id) };
  32         128  
90 28 50       130 warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1;
91             } else {
92 6         27 push @{ $SIGDATA{$id}-> {lambdas} }, $entry;
  6         15  
93 6 50       24 warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
94             }
95             }
96              
97             sub unwatch_signal
98             {
99 34     34 0 78 my ( $id, $lambda) = @_;
100              
101 34 50       141 return unless exists $SIGDATA{$id};
102            
103 34 50       102 warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
104              
105 34         138 @{ $SIGDATA{$id}-> {lambdas} } =
106 43         191 grep { $$_[0] != $lambda }
107 34         76 @{ $SIGDATA{$id}-> {lambdas} };
  34         103  
108            
109 34 100       83 return if @{ $SIGDATA{$id}-> {lambdas} };
  34         187  
110            
111 28 50       94 warn "uninstall signal handler for $id\n" if $DEBUG > 1;
112              
113 28 50       107 if (defined($SIGDATA{$id}-> {save})) {
114 0         0 $SIG{$id} = $SIGDATA{$id}-> {save};
115             } else {
116 28         422 delete $SIG{$id};
117             }
118 28         147 delete $SIGDATA{$id};
119             }
120              
121             # create a lambda that either returns undef on timeout,
122             # or some custom value based on passed callback
123             sub signal_or_timeout_lambda
124             {
125 34     34 0 246 my ( $id, $deadline, $condition) = @_;
126              
127 34         51 my $t;
128 34         153 my $q = IO::Lambda-> new;
129              
130             # wait for signal
131 34         167 my $c = $q-> bind;
132             watch_signal( $id, $q, sub {
133 779     779   1244 my @ret = $condition-> ();
134 779 100       2084 return unless @ret;
135              
136 32         108 unwatch_signal( $id, $q);
137 32 100       113 $q-> cancel_event($t) if $t;
138 32         230 $q-> resolve($c);
139 32         151 $q-> terminate(@ret); # result
140 32         60 undef $c;
141 32         100 undef $q;
142 34         381 });
143              
144             # or wait for timeout
145             $t = $q-> watch_timer( $deadline, sub {
146 1     1   15 unwatch_signal( $id, $q);
147 1         10 $q-> resolve($c);
148 1         3 undef $c;
149 1         4 undef $q;
150 1         5 return undef; #result
151 34 100       111 }) if $deadline;
152              
153 34         268 return $q;
154             }
155              
156             sub new_process;
157             # condition
158 2     2 1 39 sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') }
159 32     32 1 112 sub pid (&) { new_pid (context)-> condition(shift, \&pid, 'pid') }
160 2     2 1 32 sub spawn (&) { new_process-> call(context)-> condition(shift, \&spawn, 'spawn') }
161              
162             sub new_signal
163             {
164 2     2 0 5 my ( $id, $deadline) = @_;
165             signal_or_timeout_lambda( $id, $deadline,
166 2     1   16 sub { 1 });
  1         4  
167             }
168              
169             sub new_pid
170             {
171 32     32 0 71 my ( $pid, $deadline) = @_;
172              
173 32 50       817 croak 'bad pid' unless $pid =~ /^\-?\d+$/;
174 32 50       102 warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG;
175            
176             # avoid race conditions
177 32         50 my ( $savesig, $early_sigchld);
178 32 100       124 unless ( defined $SIGDATA{CHLD}) {
179 26 50       77 warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1;
180 26         90 $savesig = $SIG{CHLD};
181 26         51 $early_sigchld = 0;
182             $SIG{CHLD} = sub {
183 0 0   0   0 warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1;
184 0         0 $early_sigchld++
185 26         1008 };
186             }
187              
188             # finished already
189 32 50       244 if ( waitpid( $pid, WNOHANG) != 0) {
190 0 0       0 if ( defined $early_sigchld) {
191 0 0       0 if ( defined( $savesig)) {
192 0         0 $SIG{CHLD} = $savesig;
193             } else {
194 0         0 delete $SIG{CHLD};
195             }
196             }
197 0 0       0 warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1;
198 0         0 return IO::Lambda-> new-> call($?)
199             }
200              
201             # wait
202             my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub {
203 778     778   2889 my $wp = waitpid($pid, WNOHANG);
204 778 50       1504 warn "waitpid($pid) = $wp\n" if $DEBUG > 1;
205 778 100       1904 return if $wp == 0;
206 31         387 return $?;
207 32         444 });
208              
209 32 50       94 warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1;
210              
211             # don't let unwatch_signal() to restore it back to us
212 32 100       340 $SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld;
213              
214             # possibly have a race? gracefully remove the lambda
215 32 50       172 if ( $early_sigchld) {
216              
217             # Got a signal, but that wasn't our pid. And neither it was
218             # pid that we're watching.
219 0 0       0 return $p if waitpid( $pid, WNOHANG) == 0;
220              
221             # Our pid is finished. Unwatch the signal.
222 0         0 unwatch_signal( 'CHLD', $p);
223             # Lambda will also never get executed - cancel it
224 0         0 $p-> terminate;
225            
226 0 0       0 warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1;
227            
228 0         0 return IO::Lambda-> new-> call($?);
229             }
230              
231 32         363 return $p;
232             }
233              
234             sub new_process_posix
235             {
236             lambda {
237 2     2   19 my $h = IO::Handle-> new;
238 2         5413 my $pid = open( $h, '-|', @_);
239              
240 2 100       125 return undef, undef, $! unless $pid;
241              
242 1         41 this-> {pid} = $pid;
243 1         18 $h-> blocking(0);
244              
245 1         2 my $buf;
246 1         19 context readbuf, $h, \$buf, undef; # wait for EOF
247             tail {
248 1         3 my ($res, $error) = @_;
249 1 50       6 if ( defined $error) {
250 0         0 close $h;
251 0         0 return ($buf, $?, $error);
252             }
253              
254             # finished already
255 1 50       35 if (waitpid($pid, WNOHANG) != 0) {
256 1         22 my ( $exitcode, $error) = ( $?, $! );
257 1         21 close $h;
258 1         8 return ($buf, $exitcode, $error);
259             }
260             # wait for it
261 0         0 context $pid;
262             pid {
263 0         0 close $h;
264 0         0 return ($buf, shift);
265 1     2 0 35 }}}}
  0         0  
  2         19  
266              
267             sub new_process_win32
268             {
269             lambda {
270 0     0     my @cmd = @_;
271             context IO::Lambda::Thread::threaded( sub {
272 0           my $k = `@cmd`;
273 0 0         return $? ? ( undef, $?, $! ) : ( $k, 0, undef );
274 0           });
275 0           &tail();
276             }
277 0     0 0   }
278              
279              
280             if ( $^O !~ /win32/i) {
281             *new_process = \&new_process_posix;
282             } else {
283             require IO::Lambda::Thread;
284             unless ( $IO::Lambda::Thread::DISABLED) {
285             *new_process = \&new_process_win32;
286             } else {
287             *new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } };
288             }
289             }
290              
291              
292             1;
293              
294             __DATA__