File Coverage

blib/lib/IO/Lambda/Signal.pm
Criterion Covered Total %
statement 137 163 84.0
branch 43 78 55.1
condition n/a
subroutine 26 29 89.6
pod 3 14 21.4
total 209 284 73.5


line stmt bran cond sub pod time code
1             # $Id: Signal.pm,v 1.24 2010/03/25 12:52:36 dk Exp $
2             package IO::Lambda::Signal;
3 11     11   2022 use vars qw(@ISA %SIGDATA);
  11         21  
  11         1110  
4             @ISA = qw(Exporter);
5             @EXPORT_OK = qw(signal pid spawn new_signal new_pid new_process);
6             %EXPORT_TAGS = ( all => \@EXPORT_OK);
7              
8             our $DEBUG = $IO::Lambda::DEBUG{signal} || 0;
9              
10 11     11   55 use strict;
  11         12  
  11         270  
11 11     11   64 use Carp;
  11         13  
  11         675  
12 11     11   1896 use IO::Handle;
  11         14205  
  11         392  
13 11     11   1707 use POSIX ":sys_wait_h";
  11         13558  
  11         66  
14 11     11   7514 use IO::Lambda qw(:all :dev);
  11         22  
  11         28222  
15              
16             my $MASTER = bless {}, __PACKAGE__;
17              
18             # register yield handler
19             IO::Lambda::add_loop($MASTER);
20 11     11   15662 END { IO::Lambda::remove_loop($MASTER) };
21              
22 487448     487448 0 1558622 sub empty { 0 == keys %SIGDATA }
23              
24             sub remove
25             {
26 3     3 0 12 my $lambda = $_[1];
27 3         5 my %rec;
28 3         4 keys %SIGDATA;
29 3         13 while ( my ($id, $v) = each %SIGDATA) {
30 1         2 for my $r (@{$v-> {lambdas}}) {
  1         9  
31 1         3 push @{$rec{$id}}, $r-> [0];
  1         6  
32             }
33             }
34 3         16 while ( my ($id, $v) = each %rec) {
35 1         6 unwatch_signal( $id, $_ ) for @$v;
36             }
37             }
38              
39             sub yield
40             {
41 487338     487338 0 995431 my %v = %SIGDATA;
42 487338         908951 for my $id ( keys %v) {
43 487338         646357 my $v = $v{$id};
44             # use mutex in case signal happens right here during handling
45 487338         637153 $v-> {mutex} = 0;
46 487338 50       898580 warn " yield sig $id\n" if $DEBUG > 1;
47             AGAIN:
48 487341 100       1675690 next unless $v-> {signal};
49              
50 698         803 my @r = @{$v-> {lambdas}};
  698         1446  
51 698 50       1461 warn " calling ", scalar(@r), " sig handlers\n" if $DEBUG > 1;
52 698         957 for my $r ( @r) {
53 707         1114 my ( $lambda, $callback, @param) = @$r;
54 707         1428 $callback-> ( $lambda, @param);
55             }
56              
57 698         1058 my $sigs = $v-> {mutex};
58 698 100       2755 if ( $sigs) {
59 3 50       21 warn " caught $sigs signals during yield\n" if $DEBUG > 1;
60 3         27 $v-> {signal} = $sigs;
61 3         9 $v-> {mutex} -= $sigs;
62 3         21 goto AGAIN;
63             }
64             }
65             }
66              
67             sub signal_handler
68             {
69 32     32 0 91 my $id = shift;
70 32 50       134 warn "SIG{$id}\n" if $DEBUG;
71 32 50       259 return unless exists $SIGDATA{$id};
72 32         85 $SIGDATA{$id}-> {signal}++;
73 32         80 $SIGDATA{$id}-> {mutex}++;
74 32 50       585 $IO::Lambda::LOOP-> signal($id) if $IO::Lambda::LOOP-> can('signal');
75             }
76              
77             sub watch_signal
78             {
79 34     34 0 95 my ($id, $lambda, $callback, @param) = @_;
80              
81 34         87 my $entry = [ $lambda, $callback, @param ];
82 34 100       137 unless ( exists $SIGDATA{$id}) {
83             $SIGDATA{$id} = {
84             mutex => 0,
85             signal => 0,
86 28         827 save => $SIG{$id},
87             lambdas => [$entry],
88             };
89 28     32   378 $SIG{$id} = sub { signal_handler($id) };
  32         129  
90 28 50       124 warn "install signal handler for $id ", _o($lambda), "\n" if $DEBUG > 1;
91             } else {
92 6         39 push @{ $SIGDATA{$id}-> {lambdas} }, $entry;
  6         21  
93 6 50       24 warn "push signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
94             }
95             }
96              
97             sub unwatch_signal
98             {
99 34     34 0 69 my ( $id, $lambda) = @_;
100              
101 34 50       124 return unless exists $SIGDATA{$id};
102            
103 34 50       108 warn "remove signal handler for $id ", _o($lambda), "\n" if $DEBUG > 2;
104              
105 34         133 @{ $SIGDATA{$id}-> {lambdas} } =
106 43         180 grep { $$_[0] != $lambda }
107 34         56 @{ $SIGDATA{$id}-> {lambdas} };
  34         118  
108            
109 34 100       71 return if @{ $SIGDATA{$id}-> {lambdas} };
  34         212  
110            
111 28 50       92 warn "uninstall signal handler for $id\n" if $DEBUG > 1;
112              
113 28 50       97 if (defined($SIGDATA{$id}-> {save})) {
114 0         0 $SIG{$id} = $SIGDATA{$id}-> {save};
115             } else {
116 28         440 delete $SIG{$id};
117             }
118 28         135 delete $SIGDATA{$id};
119             }
120              
121             # create a lambda that either returns undef on timeout,
122             # or some custom value based on passed callback
123             sub signal_or_timeout_lambda
124             {
125 34     34 0 223 my ( $id, $deadline, $condition) = @_;
126              
127 34         64 my $t;
128 34         150 my $q = IO::Lambda-> new;
129              
130             # wait for signal
131 34         160 my $c = $q-> bind;
132             watch_signal( $id, $q, sub {
133 707     707   1189 my @ret = $condition-> ();
134 707 100       2024 return unless @ret;
135              
136 32         121 unwatch_signal( $id, $q);
137 32 100       105 $q-> cancel_event($t) if $t;
138 32         290 $q-> resolve($c);
139 32         158 $q-> terminate(@ret); # result
140 32         66 undef $c;
141 32         101 undef $q;
142 34         384 });
143              
144             # or wait for timeout
145             $t = $q-> watch_timer( $deadline, sub {
146 1     1   6 unwatch_signal( $id, $q);
147 1         56 $q-> resolve($c);
148 1         4 undef $c;
149 1         3 undef $q;
150 1         6 return undef; #result
151 34 100       107 }) if $deadline;
152              
153 34         245 return $q;
154             }
155              
156             sub new_process;
157             # condition
158 2     2 1 41 sub signal (&) { new_signal (context)-> condition(shift, \&signal, 'signal') }
159 32     32 1 120 sub pid (&) { new_pid (context)-> condition(shift, \&pid, 'pid') }
160 2     2 1 28 sub spawn (&) { new_process-> call(context)-> condition(shift, \&spawn, 'spawn') }
161              
162             sub new_signal
163             {
164 2     2 0 6 my ( $id, $deadline) = @_;
165             signal_or_timeout_lambda( $id, $deadline,
166 2     1   17 sub { 1 });
  1         4  
167             }
168              
169             sub new_pid
170             {
171 32     32 0 64 my ( $pid, $deadline) = @_;
172              
173 32 50       858 croak 'bad pid' unless $pid =~ /^\-?\d+$/;
174 32 50       104 warn "new_pid($pid) ", _t($deadline), "\n" if $DEBUG;
175            
176             # avoid race conditions
177 32         46 my ( $savesig, $early_sigchld);
178 32 100       119 unless ( defined $SIGDATA{CHLD}) {
179 26 50       69 warn "new_pid: install early SIGCHLD detector\n" if $DEBUG > 1;
180 26         81 $savesig = $SIG{CHLD};
181 26         41 $early_sigchld = 0;
182             $SIG{CHLD} = sub {
183 0 0   0   0 warn "new_pid: early SIGCHLD caught\n" if $DEBUG > 1;
184 0         0 $early_sigchld++
185 26         899 };
186             }
187              
188             # finished already
189 32 50       250 if ( waitpid( $pid, WNOHANG) != 0) {
190 0 0       0 if ( defined $early_sigchld) {
191 0 0       0 if ( defined( $savesig)) {
192 0         0 $SIG{CHLD} = $savesig;
193             } else {
194 0         0 delete $SIG{CHLD};
195             }
196             }
197 0 0       0 warn "new_pid($pid): finished already with $?\n" if $DEBUG > 1;
198 0         0 return IO::Lambda-> new-> call($?)
199             }
200              
201             # wait
202             my $p = signal_or_timeout_lambda( 'CHLD', $deadline, sub {
203 706     706   2660 my $wp = waitpid($pid, WNOHANG);
204 706 50       1294 warn "waitpid($pid) = $wp\n" if $DEBUG > 1;
205 706 100       1771 return if $wp == 0;
206 31         372 return $?;
207 32         410 });
208              
209 32 50       132 warn "new_pid: new lambda(", _o($p), ")\n" if $DEBUG > 1;
210              
211             # don't let unwatch_signal() to restore it back to us
212 32 100       196 $SIGDATA{CHLD}-> {save} = $savesig if defined $early_sigchld;
213              
214             # possibly have a race? gracefully remove the lambda
215 32 50       160 if ( $early_sigchld) {
216              
217             # Got a signal, but that wasn't our pid. And neither it was
218             # pid that we're watching.
219 0 0       0 return $p if waitpid( $pid, WNOHANG) == 0;
220              
221             # Our pid is finished. Unwatch the signal.
222 0         0 unwatch_signal( 'CHLD', $p);
223             # Lambda will also never get executed - cancel it
224 0         0 $p-> terminate;
225            
226 0 0       0 warn "new_pid($pid): finished with race: $?, ", _o($p), " killed\n" if $DEBUG > 1;
227            
228 0         0 return IO::Lambda-> new-> call($?);
229             }
230              
231 32         336 return $p;
232             }
233              
234             sub new_process_posix
235             {
236             lambda {
237 2     2   20 my $h = IO::Handle-> new;
238 2         6758 my $pid = open( $h, '-|', @_);
239              
240 2 100       106 return undef, undef, $! unless $pid;
241              
242 1         28 this-> {pid} = $pid;
243 1         11 $h-> blocking(0);
244              
245 1         2 my $buf;
246 1         12 context readbuf, $h, \$buf, undef; # wait for EOF
247             tail {
248 1         2 my ($res, $error) = @_;
249 1 50       4 if ( defined $error) {
250 0         0 close $h;
251 0         0 return ($buf, $?, $error);
252             }
253              
254             # finished already
255 1 50       26 if (waitpid($pid, WNOHANG) != 0) {
256 1         14 my ( $exitcode, $error) = ( $?, $! );
257 1         14 close $h;
258 1         7 return ($buf, $exitcode, $error);
259             }
260             # wait for it
261 0         0 context $pid;
262             pid {
263 0         0 close $h;
264 0         0 return ($buf, shift);
265 1     2 0 25 }}}}
  0         0  
  2         15  
266              
267             sub new_process_win32
268             {
269             lambda {
270 0     0     my @cmd = @_;
271             context IO::Lambda::Thread::threaded( sub {
272 0           my $k = `@cmd`;
273 0 0         return $? ? ( undef, $?, $! ) : ( $k, 0, undef );
274 0           });
275 0           &tail();
276             }
277 0     0 0   }
278              
279              
280             if ( $^O !~ /win32/i) {
281             *new_process = \&new_process_posix;
282             } else {
283             require IO::Lambda::Thread;
284             unless ( $IO::Lambda::Thread::DISABLED) {
285             *new_process = \&new_process_win32;
286             } else {
287             *new_process = sub { lambda { undef, undef, $IO::Lambda::Thread::DISABLED } };
288             }
289             }
290              
291              
292             1;
293              
294             __DATA__