File Coverage

blib/lib/IO/Lambda.pm
Criterion Covered Total %
statement 681 750 90.8
branch 266 424 62.7
condition 50 102 49.0
subroutine 126 145 86.9
pod 60 77 77.9
total 1183 1498 78.9


line stmt bran cond sub pod time code
1             # $Id: Lambda.pm,v 1.191 2012/01/13 06:41:28 dk Exp $
2             package IO::Lambda;
3              
4 29     29   649696 use Carp qw(croak);
  29         64  
  29         1960  
5 29     29   153 use strict;
  29         44  
  29         625  
6 29     29   163 use warnings;
  29         52  
  29         799  
7 29     29   144 use Exporter;
  29         44  
  29         1013  
8 29     29   21533 use Sub::Name;
  29         17638  
  29         1595  
9 29     29   260 use Scalar::Util qw(weaken);
  29         54  
  29         2721  
10 29     29   21586 use Time::HiRes qw(time);
  29         37177  
  29         136  
11 29         17751 use vars qw(
12             $LOOP %EVENTS @LOOPS
13             $VERSION @ISA
14             @EXPORT_OK %EXPORT_TAGS @EXPORT_CONSTANTS @EXPORT_LAMBDA @EXPORT_STREAM
15             @EXPORT_DEV @EXPORT_MISC @EXPORT_FUNC
16             $THIS @CONTEXT $METHOD $CALLBACK $AGAIN $SIGTHROW
17             $DEBUG_IO $DEBUG_LAMBDA $DEBUG_CALLER %DEBUG
18 29     29   4833 );
  29         52  
19             $VERSION = '1.25';
20             @ISA = qw(Exporter);
21             @EXPORT_CONSTANTS = qw(
22             IO_READ IO_WRITE IO_EXCEPTION
23             WATCH_OBJ WATCH_DEADLINE WATCH_LAMBDA WATCH_CALLBACK
24             WATCH_IO_HANDLE WATCH_IO_FLAGS WATCH_CALLER WATCH_CANCEL
25             );
26             @EXPORT_STREAM = qw(
27             sysreader syswriter getline readbuf writebuf
28             );
29             @EXPORT_LAMBDA = qw(
30             this context lambda again state restartable catch autocatch
31             io readable writable rwx timeout tail tails tailo any_tail
32             );
33             @EXPORT_FUNC = qw(
34             seq par mapcar filter fold curry
35             );
36             @EXPORT_MISC = qw(
37             set_frame get_frame swap_frame sigthrow
38             );
39             @EXPORT_DEV = qw(
40             _subname _o _t
41             );
42             @EXPORT_OK = (
43             @EXPORT_LAMBDA, @EXPORT_CONSTANTS, @EXPORT_STREAM,
44             @EXPORT_DEV, @EXPORT_MISC, @EXPORT_FUNC
45             );
46             %EXPORT_TAGS = (
47             func => \@EXPORT_FUNC,
48             lambda => \@EXPORT_LAMBDA,
49             stream => \@EXPORT_STREAM,
50             constants => \@EXPORT_CONSTANTS,
51             dev => \@EXPORT_DEV,
52             all => [ @EXPORT_LAMBDA, @EXPORT_STREAM, @EXPORT_CONSTANTS, @EXPORT_FUNC ],
53             );
54              
55             if ( exists $ENV{IO_LAMBDA_DEBUG}) {
56             for my $p ( split ',', $ENV{IO_LAMBDA_DEBUG}) {
57             if ( $p =~ /^([^=]+)=(.*)$/) {
58             $DEBUG{lc $1}=$2;
59             } else {
60             $DEBUG{lc $p}++;
61             }
62             }
63             $DEBUG_IO = $DEBUG{io} || 0;
64             $DEBUG_LAMBDA = $DEBUG{lambda} || 0;
65             $DEBUG_CALLER = $DEBUG{caller} || 0;
66             $IO::Lambda::Loop::DEFAULT = $DEBUG{loop} if $DEBUG{loop};
67             $SIG{__DIE__} = sub {
68             return if $^S;
69             Carp::confess(@_);
70             } if $DEBUG{die};
71             }
72              
73 29     29   198 use constant IO_READ => 4;
  29         58  
  29         2683  
74 29     29   406 use constant IO_WRITE => 2;
  29         58  
  29         1502  
75 29     29   139 use constant IO_EXCEPTION => 1;
  29         54  
  29         1391  
76            
77 29     29   139 use constant WATCH_OBJ => 0;
  29         58  
  29         1375  
78 29     29   140 use constant WATCH_CANCEL => 1;
  29         50  
  29         1455  
79              
80 29     29   142 use constant WATCH_DEADLINE => 2;
  29         100  
  29         1239  
81 29     29   141 use constant WATCH_LAMBDA => 2;
  29         64  
  29         1358  
82 29     29   141 use constant WATCH_CALLBACK => 3;
  29         86  
  29         1545  
83              
84 29     29   140 use constant WATCH_CALLER => 4;
  29         43  
  29         1334  
85 29     29   139 use constant WATCH_IO_HANDLE => 4;
  29         53  
  29         1243  
86 29     29   140 use constant WATCH_IO_FLAGS => 5;
  29         48  
  29         292000  
87              
88             sub new
89             {
90 505 100   505 1 2006 IO::Lambda::Loop-> new unless $LOOP;
91 505         3992 return bless {
92             in => [], # events we wait for
93             last => [], # result of the last state
94             stopped => 0, # initial state
95             start => $_[1], # kick-start coderef
96             }, $_[0];
97             }
98              
99             sub DESTROY
100             {
101 481     481   3709 my $self = $_[0];
102 481         1518 $self-> cancel_all_events;
103             }
104              
105             my $_doffs = 0;
106 0     0   0 sub _d_in { $_doffs++ }
107 0 0   0   0 sub _d_out { $_doffs-- if $_doffs }
108 0     0   0 sub _d { (' ' x $_doffs), _obj(shift), ': ', @_, "\n" }
109 0     0   0 sub _o { $_[0] =~ /0x([\w]+)/; $1 }
  0         0  
110 0   0 0   0 sub _obj { "lambda(". _o($_[0]) . ")." . ( $_[0]->{caller} || '()' ) }
111 0 0   0   0 sub _t { defined($_[0]) ? ( "time(", (($_[0] < 1_000_000) ? $_[0] : $_[0]-time()), ")" ) : () }
    0          
