File Coverage

blib/lib/IO/Lambda.pm
Criterion Covered Total %
statement 660 750 88.0
branch 257 424 60.6
condition 49 102 48.0
subroutine 121 145 83.4
pod 60 77 77.9
total 1147 1498 76.5


line stmt bran cond sub pod time code
1             # $Id: Lambda.pm,v 1.191 2012/01/13 06:41:28 dk Exp $
2             package IO::Lambda;
3              
4 15     15   269113 use Carp qw(croak);
  15         31  
  15         1062  
5 15     15   72 use strict;
  15         21  
  15         387  
6 15     15   65 use warnings;
  15         22  
  15         417  
7 15     15   60 use Exporter;
  15         18  
  15         492  
8 15     15   7759 use Sub::Name;
  15         9577  
  15         805  
9 15     15   81 use Scalar::Util qw(weaken);
  15         20  
  15         1308  
10 15     15   11421 use Time::HiRes qw(time);
  15         14688  
  15         79  
11 15         7630 use vars qw(
12             $LOOP %EVENTS @LOOPS
13             $VERSION @ISA
14             @EXPORT_OK %EXPORT_TAGS @EXPORT_CONSTANTS @EXPORT_LAMBDA @EXPORT_STREAM
15             @EXPORT_DEV @EXPORT_MISC @EXPORT_FUNC
16             $THIS @CONTEXT $METHOD $CALLBACK $AGAIN $SIGTHROW
17             $DEBUG_IO $DEBUG_LAMBDA $DEBUG_CALLER %DEBUG
18 15     15   2531 );
  15         166  
19             $VERSION = '1.24';
20             @ISA = qw(Exporter);
21             @EXPORT_CONSTANTS = qw(
22             IO_READ IO_WRITE IO_EXCEPTION
23             WATCH_OBJ WATCH_DEADLINE WATCH_LAMBDA WATCH_CALLBACK
24             WATCH_IO_HANDLE WATCH_IO_FLAGS WATCH_CALLER WATCH_CANCEL
25             );
26             @EXPORT_STREAM = qw(
27             sysreader syswriter getline readbuf writebuf
28             );
29             @EXPORT_LAMBDA = qw(
30             this context lambda again state restartable catch autocatch
31             io readable writable rwx timeout tail tails tailo any_tail
32             );
33             @EXPORT_FUNC = qw(
34             seq par mapcar filter fold curry
35             );
36             @EXPORT_MISC = qw(
37             set_frame get_frame swap_frame sigthrow
38             );
39             @EXPORT_DEV = qw(
40             _subname _o _t
41             );
42             @EXPORT_OK = (
43             @EXPORT_LAMBDA, @EXPORT_CONSTANTS, @EXPORT_STREAM,
44             @EXPORT_DEV, @EXPORT_MISC, @EXPORT_FUNC
45             );
46             %EXPORT_TAGS = (
47             func => \@EXPORT_FUNC,
48             lambda => \@EXPORT_LAMBDA,
49             stream => \@EXPORT_STREAM,
50             constants => \@EXPORT_CONSTANTS,
51             dev => \@EXPORT_DEV,
52             all => [ @EXPORT_LAMBDA, @EXPORT_STREAM, @EXPORT_CONSTANTS, @EXPORT_FUNC ],
53             );
54              
55             if ( exists $ENV{IO_LAMBDA_DEBUG}) {
56             for my $p ( split ',', $ENV{IO_LAMBDA_DEBUG}) {
57             if ( $p =~ /^([^=]+)=(.*)$/) {
58             $DEBUG{lc $1}=$2;
59             } else {
60             $DEBUG{lc $p}++;
61             }
62             }
63             $DEBUG_IO = $DEBUG{io} || 0;
64             $DEBUG_LAMBDA = $DEBUG{lambda} || 0;
65             $DEBUG_CALLER = $DEBUG{caller} || 0;
66             $IO::Lambda::Loop::DEFAULT = $DEBUG{loop} if $DEBUG{loop};
67             $SIG{__DIE__} = sub {
68             return if $^S;
69             Carp::confess(@_);
70             } if $DEBUG{die};
71             }
72              
73 15     15   80 use constant IO_READ => 4;
  15         22  
  15         1332  
74 15     15   206 use constant IO_WRITE => 2;
  15         23  
  15         782  
75 15     15   66 use constant IO_EXCEPTION => 1;
  15         20  
  15         909  
76            
77 15     15   70 use constant WATCH_OBJ => 0;
  15         22  
  15         727  
78 15     15   65 use constant WATCH_CANCEL => 1;
  15         18  
  15         724  
79              
80 15     15   60 use constant WATCH_DEADLINE => 2;
  15         21  
  15         648  
81 15     15   61 use constant WATCH_LAMBDA => 2;
  15         21  
  15         692  
82 15     15   62 use constant WATCH_CALLBACK => 3;
  15         15  
  15         699  
83              
84 15     15   61 use constant WATCH_CALLER => 4;
  15         25  
  15         706  
85 15     15   71 use constant WATCH_IO_HANDLE => 4;
  15         18  
  15         953  
86 15     15   68 use constant WATCH_IO_FLAGS => 5;
  15         26  
  15         123578  
87              
88             sub new
89             {
90 216 100   216 1 520 IO::Lambda::Loop-> new unless $LOOP;
91 216         825 return bless {
92             in => [], # events we wait for
93             last => [], # result of the last state
94             stopped => 0, # initial state
95             start => $_[1], # kick-start coderef
96             }, $_[0];
97             }
98              
99             sub DESTROY
100             {
101 196     196   1775 my $self = $_[0];
102 196         355 $self-> cancel_all_events;
103             }
104              
105             my $_doffs = 0;
106 0     0   0 sub _d_in { $_doffs++ }
107 0 0   0   0 sub _d_out { $_doffs-- if $_doffs }
108 0     0   0 sub _d { (' ' x $_doffs), _obj(shift), ': ', @_, "\n" }
109 0     0   0 sub _o { $_[0] =~ /0x([\w]+)/; $1 }
  0         0  
110 0   0 0   0 sub _obj { "lambda(". _o($_[0]) . ")." . ( $_[0]->{caller} || '()' ) }
111 0 0   0   0 sub _t { defined($_[0]) ? ( "time(", (($_[0] < 1_000_000) ? $_[0] : $_[0]-time()), ")" ) : () }
    0          
