File Coverage

blib/lib/IO/Lambda.pm
Criterion Covered Total %
statement 681 750 90.8
branch 266 424 62.7
condition 50 106 47.1
subroutine 126 145 86.9
pod 60 77 77.9
total 1183 1502 78.7


line stmt bran cond sub pod time code
1             # $Id: Lambda.pm,v 1.191 2012/01/13 06:41:28 dk Exp $
2             package IO::Lambda;
3              
4 29     29   694880 use Carp qw(croak);
  29         70  
  29         1971  
5 29     29   149 use strict;
  29         54  
  29         633  
6 29     29   156 use warnings;
  29         40  
  29         823  
7 29     29   139 use Exporter;
  29         55  
  29         1023  
8 29     29   20821 use Sub::Name;
  29         17132  
  29         1743  
9 29     29   189 use Scalar::Util qw(weaken);
  29         54  
  29         2737  
10 29     29   20949 use Time::HiRes qw(time);
  29         37155  
  29         150  
11 29         17259 use vars qw(
12             $LOOP %EVENTS @LOOPS
13             $VERSION @ISA
14             @EXPORT_OK %EXPORT_TAGS @EXPORT_CONSTANTS @EXPORT_LAMBDA @EXPORT_STREAM
15             @EXPORT_DEV @EXPORT_MISC @EXPORT_FUNC
16             $THIS @CONTEXT $METHOD $CALLBACK $AGAIN $SIGTHROW
17             $DEBUG_IO $DEBUG_LAMBDA $DEBUG_CALLER %DEBUG
18 29     29   5071 );
  29         48  
19             $VERSION = '1.26';
20             @ISA = qw(Exporter);
21             @EXPORT_CONSTANTS = qw(
22             IO_READ IO_WRITE IO_EXCEPTION
23             WATCH_OBJ WATCH_DEADLINE WATCH_LAMBDA WATCH_CALLBACK
24             WATCH_IO_HANDLE WATCH_IO_FLAGS WATCH_CALLER WATCH_CANCEL
25             );
26             @EXPORT_STREAM = qw(
27             sysreader syswriter getline readbuf writebuf
28             );
29             @EXPORT_LAMBDA = qw(
30             this context lambda again state restartable catch autocatch
31             io readable writable rwx timeout tail tails tailo any_tail
32             );
33             @EXPORT_FUNC = qw(
34             seq par mapcar filter fold curry
35             );
36             @EXPORT_MISC = qw(
37             set_frame get_frame swap_frame sigthrow
38             );
39             @EXPORT_DEV = qw(
40             _subname _o _t
41             );
42             @EXPORT_OK = (
43             @EXPORT_LAMBDA, @EXPORT_CONSTANTS, @EXPORT_STREAM,
44             @EXPORT_DEV, @EXPORT_MISC, @EXPORT_FUNC
45             );
46             %EXPORT_TAGS = (
47             func => \@EXPORT_FUNC,
48             lambda => \@EXPORT_LAMBDA,
49             stream => \@EXPORT_STREAM,
50             constants => \@EXPORT_CONSTANTS,
51             dev => \@EXPORT_DEV,
52             all => [ @EXPORT_LAMBDA, @EXPORT_STREAM, @EXPORT_CONSTANTS, @EXPORT_FUNC ],
53             );
54              
55             if ( exists $ENV{IO_LAMBDA_DEBUG}) {
56             for my $p ( split ',', $ENV{IO_LAMBDA_DEBUG}) {
57             if ( $p =~ /^([^=]+)=(.*)$/) {
58             $DEBUG{lc $1}=$2;
59             } else {
60             $DEBUG{lc $p}++;
61             }
62             }
63             $DEBUG_IO = $DEBUG{io} || 0;
64             $DEBUG_LAMBDA = $DEBUG{lambda} || 0;
65             $DEBUG_CALLER = $DEBUG{caller} || 0;
66             $IO::Lambda::Loop::DEFAULT = $DEBUG{loop} if $DEBUG{loop};
67             $SIG{__DIE__} = sub {
68             return if $^S;
69             Carp::confess(@_);
70             } if $DEBUG{die};
71             }
72              
73 29     29   181 use constant IO_READ => 4;
  29         55  
  29         2449  
74 29     29   397 use constant IO_WRITE => 2;
  29         54  
  29         1413  
75 29     29   140 use constant IO_EXCEPTION => 1;
  29         55  
  29         1309  
76            
77 29     29   149 use constant WATCH_OBJ => 0;
  29         50  
  29         1356  
78 29     29   167 use constant WATCH_CANCEL => 1;
  29         49  
  29         1397  
79              
80 29     29   132 use constant WATCH_DEADLINE => 2;
  29         110  
  29         1215  
81 29     29   132 use constant WATCH_LAMBDA => 2;
  29         65  
  29         1256  
82 29     29   184 use constant WATCH_CALLBACK => 3;
  29         84  
  29         1270  
83              
84 29     29   143 use constant WATCH_CALLER => 4;
  29         46  
  29         1286  
85 29     29   139 use constant WATCH_IO_HANDLE => 4;
  29         55  
  29         1176  
86 29     29   137 use constant WATCH_IO_FLAGS => 5;
  29         39  
  29         288693  
87              
88             sub new
89             {
90 505 100   505 1 1871 IO::Lambda::Loop-> new unless $LOOP;
91 505         4181 return bless {
92             in => [], # events we wait for
93             last => [], # result of the last state
94             stopped => 0, # initial state
95             start => $_[1], # kick-start coderef
96             }, $_[0];
97             }
98              
99             sub DESTROY
100             {
101 481     481   3803 my $self = $_[0];
102 481         1567 $self-> cancel_all_events;
103             }
104              
105             my $_doffs = 0;
106 0     0   0 sub _d_in { $_doffs++ }
107 0 0   0   0 sub _d_out { $_doffs-- if $_doffs }
108 0     0   0 sub _d { (' ' x $_doffs), _obj(shift), ': ', @_, "\n" }
109 0     0   0 sub _o { $_[0] =~ /0x([\w]+)/; $1 }
  0         0  
110 0   0 0   0 sub _obj { "lambda(". _o($_[0]) . ")." . ( $_[0]->{caller} || '()' ) }
111 0 0   0   0 sub _t { defined($_[0]) ? ( "time(", (($_[0] < 1_000_000) ? $_[0] : $_[0]-time()), ")" ) : () }
    0          