112             sub _ev
113             {
114 0     0   0 $_[0] =~ /0x([\w]+)/;
115             "event($1) ",
116 0 0       0 (($#{$_[0]} == WATCH_IO_FLAGS) ? (
  0 0       0  
    0          
    0          
    0          
    0          
117             'fd=',
118             fileno($_[0]->[WATCH_IO_HANDLE]),
119             ' ',
120             ( $_[0]->[WATCH_IO_FLAGS] ? (
121             join('/',
122             (($_[0]->[WATCH_IO_FLAGS] & IO_READ) ? 'read' : ()),
123             (($_[0]->[WATCH_IO_FLAGS] & IO_WRITE) ? 'write' : ()),
124             (($_[0]->[WATCH_IO_FLAGS] & IO_EXCEPTION) ? 'exc' : ()),
125             )) :
126             'timeout'
127             ),
128             ' ', _t($_[0]->[WATCH_DEADLINE]),
129             ) : (
130             ref($_[0]-> [WATCH_LAMBDA]) ?
131             _obj($_[0]-> [WATCH_LAMBDA]) :
132             _t($_[0]->[WATCH_DEADLINE])
133             ))
134             }
135             sub _msg
136             {
137 0     0   0 my $self = shift;
138             _d(
139             $self,
140             "@_ >> (",
141             join(',',
142             map {
143 0 0       0 defined($_) ? $_ : 'undef'
144             }
145 0         0 @{$self->{last}}
  0         0  
146             ),
147             ')'
148             )
149             }
150              
151             #
152             # Part I - Object interface to callback and
153             # messaging interface with event loop and lambdas
154             #
155             #########################################################
156              
157             # register an IO event
158             sub watch_io
159             {
160 218     218 1 502 my ( $self, $flags, $handle, $deadline, $callback, $cancel) = @_;
161              
162 218 50       674 croak "can't register events on a stopped lambda" if $self-> {stopped};
163 218 50       564 croak "bad io flags" if 0 == ($flags & (IO_READ|IO_WRITE|IO_EXCEPTION));
164              
165 218 100 66     721 $deadline += time if defined($deadline) and $deadline < 1_000_000_000;
166            
167 218         677 my $rec = [
168             $self,
169             $cancel,
170             $deadline,
171             $callback,
172             $handle,
173             $flags,
174             ];
175 218         582 weaken $rec->[0];
176 218         293 push @{$self-> {in}}, $rec;
  218         655  
177              
178 218 50       680 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
179              
180 218         1291 $LOOP-> watch( $rec );
181              
182 218         807 return $rec;
183             }
184              
185             # register a timeout
186             sub watch_timer
187             {
188 54     54 1 168 my ( $self, $deadline, $callback, $cancel) = @_;
189              
190 54 50       265 croak "can't register events on a stopped lambda" if $self-> {stopped};
191 54 50       170 croak "$self: time is undefined" unless defined $deadline;
192            
193 54 100       396 $deadline += time if $deadline < 1_000_000_000;
194 54         176 my $rec = [
195             $self,
196             $cancel,
197             $deadline,
198             $callback,
199             ];
200 54         241 weaken $rec->[0];
201 54         88 push @{$self-> {in}}, $rec;
  54         160  
202              
203 54 50       194 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
204              
205 54         285 $LOOP-> after( $rec);
206            
207 54         268 return $rec;
208             }
209              
210             # register a callback when another lambda exits
211             sub watch_lambda
212             {
213 527     527 1 1099 my ( $self, $lambda, $callback, $cancel) = @_;
214 527         917 @_ = (); # perl bug http://rt.perl.org/rt3//Public/Bug/Display.html?id=70974
215              
216 527 50       1483 croak "can't register events on a stopped lambda" if $self-> {stopped};
217 527 50 33     4468 croak "bad lambda" unless $lambda and $lambda->isa('IO::Lambda');
218              
219 527 50       1284 croak "won't watch myself" if $self == $lambda;
220             # XXX check cycling
221            
222 527 50       1203 $lambda-> reset if $lambda-> is_stopped;
223              
224 527         1520 my $rec = [
225             $self,
226             $cancel,
227             $lambda,
228             $callback,
229             ];
230 527         1624 weaken $rec->[0];
231 527 50       1225 $rec-> [WATCH_CALLER] = Carp::shortmess if $DEBUG_CALLER;
232 527         787 push @{$self-> {in}}, $rec;
  527         1238  
233 527         731 push @{$EVENTS{"$lambda"}}, $rec;
  527         2955  
234              
235 527 100       1458 $lambda-> start if $lambda-> is_passive;
236              
237 522 50       1334 warn _d( $self, "> ", _ev($rec)) if $DEBUG_LAMBDA;
238              
239 522         3213 return $rec;
240             }
241              
242             # watch the watchers
243             sub override
244             {
245 33 100   33 1 674 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
246              
247 33 100       56 if ( $cb) {
248 13   100     51 $self-> {override}->{$method} ||= [];
249 13         17 push @{$self-> {override}->{$method}}, [ $state, $cb ];
  13         51  
250             } else {
251 20         23 my $p;
252 20 100       56 return unless $p = $self-> {override}->{$method};
253 19         52 for ( my $i = $#$p; $i >= 0; $i--) {
254 21 100 33     203 if (
      33        
      66        
      33        
255             (
256             not defined ($state) and
257             not defined ($p->[$i]-> [0])
258             ) or (
259             defined($state) and
260             defined($p->[$i]-> [0]) and
261             $p->[$i]->[0] eq $state
262             )
263             ) {
264 19         27 my $ret = splice( @$p, $i, 1);
265 19 100       53 delete $self-> {override}->{$method} unless @$p;
266 19         70 return $ret->[1];
267             }
268             }
269              
270 0         0 return undef;
271             }
272             }
273              
274             sub override_handler
275             {
276 40     40 0 69 my ( $self, $method, $sub, $cb) = @_;
277              
278 40         62 my $o = $self-> {override}-> {$method}-> [-1];
279              
280             # check state match
281 40         69 my ($a, $b) = ( $self-> {state}, $o-> [0]);
282 40 100 66     476 unless (
283             ( not defined($a) and not defined ($b)) or
284             ( defined $a and defined $b and $a eq $b) or
285             ( defined $b and $b eq '*')
286             ) {
287             # state not matched
288 8 100       11 if ( 1 == @{$self-> {override}->{$method}}) {
  8         23  
289 4         11 local $self-> {override}->{$method} = undef;
290 4         10 return $sub-> ($cb);
291             } else {
292 4         5 pop @{$self-> {override}->{$method}};
  4         10  
293 4         15 my $ret = $sub-> ($cb);
294 4         6 push @{$self->{override}->{$method}}, $o;
  4         9  
295 4         14 return $ret;
296             }
297             } else {
298             # state matched
299 32         92 local $self-> {super} = [ $sub, $cb ];
300 32 100       43 if ( 1 == @{$self-> {override}->{$method}}) {
  32         81  
301 22         49 local $self-> {override}->{$method} = undef;
302 22         69 return $o-> [1]-> ( $self, $sub, $cb);
303             } else {
304 10         11 pop @{$self-> {override}->{$method}};
  10         21  
305 10         25 my $ret = $o-> [1]-> ( $self, $sub, $cb);
306 10         29 push @{$self->{override}->{$method}}, $o;
  10         20  
307 10         40 return $ret;
308             }
309             }
310             }
311              
312             # Insert a new callback to be called before original callback.
313             # Needs to insert callbacks in {override} stack in reverse order,
314             # because direct order serves LIFO order for override() callbacks, --
315             # and that means FIFO for intercept() callbacks. But we also want LIFO.
316             sub intercept
317             {
318 24 100   24 1 1590 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
319            
320 24 100       62 return $self-> override( $method, $state, undef) unless $cb;
321              
322 14         37 _subname("intercept($method:$state)" => $cb);
323              
324 14   100     57 $self-> {override}->{$method} ||= [];
325 14         81 unshift @{$self-> {override}->{$method}}, [ $state, sub {
326             # this is called when lambda calls $method with $state
327 17     17   27 my ( undef, $sub, $orig_cb) = @_;
328             # $sub is a condition, like readable(&) or tail(&)
329             $sub->( sub {
330             # that (&) is finally called when IO event is there
331 16         39 local $self-> {super} = [$orig_cb];
332 16         36 &$cb;
333 17         67 });
334 14         16 } ];
335             }
336              
337             sub super
338             {
339 25 50   25 1 78 croak "super() call outside overridden condition" unless $_[0]-> {super};
340 25         35 my $data = $_[0]-> {super};
341 25 100       47 if ( defined $data-> [1]) {
342             # override() super
343 5         11 return $data-> [0]-> ($data-> [1]);
344             } else {
345             # intercept() super
346 20         26 my $self = shift;
347 20 50       78 return defined($data->[0]) ?
    100          
348             $data-> [0]-> (@_) :
349             ( wantarray ? @_ : $_[0] );
350             }
351             }
352              
353              
354             # handle incoming asynchronous events
355             sub io_handler
356             {
357 258     258 0 521 my ( $self, $rec) = @_;
358              
359 258 50       658 warn _d( $self, '< ', _ev($rec)) if $DEBUG_IO;
360              
361 258         504 my $in = $self-> {in};
362 258         399 my $nn = @$in;
363 258         590 @$in = grep { $rec != $_ } @$in;
  276         1292  
364 258 50 33     1886 die _d($self, 'stray ', _ev($rec))
365             if $nn == @$in or $self != $rec->[WATCH_OBJ];
366              
367 258 50       637 _d_in if $DEBUG_IO;
368 258         899 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
369 258         1789 @{$self->{last}} = $rec-> [WATCH_CALLBACK]-> (
370             $self,
371             (($#$rec == WATCH_IO_FLAGS) ? $rec-> [WATCH_IO_FLAGS] : ()),
372 258 100       1123 @{$self->{last}}
  258 50       1018  
373             ) if $rec-> [WATCH_CALLBACK];
374              
375 258 50       841 _d_out if $DEBUG_IO;
376 258 50       585 warn $self-> _msg('io') if $DEBUG_IO;
377              
378 258 100       1258 unless ( @$in) {
379 205 50       462 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
380 205         2689 $self-> {stopped}++;
381             }
382             }
383              
384             # handle incoming synchronous events
385             sub lambda_handler
386             {
387 493     493 0 907 my ( $self, $rec) = @_;
388              
389 493 50       1036 warn _d( $self, '< ', _ev($rec)) if $DEBUG_LAMBDA;
390              
391 493         1071 my $in = $self-> {in};
392 493         711 my $nn = @$in;
393 493         944 @$in = grep { $rec != $_ } @$in;
  550         1847  
394 493 50 33     3096 die _d($self, 'stray ', _ev($rec))
395             if $nn == @$in or $self != $rec->[WATCH_OBJ];
396              
397 493         900 my $lambda = $rec-> [WATCH_LAMBDA];
398             die _d($self,
399             'handler called but ', _obj($lambda),
400 493 50       1284 ' is not finished yet') unless $lambda-> {stopped};
401              
402 493         1439 my $arr = $EVENTS{"$lambda"};
403 493         932 @$arr = grep { $_ != $rec } @$arr;
  496         1789  
404 493 100       2000 delete $EVENTS{"$lambda"} unless @$arr;
405              
406 493 50       1106 _d_in if $DEBUG_LAMBDA;
407            
408 493         1262 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
409 493         12303 @{$self->{last}} =
410             $rec-> [WATCH_CALLBACK] ?
411             $rec-> [WATCH_CALLBACK]-> (
412             $self,
413 467         1714 @{$rec-> [WATCH_LAMBDA]-> {last}}
414             ) :
415 493 100       1178 @{$rec-> [WATCH_LAMBDA]-> {last}};
  26         51  
416              
417 493 50       1536 _d_out if $DEBUG_LAMBDA;
418 493 50       1001 warn $self-> _msg('tail') if $DEBUG_LAMBDA;
419              
420 493 100       1691 unless ( @$in) {
421 255 50       534 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
422 255         849 $self-> {stopped} = 1;
423             }
424             }
425              
426             # Removes one event from queue
427             sub cancel_event
428             {
429 9     9 0 29 my ( $self, $rec) = @_;
430              
431 9 100       21 return unless @{$self-> {in}};
  9         73  
432              
433 8 50       40 @{$self->{last}} = $rec-> [WATCH_CANCEL]->($self, @{$self->{last}})
  0         0  
  0         0  
434             if $rec-> [WATCH_CANCEL];
435              
436 8 50       68 $LOOP-> remove_event($rec) if $LOOP;
437 8         17 @{$self-> {in}} = grep { $_ != $rec } @{$self-> {in}};
  8         32  
  11         45  
  8         37  
438              
439 8 100 66     94 if ($rec->[WATCH_LAMBDA] and ref($rec->[WATCH_LAMBDA])) {
440 5         21 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
441 5 100       26 if ( $arr) {
442 4         15 @$arr = grep { $_ != $rec } @$arr;
  4         15  
443 4 50       33 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
444             }
445             }
446 8         218 @$rec = ();
447              
448 8 100       20 return if @{$self->{in}};
  8         52  
449             # that was the last event
450              
451 4 50       13 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
452 4         13 $self-> {stopped} = 1;
453 4         24 $_-> remove( $self) for @LOOPS;
454             }
455              
456             # Removes all events bound to the object, notifies the interested objects.
457             # The object becomes stopped immediately, so no new events will be allowed to register.
458             sub cancel_all_events
459             {
460 756     756 0 1136 my $self = shift;
461              
462 756         1356 $self-> {stopped} = 1;
463              
464 756 100       2086 return unless @{$self-> {in}};
  756         8715  
465            
466 46         125 for ( grep { $_-> [WATCH_CANCEL] } reverse @{$self-> {in}}) {
  53         254  
  46         376  
467 14         35 my $wc = $_-> [WATCH_CANCEL];
468 14         36 $_-> [WATCH_CANCEL] = undef;
469 14         35 @{$self->{last}} = $wc-> ($self, $_, @{$self->{last}})
  14         210  
  14         77  
470             }
471              
472 46 50       458 $LOOP-> remove( $self) if $LOOP;
473 46         186 $_-> remove($self) for @LOOPS;
474              
475 46         126 for my $rec ( @{$self->{in}}) {
  46         253  
476 50 100       262 if ( ref($rec->[WATCH_LAMBDA])) {
477 30         184 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
478 30 50       254 if ( $arr) {
479 30         85 @$arr = grep { $_ != $rec } @$arr;
  30         177  
480 30 50       339 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
481             }
482             }
483 50         1331 @$rec = ();
484             }
485              
486 46         121 @{$self-> {in}} = ();
  46         1263  
487             }
488              
489             sub autorestart
490             {
491             $#_ ?
492             $_[0]-> {autorestart} = $_[1] :
493             ( exists($_[0]-> {autorestart}) ?
494 151 50   151 1 1353 $_[0]-> {autorestart} : 1)
    50          
495             }
496 967     967 1 3765 sub is_stopped { $_[0]-> {stopped} }
497 5 100   5 1 24 sub is_waiting { not($_[0]->{stopped}) and @{$_[0]->{in}} }
  3         18  
498 691   100 691 1 2691 sub is_passive { not($_[0]->{stopped}) and not(@{$_[0]->{in}}) }
499 1375 100   1375 1 4484 sub is_active { $_[0]->{stopped} or @{$_[0]->{in}} }
  1373         6611  
500              
501             # reset the state machine
502             sub reset
503             {
504 192     192 1 2925 my $self = shift;
505              
506 192         478 $self-> cancel_all_events;
507 192         278 @{$self-> {last}} = ();
  192         452  
508 192         364 delete $self-> {stopped};
509 192 50       619 warn _d( $self, 'reset') if $DEBUG_LAMBDA;
510             }
511              
512             # start the state machine
513             sub start
514             {
515 637     637 1 1189 my $self = shift;
516              
517 637 50       1454 croak "can't start active lambda, call reset() first" if $self-> is_active;
518              
519 637 50       1457 warn _d( $self, 'started') if $DEBUG_LAMBDA;
520 621         2021 @{$self->{last}} = $self-> {start}-> ($self, @{$self->{last}})
  634         2149  
521 637 100       1866 if $self-> {start};
522 624 50       1670 warn $self-> _msg('initial') if $DEBUG_LAMBDA;
523              
524 624 100       835 unless ( @{$self->{in}}) {
  624         1991  
525 137 50       326 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
526 137         315 $self-> {stopped} = 1;
527             }
528             }
529              
530             # peek into the current state
531 154 100   154 1 1075 sub peek { wantarray ? @{$_[0]->{last}} : $_[0]-> {last}-> [0] }
  35         589  
532              
533             # pass initial parameters to lambda
534             sub call
535             {
536 571     571 1 851 my $self = shift;
537              
538 571 50       1285 croak "can't call active lambda" if $self-> is_active;
539              
540 571         982 @{$self-> {last}} = @_;
  571         1398  
541 571         1002 $self;
542             }
543              
544             # abandon all states and stop with constant message
545             sub terminate
546             {
547 82     82 1 509 my ( $self, @error) = @_;
548              
549 82         225 $self-> {last} = \@error;
550 82         338 $self-> cancel_all_events;
551 82 50       337 warn $self-> _msg('terminate') if $DEBUG_LAMBDA;
552             }
553              
554             # propagate event destruction on all levels
555             sub destroy
556             {
557 0     0 1 0 shift-> cancel_all_events( cascade => 1);
558             }
559              
560             # synchronisation
561              
562             # drives objects dependant on the other objects until all of them
563             # are stopped
564             sub drive
565             {
566 487789     487789 0 533830 my $changed = 1;
567 487789         533836 my $executed = 0;
568 487789 50       927527 warn "IO::Lambda::drive --------\n" if $DEBUG_LAMBDA;
569 487789         922119 while ( $changed) {
570 488137         481940 $changed = 0;
571              
572             # dispatch
573 488137         771135 for my $rec ( map { @$_ } values %EVENTS) {
  1287679         2014072  
574 1287689 100       2916013 next unless $rec->[WATCH_LAMBDA]-> {stopped};
575 493         1558 $rec->[WATCH_OBJ]-> lambda_handler( $rec);
576 493         639 $changed = 1;
577 493         907 $executed++;
578             }
579 488137 50 33     1729006 warn "IO::Lambda::drive .........\n" if $DEBUG_LAMBDA and $changed;
580             }
581 487789 50       851115 warn "IO::Lambda::drive +++++++++\n" if $DEBUG_LAMBDA;
582              
583 487789         999831 return $executed;
584             }
585              
586             # do one quant
587             sub yield
588             {
589 487789     487789 1 578073 my $nonblocking = shift;
590 487789         562909 my $more_events = 0;
591              
592             # custom loops must not wait
593 487789         780171 for ( @LOOPS) {
594 487464 100       1158004 next if $_-> empty;
595 487354         1224297 $_-> yield;
596 487354         879560 $more_events = 1;
597             }
598              
599 487789 100       833917 if ( drive) {
600             # some callbacks we called, don't let them wait in sleep
601 232         922 return 1;
602             }
603              
604             # main loop waits, if anything
605 487557 100       1261527 unless ( $LOOP-> empty) {
606 250         871 $LOOP-> yield( $nonblocking);
607 250         524 $more_events = 1;
608             }
609              
610 487557 100       1122879 $more_events = 1 if keys %EVENTS;
611 487557         1393485 return $more_events;
612             }
613              
614              
615             # wait for one lambda to stop
616             sub wait
617             {
618 157     157 1 1674 my $self = shift;
619 157 100       481 if ( $self-> is_passive) {
620 140         456 $self-> call(@_);
621 140         382 $self-> start;
622             }
623 149         710 yield while not $self-> {stopped};
624 149         527 return $self-> peek;
625             }
626              
627             # wait for all lambdas to stop
628             sub wait_for_all
629             {
630 0     0 1 0 my @objects = @_;
631 0 0       0 return unless @objects;
632 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
633 0         0 my @ret;
634 0         0 while ( 1) {
635 0         0 push @ret, map { $_-> peek } grep { $_-> {stopped} } @objects;
  0         0  
  0         0  
636 0         0 @objects = grep { not $_-> {stopped} } @objects;
  0         0  
637 0 0       0 last unless @objects;
638 0         0 yield;
639             }
640 0         0 return @ret;
641             }
642              
643             # wait for at least one lambda to stop, return those that stopped
644             sub wait_for_any
645             {
646 0     0 1 0 my @objects = @_;
647 0 0       0 return unless @objects;
648 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
649 0         0 while ( 1) {
650 0         0 my @n = grep { $_-> {stopped} } @objects;
  0         0  
651 0 0       0 return @n if @n;
652 0         0 yield;
653             }
654             }
655              
656             # run the event loop until no lambdas are left in the blocking state
657 0     0 1 0 sub run { do {} while yield }
658              
659             #
660             # Part II - Procedural interface to the lambda-style programming
661             #
662             #################################################################
663              
664 0     0   0 sub _lambda_restart { die "lambda() is not restartable" }
665             sub lambda(&)
666             {
667 457     457 1 12729 my $cb = _subname(lambda => $_[0]);
668             my $l = __PACKAGE__-> new( sub {
669             # initial lambda code is usually executed by tail/tails inside another lambda,
670             # so protect the upper-level context
671 632     632   2723 local *__ANON__ = "IO::Lambda::lambda::callback";
672 632         1326 local $THIS = shift;
673 632         1561 local @CONTEXT = ();
674 632         950 local $CALLBACK = $cb;
675 632         1091 local $METHOD = \&_lambda_restart;
676 632 50       2319 $cb ? $cb-> (@_) : @_;
677 457         5306 });
678 457 50       1483 if ( $DEBUG_CALLER) {
679 0 0       0 if ( $DEBUG_CALLER > 1) {
680 0         0 $l-> {caller} = Carp::longmess;
681 0         0 chomp $l-> {caller};
682 0         0 $l-> {caller} =~ s/^ at //;
683             } else {
684 0         0 $l-> {caller} = join(':', (caller)[1,2]);
685             }
686             }
687 457         2492 $l;
688             }
689              
690             sub _subname
691             {
692 1040 0 0 1040   2513 subname(
      33        
      0        
693             caller(1 + ($_[2] || 0)) . '::_'. $_[0],
694             $_[1]
695             ) if $DEBUG_CALLER and $_[1] and not $AGAIN;
696 1040         3413 return $_[1];
697             }
698              
699             *io = \λ
700              
701             # re-enter the latest (or other) frame
702             sub again
703             {
704 161 100   161 1 1222 ( $METHOD, $CALLBACK) = @_ if 2 == @_;
705 161         262 local $AGAIN = 1;
706 161 50       703 defined($METHOD) ?
707             $METHOD-> ($CALLBACK) :
708             croak "again() outside of a restartable call"
709             }
710              
711             # define context
712 438 100   438 1 5187 sub this { @_ ? ($THIS, @CONTEXT) = @_ : $THIS }
713 1097 100   1097 1 10068 sub context { @_ ? (@CONTEXT) = @_ : @CONTEXT }
714 1 50   1 1 15 sub restartable { @_ ? ($METHOD, $CALLBACK) = @_ : ( $METHOD, $CALLBACK) }
715 41     41 0 257 sub set_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = @_ }
716 16     16 0 93 sub get_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) }
717 0     0 0 0 sub swap_frame { my @f = get_frame; set_frame(@_); @f }
  0         0  
  0         0  