112             sub _ev
113             {
114 0     0   0 $_[0] =~ /0x([\w]+)/;
115             "event($1) ",
116 0 0       0 (($#{$_[0]} == WATCH_IO_FLAGS) ? (
  0 0       0  
    0          
    0          
    0          
    0          
117             'fd=',
118             fileno($_[0]->[WATCH_IO_HANDLE]),
119             ' ',
120             ( $_[0]->[WATCH_IO_FLAGS] ? (
121             join('/',
122             (($_[0]->[WATCH_IO_FLAGS] & IO_READ) ? 'read' : ()),
123             (($_[0]->[WATCH_IO_FLAGS] & IO_WRITE) ? 'write' : ()),
124             (($_[0]->[WATCH_IO_FLAGS] & IO_EXCEPTION) ? 'exc' : ()),
125             )) :
126             'timeout'
127             ),
128             ' ', _t($_[0]->[WATCH_DEADLINE]),
129             ) : (
130             ref($_[0]-> [WATCH_LAMBDA]) ?
131             _obj($_[0]-> [WATCH_LAMBDA]) :
132             _t($_[0]->[WATCH_DEADLINE])
133             ))
134             }
135             sub _msg
136             {
137 0     0   0 my $self = shift;
138             _d(
139             $self,
140             "@_ >> (",
141             join(',',
142             map {
143 0 0       0 defined($_) ? $_ : 'undef'
144             }
145 0         0 @{$self->{last}}
  0         0  
146             ),
147             ')'
148             )
149             }
150              
151             #
152             # Part I - Object interface to callback and
153             # messaging interface with event loop and lambdas
154             #
155             #########################################################
156              
157             # register an IO event
158             sub watch_io
159             {
160 10     10 1 52 my ( $self, $flags, $handle, $deadline, $callback, $cancel) = @_;
161              
162 10 50       21 croak "can't register events on a stopped lambda" if $self-> {stopped};
163 10 50       23 croak "bad io flags" if 0 == ($flags & (IO_READ|IO_WRITE|IO_EXCEPTION));
164              
165 10 100 66     34 $deadline += time if defined($deadline) and $deadline < 1_000_000_000;
166            
167 10         29 my $rec = [
168             $self,
169             $cancel,
170             $deadline,
171             $callback,
172             $handle,
173             $flags,
174             ];
175 10         24 weaken $rec->[0];
176 10         9 push @{$self-> {in}}, $rec;
  10         16  
177              
178 10 50       30 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
179              
180 10         32 $LOOP-> watch( $rec );
181              
182 10         22 return $rec;
183             }
184              
185             # register a timeout
186             sub watch_timer
187             {
188 50     50 1 99 my ( $self, $deadline, $callback, $cancel) = @_;
189              
190 50 50       110 croak "can't register events on a stopped lambda" if $self-> {stopped};
191 50 50       110 croak "$self: time is undefined" unless defined $deadline;
192            
193 50 100       413 $deadline += time if $deadline < 1_000_000_000;
194 50         109 my $rec = [
195             $self,
196             $cancel,
197             $deadline,
198             $callback,
199             ];
200 50         147 weaken $rec->[0];
201 50         51 push @{$self-> {in}}, $rec;
  50         100  
202              
203 50 50       113 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
204              
205 50         174 $LOOP-> after( $rec);
206            
207 50         134 return $rec;
208             }
209              
210             # register a callback when another lambda exits
211             sub watch_lambda
212             {
213 183     183 1 271 my ( $self, $lambda, $callback, $cancel) = @_;
214 183         219 @_ = (); # perl bug http://rt.perl.org/rt3//Public/Bug/Display.html?id=70974
215              
216 183 50       334 croak "can't register events on a stopped lambda" if $self-> {stopped};
217 183 50 33     997 croak "bad lambda" unless $lambda and $lambda->isa('IO::Lambda');
218              
219 183 50       360 croak "won't watch myself" if $self == $lambda;
220             # XXX check cycling
221            
222 183 50       302 $lambda-> reset if $lambda-> is_stopped;
223              
224 183         378 my $rec = [
225             $self,
226             $cancel,
227             $lambda,
228             $callback,
229             ];
230 183         416 weaken $rec->[0];
231 183 50       295 $rec-> [WATCH_CALLER] = Carp::shortmess if $DEBUG_CALLER;
232 183         180 push @{$self-> {in}}, $rec;
  183         296  
233 183         173 push @{$EVENTS{"$lambda"}}, $rec;
  183         557  
234              
235 183 100       307 $lambda-> start if $lambda-> is_passive;
236              
237 183 50       322 warn _d( $self, "> ", _ev($rec)) if $DEBUG_LAMBDA;
238              
239 183         649 return $rec;
240             }
241              
242             # watch the watchers
243             sub override
244             {
245 33 100   33 1 1139 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
246              
247 33 100       53 if ( $cb) {
248 13   100     60 $self-> {override}->{$method} ||= [];
249 13         17 push @{$self-> {override}->{$method}}, [ $state, $cb ];
  13         56  
250             } else {
251 20         17 my $p;
252 20 100       49 return unless $p = $self-> {override}->{$method};
253 19         50 for ( my $i = $#$p; $i >= 0; $i--) {
254 21 100 33     215 if (
      33        
      66        
      33        
255             (
256             not defined ($state) and
257             not defined ($p->[$i]-> [0])
258             ) or (
259             defined($state) and
260             defined($p->[$i]-> [0]) and
261             $p->[$i]->[0] eq $state
262             )
263             ) {
264 19         33 my $ret = splice( @$p, $i, 1);
265 19 100       57 delete $self-> {override}->{$method} unless @$p;
266 19         64 return $ret->[1];
267             }
268             }
269              
270 0         0 return undef;
271             }
272             }
273              
274             sub override_handler
275             {
276 40     40 0 50 my ( $self, $method, $sub, $cb) = @_;
277              
278 40         57 my $o = $self-> {override}-> {$method}-> [-1];
279              
280             # check state match
281 40         62 my ($a, $b) = ( $self-> {state}, $o-> [0]);
282 40 100 66     433 unless (
283             ( not defined($a) and not defined ($b)) or
284             ( defined $a and defined $b and $a eq $b) or
285             ( defined $b and $b eq '*')
286             ) {
287             # state not matched
288 8 100       9 if ( 1 == @{$self-> {override}->{$method}}) {
  8         21  
289 4         9 local $self-> {override}->{$method} = undef;
290 4         10 return $sub-> ($cb);
291             } else {
292 4         11 pop @{$self-> {override}->{$method}};
  4         10  
293 4         10 my $ret = $sub-> ($cb);
294 4         6 push @{$self->{override}->{$method}}, $o;
  4         9  
295 4         14 return $ret;
296             }
297             } else {
298             # state matched
299 32         85 local $self-> {super} = [ $sub, $cb ];
300 32 100       37 if ( 1 == @{$self-> {override}->{$method}}) {
  32         66  
301 22         59 local $self-> {override}->{$method} = undef;
302 22         55 return $o-> [1]-> ( $self, $sub, $cb);
303             } else {
304 10         10 pop @{$self-> {override}->{$method}};
  10         18  
305 10         30 my $ret = $o-> [1]-> ( $self, $sub, $cb);
306 10         18 push @{$self->{override}->{$method}}, $o;
  10         22  
307 10         36 return $ret;
308             }
309             }
310             }
311              
312             # Insert a new callback to be called before original callback.
313             # Needs to insert callbacks in {override} stack in reverse order,
314             # because direct order serves LIFO order for override() callbacks, --
315             # and that means FIFO for intercept() callbacks. But we also want LIFO.
316             sub intercept
317             {
318 24 100   24 1 1981 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
319            
320 24 100       51 return $self-> override( $method, $state, undef) unless $cb;
321              
322 14         31 _subname("intercept($method:$state)" => $cb);
323              
324 14   100     42 $self-> {override}->{$method} ||= [];
325 14         57 unshift @{$self-> {override}->{$method}}, [ $state, sub {
326             # this is called when lambda calls $method with $state
327 17     17   18 my ( undef, $sub, $orig_cb) = @_;
328             # $sub is a condition, like readable(&) or tail(&)
329             $sub->( sub {
330             # that (&) is finally called when IO event is there
331 16         25 local $self-> {super} = [$orig_cb];
332 16         24 &$cb;
333 17         50 });
334 14         5 } ];
335             }
336              
337             sub super
338             {
339 25 50   25 1 76 croak "super() call outside overridden condition" unless $_[0]-> {super};
340 25         27 my $data = $_[0]-> {super};
341 25 100       43 if ( defined $data-> [1]) {
342             # override() super
343 5         12 return $data-> [0]-> ($data-> [1]);
344             } else {
345             # intercept() super
346 20         22 my $self = shift;
347 20 50       62 return defined($data->[0]) ?
    100          
348             $data-> [0]-> (@_) :
349             ( wantarray ? @_ : $_[0] );
350             }
351             }
352              
353              
354             # handle incoming asynchronous events
355             sub io_handler
356             {
357 53     53 0 80 my ( $self, $rec) = @_;
358              
359 53 50       122 warn _d( $self, '< ', _ev($rec)) if $DEBUG_IO;
360              
361 53         81 my $in = $self-> {in};
362 53         64 my $nn = @$in;
363 53         85 @$in = grep { $rec != $_ } @$in;
  59         243  
364 53 50 33     327 die _d($self, 'stray ', _ev($rec))
365             if $nn == @$in or $self != $rec->[WATCH_OBJ];
366              
367 53 50       144 _d_in if $DEBUG_IO;
368 53         138 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
369 53         223 @{$self->{last}} = $rec-> [WATCH_CALLBACK]-> (
370             $self,
371             (($#$rec == WATCH_IO_FLAGS) ? $rec-> [WATCH_IO_FLAGS] : ()),
372 53 100       223 @{$self->{last}}
  53 50       180  
373             ) if $rec-> [WATCH_CALLBACK];
374              
375 53 50       124 _d_out if $DEBUG_IO;
376 53 50       120 warn $self-> _msg('io') if $DEBUG_IO;
377              
378 53 100       226 unless ( @$in) {
379 38 50       63 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
380 38         315 $self-> {stopped}++;
381             }
382             }
383              
384             # handle incoming synchronous events
385             sub lambda_handler
386             {
387 174     174 0 200 my ( $self, $rec) = @_;
388              
389 174 50       290 warn _d( $self, '< ', _ev($rec)) if $DEBUG_LAMBDA;
390              
391 174         232 my $in = $self-> {in};
392 174         197 my $nn = @$in;
393 174         196 @$in = grep { $rec != $_ } @$in;
  193         495  
394 174 50 33     793 die _d($self, 'stray ', _ev($rec))
395             if $nn == @$in or $self != $rec->[WATCH_OBJ];
396              
397 174         191 my $lambda = $rec-> [WATCH_LAMBDA];
398             die _d($self,
399             'handler called but ', _obj($lambda),
400 174 50       287 ' is not finished yet') unless $lambda-> {stopped};
401              
402 174         354 my $arr = $EVENTS{"$lambda"};
403 174         265 @$arr = grep { $_ != $rec } @$arr;
  177         359  
404 174 100       567 delete $EVENTS{"$lambda"} unless @$arr;
405              
406 174 50       285 _d_in if $DEBUG_LAMBDA;
407            
408 174         375 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
409 174         397 @{$self->{last}} =
410             $rec-> [WATCH_CALLBACK] ?
411             $rec-> [WATCH_CALLBACK]-> (
412             $self,
413 156         411 @{$rec-> [WATCH_LAMBDA]-> {last}}
414             ) :
415 174 100       315 @{$rec-> [WATCH_LAMBDA]-> {last}};
  18         25  
416              
417 174 50       325 _d_out if $DEBUG_LAMBDA;
418 174 50       265 warn $self-> _msg('tail') if $DEBUG_LAMBDA;
419              
420 174 100       386 unless ( @$in) {
421 104 50       165 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
422 104         238 $self-> {stopped} = 1;
423             }
424             }
425              
426             # Removes one event from queue
427             sub cancel_event
428             {
429 7     7 0 14 my ( $self, $rec) = @_;
430              
431 7 100       9 return unless @{$self-> {in}};
  7         25  
432              
433 6 50       17 @{$self->{last}} = $rec-> [WATCH_CANCEL]->($self, @{$self->{last}})
  0         0  
  0         0  
434             if $rec-> [WATCH_CANCEL];
435              
436 6 50       37 $LOOP-> remove_event($rec) if $LOOP;
437 6         9 @{$self-> {in}} = grep { $_ != $rec } @{$self-> {in}};
  6         13  
  8         20  
  6         14  
438              
439 6 100 66     53 if ($rec->[WATCH_LAMBDA] and ref($rec->[WATCH_LAMBDA])) {
440 3         7 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
441 3 100       9 if ( $arr) {
442 2         6 @$arr = grep { $_ != $rec } @$arr;
  2         5  
443 2 50       11 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
444             }
445             }
446 6         117 @$rec = ();
447              
448 6 100       7 return if @{$self->{in}};
  6         26  
449             # that was the last event
450              
451 3 50       8 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
452 3         7 $self-> {stopped} = 1;
453 3         14 $_-> remove( $self) for @LOOPS;
454             }
455              
456             # Removes all events bound to the object, notifies the interested objects.
457             # The object becomes stopped immediately, so no new events will be allowed to register.
458             sub cancel_all_events
459             {
460 330     330 0 304 my $self = shift;
461              
462 330         415 $self-> {stopped} = 1;
463              
464 330 100       259 return unless @{$self-> {in}};
  330         2504  
465            
466 22         33 for ( grep { $_-> [WATCH_CANCEL] } reverse @{$self-> {in}}) {
  23         57  
  22         40  
467 14         14 my $wc = $_-> [WATCH_CANCEL];
468 14         15 $_-> [WATCH_CANCEL] = undef;
469 14         17 @{$self->{last}} = $wc-> ($self, $_, @{$self->{last}})
  14         112  
  14         41  
470             }
471              
472 22 50       107 $LOOP-> remove( $self) if $LOOP;
473 22         50 $_-> remove($self) for @LOOPS;
474              
475 22         29 for my $rec ( @{$self->{in}}) {
  22         60  
476 20 100       54 if ( ref($rec->[WATCH_LAMBDA])) {
477 7         14 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
478 7 50       11 if ( $arr) {
479 7         9 @$arr = grep { $_ != $rec } @$arr;
  7         19  
480 7 50       25 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
481             }
482             }
483 20         57 @$rec = ();
484             }
485              
486 22         26 @{$self-> {in}} = ();
  22         52  
487             }
488              
489             sub autorestart
490             {
491             $#_ ?
492             $_[0]-> {autorestart} = $_[1] :
493             ( exists($_[0]-> {autorestart}) ?
494 43 50   43 1 211 $_[0]-> {autorestart} : 1)
    50          
495             }
496 341     341 1 1352 sub is_stopped { $_[0]-> {stopped} }
497 4 100   4 1 18 sub is_waiting { not($_[0]->{stopped}) and @{$_[0]->{in}} }
  2         8  
498 296   100 296 1 709 sub is_passive { not($_[0]->{stopped}) and not(@{$_[0]->{in}}) }
499 611 100   611 1 1169 sub is_active { $_[0]->{stopped} or @{$_[0]->{in}} }
  609         1858  
500              
501             # reset the state machine
502             sub reset
503             {
504 82     82 1 3815 my $self = shift;
505              
506 82         145 $self-> cancel_all_events;
507 82         75 @{$self-> {last}} = ();
  82         148  
508 82         108 delete $self-> {stopped};
509 82 50       178 warn _d( $self, 'reset') if $DEBUG_LAMBDA;
510             }
511              
512             # start the state machine
513             sub start
514             {
515 271     271 1 307 my $self = shift;
516              
517 271 50       383 croak "can't start active lambda, call reset() first" if $self-> is_active;
518              
519 271 50       467 warn _d( $self, 'started') if $DEBUG_LAMBDA;
520 268         826 @{$self->{last}} = $self-> {start}-> ($self, @{$self->{last}})
  268         594  
521 271 100       550 if $self-> {start};
522 271 50       501 warn $self-> _msg('initial') if $DEBUG_LAMBDA;
523              
524 271 100       204 unless ( @{$self->{in}}) {
  271         594  
525 125 50       198 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
526 125         194 $self-> {stopped} = 1;
527             }
528             }
529              
530             # peek into the current state
531 112 100   112 1 469 sub peek { wantarray ? @{$_[0]->{last}} : $_[0]-> {last}-> [0] }
  10         60  
532              
533             # pass initial parameters to lambda
534             sub call
535             {
536 240     240 1 254 my $self = shift;
537              
538 240 50       351 croak "can't call active lambda" if $self-> is_active;
539              
540 240         283 @{$self-> {last}} = @_;
  240         385  
541 240         273 $self;
542             }
543              
544             # abandon all states and stop with constant message
545             sub terminate
546             {
547 51     51 1 70 my ( $self, @error) = @_;
548              
549 51         357 $self-> {last} = \@error;
550 51         111 $self-> cancel_all_events;
551 51 50       106 warn $self-> _msg('terminate') if $DEBUG_LAMBDA;
552             }
553              
554             # propagate event destruction on all levels
555             sub destroy
556             {
557 0     0 1 0 shift-> cancel_all_events( cascade => 1);
558             }
559              
560             # synchronisation
561              
562             # drives objects dependant on the other objects until all of them
563             # are stopped
564             sub drive
565             {
566 170     170 0 193 my $changed = 1;
567 170         160 my $executed = 0;
568 170 50       366 warn "IO::Lambda::drive --------\n" if $DEBUG_LAMBDA;
569 170         321 while ( $changed) {
570 317         303 $changed = 0;
571              
572             # dispatch
573 317         587 for my $rec ( map { @$_ } values %EVENTS) {
  388         582  
574 397 100       765 next unless $rec->[WATCH_LAMBDA]-> {stopped};
575 174         352 $rec->[WATCH_OBJ]-> lambda_handler( $rec);
576 174         168 $changed = 1;
577 174         226 $executed++;
578             }
579 317 50 33     1317 warn "IO::Lambda::drive .........\n" if $DEBUG_LAMBDA and $changed;
580             }
581 170 50       301 warn "IO::Lambda::drive +++++++++\n" if $DEBUG_LAMBDA;
582              
583 170         385 return $executed;
584             }
585              
586             # do one quant
587             sub yield
588             {
589 170     170 1 200 my $nonblocking = shift;
590 170         169 my $more_events = 0;
591              
592             # custom loops must not wait
593 170         330 for ( @LOOPS) {
594 26 100       115 next if $_-> empty;
595 20         71 $_-> yield;
596 20         37 $more_events = 1;
597             }
598              
599 170 100       290 if ( drive) {
600             # some callbacks we called, don't let them wait in sleep
601 78         224 return 1;
602             }
603              
604             # main loop waits, if anything
605 92 100       314 unless ( $LOOP-> empty) {
606 89         237 $LOOP-> yield( $nonblocking);
607 89         126 $more_events = 1;
608             }
609              
610 92 100       267 $more_events = 1 if keys %EVENTS;
611 92         371 return $more_events;
612             }
613              
614              
615             # wait for one lambda to stop
616             sub wait
617             {
618 107     107 1 185 my $self = shift;
619 107 100       245 if ( $self-> is_passive) {
620 91         210 $self-> call(@_);
621 91         199 $self-> start;
622             }
623 107         351 yield while not $self-> {stopped};
624 107         255 return $self-> peek;
625             }
626              
627             # wait for all lambdas to stop
628             sub wait_for_all
629             {
630 0     0 1 0 my @objects = @_;
631 0 0       0 return unless @objects;
632 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
633 0         0 my @ret;
634 0         0 while ( 1) {
635 0         0 push @ret, map { $_-> peek } grep { $_-> {stopped} } @objects;
  0         0  
  0         0  
636 0         0 @objects = grep { not $_-> {stopped} } @objects;
  0         0  
637 0 0       0 last unless @objects;
638 0         0 yield;
639             }
640 0         0 return @ret;
641             }
642              
643             # wait for at least one lambda to stop, return those that stopped
644             sub wait_for_any
645             {
646 0     0 1 0 my @objects = @_;
647 0 0       0 return unless @objects;
648 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
649 0         0 while ( 1) {
650 0         0 my @n = grep { $_-> {stopped} } @objects;
  0         0  
651 0 0       0 return @n if @n;
652 0         0 yield;
653             }
654             }
655              
656             # run the event loop until no lambdas are left in the blocking state
657 0     0 1 0 sub run { do {} while yield }
658              
659             #
660             # Part II - Procedural interface to the lambda-style programming
661             #
662             #################################################################
663              
664 0     0   0 sub _lambda_restart { die "lambda() is not restartable" }
665             sub lambda(&)
666             {
667 200     200 1 7849 my $cb = _subname(lambda => $_[0]);
668             my $l = __PACKAGE__-> new( sub {
669             # initial lambda code is usually executed by tail/tails inside another lambda,
670             # so protect the upper-level context
671 266     266   790 local *__ANON__ = "IO::Lambda::lambda::callback";
672 266         305 local $THIS = shift;
673 266         394 local @CONTEXT = ();
674 266         247 local $CALLBACK = $cb;
675 266         343 local $METHOD = \&_lambda_restart;
676 266 50       731 $cb ? $cb-> (@_) : @_;
677 200         1026 });
678 200 50       356 if ( $DEBUG_CALLER) {
679 0 0       0 if ( $DEBUG_CALLER > 1) {
680 0         0 $l-> {caller} = Carp::longmess;
681 0         0 chomp $l-> {caller};
682 0         0 $l-> {caller} =~ s/^ at //;
683             } else {
684 0         0 $l-> {caller} = join(':', (caller)[1,2]);
685             }
686             }
687 200         657 $l;
688             }
689              
690             sub _subname
691             {
692 425 0 0 425   784 subname(
      33        
      0        
693             caller(1 + ($_[2] || 0)) . '::_'. $_[0],
694             $_[1]
695             ) if $DEBUG_CALLER and $_[1] and not $AGAIN;
696 425         1043 return $_[1];
697             }
698              
699             *io = \λ
700              
701             # re-enter the latest (or other) frame
702             sub again
703             {
704 58 100   58 1 157 ( $METHOD, $CALLBACK) = @_ if 2 == @_;
705 58         82 local $AGAIN = 1;
706 58 50       141 defined($METHOD) ?
707             $METHOD-> ($CALLBACK) :
708             croak "again() outside of a restartable call"
709             }
710              
711             # define context
712 155 100   155 1 679 sub this { @_ ? ($THIS, @CONTEXT) = @_ : $THIS }
713 382 100   382 1 1207 sub context { @_ ? (@CONTEXT) = @_ : @CONTEXT }
714 1 50   1 1 8 sub restartable { @_ ? ($METHOD, $CALLBACK) = @_ : ( $METHOD, $CALLBACK) }
715 23     23 0 77 sub set_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = @_ }
716 16     16 0 62 sub get_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) }
717 0     0 0 0 sub swap_frame { my @f = get_frame; set_frame(@_); @f }
  0         0  
  0         0  