112             sub _ev
113             {
114 0     0   0 $_[0] =~ /0x([\w]+)/;
115             "event($1) ",
116 0 0       0 (($#{$_[0]} == WATCH_IO_FLAGS) ? (
  0 0       0  
    0          
    0          
    0          
    0          
117             'fd=',
118             fileno($_[0]->[WATCH_IO_HANDLE]),
119             ' ',
120             ( $_[0]->[WATCH_IO_FLAGS] ? (
121             join('/',
122             (($_[0]->[WATCH_IO_FLAGS] & IO_READ) ? 'read' : ()),
123             (($_[0]->[WATCH_IO_FLAGS] & IO_WRITE) ? 'write' : ()),
124             (($_[0]->[WATCH_IO_FLAGS] & IO_EXCEPTION) ? 'exc' : ()),
125             )) :
126             'timeout'
127             ),
128             ' ', _t($_[0]->[WATCH_DEADLINE]),
129             ) : (
130             ref($_[0]-> [WATCH_LAMBDA]) ?
131             _obj($_[0]-> [WATCH_LAMBDA]) :
132             _t($_[0]->[WATCH_DEADLINE])
133             ))
134             }
135             sub _msg
136             {
137 0     0   0 my $self = shift;
138             _d(
139             $self,
140             "@_ >> (",
141             join(',',
142             map {
143 0 0       0 defined($_) ? $_ : 'undef'
144             }
145 0         0 @{$self->{last}}
  0         0  
146             ),
147             ')'
148             )
149             }
150              
151             #
152             # Part I - Object interface to callback and
153             # messaging interface with event loop and lambdas
154             #
155             #########################################################
156              
157             # register an IO event
158             sub watch_io
159             {
160 183     183 1 507 my ( $self, $flags, $handle, $deadline, $callback, $cancel) = @_;
161              
162 183 50       584 croak "can't register events on a stopped lambda" if $self-> {stopped};
163 183 50       584 croak "bad io flags" if 0 == ($flags & (IO_READ|IO_WRITE|IO_EXCEPTION));
164              
165 183 100 66     680 $deadline += time if defined($deadline) and $deadline < 1_000_000_000;
166            
167 183         764 my $rec = [
168             $self,
169             $cancel,
170             $deadline,
171             $callback,
172             $handle,
173             $flags,
174             ];
175 183         552 weaken $rec->[0];
176 183         278 push @{$self-> {in}}, $rec;
  183         483  
177              
178 183 50       510 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
179              
180 183         1234 $LOOP-> watch( $rec );
181              
182 183         788 return $rec;
183             }
184              
185             # register a timeout
186             sub watch_timer
187             {
188 54     54 1 154 my ( $self, $deadline, $callback, $cancel) = @_;
189              
190 54 50       193 croak "can't register events on a stopped lambda" if $self-> {stopped};
191 54 50       163 croak "$self: time is undefined" unless defined $deadline;
192            
193 54 100       347 $deadline += time if $deadline < 1_000_000_000;
194 54         159 my $rec = [
195             $self,
196             $cancel,
197             $deadline,
198             $callback,
199             ];
200 54         211 weaken $rec->[0];
201 54         95 push @{$self-> {in}}, $rec;
  54         155  
202              
203 54 50       180 warn _d( $self, "> ", _ev($rec)) if $DEBUG_IO;
204              
205 54         245 $LOOP-> after( $rec);
206            
207 54         250 return $rec;
208             }
209              
210             # register a callback when another lambda exits
211             sub watch_lambda
212             {
213 492     492 1 979 my ( $self, $lambda, $callback, $cancel) = @_;
214 492         1039 @_ = (); # perl bug http://rt.perl.org/rt3//Public/Bug/Display.html?id=70974
215              
216 492 50       1342 croak "can't register events on a stopped lambda" if $self-> {stopped};
217 492 50 33     4384 croak "bad lambda" unless $lambda and $lambda->isa('IO::Lambda');
218              
219 492 50       1345 croak "won't watch myself" if $self == $lambda;
220             # XXX check cycling
221            
222 492 50       1425 $lambda-> reset if $lambda-> is_stopped;
223              
224 492         1440 my $rec = [
225             $self,
226             $cancel,
227             $lambda,
228             $callback,
229             ];
230 492         1558 weaken $rec->[0];
231 492 50       1547 $rec-> [WATCH_CALLER] = Carp::shortmess if $DEBUG_CALLER;
232 492         781 push @{$self-> {in}}, $rec;
  492         1310  
233 492         833 push @{$EVENTS{"$lambda"}}, $rec;
  492         3201  
234              
235 492 100       1466 $lambda-> start if $lambda-> is_passive;
236              
237 487 50       1401 warn _d( $self, "> ", _ev($rec)) if $DEBUG_LAMBDA;
238              
239 487         3104 return $rec;
240             }
241              
242             # watch the watchers
243             sub override
244             {
245 33 100   33 1 954 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
246              
247 33 100       58 if ( $cb) {
248 13   100     54 $self-> {override}->{$method} ||= [];
249 13         16 push @{$self-> {override}->{$method}}, [ $state, $cb ];
  13         54  
250             } else {
251 20         33 my $p;
252 20 100       55 return unless $p = $self-> {override}->{$method};
253 19         56 for ( my $i = $#$p; $i >= 0; $i--) {
254 21 100 33     222 if (
      33        
      66        
      33        
255             (
256             not defined ($state) and
257             not defined ($p->[$i]-> [0])
258             ) or (
259             defined($state) and
260             defined($p->[$i]-> [0]) and
261             $p->[$i]->[0] eq $state
262             )
263             ) {
264 19         37 my $ret = splice( @$p, $i, 1);
265 19 100       50 delete $self-> {override}->{$method} unless @$p;
266 19         81 return $ret->[1];
267             }
268             }
269              
270 0         0 return undef;
271             }
272             }
273              
274             sub override_handler
275             {
276 40     40 0 67 my ( $self, $method, $sub, $cb) = @_;
277              
278 40         69 my $o = $self-> {override}-> {$method}-> [-1];
279              
280             # check state match
281 40         73 my ($a, $b) = ( $self-> {state}, $o-> [0]);
282 40 100 66     440 unless (
283             ( not defined($a) and not defined ($b)) or
284             ( defined $a and defined $b and $a eq $b) or
285             ( defined $b and $b eq '*')
286             ) {
287             # state not matched
288 8 100       11 if ( 1 == @{$self-> {override}->{$method}}) {
  8         25  
289 4         10 local $self-> {override}->{$method} = undef;
290 4         10 return $sub-> ($cb);
291             } else {
292 4         6 pop @{$self-> {override}->{$method}};
  4         10  
293 4         10 my $ret = $sub-> ($cb);
294 4         6 push @{$self->{override}->{$method}}, $o;
  4         10  
295 4         18 return $ret;
296             }
297             } else {
298             # state matched
299 32         83 local $self-> {super} = [ $sub, $cb ];
300 32 100       44 if ( 1 == @{$self-> {override}->{$method}}) {
  32         88  
301 22         49 local $self-> {override}->{$method} = undef;
302 22         57 return $o-> [1]-> ( $self, $sub, $cb);
303             } else {
304 10         12 pop @{$self-> {override}->{$method}};
  10         21  
305 10         26 my $ret = $o-> [1]-> ( $self, $sub, $cb);
306 10         21 push @{$self->{override}->{$method}}, $o;
  10         21  
307 10         41 return $ret;
308             }
309             }
310             }
311              
312             # Insert a new callback to be called before original callback.
313             # Needs to insert callbacks in {override} stack in reverse order,
314             # because direct order serves LIFO order for override() callbacks, --
315             # and that means FIFO for intercept() callbacks. But we also want LIFO.
316             sub intercept
317             {
318 24 100   24 1 1661 my ( $self, $method, $state, $cb) = ( 4 == @_) ? @_ : (@_[0,1],'*',$_[2]);
319            
320 24 100       67 return $self-> override( $method, $state, undef) unless $cb;
321              
322 14         42 _subname("intercept($method:$state)" => $cb);
323              
324 14   100     63 $self-> {override}->{$method} ||= [];
325 14         85 unshift @{$self-> {override}->{$method}}, [ $state, sub {
326             # this is called when lambda calls $method with $state
327 17     17   30 my ( undef, $sub, $orig_cb) = @_;
328             # $sub is a condition, like readable(&) or tail(&)
329             $sub->( sub {
330             # that (&) is finally called when IO event is there
331 16         40 local $self-> {super} = [$orig_cb];
332 16         37 &$cb;
333 17         64 });
334 14         17 } ];
335             }
336              
337             sub super
338             {
339 25 50   25 1 80 croak "super() call outside overridden condition" unless $_[0]-> {super};
340 25         39 my $data = $_[0]-> {super};
341 25 100       48 if ( defined $data-> [1]) {
342             # override() super
343 5         12 return $data-> [0]-> ($data-> [1]);
344             } else {
345             # intercept() super
346 20         24 my $self = shift;
347 20 50       79 return defined($data->[0]) ?
    100          
348             $data-> [0]-> (@_) :
349             ( wantarray ? @_ : $_[0] );
350             }
351             }
352              
353              
354             # handle incoming asynchronous events
355             sub io_handler
356             {
357 223     223 0 477 my ( $self, $rec) = @_;
358              
359 223 50       628 warn _d( $self, '< ', _ev($rec)) if $DEBUG_IO;
360              
361 223         499 my $in = $self-> {in};
362 223         391 my $nn = @$in;
363 223         443 @$in = grep { $rec != $_ } @$in;
  241         1194  
364 223 50 33     1831 die _d($self, 'stray ', _ev($rec))
365             if $nn == @$in or $self != $rec->[WATCH_OBJ];
366              
367 223 50       665 _d_in if $DEBUG_IO;
368 223         888 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
369 223         1792 @{$self->{last}} = $rec-> [WATCH_CALLBACK]-> (
370             $self,
371             (($#$rec == WATCH_IO_FLAGS) ? $rec-> [WATCH_IO_FLAGS] : ()),
372 223 100       1095 @{$self->{last}}
  223 50       958  
373             ) if $rec-> [WATCH_CALLBACK];
374              
375 223 50       912 _d_out if $DEBUG_IO;
376 223 50       557 warn $self-> _msg('io') if $DEBUG_IO;
377              
378 223 100       1275 unless ( @$in) {
379 170 50       423 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
380 170         2656 $self-> {stopped}++;
381             }
382             }
383              
384             # handle incoming synchronous events
385             sub lambda_handler
386             {
387 458     458 0 821 my ( $self, $rec) = @_;
388              
389 458 50       1079 warn _d( $self, '< ', _ev($rec)) if $DEBUG_LAMBDA;
390              
391 458         885 my $in = $self-> {in};
392 458         708 my $nn = @$in;
393 458         848 @$in = grep { $rec != $_ } @$in;
  515         2083  
394 458 50 33     2889 die _d($self, 'stray ', _ev($rec))
395             if $nn == @$in or $self != $rec->[WATCH_OBJ];
396              
397 458         868 my $lambda = $rec-> [WATCH_LAMBDA];
398             die _d($self,
399             'handler called but ', _obj($lambda),
400 458 50       1227 ' is not finished yet') unless $lambda-> {stopped};
401              
402 458         1444 my $arr = $EVENTS{"$lambda"};
403 458         1003 @$arr = grep { $_ != $rec } @$arr;
  461         1591  
404 458 100       2120 delete $EVENTS{"$lambda"} unless @$arr;
405              
406 458 50       1130 _d_in if $DEBUG_LAMBDA;
407            
408 458         1325 local $self-> {cancel} = $rec-> [WATCH_CANCEL];
409 458         9446 @{$self->{last}} =
410             $rec-> [WATCH_CALLBACK] ?
411             $rec-> [WATCH_CALLBACK]-> (
412             $self,
413 432         1969 @{$rec-> [WATCH_LAMBDA]-> {last}}
414             ) :
415 458 100       1165 @{$rec-> [WATCH_LAMBDA]-> {last}};
  26         62  
416              
417 458 50       1385 _d_out if $DEBUG_LAMBDA;
418 458 50       1005 warn $self-> _msg('tail') if $DEBUG_LAMBDA;
419              
420 458 100       1701 unless ( @$in) {
421 255 50       734 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
422 255         960 $self-> {stopped} = 1;
423             }
424             }
425              
426             # Removes one event from queue
427             sub cancel_event
428             {
429 9     9 0 33 my ( $self, $rec) = @_;
430              
431 9 100       26 return unless @{$self-> {in}};
  9         84  
432              
433 8 50       39 @{$self->{last}} = $rec-> [WATCH_CANCEL]->($self, @{$self->{last}})
  0         0  
  0         0  
434             if $rec-> [WATCH_CANCEL];
435              
436 8 50       67 $LOOP-> remove_event($rec) if $LOOP;
437 8         30 @{$self-> {in}} = grep { $_ != $rec } @{$self-> {in}};
  8         47  
  11         42  
  8         27  
438              
439 8 100 66     101 if ($rec->[WATCH_LAMBDA] and ref($rec->[WATCH_LAMBDA])) {
440 5         19 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
441 5 100       25 if ( $arr) {
442 4         10 @$arr = grep { $_ != $rec } @$arr;
  4         17  
443 4 50       30 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
444             }
445             }
446 8         208 @$rec = ();
447              
448 8 100       23 return if @{$self->{in}};
  8         60  
449             # that was the last event
450              
451 4 50       18 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
452 4         17 $self-> {stopped} = 1;
453 4         24 $_-> remove( $self) for @LOOPS;
454             }
455              
456             # Removes all events bound to the object, notifies the interested objects.
457             # The object becomes stopped immediately, so no new events will be allowed to register.
458             sub cancel_all_events
459             {
460 721     721 0 1122 my $self = shift;
461              
462 721         1335 $self-> {stopped} = 1;
463              
464 721 100       1946 return unless @{$self-> {in}};
  721         10868  
465            
466 46         126 for ( grep { $_-> [WATCH_CANCEL] } reverse @{$self-> {in}}) {
  53         243  
  46         215  
467 14         33 my $wc = $_-> [WATCH_CANCEL];
468 14         31 $_-> [WATCH_CANCEL] = undef;
469 14         28 @{$self->{last}} = $wc-> ($self, $_, @{$self->{last}})
  14         193  
  14         68  
470             }
471              
472 46 50       573 $LOOP-> remove( $self) if $LOOP;
473 46         186 $_-> remove($self) for @LOOPS;
474              
475 46         127 for my $rec ( @{$self->{in}}) {
  46         208  
476 50 100       241 if ( ref($rec->[WATCH_LAMBDA])) {
477 30         131 my $arr = $EVENTS{$rec->[WATCH_LAMBDA]};
478 30 50       121 if ( $arr) {
479 30         83 @$arr = grep { $_ != $rec } @$arr;
  30         173  
480 30 50       250 delete $EVENTS{$rec->[WATCH_LAMBDA]} unless @$arr;
481             }
482             }
483 50         1327 @$rec = ();
484             }
485              
486 46         121 @{$self-> {in}} = ();
  46         1236  
487             }
488              
489             sub autorestart
490             {
491             $#_ ?
492             $_[0]-> {autorestart} = $_[1] :
493             ( exists($_[0]-> {autorestart}) ?
494 116 50   116 1 1118 $_[0]-> {autorestart} : 1)
    50          
495             }
496 897     897 1 4140 sub is_stopped { $_[0]-> {stopped} }
497 5 100   5 1 37 sub is_waiting { not($_[0]->{stopped}) and @{$_[0]->{in}} }
  3         19  
498 656   100 656 1 2533 sub is_passive { not($_[0]->{stopped}) and not(@{$_[0]->{in}}) }
499 1305 100   1305 1 4519 sub is_active { $_[0]->{stopped} or @{$_[0]->{in}} }
  1303         6486  
500              
501             # reset the state machine
502             sub reset
503             {
504 157     157 1 4332 my $self = shift;
505              
506 157         437 $self-> cancel_all_events;
507 157         276 @{$self-> {last}} = ();
  157         522  
508 157         328 delete $self-> {stopped};
509 157 50       562 warn _d( $self, 'reset') if $DEBUG_LAMBDA;
510             }
511              
512             # start the state machine
513             sub start
514             {
515 602     602 1 1074 my $self = shift;
516              
517 602 50       1316 croak "can't start active lambda, call reset() first" if $self-> is_active;
518              
519 602 50       1500 warn _d( $self, 'started') if $DEBUG_LAMBDA;
520 586         2067 @{$self->{last}} = $self-> {start}-> ($self, @{$self->{last}})
  599         2044  
521 602 100       1828 if $self-> {start};
522 589 50       1647 warn $self-> _msg('initial') if $DEBUG_LAMBDA;
523              
524 589 100       848 unless ( @{$self->{in}}) {
  589         2043  
525 137 50       327 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
526 137         331 $self-> {stopped} = 1;
527             }
528             }
529              
530             # peek into the current state
531 154 100   154 1 1023 sub peek { wantarray ? @{$_[0]->{last}} : $_[0]-> {last}-> [0] }
  35         670  
532              
533             # pass initial parameters to lambda
534             sub call
535             {
536 536     536 1 878 my $self = shift;
537              
538 536 50       1231 croak "can't call active lambda" if $self-> is_active;
539              
540 536         988 @{$self-> {last}} = @_;
  536         1347  
541 536         1013 $self;
542             }
543              
544             # abandon all states and stop with constant message
545             sub terminate
546             {
547 82     82 1 207 my ( $self, @error) = @_;
548              
549 82         287 $self-> {last} = \@error;
550 82         286 $self-> cancel_all_events;
551 82 50       339 warn $self-> _msg('terminate') if $DEBUG_LAMBDA;
552             }
553              
554             # propagate event destruction on all levels
555             sub destroy
556             {
557 0     0 1 0 shift-> cancel_all_events( cascade => 1);
558             }
559              
560             # synchronisation
561              
562             # drives objects dependant on the other objects until all of them
563             # are stopped
564             sub drive
565             {
566 485170     485170 0 552562 my $changed = 1;
567 485170         522603 my $executed = 0;
568 485170 50       893654 warn "IO::Lambda::drive --------\n" if $DEBUG_LAMBDA;
569 485170         953267 while ( $changed) {
570 485502         533380 $changed = 0;
571              
572             # dispatch
573 485502         841618 for my $rec ( map { @$_ } values %EVENTS) {
  1281412         2163332  
574 1281422 100       2887397 next unless $rec->[WATCH_LAMBDA]-> {stopped};
575 458         1628 $rec->[WATCH_OBJ]-> lambda_handler( $rec);
576 458         674 $changed = 1;
577 458         957 $executed++;
578             }
579 485502 50 33     1692385 warn "IO::Lambda::drive .........\n" if $DEBUG_LAMBDA and $changed;
580             }
581 485170 50       965137 warn "IO::Lambda::drive +++++++++\n" if $DEBUG_LAMBDA;
582              
583 485170         1043866 return $executed;
584             }
585              
586             # do one quant
587             sub yield
588             {
589 485170     485170 1 598733 my $nonblocking = shift;
590 485170         555613 my $more_events = 0;
591              
592             # custom loops must not wait
593 485170         798451 for ( @LOOPS) {
594 484908 100       1126326 next if $_-> empty;
595 484798         1202109 $_-> yield;
596 484798         851070 $more_events = 1;
597             }
598              
599 485170 100       771576 if ( drive) {
600             # some callbacks we called, don't let them wait in sleep
601 204         833 return 1;
602             }
603              
604             # main loop waits, if anything
605 484966 100       1200651 unless ( $LOOP-> empty) {
606 218         827 $LOOP-> yield( $nonblocking);
607 218         465 $more_events = 1;
608             }
609              
610 484966 100       1065039 $more_events = 1 if keys %EVENTS;
611 484966         1410935 return $more_events;
612             }
613              
614              
615             # wait for one lambda to stop
616             sub wait
617             {
618 157     157 1 2295 my $self = shift;
619 157 100       463 if ( $self-> is_passive) {
620 140         437 $self-> call(@_);
621 140         383 $self-> start;
622             }
623 149         751 yield while not $self-> {stopped};
624 149         487 return $self-> peek;
625             }
626              
627             # wait for all lambdas to stop
628             sub wait_for_all
629             {
630 0     0 1 0 my @objects = @_;
631 0 0       0 return unless @objects;
632 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
633 0         0 my @ret;
634 0         0 while ( 1) {
635 0         0 push @ret, map { $_-> peek } grep { $_-> {stopped} } @objects;
  0         0  
  0         0  
636 0         0 @objects = grep { not $_-> {stopped} } @objects;
  0         0  
637 0 0       0 last unless @objects;
638 0         0 yield;
639             }
640 0         0 return @ret;
641             }
642              
643             # wait for at least one lambda to stop, return those that stopped
644             sub wait_for_any
645             {
646 0     0 1 0 my @objects = @_;
647 0 0       0 return unless @objects;
648 0         0 $_-> start for grep { $_-> is_passive } @objects;
  0         0  
649 0         0 while ( 1) {
650 0         0 my @n = grep { $_-> {stopped} } @objects;
  0         0  
651 0 0       0 return @n if @n;
652 0         0 yield;
653             }
654             }
655              
656             # run the event loop until no lambdas are left in the blocking state
657 0     0 1 0 sub run { do {} while yield }
658              
659             #
660             # Part II - Procedural interface to the lambda-style programming
661             #
662             #################################################################
663              
664 0     0   0 sub _lambda_restart { die "lambda() is not restartable" }
665             sub lambda(&)
666             {
667 457     457 1 16646 my $cb = _subname(lambda => $_[0]);
668             my $l = __PACKAGE__-> new( sub {
669             # initial lambda code is usually executed by tail/tails inside another lambda,
670             # so protect the upper-level context
671 597     597   2713 local *__ANON__ = "IO::Lambda::lambda::callback";
672 597         1267 local $THIS = shift;
673 597         1318 local @CONTEXT = ();
674 597         1237 local $CALLBACK = $cb;
675 597         1370 local $METHOD = \&_lambda_restart;
676 597 50       2596 $cb ? $cb-> (@_) : @_;
677 457         5112 });
678 457 50       1461 if ( $DEBUG_CALLER) {
679 0 0       0 if ( $DEBUG_CALLER > 1) {
680 0         0 $l-> {caller} = Carp::longmess;
681 0         0 chomp $l-> {caller};
682 0         0 $l-> {caller} =~ s/^ at //;
683             } else {
684 0         0 $l-> {caller} = join(':', (caller)[1,2]);
685             }
686             }
687 457         2608 $l;
688             }
689              
690             sub _subname
691             {
692 1005 0 0 1005   2763 subname(
      33        
      0        
693             caller(1 + ($_[2] || 0)) . '::_'. $_[0],
694             $_[1]
695             ) if $DEBUG_CALLER and $_[1] and not $AGAIN;
696 1005         3633 return $_[1];
697             }
698              
699             *io = \λ
700              
701             # re-enter the latest (or other) frame
702             sub again
703             {
704 126 100   126 1 1408 ( $METHOD, $CALLBACK) = @_ if 2 == @_;
705 126         244 local $AGAIN = 1;
706 126 50       574 defined($METHOD) ?
707             $METHOD-> ($CALLBACK) :
708             croak "again() outside of a restartable call"
709             }
710              
711             # define context
712 403 100   403 1 4153 sub this { @_ ? ($THIS, @CONTEXT) = @_ : $THIS }
713 1027 100   1027 1 10913 sub context { @_ ? (@CONTEXT) = @_ : @CONTEXT }
714 1 50   1 1 9 sub restartable { @_ ? ($METHOD, $CALLBACK) = @_ : ( $METHOD, $CALLBACK) }
715 41     41 0 284 sub set_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = @_ }
716 16     16 0 89 sub get_frame { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) }
717 0     0 0 0 sub swap_frame { my @f = get_frame; set_frame(@_); @f }
  0         0  
  0         0  