718 1     1 0 9 sub clear { set_frame(); undef $AGAIN; }
  1         4  
719              
720 29     29   4716 END { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = (); }
721              
722             sub state($)
723             {
724 41 100 66 41 1 515 my $this = ($_[0] && ref($_[0])) ? shift(@_) : this;
725 41 100       315 @_ ? $this-> {state} = $_[0] : return $this-> {state};
726             }
727              
728             # exceptions and backtracing
729             sub catch(&$)
730             {
731 31     31 1 64 my ( $cb, $event) = @_;
732 31         232 my $who = (caller(1))[3];
733 31         97 my @ctx = @CONTEXT;
734 31 50       97 croak "catch callback already defined" if $event-> [WATCH_CANCEL];
735             $event->[WATCH_CANCEL] = $cb ? sub {
736 15 50   15   43 local *__ANON__ = "$who\:\:catch" if $DEBUG_CALLER;
737 15         26 $THIS = shift;
738 15         46 local $THIS-> {cancelled_event} = shift;
739 15         46 local $THIS-> {cancelling} = 1;
740 15         43 @CONTEXT = @ctx;
741 15         20 $METHOD = undef;
742 15         25 $CALLBACK = undef;
743 15         64 $cb-> (@_);
744 31 50       165 } : undef;
745              
746             # if throw() happened before we even get here
747 31 100 66     201 $event->[WATCH_CALLBACK] = $event->[WATCH_CANCEL]
748             if $event->[WATCH_CALLBACK] && $event->[WATCH_CALLBACK] == \&_throw;
749            
750 31         108 return $event;
751             }
752              
753             sub call_again
754             {
755 3     3 1 8 my $self = shift;
756 3 50       14 croak "called outside catch()" unless $self-> {cancelled_event};
757 3         7 my $cb = $self-> {cancelled_event}->[WATCH_CALLBACK];
758 3 50       18 $cb->($self, @_) if $cb;
759             }
760              
761             sub autocatch($)
762             {
763             catch {
764 2     2   8 this-> call_again;
765 2         19 this-> throw(@_);
766 15     15 1 99 } $_[0]
767             }
768              
769 3     3 1 11 sub is_cancelling { $_[0]-> {cancelling} }
770              
771             sub _throw
772             {
773 15     15   21 my $self = shift;
774 15 50       28 warn _d( $self, 'throw') if $DEBUG_LAMBDA;
775 15         31 $self-> throw(@_);
776             }
777              
778             sub throw
779             {
780 40     40 1 79 my ( $self, @error) = @_;
781 40         87 my @c = $self-> callers;
782 40   100     231 $_-> [WATCH_CALLBACK] = $_->[WATCH_CANCEL] || \&_throw for @c;
783 40         81 $self-> terminate(@error);
784 40 100 66     112 $SIGTHROW->($self, @error) if $SIGTHROW and not @c;
785 40         113 return @error;
786             }
787              
788             sub sigthrow
789             {
790 4 100 66 4 1 23 shift if defined($_[0]) and (not(ref $_[0]) or ref($_[0]) ne 'CODE');
      66        
791 4 50       16 $SIGTHROW = $_[0] if @_;
792 4         10 return $SIGTHROW;
793             }
794              
795 0 0   0 1 0 sub callees { @{ $EVENTS{ "$_[0]" } || [] } }
  0         0  