718 1     1 0 6 sub clear { set_frame(); undef $AGAIN; }
  1         1  
719              
720 15     15   2821 END { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = (); }
721              
722             sub state($)
723             {
724 17 100 66 17 1 80 my $this = ($_[0] && ref($_[0])) ? shift(@_) : this;
725 17 100       89 @_ ? $this-> {state} = $_[0] : return $this-> {state};
726             }
727              
728             # exceptions and backtracing
729             sub catch(&$)
730             {
731 23     23 1 27 my ( $cb, $event) = @_;
732 23         136 my $who = (caller(1))[3];
733 23         51 my @ctx = @CONTEXT;
734 23 50       54 croak "catch callback already defined" if $event-> [WATCH_CANCEL];
735             $event->[WATCH_CANCEL] = $cb ? sub {
736 15 50   15   24 local *__ANON__ = "$who\:\:catch" if $DEBUG_CALLER;
737 15         18 $THIS = shift;
738 15         39 local $THIS-> {cancelled_event} = shift;
739 15         30 local $THIS-> {cancelling} = 1;
740 15         24 @CONTEXT = @ctx;
741 15         28 $METHOD = undef;
742 15         19 $CALLBACK = undef;
743 15         40 $cb-> (@_);
744 23 50       91 } : undef;
745              
746             # if throw() happened before we even get here
747 23 100 66     111 $event->[WATCH_CALLBACK] = $event->[WATCH_CANCEL]
748             if $event->[WATCH_CALLBACK] && $event->[WATCH_CALLBACK] == \&_throw;
749            
750 23         47 return $event;
751             }
752              
753             sub call_again
754             {
755 3     3 1 4 my $self = shift;
756 3 50       8 croak "called outside catch()" unless $self-> {cancelled_event};
757 3         4 my $cb = $self-> {cancelled_event}->[WATCH_CALLBACK];
758 3 50       48 $cb->($self, @_) if $cb;
759             }
760              
761             sub autocatch($)
762             {
763             catch {
764 2     2   4 this-> call_again;
765 2         9 this-> throw(@_);
766 7     7 1 30 } $_[0]
767             }
768              
769 3     3 1 5 sub is_cancelling { $_[0]-> {cancelling} }
770              
771             sub _throw
772             {
773 15     15   13 my $self = shift;
774 15 50       23 warn _d( $self, 'throw') if $DEBUG_LAMBDA;
775 15         25 $self-> throw(@_);
776             }
777              
778             sub throw
779             {
780 40     40 1 54 my ( $self, @error) = @_;
781 40         59 my @c = $self-> callers;
782 40   100     181 $_-> [WATCH_CALLBACK] = $_->[WATCH_CANCEL] || \&_throw for @c;
783 40         64 $self-> terminate(@error);
784 40 100 66     84 $SIGTHROW->($self, @error) if $SIGTHROW and not @c;
785 40         109 return @error;
786             }
787              
788             sub sigthrow
789             {
790 4 100 66 4 1 20 shift if defined($_[0]) and (not(ref $_[0]) or ref($_[0]) ne 'CODE');
      66        
791 4 50       9 $SIGTHROW = $_[0] if @_;
792 4         8 return $SIGTHROW;
793             }
794              
795 0 0   0 1 0 sub callees { @{ $EVENTS{ "$_[0]" } || [] } }
  0         0  
