File Coverage

blib/lib/IO/Lambda/Signal.pm
Criterion Covered Total %
statement 100 163 61.3
branch 22 78 28.2
condition n/a
subroutine 22 29 75.8
pod 3 14 21.4
total 147 284 51.7


line stmt bran cond sub pod time code
1             # $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $
2             package IO::Lambda::Signal;
3 2     2   1484 use vars qw(@ISA %SIGDATA);
  2         3  
  2         171  
4             @ISA = qw(Exporter);
5             @EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process);
6             %EXPORT_TAGS = ( all => \@EXPORT_OK);
7              
8             our $DEBUG = $IO::Lambda::DEBUG{signal} || 0;
9              
10 2     2   8 use strict;
  2         2  
  2         47  
11 2     2   7 use Carp;
  2         2  
  2         120  
12 2     2   1303 use IO::Handle;
  2         14797  
  2         126  
13 2     2   1289 use POSIX ":sys_wait_h";
  2         11785  
  2         14  
14 2     2   2600 use IO::Lambda qw(:all :dev);
  2         3  
  2         3771  
15              
16             my $MASTER = bless {}, __PACKAGE__;
17              
18             # register yield handler
19             IO::Lambda::add_loop($MASTER);
20 2     2   1263 END { IO::Lambda::remove_loop($MASTER) };
21              
22 10     10 0 50 sub empty { 0 == keys %SIGDATA }
23              
24             sub remove
25             {
26 0     0 0 0 my $lambda = $_[1];
27 0         0 my %rec;
28 0         0 keys %SIGDATA;
29 0         0 while ( my ($id, $v) = each %SIGDATA) {
30 0         0 for my $r (@{$v-> {lambdas}}) {
  0         0  
31 0         0 push @{$rec{$id}}, $r-> [0];
  0         0  
32             }
33             }
34 0         0 while ( my ($id, $v) = each %rec) {
35 0         0 unwatch_signal( $id, $_ ) for @$v;
36             }
37             }
38              
39             sub yield
40             {
41 4     4 0 20 my %v = %SIGDATA;
42 4         13 for my $id ( keys %v) {
43 4         11 my $v = $v{$id};
44             # use mutex in case signal happens right here during handling
45 4         8 $v-> {mutex} = 0;
46 4 50       16 warn " yield sig $id\n" if $DEBUG > 1;
47             AGAIN:
48 4 100       19 next unless $v-> {signal};
49              
50 1         3 my @r = @{$v-> {lambdas}};
  1         5  
51 1 50       5 warn " calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1;
52 1         3 for my $r ( @r) {
53 1         16 my ( $lambda, $callback, @param) = @$r;
54 1         4 $callback-> ( $lambda, @param);
55             }
56              
57 1         2 my $sigs = $v-> {mutex};
58 1 50       47 if ( $sigs) {
59 0 0       0 warn " caught $sigs signals during yield\n" if $DEBUG > 1;
60 0         0 $v-> {signal} = $sigs;
61 0         0 $v-> {mutex} -= $sigs;
62 0         0 goto AGAIN;
63             }
64             }
65             }
66              
67             sub signal_handler
68             {
69 1     1 0 4 my $id = shift;
70 1 50       9 warn "SIG{$id}\n" if $DEBUG;
71 1 50       6 return unless exists $SIGDATA{$id};
72 1         4 $SIGDATA{$id}-> {signal}++;
73 1         3 $SIGDATA{$id}-> {mutex}++;
74 1 50       51 $IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal');
75             }
76              
77             sub watch_signal
78             {
79 2     2 0 6 my ($id, $lambda, $callback, @param) = @_;
80              
81 2         6 my $entry = [ $lambda, $callback, @param ];
82 2 50       9 unless ( exists $SIGDATA{$id}) {
83             $SIGDATA{$id} = {
84             mutex => 0,
85             signal => 0,
86 2         24 save => $SIG{$id},
87             lambdas => [$entry],
88             };
89 2     1   25 $SIG{$id} = sub { signal_handler($id) };
  1         8  
90 2 50       12 warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1;
91             } else {
92 0         0 push @{ $SIGDATA{$id}-> {lambdas} }, $entry;
  0         0  
93 0 0       0 warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
94             }
95             }
96              
97             sub unwatch_signal
98             {
99 2     2 0 5 my ( $id, $lambda) = @_;
100              
101 2 50       12 return unless exists $SIGDATA{$id};
102            
103 2 50       7 warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
104              
105 2         17 @{ $SIGDATA{$id}-> {lambdas} } =
106 2         10 grep { $$_[0] != $lambda }
107 2         5 @{ $SIGDATA{$id}-> {lambdas} };
  2         8  
108            
109 2 50       4 return if @{ $SIGDATA{$id}-> {lambdas} };
  2         9  
110            
111 2 50       8 warn "uninstall signal handler for $id\n" if $DEBUG > 1;
112              
113 2 50       7 if (defined($SIGDATA{$id}-> {save})) {
114 0         0 $SIG{$id} = $SIGDATA{$id}-> {save};
115             } else {
116 2         30 delete $SIG{$id};
117             }
118 2         6 delete $SIGDATA{$id};
119             }
120              
121             # create a lambda that either returns undef on timeout,
122             # or some custom value based on passed callback
123             sub signal_or_timeout_lambda
124             {
125 2     2 0 6 my ( $id, $deadline, $condition) = @_;
126              
127 2         2 my $t;
128 2         7 my $q = IO::Lambda-> new;
129              
130             # wait for signal
131 2         11 my $c = $q-> bind;
132             watch_signal( $id, $q, sub {
133 1     1   4 my @ret = $condition-> ();
134 1 50       4 return unless @ret;
135              
136 1         4 unwatch_signal( $id, $q);
137 1 50       10 $q-> cancel_event($t) if $t;
138 1         6 $q-> resolve($c);
139 1         5 $q-> terminate(@ret); # result
140 1         2 undef $c;
141 1         2 undef $q;
142 2         15 });
143              
144             # or wait for timeout
145             $t = $q-> watch_timer( $deadline, sub {
146 1     1   4 unwatch_signal( $id, $q);
147 1         8 $q-> resolve($c);
148 1         2 undef $c;
149 1         2 undef $q;
150 1         3 return undef; #result
151 2 50       19 }) if $deadline;
152              
153 2         61 return $q;
154             }
155              
156             sub new_process;
157             # condition
158 2     2 1 34 sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') }
159 0     0 1 0 sub pid (&) { new_pid (context)-> condition(shift, \&pid, 'pid') }
160 2     2 1 26 sub spawn (&) { new_process-> call(context)-> condition(shift, \&spawn, 'spawn') }
161              
162             sub new_signal
163             {
164 2     2 0 6 my ( $id, $deadline) = @_;
165             signal_or_timeout_lambda( $id, $deadline,
166 2     1   14 sub { 1 });
  1         3  
167             }
168              
169             sub new_pid
170             {
171 0     0 0 0 my ( $pid, $deadline) = @_;
172              
173 0 0       0 croak 'bad pid' unless $pid =~ /^\-?\d+$/;
174 0 0       0 warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG;
175            
176             # avoid race conditions
177 0         0 my ( $savesig, $early_sigchld);
178 0 0       0 unless ( defined $SIGDATA{CHLD}) {
179 0 0       0 warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1;
180 0         0 $savesig = $SIG{CHLD};
181 0         0 $early_sigchld = 0;
182             $SIG{CHLD} = sub {
183 0 0   0   0 warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1;
184 0         0 $early_sigchld++
185 0         0 };
186             }
187              
188             # finished already
189 0 0       0 if ( waitpid( $pid, WNOHANG) != 0) {
190 0 0       0 if ( defined $early_sigchld) {
191 0 0       0 if ( defined( $savesig)) {
192 0         0 $SIG{CHLD} = $savesig;
193             } else {
194 0         0 delete $SIG{CHLD};
195             }
196             }
197 0 0       0 warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1;
198 0         0 return IO::Lambda-> new-> call($?)
199             }
200              
201             # wait
202             my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub {
203 0     0   0 my $wp = waitpid($pid, WNOHANG);
204 0 0       0 warn "waitpid($pid) = $wp\n" if $DEBUG > 1;
205 0 0       0 return if $wp == 0;
206 0         0 return $?;
207 0         0 });
208              
209 0 0       0 warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1;
210              
211             # don't let unwatch_signal() to restore it back to us
212 0 0       0 $SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld;
213              
214             # possibly have a race? gracefully remove the lambda
215 0 0       0 if ( $early_sigchld) {
216              
217             # Got a signal, but that wasn't our pid. And neither it was
218             # pid that we're watching.
219 0 0       0 return $p if waitpid( $pid, WNOHANG) == 0;
220              
221             # Our pid is finished. Unwatch the signal.
222 0         0 unwatch_signal( 'CHLD', $p);
223             # Lambda will also never get executed - cancel it
224 0         0 $p-> terminate;
225            
226 0 0       0 warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1;
227            
228 0         0 return IO::Lambda-> new-> call($?);
229             }
230              
231 0         0 return $p;
232             }
233              
234             sub new_process_posix
235             {
236             lambda {
237 2     2   19 my $h = IO::Handle-> new;
238 2         3360 my $pid = open( $h, '-|', @_);
239              
240 2 100       58 return undef, undef, $! unless $pid;
241              
242 1         15 this-> {pid} = $pid;
243 1         14 $h-> blocking(0);
244              
245 1         5 my $buf;
246 1         8 context readbuf, $h, \$buf, undef; # wait for EOF
247             tail {
248 1         1 my ($res, $error) = @_;
249 1 50       3 if ( defined $error) {
250 0         0 close $h;
251 0         0 return ($buf, $?, $error);
252             }
253              
254             # finished already
255 1 50       18 if (waitpid($pid, WNOHANG) != 0) {
256 1         10 my ( $exitcode, $error) = ( $?, $! );
257 1         11 close $h;
258 1         6 return ($buf, $exitcode, $error);
259             }
260             # wait for it
261 0         0 context $pid;
262             pid {
263 0         0 close $h;
264 0         0 return ($buf, shift);
265 1     2 0 16 }}}}
  0         0  
  2         14  
266              
267             sub new_process_win32
268             {
269             lambda {
270 0     0     my @cmd = @_;
271             context IO::Lambda::Thread::threaded( sub {
272 0           my $k = `@cmd`;
273 0 0         return $? ? ( undef, $?, $! ) : ( $k, 0, undef );
274 0           });
275 0           &tail();
276             }
277 0     0 0   }
278              
279              
280             if ( $^O !~ /win32/i) {
281             *new_process = \&new_process_posix;
282             } else {
283             require IO::Lambda::Thread;
284             unless ( $IO::Lambda::Thread::DISABLED) {
285             *new_process = \&new_process_win32;
286             } else {
287             *new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } };
288             }
289             }
290              
291              
292             1;
293              
294             __DATA__