718 1     1 0 11 sub clear { set_frame(); undef $AGAIN; }
  1         4  
719              
720 29     29   5832 END { ( $THIS, $METHOD, $CALLBACK, @CONTEXT) = (); }
721              
722             sub state($)
723             {
724 41 100 66 41 1 285 my $this = ($_[0] && ref($_[0])) ? shift(@_) : this;
725 41 100       387 @_ ? $this-> {state} = $_[0] : return $this-> {state};
726             }
727              
728             # exceptions and backtracing
729             sub catch(&$)
730             {
731 31     31 1 64 my ( $cb, $event) = @_;
732 31         246 my $who = (caller(1))[3];
733 31         98 my @ctx = @CONTEXT;
734 31 50       100 croak "catch callback already defined" if $event-> [WATCH_CANCEL];
735             $event->[WATCH_CANCEL] = $cb ? sub {
736 15 50   15   48 local *__ANON__ = "$who\:\:catch" if $DEBUG_CALLER;
737 15         29 $THIS = shift;
738 15         46 local $THIS-> {cancelled_event} = shift;
739 15         61 local $THIS-> {cancelling} = 1;
740 15         42 @CONTEXT = @ctx;
741 15         24 $METHOD = undef;
742 15         24 $CALLBACK = undef;
743 15         68 $cb-> (@_);
744 31 50       169 } : undef;
745              
746             # if throw() happened before we even get here
747 31 100 66     222 $event->[WATCH_CALLBACK] = $event->[WATCH_CANCEL]
748             if $event->[WATCH_CALLBACK] && $event->[WATCH_CALLBACK] == \&_throw;
749            
750 31         111 return $event;
751             }
752              
753             sub call_again
754             {
755 3     3 1 7 my $self = shift;
756 3 50       14 croak "called outside catch()" unless $self-> {cancelled_event};
757 3         9 my $cb = $self-> {cancelled_event}->[WATCH_CALLBACK];
758 3 50       18 $cb->($self, @_) if $cb;
759             }
760              
761             sub autocatch($)
762             {
763             catch {
764 2     2   9 this-> call_again;
765 2         20 this-> throw(@_);
766 15     15 1 106 } $_[0]
767             }
768              
769 3     3 1 10 sub is_cancelling { $_[0]-> {cancelling} }
770              
771             sub _throw
772             {
773 15     15   22 my $self = shift;
774 15 50       29 warn _d( $self, 'throw') if $DEBUG_LAMBDA;
775 15         30 $self-> throw(@_);
776             }
777              
778             sub throw
779             {
780 40     40 1 83 my ( $self, @error) = @_;
781 40         89 my @c = $self-> callers;
782 40   100     238 $_-> [WATCH_CALLBACK] = $_->[WATCH_CANCEL] || \&_throw for @c;
783 40         96 $self-> terminate(@error);
784 40 100 66     114 $SIGTHROW->($self, @error) if $SIGTHROW and not @c;
785 40         120 return @error;
786             }
787              
788             sub sigthrow
789             {
790 4 100 66 4 1 24 shift if defined($_[0]) and (not(ref $_[0]) or ref($_[0]) ne 'CODE');
      66        
791 4 50       14 $SIGTHROW = $_[0] if @_;
792 4         12 return $SIGTHROW;
793             }
794              
795 0 0   0 1 0 sub callees { @{ $EVENTS{ "$_[0]" } || [] } }
  0         0  