796 60     60 1 80 sub callers { grep { $_[0] == $_-> [WATCH_LAMBDA] } map { @$_ } values %EVENTS }
  120         219  
  102         114  
797              
798             sub backtrace
799             {
800 6     6 1 529 require IO::Lambda::Backtrace;
801 6         712 IO::Lambda::Backtrace-> new($_[0], Carp::shortmess);
802             }
803              
804             #
805             # Conditions:
806             #
807              
808             # common wrapper for declaration of handle-watching user conditions
809             sub add_watch
810             {
811 5     5 0 10 my ($self, $cb, $method, $flags, $handle, $deadline, @ctx) = @_;
812 5         3 my $who;
813 5 50       8 $who = (caller(1))[3] if $DEBUG_CALLER;
814             $self-> watch_io(
815             $flags, $handle, $deadline,
816             sub {
817 5 50   5   12 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
818 5         5 $THIS = shift;
819 5         9 @CONTEXT = @ctx;
820 5         5 $METHOD = $method;
821 5         4 $CALLBACK = $cb;
822 5 50       19 $cb ? $cb-> (@_) : @_;
823             },
824 5 100       24 ($AGAIN ? delete($self-> {cancel}) : undef),
825             )
826             }
827              
828             # rwx($flags,$handle,$deadline)
829             sub rwx(&)
830             {
831             return $THIS-> override_handler('rwx', \&rwx, shift)
832 0 0   0 1 0 if $THIS-> {override}->{rwx};
833              
834 0         0 $THIS-> add_watch(
835             _subname(rwx => shift), \&rwx,
836             @CONTEXT[0,1,2,0,1,2]
837             )
838             }
839              
840             # readable($handle,$deadline)
841             sub readable(&)
842             {
843             return $THIS-> override_handler('readable', \&readable, shift)
844 5 50   5 1 21 if $THIS-> {override}->{readable};
845              
846 5         10 $THIS-> add_watch(
847             _subname(readable => shift), \&readable, IO_READ,
848             @CONTEXT[0,1,0,1]
849             )
850             }
851              
852             # writable($handle,$deadline)
853             sub writable(&)
854             {
855             return $THIS-> override_handler('writable', \&writable, shift)
856 0 0   0 1 0 if $THIS-> {override}->{writable};
857            
858 0         0 $THIS-> add_watch(
859             _subname(writable => shift), \&writable, IO_WRITE,
860             @CONTEXT[0,1,0,1]
861             )
862             }
863              
864             # common wrapper for declaration of time-watching user conditions
865             sub add_timer
866             {
867 40     40 0 84 my ($self, $cb, $method, $deadline, @ctx) = @_;
868 40         43 my $who;
869 40 50       78 $who = (caller(1))[3] if $DEBUG_CALLER;
870             $self-> watch_timer(
871             $deadline,
872             sub {
873 37 50   37   99 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
874 37         52 $THIS = shift;
875 37         88 @CONTEXT = @ctx;
876 37         49 $METHOD = $method;
877 37         56 $CALLBACK = $cb;
878 37 100       248 $cb ? $cb-> (@_) : @_;
879             },
880 40 100       272 ($AGAIN ? delete($self-> {cancel}) : undef),
881             )
882             }
883              
884             # timeout($deadline)
885             sub timeout(&)
886             {
887             return $THIS-> override_handler('timeout', \&timeout, shift)
888 40 50   40 1 242 if $THIS-> {override}->{timeout};
889 40         96 $THIS-> add_timer( _subname(timeout => shift), \&timeout, @CONTEXT[0,0])
890             }
891              
892             # common wrapper for declaration of single lambda-watching user conditions
893             sub add_tail
894             {
895 151     151 0 260 my ($self, $cb, $method, $lambda, @ctx) = @_;
896 151         125 my $who;
897 151 50       265 $who = (caller(1))[3] if $DEBUG_CALLER;
898             $self-> watch_lambda(
899             $lambda,
900             ($cb ? sub {
901 112 50   112   180 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
902 112         126 $THIS = shift;
903 112         192 @CONTEXT = @ctx;
904 112         107 $METHOD = $method;
905 112         98 $CALLBACK = $cb;
906 112         339 $cb-> (@_);
907             } : undef),
908 151 100       822 ($AGAIN ? delete($self-> {cancel}) : undef),
    100          
909             );
910             }
911              
912             # convert constant @param into a lambda
913             sub add_constant
914             {
915 0     0 0 0 my ( $self, $cb, $method, @param) = @_;
916             $self-> add_tail (
917             _subname(constant => $cb), $method,
918 0     0   0 lambda { @param },
919             @CONTEXT
920 0         0 );
921             }
922              
923             # handle default condition logic given a lambda
924             sub condition
925             {
926 6     6 1 14 my ( $self, $cb, $method, $name) = @_;
927              
928             return $THIS-> override_handler($name, $method, $cb)
929 6 50 66     45 if defined($name) and $THIS-> {override}->{$name};
930            
931 6         22 my @ctx = @CONTEXT;
932              
933 6         7 my $who;
934 6 50       18 if ( $DEBUG_CALLER) {
935 0 0       0 $who = defined($name) ? $name : (caller(1))[3];
936 0         0 _subname($who, $cb, 2);
937             }
938              
939             $THIS-> watch_lambda(
940             $self,
941             $cb ? sub {
942 6 50   6   13 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
943 6         17 $THIS = shift;
944 6         15 @CONTEXT = @ctx;
945 6         8 $METHOD = $method;
946 6         9 $CALLBACK = $cb;
947 6         51 $cb-> (@_);
948             } : undef
949 6 50       54 );
950             }
951              
952             # dummy sub for empty calls for tails() family
953             sub _empty
954             {
955 2     2   3 my ($name, $method, $cb) = @_;
956 2         5 my @ctx = context;
957             $THIS-> watch_lambda( IO::Lambda-> new, sub {
958 2     2   24 local *__ANON__ = "IO::Lambda::".$name."::callback";
959 2         4 @CONTEXT = @ctx;
960 2         3 $METHOD = $method;
961 2         2 $CALLBACK = $cb;
962 2         6 $cb-> ();
963 2 50       9 }) if $cb;
964             }
965              
966             # tail( $lambda, @param) -- initialize $lambda with @param, and wait for it
967             sub tail(&)
968             {
969             return $THIS-> override_handler('tail', \&tail, shift)
970 192 100   192 1 795 if $THIS-> {override}->{tail};
971            
972 152         209 my ( $lambda, @param) = context;
973 152 100       262 return _empty(tail => \&tail, shift) unless $lambda;
974              
975 151 100 66     288 $lambda-> reset
976             if $lambda-> is_stopped and $lambda-> autorestart;
977 151 100       251 if ( @param) {
978 55         98 $lambda-> call( @param);
979             } else {
980 96 100       124 $lambda-> call unless $lambda-> is_active;
981             }
982 151         295 $THIS-> add_tail( _subname(tail => shift), \&tail, $lambda, $lambda, @param);
983             }
984              
985              
986             # tails(@lambdas) -- wait for all lambdas to finish
987             sub tails(&)
988             {
989             return $THIS-> override_handler('tails', \&tails, shift)
990 7 50   7 1 44 if $THIS-> {override}->{tails};
991            
992 7         17 my $cb = _subname tails => $_[0];
993 7         15 my @lambdas = context;
994 7         10 my $n = $#lambdas;
995 7 100       19 return _empty(tails => \&tails, $cb) unless @lambdas;
996              
997 6         8 my @ret;
998             my $watcher;
999             $watcher = sub {
1000 6     6   11 $THIS = shift;
1001 6         11 push @ret, @_;
1002 6 100       20 return if $n--;
1003              
1004 2         21 local *__ANON__ = "IO::Lambda::tails::callback";
1005 2         8 @CONTEXT = @lambdas;
1006 2         5 $METHOD = \&tails;
1007 2         4 $CALLBACK = $cb;
1008 2 100       13 $cb ? $cb-> (@ret) : @ret;
1009 6         18 };
1010 6         9 my $this = $THIS;
1011 6         48 $this-> watch_lambda( $_, $watcher) for @lambdas;
1012             }
1013              
1014             # tailo(@lambdas) -- wait for all lambdas to finish, return ordered results
1015             sub tailo(&)
1016             {
1017             return $THIS-> override_handler('tailo', \&tailo, shift)
1018 1 50   1 1 4 if $THIS-> {override}->{tailo};
1019            
1020 1         2 my $cb = _subname tailo => $_[0];
1021 1         1 my @lambdas = context;
1022 1         1 my $n = $#lambdas;
1023 1 50       3 return _empty(tailo => \&tailo, $cb) unless @lambdas;
1024              
1025 1         1 my @ret;
1026             my $watcher;
1027             $watcher = sub {
1028 3     3   3 my $curr = shift;
1029 3         5 $THIS = shift;
1030 3         4 $ret[ $curr ] = \@_;
1031 3 100       15 return if $n--;
1032              
1033 1         24 local *__ANON__ = "IO::Lambda::tailo::callback";
1034 1         3 @CONTEXT = @lambdas;
1035 1         3 $METHOD = \&tailo;
1036 1         1 $CALLBACK = $cb;
1037 1         3 @ret = map { @$_ } @ret;
  3         5  
1038 1 50       5 $cb ? $cb-> (@ret) : @ret;
1039 1         3 };
1040 1         1 my $this = $THIS;
1041 1         4 for ( my $i = 0; $i < @lambdas; $i++) {
1042 3         4 my $d = $i;
1043             $this-> watch_lambda(
1044             $lambdas[$i],
1045 3     3   5 sub { $watcher->($d, @_) }
1046 3         16 );
1047             };
1048             }
1049              
1050             # any_tail($deadline,@lambdas) -- wait for any lambda to finish within time
1051             sub any_tail(&)
1052             {
1053             return $THIS-> override_handler('any_tail', \&any_tail, shift)
1054 3 50   3 1 26 if $THIS-> {override}->{any_tail};
1055            
1056 3         7 my $cb = _subname any_tail => $_[0];
1057 3         7 my ( $deadline, @lambdas) = context;
1058 3         7 my $n = $#lambdas;
1059 3 50       11 return _empty(any_tail => \&any_tail, $cb) unless @lambdas;
1060              
1061 3         11 my ( @ret, @watchers);
1062 0         0 my $timer;
1063            
1064             $timer = $THIS-> watch_timer( $deadline, sub {
1065 2     2   29 local *__ANON__ = "IO::Lambda::any_tail::callback1";
1066 2         4 $THIS = shift;
1067 2         7 @CONTEXT = ($deadline, @lambdas);
1068 2         4 $METHOD = \&any_tail;
1069 2         6 $CALLBACK = $cb;
1070 2 50       19 @ret = $cb-> (@ret) if $cb;
1071 2         15 $THIS-> cancel_event($_) for @watchers;
1072 2         6 return @ret;
1073 3 50       30 }) if defined $deadline;
1074              
1075 3         4 my $watcher;
1076             $watcher = sub {
1077 3     3   5 push @ret, shift;
1078 3 100       13 return if $n--;
1079            
1080 1         7 local *__ANON__ = "IO::Lambda::any_tail::callback2";
1081 1         2 $THIS = shift;
1082 1         3 @CONTEXT = ($deadline, @lambdas);
1083 1         1 $METHOD = \&any_tail;
1084 1         2 $CALLBACK = $cb;
1085 1 50       12 @ret = $cb-> (@ret) if $cb;
1086 1 50       7 $THIS-> cancel_event( $timer) if $timer;
1087 1         3 return @ret;
1088 3         20 };
1089              
1090             @watchers = map {
1091 3         8 my $l = $_;
  5         5  
1092             $THIS-> watch_lambda( $l, sub {
1093 3     3   8 $watcher->($l, @_);
1094             })
1095 5         55 } @lambdas;
1096             }
1097              
1098             #
1099             # Part III - High order lambdas
1100             #
1101             ################################################################
1102              
1103             # fold($l) :: @b -> @c
1104             # $l :: ($a,@b) -> @c
1105             sub fold($)
1106             {
1107 1     1 1 3 my $l = shift;
1108             lambda {
1109 3     3   8 my @q = @_;
1110 3         9 context $l, shift(@q), shift(@q);
1111             tail {
1112 10 100       26 return @_ unless @q;
1113 7         17 context $l, shift(@q), @_;
1114 7         19 again;
1115 3         16 }}
1116 1         7 }
1117              
1118             # mapcar($l) :: (@p) -> @r
1119             sub mapcar($)
1120             {
1121 2     2 1 3 my $lambda = shift;
1122             lambda {
1123 3     3   6 my @ret;
1124 3         6 my @p = @_;
1125 3 50       8 return unless @p;
1126 3         10 context $lambda, shift @p;
1127             tail {
1128 15         26 push @ret, @_;
1129 15 100       29 return @ret unless @p;
1130 12         23 context $lambda, shift @p;
1131 12         19 again;
1132 3         21 }}
1133 2         19 }
1134              
1135             # filter($l) :: (@p) -> @r
1136             sub filter($)
1137             {
1138 1     1 1 3 my $lambda = shift;
1139             lambda {
1140 1     1   3 my @ret;
1141 1 50       4 return unless @_;
1142 1         3 my @p = @_;
1143 1         3 my $p = shift @p;
1144 1         4 context $lambda, $p;
1145             tail {
1146 5 100       14 push @ret, $p if shift;
1147 5 100       11 return @ret unless @p;
1148 4         6 $p = shift @p;
1149 4         9 context $lambda, $p;
1150 4         9 again;
1151 1         8 }}
1152 1         10 }
1153              
1154             # curry(@a -> $l) :: @a -> @b
1155             sub curry(&)
1156             {
1157 3     3 1 7 my $cb = $_[0];
1158             lambda {
1159 12     12   22 context $cb->(@_);
1160 12         50 &tail();
1161             }
1162 3         15 }
1163              
1164             # seq() :: (@l) -> @m
1165 10     10 1 15 sub seq { mapcar curry { shift } }
  1     1   211  