796 60     60 1 110 sub callers { grep { $_[0] == $_-> [WATCH_LAMBDA] } map { @$_ } values %EVENTS }
  120         303  
  102         173  
797              
798             sub backtrace
799             {
800 6     6 1 855 require IO::Lambda::Backtrace;
801 6         796 IO::Lambda::Backtrace-> new($_[0], Carp::shortmess);
802             }
803              
804             #
805             # Conditions:
806             #
807              
808             # common wrapper for declaration of handle-watching user conditions
809             sub add_watch
810             {
811 50     50 0 143 my ($self, $cb, $method, $flags, $handle, $deadline, @ctx) = @_;
812 50         59 my $who;
813 50 50       132 $who = (caller(1))[3] if $DEBUG_CALLER;
814             $self-> watch_io(
815             $flags, $handle, $deadline,
816             sub {
817 49 50   49   125 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
818 49         127 $THIS = shift;
819 49         149 @CONTEXT = @ctx;
820 49         78 $METHOD = $method;
821 49         70 $CALLBACK = $cb;
822 49 50       376 $cb ? $cb-> (@_) : @_;
823             },
824 50 100       316 ($AGAIN ? delete($self-> {cancel}) : undef),
825             )
826             }
827              
828             # rwx($flags,$handle,$deadline)
829             sub rwx(&)
830             {
831             return $THIS-> override_handler('rwx', \&rwx, shift)
832 0 0   0 1 0 if $THIS-> {override}->{rwx};
833              
834 0         0 $THIS-> add_watch(
835             _subname(rwx => shift), \&rwx,
836             @CONTEXT[0,1,2,0,1,2]
837             )
838             }
839              
840             # readable($handle,$deadline)
841             sub readable(&)
842             {
843             return $THIS-> override_handler('readable', \&readable, shift)
844 43 50   43 1 351 if $THIS-> {override}->{readable};
845              
846 43         110 $THIS-> add_watch(
847             _subname(readable => shift), \&readable, IO_READ,
848             @CONTEXT[0,1,0,1]
849             )
850             }
851              
852             # writable($handle,$deadline)
853             sub writable(&)
854             {
855             return $THIS-> override_handler('writable', \&writable, shift)
856 7 50   7 1 60 if $THIS-> {override}->{writable};
857            
858 7         48 $THIS-> add_watch(
859             _subname(writable => shift), \&writable, IO_WRITE,
860             @CONTEXT[0,1,0,1]
861             )
862             }
863              
864             # common wrapper for declaration of time-watching user conditions
865             sub add_timer
866             {
867 41     41 0 147 my ($self, $cb, $method, $deadline, @ctx) = @_;
868 41         67 my $who;
869 41 50       130 $who = (caller(1))[3] if $DEBUG_CALLER;
870             $self-> watch_timer(
871             $deadline,
872             sub {
873 38 50   38   148 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
874 38         87 $THIS = shift;
875 38         154 @CONTEXT = @ctx;
876 38         93 $METHOD = $method;
877 38         84 $CALLBACK = $cb;
878 38 100       377 $cb ? $cb-> (@_) : @_;
879             },
880 41 100       445 ($AGAIN ? delete($self-> {cancel}) : undef),
881             )
882             }
883              
884             # timeout($deadline)
885             sub timeout(&)
886             {
887             return $THIS-> override_handler('timeout', \&timeout, shift)
888 41 50   41 1 369 if $THIS-> {override}->{timeout};
889 41         150 $THIS-> add_timer( _subname(timeout => shift), \&timeout, @CONTEXT[0,0])
890             }
891              
892             # common wrapper for declaration of single lambda-watching user conditions
893             sub add_tail
894             {
895 433     433 0 1574 my ($self, $cb, $method, $lambda, @ctx) = @_;
896 433         513 my $who;
897 433 50       974 $who = (caller(1))[3] if $DEBUG_CALLER;
898             $self-> watch_lambda(
899             $lambda,
900             ($cb ? sub {
901 373 50   373   833 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
902 373         547 $THIS = shift;
903 373         1060 @CONTEXT = @ctx;
904 373         549 $METHOD = $method;
905 373         930 $CALLBACK = $cb;
906 373         3007 $cb-> (@_);
907             } : undef),
908 433 100       4360 ($AGAIN ? delete($self-> {cancel}) : undef),
    100          
909             );
910             }
911              
912             # convert constant @param into a lambda
913             sub add_constant
914             {
915 0     0 0 0 my ( $self, $cb, $method, @param) = @_;
916             $self-> add_tail (
917             _subname(constant => $cb), $method,
918 0     0   0 lambda { @param },
919             @CONTEXT
920 0         0 );
921             }
922              
923             # handle default condition logic given a lambda
924             sub condition
925             {
926 38     38 1 243 my ( $self, $cb, $method, $name) = @_;
927              
928             return $THIS-> override_handler($name, $method, $cb)
929 38 50 66     373 if defined($name) and $THIS-> {override}->{$name};
930            
931 38         91 my @ctx = @CONTEXT;
932              
933 38         66 my $who;
934 38 50       92 if ( $DEBUG_CALLER) {
935 0 0       0 $who = defined($name) ? $name : (caller(1))[3];
936 0         0 _subname($who, $cb, 2);
937             }
938              
939             $THIS-> watch_lambda(
940             $self,
941             $cb ? sub {
942 37 50   37   108 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
943 37         64 $THIS = shift;
944 37         128 @CONTEXT = @ctx;
945 37         64 $METHOD = $method;
946 37         115 $CALLBACK = $cb;
947 37         221 $cb-> (@_);
948             } : undef
949 38 50       559 );
950             }
951              
952             # dummy sub for empty calls for tails() family
953             sub _empty
954             {
955 2     2   7 my ($name, $method, $cb) = @_;
956 2         8 my @ctx = context;
957             $THIS-> watch_lambda( IO::Lambda-> new, sub {
958 2     2   25 local *__ANON__ = "IO::Lambda::".$name."::callback";
959 2         7 @CONTEXT = @ctx;
960 2         5 $METHOD = $method;
961 2         5 $CALLBACK = $cb;
962 2         13 $cb-> ();
963 2 50       33 }) if $cb;
964             }
965              
966             # tail( $lambda, @param) -- initialize $lambda with @param, and wait for it
967             sub tail(&)
968             {
969             return $THIS-> override_handler('tail', \&tail, shift)
970 474 100   474 1 2237 if $THIS-> {override}->{tail};
971            
972 434         993 my ( $lambda, @param) = context;
973 434 100       1154 return _empty(tail => \&tail, shift) unless $lambda;
974              
975 433 100 66     1652 $lambda-> reset
976             if $lambda-> is_stopped and $lambda-> autorestart;
977 433 100       979 if ( @param) {
978 270         968 $lambda-> call( @param);
979             } else {
980 163 100       550 $lambda-> call unless $lambda-> is_active;
981             }
982 433         1165 $THIS-> add_tail( _subname(tail => shift), \&tail, $lambda, $lambda, @param);
983             }
984              
985              
986             # tails(@lambdas) -- wait for all lambdas to finish
987             sub tails(&)
988             {
989             return $THIS-> override_handler('tails', \&tails, shift)
990 16 50   16 1 114 if $THIS-> {override}->{tails};
991            
992 16         87 my $cb = _subname tails => $_[0];
993 16         83 my @lambdas = context;
994 16         53 my $n = $#lambdas;
995 16 100       94 return _empty(tails => \&tails, $cb) unless @lambdas;
996              
997 15         29 my @ret;
998             my $watcher;
999             $watcher = sub {
1000 25     25   53 $THIS = shift;
1001 25         60 push @ret, @_;
1002 25 100       93 return if $n--;
1003              
1004 8         150 local *__ANON__ = "IO::Lambda::tails::callback";
1005 8         43 @CONTEXT = @lambdas;
1006 8         26 $METHOD = \&tails;
1007 8         18 $CALLBACK = $cb;
1008 8 100       84 $cb ? $cb-> (@ret) : @ret;
1009 15         116 };
1010 15         47 my $this = $THIS;
1011 15         84 $this-> watch_lambda( $_, $watcher) for @lambdas;
1012             }
1013              
1014             # tailo(@lambdas) -- wait for all lambdas to finish, return ordered results
1015             sub tailo(&)
1016             {
1017             return $THIS-> override_handler('tailo', \&tailo, shift)
1018 1 50   1 1 7 if $THIS-> {override}->{tailo};
1019            
1020 1         4 my $cb = _subname tailo => $_[0];
1021 1         6 my @lambdas = context;
1022 1         4 my $n = $#lambdas;
1023 1 50       6 return _empty(tailo => \&tailo, $cb) unless @lambdas;
1024              
1025 1         3 my @ret;
1026             my $watcher;
1027             $watcher = sub {
1028 3     3   8 my $curr = shift;
1029 3         21 $THIS = shift;
1030 3         11 $ret[ $curr ] = \@_;
1031 3 100       18 return if $n--;
1032              
1033 1         50 local *__ANON__ = "IO::Lambda::tailo::callback";
1034 1         7 @CONTEXT = @lambdas;
1035 1         5 $METHOD = \&tailo;
1036 1         3 $CALLBACK = $cb;
1037 1         6 @ret = map { @$_ } @ret;
  3         13  
1038 1 50       10 $cb ? $cb-> (@ret) : @ret;
1039 1         8 };
1040 1         2 my $this = $THIS;
1041 1         8 for ( my $i = 0; $i < @lambdas; $i++) {
1042 3         6 my $d = $i;
1043             $this-> watch_lambda(
1044             $lambdas[$i],
1045 3     3   21 sub { $watcher->($d, @_) }
1046 3         42 );
1047             };
1048             }
1049              
1050             # any_tail($deadline,@lambdas) -- wait for any lambda to finish within time
1051             sub any_tail(&)
1052             {
1053             return $THIS-> override_handler('any_tail', \&any_tail, shift)
1054 6 50   6 1 44 if $THIS-> {override}->{any_tail};
1055            
1056 6         23 my $cb = _subname any_tail => $_[0];
1057 6         20 my ( $deadline, @lambdas) = context;
1058 6         20 my $n = $#lambdas;
1059 6 50       36 return _empty(any_tail => \&any_tail, $cb) unless @lambdas;
1060              
1061 6         14 my ( @ret, @watchers);
1062 0         0 my $timer;
1063            
1064             $timer = $THIS-> watch_timer( $deadline, sub {
1065 3     3   77 local *__ANON__ = "IO::Lambda::any_tail::callback1";
1066 3         16 $THIS = shift;
1067 3         19 @CONTEXT = ($deadline, @lambdas);
1068 3         36 $METHOD = \&any_tail;
1069 3         17 $CALLBACK = $cb;
1070 3 50       60 @ret = $cb-> (@ret) if $cb;
1071 3         37 $THIS-> cancel_event($_) for @watchers;
1072 3         11 return @ret;
1073 6 50       83 }) if defined $deadline;
1074              
1075 6         17 my $watcher;
1076             $watcher = sub {
1077 3     3   8 push @ret, shift;
1078 3 100       17 return if $n--;
1079            
1080 1         5 local *__ANON__ = "IO::Lambda::any_tail::callback2";
1081 1         3 $THIS = shift;
1082 1         2 @CONTEXT = ($deadline, @lambdas);
1083 1         2 $METHOD = \&any_tail;
1084 1         2 $CALLBACK = $cb;
1085 1 50       9 @ret = $cb-> (@ret) if $cb;
1086 1 50       7 $THIS-> cancel_event( $timer) if $timer;
1087 1         9 return @ret;
1088 6         38 };
1089              
1090             @watchers = map {
1091 6         23 my $l = $_;
  10         20  
1092             $THIS-> watch_lambda( $l, sub {
1093 3     3   10 $watcher->($l, @_);
1094             })
1095 10         75 } @lambdas;
1096             }
1097              
1098             #
1099             # Part III - High order lambdas
1100             #
1101             ################################################################
1102              
1103             # fold($l) :: @b -> @c
1104             # $l :: ($a,@b) -> @c
1105             sub fold($)
1106             {
1107 1     1 1 3 my $l = shift;
1108             lambda {
1109 3     3   11 my @q = @_;
1110 3         12 context $l, shift(@q), shift(@q);
1111             tail {
1112 10 100       34 return @_ unless @q;
1113 7         23 context $l, shift(@q), @_;
1114 7         16 again;
1115 3         21 }}
1116 1         9 }
1117              
1118             # mapcar($l) :: (@p) -> @r
1119             sub mapcar($)
1120             {
1121 2     2 1 4 my $lambda = shift;
1122             lambda {
1123 3     3   5 my @ret;
1124 3         7 my @p = @_;
1125 3 50       9 return unless @p;
1126 3         12 context $lambda, shift @p;
1127             tail {
1128 15         30 push @ret, @_;
1129 15 100       40 return @ret unless @p;
1130 12         28 context $lambda, shift @p;
1131 12         31 again;
1132 3         18 }}
1133 2         13 }
1134              
1135             # filter($l) :: (@p) -> @r
1136             sub filter($)
1137             {
1138 1     1 1 4 my $lambda = shift;
1139             lambda {
1140 1     1   3 my @ret;
1141 1 50       7 return unless @_;
1142 1         4 my @p = @_;
1143 1         4 my $p = shift @p;
1144 1         5 context $lambda, $p;
1145             tail {
1146 5 100       20 push @ret, $p if shift;
1147 5 100       21 return @ret unless @p;
1148 4         12 $p = shift @p;
1149 4         14 context $lambda, $p;
1150 4         13 again;
1151 1         8 }}
1152 1         9 }
1153              
1154             # curry(@a -> $l) :: @a -> @b
1155             sub curry(&)
1156             {
1157 3     3 1 8 my $cb = $_[0];
1158             lambda {
1159 12     12   27 context $cb->(@_);
1160 12         93 &tail();
1161             }
1162 3         19 }
1163              
1164             # seq() :: (@l) -> @m
1165 10     10 1 32 sub seq { mapcar curry { shift } }
  1     1   207  