796 60     60 1 117 sub callers { grep { $_[0] == $_-> [WATCH_LAMBDA] } map { @$_ } values %EVENTS }
  120         333  
  102         180  
797              
798             sub backtrace
799             {
800 6     6 1 962 require IO::Lambda::Backtrace;
801 6         778 IO::Lambda::Backtrace-> new($_[0], Carp::shortmess);
802             }
803              
804             #
805             # Conditions:
806             #
807              
808             # common wrapper for declaration of handle-watching user conditions
809             sub add_watch
810             {
811 50     50 0 158 my ($self, $cb, $method, $flags, $handle, $deadline, @ctx) = @_;
812 50         75 my $who;
813 50 50       120 $who = (caller(1))[3] if $DEBUG_CALLER;
814             $self-> watch_io(
815             $flags, $handle, $deadline,
816             sub {
817 49 50   49   138 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
818 49         91 $THIS = shift;
819 49         196 @CONTEXT = @ctx;
820 49         83 $METHOD = $method;
821 49         77 $CALLBACK = $cb;
822 49 50       422 $cb ? $cb-> (@_) : @_;
823             },
824 50 100       403 ($AGAIN ? delete($self-> {cancel}) : undef),
825             )
826             }
827              
828             # rwx($flags,$handle,$deadline)
829             sub rwx(&)
830             {
831             return $THIS-> override_handler('rwx', \&rwx, shift)
832 0 0   0 1 0 if $THIS-> {override}->{rwx};
833              
834 0         0 $THIS-> add_watch(
835             _subname(rwx => shift), \&rwx,
836             @CONTEXT[0,1,2,0,1,2]
837             )
838             }
839              
840             # readable($handle,$deadline)
841             sub readable(&)
842             {
843             return $THIS-> override_handler('readable', \&readable, shift)
844 43 50   43 1 377 if $THIS-> {override}->{readable};
845              
846 43         118 $THIS-> add_watch(
847             _subname(readable => shift), \&readable, IO_READ,
848             @CONTEXT[0,1,0,1]
849             )
850             }
851              
852             # writable($handle,$deadline)
853             sub writable(&)
854             {
855             return $THIS-> override_handler('writable', \&writable, shift)
856 7 50   7 1 64 if $THIS-> {override}->{writable};
857            
858 7         59 $THIS-> add_watch(
859             _subname(writable => shift), \&writable, IO_WRITE,
860             @CONTEXT[0,1,0,1]
861             )
862             }
863              
864             # common wrapper for declaration of time-watching user conditions
865             sub add_timer
866             {
867 41     41 0 126 my ($self, $cb, $method, $deadline, @ctx) = @_;
868 41         68 my $who;
869 41 50       115 $who = (caller(1))[3] if $DEBUG_CALLER;
870             $self-> watch_timer(
871             $deadline,
872             sub {
873 38 50   38   146 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
874 38         80 $THIS = shift;
875 38         149 @CONTEXT = @ctx;
876 38         88 $METHOD = $method;
877 38         84 $CALLBACK = $cb;
878 38 100       391 $cb ? $cb-> (@_) : @_;
879             },
880 41 100       503 ($AGAIN ? delete($self-> {cancel}) : undef),
881             )
882             }
883              
884             # timeout($deadline)
885             sub timeout(&)
886             {
887             return $THIS-> override_handler('timeout', \&timeout, shift)
888 41 50   41 1 328 if $THIS-> {override}->{timeout};
889 41         148 $THIS-> add_timer( _subname(timeout => shift), \&timeout, @CONTEXT[0,0])
890             }
891              
892             # common wrapper for declaration of single lambda-watching user conditions
893             sub add_tail
894             {
895 398     398 0 1533 my ($self, $cb, $method, $lambda, @ctx) = @_;
896 398         552 my $who;
897 398 50       1019 $who = (caller(1))[3] if $DEBUG_CALLER;
898             $self-> watch_lambda(
899             $lambda,
900             ($cb ? sub {
901 338 50   338   775 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
902 338         583 $THIS = shift;
903 338         1114 @CONTEXT = @ctx;
904 338         612 $METHOD = $method;
905 338         511 $CALLBACK = $cb;
906 338         3752 $cb-> (@_);
907             } : undef),
908 398 100       4377 ($AGAIN ? delete($self-> {cancel}) : undef),
    100          
909             );
910             }
911              
912             # convert constant @param into a lambda
913             sub add_constant
914             {
915 0     0 0 0 my ( $self, $cb, $method, @param) = @_;
916             $self-> add_tail (
917             _subname(constant => $cb), $method,
918 0     0   0 lambda { @param },
919             @CONTEXT
920 0         0 );
921             }
922              
923             # handle default condition logic given a lambda
924             sub condition
925             {
926 38     38 1 247 my ( $self, $cb, $method, $name) = @_;
927              
928             return $THIS-> override_handler($name, $method, $cb)
929 38 50 66     392 if defined($name) and $THIS-> {override}->{$name};
930            
931 38         112 my @ctx = @CONTEXT;
932              
933 38         65 my $who;
934 38 50       102 if ( $DEBUG_CALLER) {
935 0 0       0 $who = defined($name) ? $name : (caller(1))[3];
936 0         0 _subname($who, $cb, 2);
937             }
938              
939             $THIS-> watch_lambda(
940             $self,
941             $cb ? sub {
942 37 50   37   131 local *__ANON__ = "$who\:\:callback" if $DEBUG_CALLER;
943 37         70 $THIS = shift;
944 37         131 @CONTEXT = @ctx;
945 37         81 $METHOD = $method;
946 37         63 $CALLBACK = $cb;
947 37         272 $cb-> (@_);
948             } : undef
949 38 50       555 );
950             }
951              
952             # dummy sub for empty calls for tails() family
953             sub _empty
954             {
955 2     2   6 my ($name, $method, $cb) = @_;
956 2         7 my @ctx = context;
957             $THIS-> watch_lambda( IO::Lambda-> new, sub {
958 2     2   23 local *__ANON__ = "IO::Lambda::".$name."::callback";
959 2         7 @CONTEXT = @ctx;
960 2         5 $METHOD = $method;
961 2         4 $CALLBACK = $cb;
962 2         12 $cb-> ();
963 2 50       40 }) if $cb;
964             }
965              
966             # tail( $lambda, @param) -- initialize $lambda with @param, and wait for it
967             sub tail(&)
968             {
969             return $THIS-> override_handler('tail', \&tail, shift)
970 439 100   439 1 2224 if $THIS-> {override}->{tail};
971            
972 399         950 my ( $lambda, @param) = context;
973 399 100       1158 return _empty(tail => \&tail, shift) unless $lambda;
974              
975 398 100 66     1698 $lambda-> reset
976             if $lambda-> is_stopped and $lambda-> autorestart;
977 398 100       968 if ( @param) {
978 235         759 $lambda-> call( @param);
979             } else {
980 163 100       680 $lambda-> call unless $lambda-> is_active;
981             }
982 398         1143 $THIS-> add_tail( _subname(tail => shift), \&tail, $lambda, $lambda, @param);
983             }
984              
985              
986             # tails(@lambdas) -- wait for all lambdas to finish
987             sub tails(&)
988             {
989             return $THIS-> override_handler('tails', \&tails, shift)
990 16 50   16 1 118 if $THIS-> {override}->{tails};
991            
992 16         60 my $cb = _subname tails => $_[0];
993 16         70 my @lambdas = context;
994 16         40 my $n = $#lambdas;
995 16 100       83 return _empty(tails => \&tails, $cb) unless @lambdas;
996              
997 15         26 my @ret;
998             my $watcher;
999             $watcher = sub {
1000 25     25   45 $THIS = shift;
1001 25         58 push @ret, @_;
1002 25 100       86 return if $n--;
1003              
1004 8         110 local *__ANON__ = "IO::Lambda::tails::callback";
1005 8         40 @CONTEXT = @lambdas;
1006 8         23 $METHOD = \&tails;
1007 8         17 $CALLBACK = $cb;
1008 8 100       79 $cb ? $cb-> (@ret) : @ret;
1009 15         90 };
1010 15         44 my $this = $THIS;
1011 15         88 $this-> watch_lambda( $_, $watcher) for @lambdas;
1012             }
1013              
1014             # tailo(@lambdas) -- wait for all lambdas to finish, return ordered results
1015             sub tailo(&)
1016             {
1017             return $THIS-> override_handler('tailo', \&tailo, shift)
1018 1 50   1 1 6 if $THIS-> {override}->{tailo};
1019            
1020 1         3 my $cb = _subname tailo => $_[0];
1021 1         2 my @lambdas = context;
1022 1         2 my $n = $#lambdas;
1023 1 50       3 return _empty(tailo => \&tailo, $cb) unless @lambdas;
1024              
1025 1         2 my @ret;
1026             my $watcher;
1027             $watcher = sub {
1028 3     3   6 my $curr = shift;
1029 3         6 $THIS = shift;
1030 3         9 $ret[ $curr ] = \@_;
1031 3 100       13 return if $n--;
1032              
1033 1         54 local *__ANON__ = "IO::Lambda::tailo::callback";
1034 1         7 @CONTEXT = @lambdas;
1035 1         6 $METHOD = \&tailo;
1036 1         4 $CALLBACK = $cb;
1037 1         6 @ret = map { @$_ } @ret;
  3         11  
1038 1 50       11 $cb ? $cb-> (@ret) : @ret;
1039 1         5 };
1040 1         2 my $this = $THIS;
1041 1         5 for ( my $i = 0; $i < @lambdas; $i++) {
1042 3         16 my $d = $i;
1043             $this-> watch_lambda(
1044             $lambdas[$i],
1045 3     3   10 sub { $watcher->($d, @_) }
1046 3         14 );
1047             };
1048             }
1049              
1050             # any_tail($deadline,@lambdas) -- wait for any lambda to finish within time
1051             sub any_tail(&)
1052             {
1053             return $THIS-> override_handler('any_tail', \&any_tail, shift)
1054 6 50   6 1 52 if $THIS-> {override}->{any_tail};
1055            
1056 6         17 my $cb = _subname any_tail => $_[0];
1057 6         33 my ( $deadline, @lambdas) = context;
1058 6         16 my $n = $#lambdas;
1059 6 50       21 return _empty(any_tail => \&any_tail, $cb) unless @lambdas;
1060              
1061 6         20 my ( @ret, @watchers);
1062 0         0 my $timer;
1063            
1064             $timer = $THIS-> watch_timer( $deadline, sub {
1065 3     3   61 local *__ANON__ = "IO::Lambda::any_tail::callback1";
1066 3         14 $THIS = shift;
1067 3         12 @CONTEXT = ($deadline, @lambdas);
1068 3         32 $METHOD = \&any_tail;
1069 3         8 $CALLBACK = $cb;
1070 3 50       45 @ret = $cb-> (@ret) if $cb;
1071 3         41 $THIS-> cancel_event($_) for @watchers;
1072 3         11 return @ret;
1073 6 50       89 }) if defined $deadline;
1074              
1075 6         15 my $watcher;
1076             $watcher = sub {
1077 3     3   5 push @ret, shift;
1078 3 100       11 return if $n--;
1079            
1080 1         5 local *__ANON__ = "IO::Lambda::any_tail::callback2";
1081 1         3 $THIS = shift;
1082 1         2 @CONTEXT = ($deadline, @lambdas);
1083 1         2 $METHOD = \&any_tail;
1084 1         2 $CALLBACK = $cb;
1085 1 50       8 @ret = $cb-> (@ret) if $cb;
1086 1 50       8 $THIS-> cancel_event( $timer) if $timer;
1087 1         8 return @ret;
1088 6         32 };
1089              
1090             @watchers = map {
1091 6         15 my $l = $_;
  10         20  
1092             $THIS-> watch_lambda( $l, sub {
1093 3     3   9 $watcher->($l, @_);
1094             })
1095 10         67 } @lambdas;
1096             }
1097              
1098             #
1099             # Part III - High order lambdas
1100             #
1101             ################################################################
1102              
1103             # fold($l) :: @b -> @c
1104             # $l :: ($a,@b) -> @c
1105             sub fold($)
1106             {
1107 1     1 1 4 my $l = shift;
1108             lambda {
1109 3     3   12 my @q = @_;
1110 3         10 context $l, shift(@q), shift(@q);
1111             tail {
1112 10 100       36 return @_ unless @q;
1113 7         22 context $l, shift(@q), @_;
1114 7         20 again;
1115 3         18 }}
1116 1         10 }
1117              
1118             # mapcar($l) :: (@p) -> @r
1119             sub mapcar($)
1120             {
1121 2     2 1 4 my $lambda = shift;
1122             lambda {
1123 3     3   5 my @ret;
1124 3         17 my @p = @_;
1125 3 50       8 return unless @p;
1126 3         19 context $lambda, shift @p;
1127             tail {
1128 15         29 push @ret, @_;
1129 15 100       44 return @ret unless @p;
1130 12         33 context $lambda, shift @p;
1131 12         29 again;
1132 3         19 }}
1133 2         15 }
1134              
1135             # filter($l) :: (@p) -> @r
1136             sub filter($)
1137             {
1138 1     1 1 3 my $lambda = shift;
1139             lambda {
1140 1     1   3 my @ret;
1141 1 50       7 return unless @_;
1142 1         5 my @p = @_;
1143 1         3 my $p = shift @p;
1144 1         6 context $lambda, $p;
1145             tail {
1146 5 100       21 push @ret, $p if shift;
1147 5 100       21 return @ret unless @p;
1148 4         13 $p = shift @p;
1149 4         14 context $lambda, $p;
1150 4         15 again;
1151 1         8 }}
1152 1         11 }
1153              
1154             # curry(@a -> $l) :: @a -> @b
1155             sub curry(&)
1156             {
1157 3     3 1 7 my $cb = $_[0];
1158             lambda {
1159 12     12   27 context $cb->(@_);
1160 12         63 &tail();
1161             }
1162 3         17 }
1163              
1164             # seq() :: (@l) -> @m
1165 10     10 1 38 sub seq { mapcar curry { shift } }
  1     1   210  