1166              
1167             # par($max = 0) :: (@l) -> @m
1168             sub par
1169             {
1170 1   50 1 1 3 my $max = $_[0] || 0;
1171              
1172             lambda {
1173 1     1   5 my @q = @_;
1174 1         2 my @ret;
1175 1 50 33     6 $max = @q if $max < 1 or $max > @q;
1176             context map {
1177 1         3 lambda {
1178 3 50       7 return unless @q;
1179 3         5 context shift @q;
1180             tail {
1181 9         18 push @ret, @_;
1182 9 100       26 return unless @q;
1183 6         10 context shift @q;
1184 6         12 again;
1185 3         10 }}
1186 3         7 } 1 .. $max;
1187 1         5 tails { @ret }
1188 1         4 }
1189 1         7 }
1190              
1191              
1192             # sysread lambda wrapper
1193             #
1194             # ioresult :: ($result, $error)
1195             # sysreader() :: ($fh, $buf, $length, $deadline) -> ioresult
1196             sub sysreader (){ lambda
1197             {
1198 2     2   3 my ( $fh, $buf, $length, $deadline) = @_;
1199 2 50       5 $$buf = '' unless defined $$buf;
1200              
1201             this-> watch_io( IO_READ, $fh, $deadline, subname _sysreader => sub {
1202 2 50       4 return undef, 'timeout' unless $_[1];
1203 2         31 local $SIG{PIPE} = 'IGNORE';
1204 2         76 my $n = sysread( $fh, $$buf, $length, length($$buf));
1205 2 50       4 if ( $DEBUG_IO) {
1206 0 0       0 warn "fh(", fileno($fh), ") read ", ( defined($n) ? "$n bytes" : "error $!"), "\n";
1207 0 0 0     0 warn substr( $$buf, length($$buf) - $n), "\n" if $DEBUG_IO > 1 and $n > 0;
1208             }
1209 2 50       6 return undef, $! unless defined $n;
1210 2         8 return $n;
1211             })
1212 2     1 1 3 }}
  1         14  