1166              
1167             # par($max = 0) :: (@l) -> @m
1168             sub par
1169             {
1170 1   50 1 1 5 my $max = $_[0] || 0;
1171              
1172             lambda {
1173 1     1   3 my @q = @_;
1174 1         2 my @ret;
1175 1 50 33     12 $max = @q if $max < 1 or $max > @q;
1176             context map {
1177 1         4 lambda {
1178 3 50       8 return unless @q;
1179 3         12 context shift @q;
1180             tail {
1181 9         26 push @ret, @_;
1182 9 100       38 return unless @q;
1183 6         22 context shift @q;
1184 6         24 again;
1185 3         11 }}
1186 3         11 } 1 .. $max;
1187 1         5 tails { @ret }
1188 1         5 }
1189 1         6 }
1190              
1191              
1192             # sysread lambda wrapper
1193             #
1194             # ioresult :: ($result, $error)
1195             # sysreader() :: ($fh, $buf, $length, $deadline) -> ioresult
1196             sub sysreader (){ lambda
1197             {
1198 139     139   247 my ( $fh, $buf, $length, $deadline) = @_;
1199 139 50       316 $$buf = '' unless defined $$buf;
1200              
1201             this-> watch_io( IO_READ, $fh, $deadline, subname _sysreader => sub {
1202 135 50       317 return undef, 'timeout' unless $_[1];
1203 135         1967 local $SIG{PIPE} = 'IGNORE';
1204 135         7279 my $n = sysread( $fh, $$buf, $length, length($$buf));
1205 135 50       518 if ( $DEBUG_IO) {
1206 0 0       0 warn "fh(", fileno($fh), ") read ", ( defined($n) ? "$n bytes" : "error $!"), "\n";
1207 0 0 0     0 warn substr( $$buf, length($$buf) - $n), "\n" if $DEBUG_IO > 1 and $n > 0;
1208             }
1209 135 50       349 return undef, $! unless defined $n;
1210 135         872 return $n;
1211             })
1212 139     46 1 384 }}
  46         859  