1166              
1167             # par($max = 0) :: (@l) -> @m
1168             sub par
1169             {
1170 1   50 1 1 4 my $max = $_[0] || 0;
1171              
1172             lambda {
1173 1     1   3 my @q = @_;
1174 1         7 my @ret;
1175 1 50 33     8 $max = @q if $max < 1 or $max > @q;
1176             context map {
1177 1         4 lambda {
1178 3 50       12 return unless @q;
1179 3         8 context shift @q;
1180             tail {
1181 9         28 push @ret, @_;
1182 9 100       41 return unless @q;
1183 6         24 context shift @q;
1184 6         25 again;
1185 3         12 }}
1186 3         11 } 1 .. $max;
1187 1         5 tails { @ret }
1188 1         6 }
1189 1         6 }
1190              
1191              
1192             # sysread lambda wrapper
1193             #
1194             # ioresult :: ($result, $error)
1195             # sysreader() :: ($fh, $buf, $length, $deadline) -> ioresult
1196             sub sysreader (){ lambda
1197             {
1198 104     104   197 my ( $fh, $buf, $length, $deadline) = @_;
1199 104 50       275 $$buf = '' unless defined $$buf;
1200              
1201             this-> watch_io( IO_READ, $fh, $deadline, subname _sysreader => sub {
1202 100 50       276 return undef, 'timeout' unless $_[1];
1203 100         1644 local $SIG{PIPE} = 'IGNORE';
1204 100         5126 my $n = sysread( $fh, $$buf, $length, length($$buf));
1205 100 50       313 if ( $DEBUG_IO) {
1206 0 0       0 warn "fh(", fileno($fh), ") read ", ( defined($n) ? "$n bytes" : "error $!"), "\n";
1207 0 0 0     0 warn substr( $$buf, length($$buf) - $n), "\n" if $DEBUG_IO > 1 and ($n || 0) > 0;
      0        
1208             }
1209 100 50       399 return undef, $! unless defined $n;
1210 100         818 return $n;
1211             })
1212 104     46 1 236 }}
  46         823  