1213              
1214             # syswrite() lambda wrapper
1215             #
1216             # syswriter() :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
1217             sub syswriter (){ lambda
1218             {
1219 0     0   0 my ( $fh, $buf, $length, $offset, $deadline) = @_;
1220              
1221             this-> watch_io( IO_WRITE, $fh, $deadline, subname _syswriter => sub {
1222 0 0       0 return undef, 'timeout' unless $_[1];
1223 0         0 local $SIG{PIPE} = 'IGNORE';
1224 0         0 my $n = syswrite( $fh, $$buf, $length, $offset);
1225 0 0       0 if ( $DEBUG_IO) {
1226 0 0       0 warn "fh(", fileno($fh), ") wrote ", ( defined($n) ? "$n bytes out of $length" : "error $!"), "\n";
1227 0 0 0     0 warn substr( $$buf, $offset, $n), "\n" if $DEBUG_IO > 1 and $n > 0;
1228             }
1229 0 0       0 return undef, $! unless defined $n;
1230 0         0 return $n;
1231 0         0 });
1232 0     0 1 0 }}
1233              
1234             sub _match
1235             {
1236 6     6   7 my ( $cond, $buf) = @_;
1237              
1238 6 100       15 return unless defined $cond;
1239              
1240 3 50       47 return ($$buf =~ /($cond)/)[0] if ref($cond) eq 'Regexp';
1241 0 0       0 return $cond->($buf) if ref($cond) eq 'CODE';
1242 0         0 return length($$buf) >= $cond;
1243             }
1244              
1245             # read from stream until condition is met
1246             #
1247             # readbuf($reader) :: ($fh, $$buf, $cond, $deadline) -> ioresult
1248             sub readbuf
1249             {
1250 4   66 4 1 28 my $reader = shift || sysreader;
1251              
1252             lambda {
1253 4     4   6 my ( $fh, $buf, $cond, $deadline) = @_;
1254            
1255 4 100       9 $$buf = "" unless defined $$buf;
1256              
1257 4         11 my $match = _match( $cond, $buf);
1258 4 100       11 return $match if $match;
1259            
1260 3         3 my ($maxbytes, $bufsize);
1261 3 50 66     11 $maxbytes = $cond - length($$buf)
      33        
1262             if defined($cond) and not ref($cond) and $cond > length($$buf);
1263 3 50       5 $bufsize = defined($maxbytes) ? $maxbytes : 65536;
1264            
1265 3         6 my $savepos = pos($$buf); # useful when $cond is a regexp
1266              
1267 3         10 context $reader, $fh, $buf, $bufsize, $deadline;
1268             tail {
1269 4         12 pos($$buf) = $savepos;
1270              
1271 4         7 my $bytes = shift;
1272 4 100       10 return undef, shift unless defined $bytes;
1273            
1274 3 100       6 unless ( $bytes) {
1275 1 50       4 return 1 unless defined $cond;
1276 0         0 return undef, 'eof';
1277             }
1278            
1279             # got line? return it
1280 2         6 my $match = _match( $cond, $buf);
1281 2 100       11 return $match if $match;
1282            
1283             # otherwise, just wait for more data
1284 1 50       2 $bufsize -= $bytes if defined $maxbytes;
1285              
1286 1         3 context $reader, $fh, $buf, $bufsize, $deadline;
1287 1         6 again;
1288 3         34 }}
1289 4         21 }
1290              
1291             # curry readbuf()
1292             #
1293             # getline($reader) :: ($fh, $$buf, $deadline) -> ioresult
1294             sub getline
1295             {
1296 0     0 1 0 my $reader = shift;
1297             lambda {
1298 0     0   0 my ( $fh, $buf, $deadline) = @_;
1299 0 0       0 croak "getline() needs a buffer! ( f.ex getline,\$fh,\\(my \$buf='') )"
1300             unless ref($buf);
1301 0         0 context readbuf($reader), $fh, $buf, qr/^[^\n]*\n/, $deadline;
1302             tail {
1303 0 0       0 substr( $$buf, 0, length($_[0]), '') unless defined $_[1];
1304 0         0 @_;
1305 0         0 }}
1306 0         0 }
1307              
1308             # write whole buffer to stream
1309             #
1310             # writebuf($writer) :: syswriter
1311             sub writebuf
1312             {
1313 2   33 2 1 6 my $writer = shift || syswriter;
1314              
1315             lambda {
1316 2     2   3 my ( $fh, $buf, $len, $offs, $deadline) = @_;
1317              
1318 2         3 my ( $written, $recheck_length, $olen) = (0);
1319 2 50       5 $$buf = "" unless defined $$buf;
1320 2 50       3 $offs = 0 unless defined $offs;
1321 2 50       8 unless ( defined $len) {
1322 2         3 $olen = $len = length $$buf;
1323 2         2 $recheck_length++;
1324             }
1325            
1326 2         3 context $writer, $fh, $buf, $len, $offs, $deadline;
1327             tail {
1328 13         9 my $bytes = shift;
1329 13 50       19 return undef, shift unless defined $bytes;
1330              
1331 13         9 $offs += $bytes;
1332 13         9 $written += $bytes;
1333 13         11 $len -= $bytes;
1334 13 50       18 if ( $recheck_length) {
1335 13         9 my $l = length $$buf;
1336 13 100       20 if ( $l > $olen) {
1337 1         2 $len += $l - $olen;
1338 1         1 $olen = $l;
1339             }
1340             }
1341 13 100       19 return $written if $len <= 0;
1342              
1343 11         13 context $writer, $fh, $buf, $len, $offs, $deadline;
1344 11         14 again;
1345 2         7 }}
1346 2         9 }
1347              
1348             #
1349             # Part IV - Developer API for custom condvars and event loops
1350             #
1351             ################################################################
1352              
1353             # register condvar listener
1354             sub bind
1355             {
1356 20     20 1 30 my $self = shift;
1357              
1358             # create new condition
1359 20 50       90 croak "can't register events on a stopped lambda" if $self-> {stopped};
1360              
1361 20         55 my $rec = [ $self, @_ ];
1362 20         24 push @{$self-> {in}}, $rec;
  20         46  
1363              
1364 20         57 return $rec;
1365             }
1366              
1367             # stop listening on a condvar
1368             sub resolve
1369             {
1370 10     10 1 24 my ( $self, $rec) = @_;
1371              
1372 10         20 my $in = $self-> {in};
1373 10         16 my $nn = @$in;
1374 10         21 @$in = grep { $rec != $_ } @$in;
  10         33  
1375 10 50 33     71 die _d($self, "stray condvar event $rec (@$rec)")
1376             if $nn == @$in or $self != $rec->[WATCH_OBJ];
1377              
1378 10         25 undef $rec-> [WATCH_OBJ]; # unneeded references
1379              
1380 10 50       29 unless ( @$in) {
1381 10 50       26 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
1382 10         40 $self-> {stopped} = 1;
1383             }
1384             }
1385              
1386             sub callout
1387             {
1388 6     6 0 11 my ( $self, $cb, @param) = @_;
1389 6 100       23 @{$self->{last}} = $cb ? $cb-> (@param) : @param;
  6         31  
1390             }
1391              
1392 5     5 0 14 sub add_loop { push @LOOPS, shift }
1393 5 50   5 0 15 sub remove_loop { @LOOPS = grep { defined and $_ != $_[0] } @LOOPS }
  5         126  
1394              
1395 0     0   0 sub __end { clear(); undef %EVENTS; undef @LOOPS; } # for threads
  0         0  
  0         0  
1396              
1397             package IO::Lambda::Loop;
1398 15     15   133 use vars qw($DEFAULT);
  15         24  
  15         719  
1399 15     15   67 use strict;
  15         20  
  15         358  
1400 15     15   59 use warnings;
  15         24  
  15         2787  
1401              
1402             $DEFAULT = 'Select' unless defined $DEFAULT;
1403 14     14   51 sub default { $DEFAULT = shift }
1404              
1405             sub new
1406             {
1407 14 50   14   48 return $IO::Lambda::LOOP if $IO::Lambda::LOOP;
1408              
1409 14         44 my ( $class, %opt) = @_;
1410              
1411 14   33     111 $opt{type} ||= $DEFAULT;
1412 14         59 $class .= "::$opt{type}";
1413 14     14   1138 eval "use $class;";
  14         8755  
  14         26  
  14         325  
1414 14 50       59 die $@ if $@;
1415              
1416 14         62 return $IO::Lambda::LOOP = $class-> new();
1417             }
1418              
1419             1;
1420              
1421             __DATA__