1213              
1214             # syswrite() lambda wrapper
1215             #
1216             # syswriter() :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
1217             sub syswriter (){ lambda
1218             {
1219 8     8   15 my ( $fh, $buf, $length, $offset, $deadline) = @_;
1220              
1221             this-> watch_io( IO_WRITE, $fh, $deadline, subname _syswriter => sub {
1222 8 50       22 return undef, 'timeout' unless $_[1];
1223 8         126 local $SIG{PIPE} = 'IGNORE';
1224 8         469 my $n = syswrite( $fh, $$buf, $length, $offset);
1225 8 50       36 if ( $DEBUG_IO) {
1226 0 0       0 warn "fh(", fileno($fh), ") wrote ", ( defined($n) ? "$n bytes out of $length" : "error $!"), "\n";
1227 0 0 0     0 warn substr( $$buf, $offset, $n), "\n" if $DEBUG_IO > 1 and $n > 0;
1228             }
1229 8 50       21 return undef, $! unless defined $n;
1230 8         53 return $n;
1231 8         14 });
1232 8     8 1 38 }}
1233              
1234             sub _match
1235             {
1236 164     164   291 my ( $cond, $buf) = @_;
1237              
1238 164 100       529 return unless defined $cond;
1239              
1240 33 100       1207 return ($$buf =~ /($cond)/)[0] if ref($cond) eq 'Regexp';
1241 4 50       10 return $cond->($buf) if ref($cond) eq 'CODE';
1242 4         12 return length($$buf) >= $cond;
1243             }
1244              
1245             # read from stream until condition is met
1246             #
1247             # readbuf($reader) :: ($fh, $$buf, $cond, $deadline) -> ioresult
1248             sub readbuf
1249             {
1250 49   66 49 1 414 my $reader = shift || sysreader;
1251              
1252             lambda {
1253 65     65   129 my ( $fh, $buf, $cond, $deadline) = @_;
1254            
1255 65 100       213 $$buf = "" unless defined $$buf;
1256              
1257 65         182 my $match = _match( $cond, $buf);
1258 65 100       288 return $match if $match;
1259            
1260 52         67 my ($maxbytes, $bufsize);
1261 52 50 66     281 $maxbytes = $cond - length($$buf)
      33        
1262             if defined($cond) and not ref($cond) and $cond > length($$buf);
1263 52 50       129 $bufsize = defined($maxbytes) ? $maxbytes : 65536;
1264            
1265 52         150 my $savepos = pos($$buf); # useful when $cond is a regexp
1266              
1267 52         119 context $reader, $fh, $buf, $bufsize, $deadline;
1268             tail {
1269 137         368 pos($$buf) = $savepos;
1270              
1271 137         263 my $bytes = shift;
1272 137 100       321 return undef, shift unless defined $bytes;
1273            
1274 136 100       300 unless ( $bytes) {
1275 37 50       140 return 1 unless defined $cond;
1276 0         0 return undef, 'eof';
1277             }
1278            
1279             # got line? return it
1280 99         244 my $match = _match( $cond, $buf);
1281 99 100       236 return $match if $match;
1282            
1283             # otherwise, just wait for more data
1284 89 50       246 $bufsize -= $bytes if defined $maxbytes;
1285              
1286 89         253 context $reader, $fh, $buf, $bufsize, $deadline;
1287 89         293 again;
1288 52         618 }}
1289 49         646 }
1290              
1291             # curry readbuf()
1292             #
1293             # getline($reader) :: ($fh, $$buf, $deadline) -> ioresult
1294             sub getline
1295             {
1296 1     1 1 44 my $reader = shift;
1297             lambda {
1298 1     1   2 my ( $fh, $buf, $deadline) = @_;
1299 1 50       4 croak "getline() needs a buffer! ( f.ex getline,\$fh,\\(my \$buf='') )"
1300             unless ref($buf);
1301 1         4 context readbuf($reader), $fh, $buf, qr/^[^\n]*\n/, $deadline;
1302             tail {
1303 1 50       6 substr( $$buf, 0, length($_[0]), '') unless defined $_[1];
1304 1         3 @_;
1305 1         12 }}
1306 1         11 }
1307              
1308             # write whole buffer to stream
1309             #
1310             # writebuf($writer) :: syswriter
1311             sub writebuf
1312             {
1313 10   66 10 1 38 my $writer = shift || syswriter;
1314              
1315             lambda {
1316 10     10   17 my ( $fh, $buf, $len, $offs, $deadline) = @_;
1317              
1318 10         19 my ( $written, $recheck_length, $olen) = (0);
1319 10 50       25 $$buf = "" unless defined $$buf;
1320 10 100       17 $offs = 0 unless defined $offs;
1321 10 50       22 unless ( defined $len) {
1322 10         17 $olen = $len = length $$buf;
1323 10         13 $recheck_length++;
1324             }
1325            
1326 10         22 context $writer, $fh, $buf, $len, $offs, $deadline;
1327             tail {
1328 21         31 my $bytes = shift;
1329 21 50       43 return undef, shift unless defined $bytes;
1330              
1331 21         23 $offs += $bytes;
1332 21         29 $written += $bytes;
1333 21         29 $len -= $bytes;
1334 21 50       44 if ( $recheck_length) {
1335 21         30 my $l = length $$buf;
1336 21 100       54 if ( $l > $olen) {
1337 1         2 $len += $l - $olen;
1338 1         2 $olen = $l;
1339             }
1340             }
1341 21 100       55 return $written if $len <= 0;
1342              
1343 11         22 context $writer, $fh, $buf, $len, $offs, $deadline;
1344 11         22 again;
1345 10         50 }}
1346 10         78 }
1347              
1348             #
1349             # Part IV - Developer API for custom condvars and event loops
1350             #
1351             ################################################################
1352              
1353             # register condvar listener
1354             sub bind
1355             {
1356 52     52 1 111 my $self = shift;
1357              
1358             # create new condition
1359 52 50       214 croak "can't register events on a stopped lambda" if $self-> {stopped};
1360              
1361 52         181 my $rec = [ $self, @_ ];
1362 52         99 push @{$self-> {in}}, $rec;
  52         146  
1363              
1364 52         555 return $rec;
1365             }
1366              
1367             # stop listening on a condvar
1368             sub resolve
1369             {
1370 41     41 1 108 my ( $self, $rec) = @_;
1371              
1372 41         113 my $in = $self-> {in};
1373 41         79 my $nn = @$in;
1374 41         107 @$in = grep { $rec != $_ } @$in;
  41         172  
1375 41 50 33     528 die _d($self, "stray condvar event $rec (@$rec)")
1376             if $nn == @$in or $self != $rec->[WATCH_OBJ];
1377              
1378 41         120 undef $rec-> [WATCH_OBJ]; # unneeded references
1379              
1380 41 50       206 unless ( @$in) {
1381 41 50       137 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
1382 41         172 $self-> {stopped} = 1;
1383             }
1384             }
1385              
1386             sub callout
1387             {
1388 6     6 0 78 my ( $self, $cb, @param) = @_;
1389 6 100       34 @{$self->{last}} = $cb ? $cb-> (@param) : @param;
  6         51  
1390             }
1391              
1392 14     14 0 46 sub add_loop { push @LOOPS, shift }
1393 14 50   14 0 161 sub remove_loop { @LOOPS = grep { defined and $_ != $_[0] } @LOOPS }
  14         1264  
1394              
1395 0     0   0 sub __end { clear(); undef %EVENTS; undef @LOOPS; } # for threads
  0         0  
  0         0  
1396              
1397             package IO::Lambda::Loop;
1398 29     29   255 use vars qw($DEFAULT);
  29         45  
  29         1279  
1399 29     29   1306 use strict;
  29         1251  
  29         1874  
1400 29     29   154 use warnings;
  29         1226  
  29         17078  
1401              
1402             $DEFAULT = 'Select' unless defined $DEFAULT;
1403 27     27   100 sub default { $DEFAULT = shift }
1404              
1405             sub new
1406             {
1407 27 50   27   93 return $IO::Lambda::LOOP if $IO::Lambda::LOOP;
1408              
1409 27         87 my ( $class, %opt) = @_;
1410              
1411 27   33     211 $opt{type} ||= $DEFAULT;
1412 27         102 $class .= "::$opt{type}";
1413 27     27   1863 eval "use $class;";
  27         18930  
  27         77  
  27         606  
1414 27 50       131 die $@ if $@;
1415              
1416 27         171 return $IO::Lambda::LOOP = $class-> new();
1417             }
1418              
1419             1;
1420              
1421             __DATA__