1213              
1214             # syswrite() lambda wrapper
1215             #
1216             # syswriter() :: ($fh, $buf, $length, $offset, $deadline) -> ioresult
1217             sub syswriter (){ lambda
1218             {
1219 8     8   20 my ( $fh, $buf, $length, $offset, $deadline) = @_;
1220              
1221             this-> watch_io( IO_WRITE, $fh, $deadline, subname _syswriter => sub {
1222 8 50       22 return undef, 'timeout' unless $_[1];
1223 8         134 local $SIG{PIPE} = 'IGNORE';
1224 8         820 my $n = syswrite( $fh, $$buf, $length, $offset);
1225 8 50       49 if ( $DEBUG_IO) {
1226 0 0       0 warn "fh(", fileno($fh), ") wrote ", ( defined($n) ? "$n bytes out of $length" : "error $!"), "\n";
1227 0 0 0     0 warn substr( $$buf, $offset, $n), "\n" if $DEBUG_IO > 1 and ($n || 0) > 0;
      0        
1228             }
1229 8 50       24 return undef, $! unless defined $n;
1230 8         94 return $n;
1231 8         18 });
1232 8     8 1 66 }}
1233              
1234             sub _match
1235             {
1236 129     129   278 my ( $cond, $buf) = @_;
1237              
1238 129 100       461 return unless defined $cond;
1239              
1240 33 100       1450 return ($$buf =~ /($cond)/)[0] if ref($cond) eq 'Regexp';
1241 4 50       16 return $cond->($buf) if ref($cond) eq 'CODE';
1242 4         18 return length($$buf) >= $cond;
1243             }
1244              
1245             # read from stream until condition is met
1246             #
1247             # readbuf($reader) :: ($fh, $$buf, $cond, $deadline) -> ioresult
1248             sub readbuf
1249             {
1250 49   66 49 1 437 my $reader = shift || sysreader;
1251              
1252             lambda {
1253 65     65   153 my ( $fh, $buf, $cond, $deadline) = @_;
1254            
1255 65 100       192 $$buf = "" unless defined $$buf;
1256              
1257 65         213 my $match = _match( $cond, $buf);
1258 65 100       270 return $match if $match;
1259            
1260 52         90 my ($maxbytes, $bufsize);
1261 52 50 66     337 $maxbytes = $cond - length($$buf)
      33        
1262             if defined($cond) and not ref($cond) and $cond > length($$buf);
1263 52 50       126 $bufsize = defined($maxbytes) ? $maxbytes : 65536;
1264            
1265 52         100 my $savepos = pos($$buf); # useful when $cond is a regexp
1266              
1267 52         128 context $reader, $fh, $buf, $bufsize, $deadline;
1268             tail {
1269 102         326 pos($$buf) = $savepos;
1270              
1271 102         227 my $bytes = shift;
1272 102 100       313 return undef, shift unless defined $bytes;
1273            
1274 101 100       272 unless ( $bytes) {
1275 37 50       155 return 1 unless defined $cond;
1276 0         0 return undef, 'eof';
1277             }
1278            
1279             # got line? return it
1280 64         239 my $match = _match( $cond, $buf);
1281 64 100       225 return $match if $match;
1282            
1283             # otherwise, just wait for more data
1284 54 50       138 $bufsize -= $bytes if defined $maxbytes;
1285              
1286 54         192 context $reader, $fh, $buf, $bufsize, $deadline;
1287 54         168 again;
1288 52         648 }}
1289 49         547 }
1290              
1291             # curry readbuf()
1292             #
1293             # getline($reader) :: ($fh, $$buf, $deadline) -> ioresult
1294             sub getline
1295             {
1296 1     1 1 41 my $reader = shift;
1297             lambda {
1298 1     1   6 my ( $fh, $buf, $deadline) = @_;
1299 1 50       4 croak "getline() needs a buffer! ( f.ex getline,\$fh,\\(my \$buf='') )"
1300             unless ref($buf);
1301 1         4 context readbuf($reader), $fh, $buf, qr/^[^\n]*\n/, $deadline;
1302             tail {
1303 1 50       5 substr( $$buf, 0, length($_[0]), '') unless defined $_[1];
1304 1         2 @_;
1305 1         11 }}
1306 1         6 }
1307              
1308             # write whole buffer to stream
1309             #
1310             # writebuf($writer) :: syswriter
1311             sub writebuf
1312             {
1313 10   66 10 1 77 my $writer = shift || syswriter;
1314              
1315             lambda {
1316 10     10   26 my ( $fh, $buf, $len, $offs, $deadline) = @_;
1317              
1318 10         20 my ( $written, $recheck_length, $olen) = (0);
1319 10 50       35 $$buf = "" unless defined $$buf;
1320 10 100       31 $offs = 0 unless defined $offs;
1321 10 50       26 unless ( defined $len) {
1322 10         19 $olen = $len = length $$buf;
1323 10         16 $recheck_length++;
1324             }
1325            
1326 10         28 context $writer, $fh, $buf, $len, $offs, $deadline;
1327             tail {
1328 21         29 my $bytes = shift;
1329 21 50       76 return undef, shift unless defined $bytes;
1330              
1331 21         33 $offs += $bytes;
1332 21         26 $written += $bytes;
1333 21         42 $len -= $bytes;
1334 21 50       60 if ( $recheck_length) {
1335 21         36 my $l = length $$buf;
1336 21 100       54 if ( $l > $olen) {
1337 1         3 $len += $l - $olen;
1338 1         2 $olen = $l;
1339             }
1340             }
1341 21 100       70 return $written if $len <= 0;
1342              
1343 11         22 context $writer, $fh, $buf, $len, $offs, $deadline;
1344 11         22 again;
1345 10         68 }}
1346 10         101 }
1347              
1348             #
1349             # Part IV - Developer API for custom condvars and event loops
1350             #
1351             ################################################################
1352              
1353             # register condvar listener
1354             sub bind
1355             {
1356 52     52 1 108 my $self = shift;
1357              
1358             # create new condition
1359 52 50       216 croak "can't register events on a stopped lambda" if $self-> {stopped};
1360              
1361 52         178 my $rec = [ $self, @_ ];
1362 52         101 push @{$self-> {in}}, $rec;
  52         158  
1363              
1364 52         166 return $rec;
1365             }
1366              
1367             # stop listening on a condvar
1368             sub resolve
1369             {
1370 41     41 1 92 my ( $self, $rec) = @_;
1371              
1372 41         141 my $in = $self-> {in};
1373 41         94 my $nn = @$in;
1374 41         88 @$in = grep { $rec != $_ } @$in;
  41         158  
1375 41 50 33     519 die _d($self, "stray condvar event $rec (@$rec)")
1376             if $nn == @$in or $self != $rec->[WATCH_OBJ];
1377              
1378 41         114 undef $rec-> [WATCH_OBJ]; # unneeded references
1379              
1380 41 50       204 unless ( @$in) {
1381 41 50       117 warn _d( $self, 'stopped') if $DEBUG_LAMBDA;
1382 41         157 $self-> {stopped} = 1;
1383             }
1384             }
1385              
1386             sub callout
1387             {
1388 6     6 0 19 my ( $self, $cb, @param) = @_;
1389 6 100       30 @{$self->{last}} = $cb ? $cb-> (@param) : @param;
  6         97  
1390             }
1391              
1392 14     14 0 63 sub add_loop { push @LOOPS, shift }
1393 14 50   14 0 144 sub remove_loop { @LOOPS = grep { defined and $_ != $_[0] } @LOOPS }
  14         1118  
1394              
1395 0     0   0 sub __end { clear(); undef %EVENTS; undef @LOOPS; } # for threads
  0         0  
  0         0  
1396              
1397             package IO::Lambda::Loop;
1398 29     29   270 use vars qw($DEFAULT);
  29         53  
  29         2455  
1399 29     29   158 use strict;
  29         55  
  29         4134  
1400 29     29   152 use warnings;
  29         42  
  29         17482  
1401              
1402             $DEFAULT = 'Select' unless defined $DEFAULT;
1403 27     27   97 sub default { $DEFAULT = shift }
1404              
1405             sub new
1406             {
1407 27 50   27   96 return $IO::Lambda::LOOP if $IO::Lambda::LOOP;
1408              
1409 27         108 my ( $class, %opt) = @_;
1410              
1411 27   33     231 $opt{type} ||= $DEFAULT;
1412 27         108 $class .= "::$opt{type}";
1413 27     27   1986 eval "use $class;";
  27         19314  
  27         68  
  27         558  
1414 27 50       121 die $@ if $@;
1415              
1416 27         181 return $IO::Lambda::LOOP = $class-> new();
1417             }
1418              
1419             1;
1420              
1421             __DATA__