File Coverage

blib/lib/Ryu/Source.pm
Criterion Covered Total %
statement 550 849 64.7
branch 175 310 56.4
condition 83 155 53.5
subroutine 129 215 60.0
pod 70 76 92.1
total 1007 1605 62.7


line stmt bran cond sub pod time code
1             package Ryu::Source;
2              
3 37     37   207772 use strict;
  37         92  
  37         1059  
4 37     37   174 use warnings;
  37         75  
  37         1121  
5              
6 37     37   15487 use parent qw(Ryu::Node);
  37         10355  
  37         206  
7              
8             our $VERSION = '3.002'; # VERSION
9             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
10              
11             =head1 NAME
12              
13             Ryu::Source - base representation for a source of events
14              
15             =head1 SYNOPSIS
16              
17             my $src = Ryu::Source->new;
18             my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
19             $src->emit($_) for 1..5;
20             $src->finish;
21              
22             =head1 DESCRIPTION
23              
24             This is probably the module you'd want to start with, if you were going to be
25             using any of this. There's a disclaimer in L that may be relevant at this
26             point.
27              
28             =head2 Quick start
29              
30             You'd normally want to start by creating a L instance:
31              
32             my $src = Ryu::Source->new;
33              
34             If you're dealing with L code, use L to ensure that you
35             get properly awaitable L instances:
36              
37             $loop->add(my $ryu = Ryu::Async->new);
38             my $src = $ryu->source;
39              
40             Once you have a source, you'll need two things:
41              
42             =over 4
43              
44             =item * items to put into one end
45              
46             =item * processing to attach to the other end
47              
48             =back
49              
50             For the first, call L:
51              
52             use Future::AsyncAwait;
53             # 1s drifting periodic timer
54             while(1) {
55             await $loop->delay_future(after => 1);
56             $src->emit('');
57             }
58              
59             For the second, this would be L:
60              
61             $src->each(sub { print "Had timer tick\n" });
62              
63             So far, not so useful - the power of this type of reactive programming is in the
64             ability to chain and combine disparate event sources.
65              
66             At this point, L is worth a visit - this provides a clear
67             visual demonstration of how to combine multiple event streams using the chaining
68             methods. Most of the API here is modelled after similar principles.
69              
70             First, the L method: this provides a way to transform each item into
71             something else:
72              
73             $src->map(do { my $count = 0; sub { ++$count } })
74             ->each(sub { print "Count is now $_\n" })
75              
76             Next, L provides an equivalent to Perl's L functionality:
77              
78             $src->map(do { my $count = 0; sub { ++$count } })
79             ->filter(sub { $_ % 2 })
80             ->each(sub { print "Count is now at an odd number: $_\n" })
81              
82             You can stack these:
83              
84             $src->map(do { my $count = 0; sub { ++$count } })
85             ->filter(sub { $_ % 2 })
86             ->filter(sub { $_ % 5 })
87             ->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })
88              
89             or:
90              
91             $src->map(do { my $count = 0; sub { ++$count } })
92             ->map(sub { $_ % 3 ? 'fizz' : $_ })
93             ->map(sub { $_ % 5 ? 'buzz' : $_ })
94             ->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })
95              
96             =cut
97              
98 37     37   21703 no indirect;
  37         41194  
  37         183  
99 37     37   21946 use sort qw(stable);
  37         21971  
  37         225  
100              
101 37     37   1873 use Scalar::Util ();
  37         89  
  37         647  
102 37     37   18750 use Ref::Util ();
  37         61566  
  37         1256  
103 37     37   274 use List::Util ();
  37         98  
  37         927  
104 37     37   19361 use List::UtilsBy;
  37         72557  
  37         2082  
105 37     37   22135 use Encode ();
  37         556402  
  37         1561  
106 37     37   20709 use Syntax::Keyword::Try;
  37         82381  
  37         242  
107 37     37   3024 use Future;
  37         94  
  37         1125  
108 37     37   16332 use Future::Queue;
  37         15982  
  37         1387  
109 37     37   16528 use curry::weak;
  37         36093  
  37         1333  
110              
111 37     37   17397 use Ryu::Buffer;
  37         94  
  37         1602  
112              
113 37     37   19358 use Log::Any qw($log);
  37         318207  
  37         223  
114              
115             =head1 GLOBALS
116              
117             =head2 $FUTURE_FACTORY
118              
119             This is a coderef which should return a new L-compatible instance.
120              
121             Example overrides might include:
122              
123             $Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };
124              
125             =cut
126              
127             our $FUTURE_FACTORY = sub {
128             Future->new->set_label($_[1])
129             };
130              
131             =head2 %ENCODER
132              
133             An encoder is a coderef which takes input and returns output.
134              
135             =cut
136              
137             our %ENCODER = (
138             utf8 => sub {
139             sub {
140             Encode::encode_utf8($_)
141             }
142             },
143             json => sub {
144             require JSON::MaybeXS;
145             my $json = JSON::MaybeXS->new(@_);
146             sub {
147             $json->encode($_)
148             }
149             },
150             csv => sub {
151             require Text::CSV;
152             my $csv = Text::CSV->new(@_);
153             sub {
154             die $csv->error_input unless $csv->combine(@$_);
155             $csv->string
156             }
157             },
158             base64 => sub {
159             require MIME::Base64;
160             sub {
161             MIME::Base64::encode_base64($_, '');
162             }
163             },
164             );
165             # The naming of this one is a perennial source of confusion in Perl,
166             # let's just support both
167             $ENCODER{'UTF-8'} = $ENCODER{utf8};
168              
169             our %DECODER = (
170             utf8 => sub {
171             my $data = '';
172             sub {
173             $data .= $_;
174             Encode::decode_utf8($data, Encode::FB_QUIET)
175             }
176             },
177             json => sub {
178             require JSON::MaybeXS;
179             my $json = JSON::MaybeXS->new(@_);
180             sub {
181             $json->decode($_)
182             }
183             },
184             csv => sub {
185             require Text::CSV;
186             my $csv = Text::CSV->new(@_);
187             sub {
188             die $csv->error_input unless $csv->parse($_);
189             [ $csv->fields ]
190             }
191             },
192             base64 => sub {
193             require MIME::Base64;
194             sub {
195             MIME::Base64::decode_base64($_);
196             }
197             },
198             );
199             $DECODER{'UTF-8'} = $DECODER{utf8};
200              
201             =head1 METHODS
202              
203             =head2 new
204              
205             Takes named parameters, such as:
206              
207             =over 4
208              
209             =item * label - the label used in descriptions
210              
211             =back
212              
213             Note that this is rarely called directly, see L, L and L instead.
214              
215             =cut
216              
217             sub new {
218 132     132 1 156750 my ($self, %args) = @_;
219 132   100     629 $args{label} //= 'unknown';
220 132         754 $self->SUPER::new(%args);
221             }
222              
223             =head2 from
224              
225             Creates a new source from things.
226              
227             The precise details of what this method supports may be somewhat ill-defined at this point in time.
228             It is expected that the interface and internals of this method will vary greatly in versions to come.
229              
230             At the moment, the following inputs are supported:
231              
232             =over 4
233              
234             =item * arrayref - when called as C<< ->from([1,2,3]) >> this will emit the values from the arrayref,
235             deferring until the source is started
236              
237             =item * L - given a L instance, will emit the results when that L is marked as done
238              
239             =item * file handle - if provided a filehandle, such as C<< ->from(\*STDIN) >>, this will read bytes and
240             emit those until EOF
241              
242             =back
243              
244             =cut
245              
246             sub from {
247 0     0 1 0 my $class = shift;
248 0 0       0 my $src = (ref $class) ? $class : $class->new;
249 0 0       0 if(my $from_class = Scalar::Util::blessed($_[0])) {
    0          
250 0 0       0 if($from_class->isa('Future')) {
251             $_[0]->on_ready(sub {
252 0     0   0 my ($f) = @_;
253 0 0       0 if($f->failure) {
    0          
254 0         0 $src->fail($f->from_future);
255             } elsif(!$f->is_cancelled) {
256 0         0 $src->finish;
257             } else {
258 0         0 $src->emit($f->get);
259 0         0 $src->finish;
260             }
261 0         0 })->retain;
262 0         0 return $src;
263             } else {
264 0         0 die 'Unknown class ' . $from_class . ', cannot turn it into a source';
265             }
266             } elsif(my $ref = ref($_[0])) {
267 0 0       0 if($ref eq 'ARRAY') {
    0          
268             $src->{on_get} = sub {
269 0     0   0 $src->emit($_) for @{$_[0]};
  0         0  
270 0         0 $src->finish;
271 0         0 };
272 0         0 return $src;
273             } elsif($ref eq 'GLOB') {
274 0 0       0 if(my $fh = *{$_[0]}{IO}) {
  0         0  
275             my $code = sub {
276 0     0   0 while(read $fh, my $buf, 4096) {
277 0         0 $src->emit($buf)
278             }
279             $src->finish
280 0         0 };
  0         0  
281 0         0 $src->{on_get} = $code;
282 0         0 return $src;
283             } else {
284 0         0 die "have a GLOB with no IO entry, this is not supported"
285             }
286             }
287 0         0 die "unsupported ref type $ref";
288             } else {
289 0         0 die "unknown item in ->from";
290             }
291             }
292              
293             =head2 empty
294              
295             Creates an empty source, which finishes immediately.
296              
297             =cut
298              
299             sub empty {
300 0     0 1 0 my ($class) = @_;
301              
302 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)->finish
303             }
304              
305             =head2 never
306              
307             An empty source that never finishes.
308              
309             =cut
310              
311             sub never {
312 0     0 1 0 my ($class) = @_;
313              
314 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)
315             }
316              
317             =head1 METHODS - Instance
318              
319             =cut
320              
321             =head2 encode
322              
323             Passes each item through an encoder.
324              
325             The first parameter is the encoder to use, the remainder are
326             used as options for the selected encoder.
327              
328             Examples:
329              
330             $src->encode('json')
331             $src->encode('utf8')
332             $src->encode('base64')
333              
334             =cut
335              
336             sub encode {
337 2     2 1 13 my ($self, $type) = splice @_, 0, 2;
338 2         23 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
339 2   50     12 my $code = ($ENCODER{$type} || $self->can('encode_' . $type) or die "unsupported encoding $type")->(@_);
340             $self->each_while_source(sub {
341 2     2   5 $src->emit($code->($_))
342 2         8 }, $src);
343             }
344              
345             =head2 decode
346              
347             Passes each item through a decoder.
348              
349             The first parameter is the decoder to use, the remainder are
350             used as options for the selected decoder.
351              
352             Examples:
353              
354             $src->decode('json')
355             $src->decode('utf8')
356             $src->decode('base64')
357              
358             =cut
359              
360             sub decode {
361 0     0 1 0 my ($self, $type) = splice @_, 0, 2;
362 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
363 0   0     0 my $code = ($DECODER{$type} || $self->can('decode_' . $type) or die "unsupported encoding $type")->(@_);
364             $self->each_while_source(sub {
365 0     0   0 $src->emit($code->($_))
366 0         0 }, $src);
367             }
368              
369             =head2 print
370              
371             Shortcut for C<< ->each(sub { print }) >>, except this will
372             also save the initial state of C< $\ > and use that for each
373             call for consistency.
374              
375             =cut
376              
377             sub print {
378 0     0 1 0 my ($self) = @_;
379 0         0 my $delim = $\;
380 0     0   0 $self->each(sub { local $\ = $delim; print });
  0         0  
  0         0  
381             }
382              
383             =head2 say
384              
385             Shortcut for C<< ->each(sub { print "$_\n" }) >>.
386              
387             =cut
388              
389             sub say {
390 0     0 1 0 my ($self) = @_;
391 0     0   0 $self->each(sub { local $\; print "$_\n" });
  0         0  
  0         0  
392             }
393              
394             =head2 hexdump
395              
396             Convert input bytes to a hexdump representation, for example:
397              
398             00000000 00 00 12 04 00 00 00 00 00 00 03 00 00 00 80 00 >................<
399             00000010 04 00 01 00 00 00 05 00 ff ff ff 00 00 04 08 00 >................<
400             00000020 00 00 00 00 7f ff 00 00 >........<
401              
402             One line is emitted for each 16 bytes.
403              
404             Takes the following named parameters:
405              
406             =over 4
407              
408             =item * C - accumulates data for a continuous stream, and
409             does not reset the offset counter. Note that this may cause the last
410             output to be delayed until the source completes.
411              
412             =back
413              
414             =cut
415              
416             sub hexdump {
417 0     0 1 0 my ($self, %args) = @_;
418              
419 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
420 0         0 my $offset = 0;
421 0         0 my $in = '';
422             $self->each_while_source(sub {
423 0     0   0 my @out;
424 0 0       0 if($args{continuous}) {
425 0         0 $in .= $_;
426 0 0       0 return if length($in) < 16;
427             } else {
428 0         0 $in = $_;
429 0         0 $offset = 0;
430             }
431 0         0 while(length(my $bytes = substr $in, 0, 16, '')) {
432 0         0 my $encoded = join '', unpack 'H*' => $bytes;
433 0         0 $encoded =~ s/[[:xdigit:]]{2}\K(?=[[:xdigit:]])/ /g;
434 0         0 my $ascii = $bytes =~ s{[^[:print:]]}{.}gr;
435 0         0 $src->emit(sprintf '%08x %-47.47s %-18.18s', $offset, $encoded, ">$ascii<");
436 0         0 $offset += length($bytes);
437 0 0 0     0 return if $args{continuous} and length($in) < 16;
438             }
439 0         0 }, $src);
440             }
441              
442             =head2 throw
443              
444             Throws something. I don't know what, maybe a chair.
445              
446             =cut
447              
448             sub throw {
449 0     0 1 0 my $src = shift->new(@_);
450 0         0 $src->fail('...');
451             }
452              
453             =head2 debounce
454              
455             Not yet implemented.
456              
457             Requires timing support, see implementations such as L instead.
458              
459             =cut
460              
461             sub debounce {
462 0     0 1 0 my ($self, $interval) = @_;
463             ...
464 0         0 }
465              
466             =head2 chomp
467              
468             Chomps all items with the given delimiter.
469              
470             Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation.
471             Said delimiter follows the usual rules of C<< $/ >>, whatever they happen to be.
472              
473             Example:
474              
475             $ryu->stdin
476             ->chomp("\n")
477             ->say
478              
479             =cut
480              
481             sub chomp {
482 0     0 1 0 my ($self, $delim) = @_;
483 0   0     0 $delim //= $/;
484             $self->map(sub {
485 0     0   0 local $/ = $delim;
486 0         0 chomp(my $line = $_);
487 0         0 $line
488             })
489 0         0 }
490              
491             =head2 map
492              
493             A bit like L.
494              
495             Takes a single parameter - the coderef to execute for each item. This should return
496             a scalar value which will be used as the next item.
497              
498             Often useful in conjunction with a C<< do >> block to provide a closure.
499              
500             Examples:
501              
502             $src->map(do {
503             my $idx = 0;
504             sub {
505             [ @$_, ++$idx ]
506             }
507             })
508              
509             =cut
510              
511             sub map : method {
512 3     3 1 22 my ($self, $code) = @_;
513              
514 3         37 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
515             $self->each_while_source(sub {
516             $src->emit(Scalar::Util::blessed($_)
517             ? (scalar $_->$code)
518             : !ref($code)
519 10 100   10   41 ? $_->{$code}
    100          
520             : scalar $_->$code
521             )
522 3         16 }, $src);
523             }
524              
525             =head2 flat_map
526              
527             Similar to L, but will flatten out some items:
528              
529             =over 4
530              
531             =item * an arrayref will be expanded out to emit the individual elements
532              
533             =item * for a L, passes on any emitted elements
534              
535             =back
536              
537             This also means you can "merge" items from a series of sources.
538              
539             Note that this is not recursive - an arrayref of arrayrefs will be expanded out
540             into the child arrayrefs, but no further.
541              
542             Failure on any input source will cause this source to be marked as failed as well.
543              
544             =cut
545              
546             sub flat_map {
547 5     5 1 161 my ($self, $code) = splice @_, 0, 2;
548              
549             # Upgrade ->flat_map(method => args...) to a coderef
550 5 50       15 if(!Ref::Util::is_plain_coderef($code)) {
551 0         0 my $method = $code;
552 0         0 my @args = @_;
553 0     0   0 $code = sub { $_->$method(@args) }
554 0         0 }
555              
556 5         62 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
557              
558 5         18 Scalar::Util::weaken(my $weak_sauce = $src);
559             my $add = sub {
560 9     9   11 my $v = shift;
561 9 50       21 my $src = $weak_sauce or return;
562              
563 9         23 my $k = "$v";
564             $src->{waiting}{$k} = $v->on_ready(sub {
565 5         579 my ($f) = @_;
566 5 50       15 return unless my $src = $weak_sauce;
567              
568             # Any failed input source should propagate failure immediately
569 5 100       12 if($f->is_failed) {
570             # Clear out our waitlist, since we don't want to hold those references any more
571 2         15 delete $src->{waiting};
572 2 50       5 $src->fail($f->failure) unless $src->is_ready;
573 2         110 return;
574             }
575              
576 3         21 delete $src->{waiting}{$k};
577 3 100       4 $src->finish unless %{$src->{waiting}};
  3         14  
578 9         36 });
579 9         188 $log->tracef("Added %s which will bring our count to %d", $k, 0 + keys %{$src->{waiting}});
  9         32  
580 5         20 };
581              
582 5         15 $add->($self->_completed);
583             $self->each_while_source(sub {
584 7 50   7   15 my $src = $weak_sauce or return;
585 7         15 for ($code->($_)) {
586 7         32 my $item = $_;
587 7 100 33     46 if(Ref::Util::is_plain_arrayref($item)) {
    50          
588 3         11 $log->tracef("Have an arrayref of %d items", 0 + @$item);
589 3         27 for(@$item) {
590 9 50       16 last if $src->is_ready;
591 9         44 $src->emit($_);
592             }
593             } elsif(Scalar::Util::blessed($item) && $item->isa(__PACKAGE__)) {
594 4         17 $log->tracef("This item is a source");
595             $src->on_ready(sub {
596 2 100       209 return if $item->is_ready;
597 1         9 $log->tracef("Marking %s as ready because %s was", $item->describe, $src->describe);
598 1         107 shift->on_ready($item->_completed);
599 4         58 });
600 4         88 $add->($item->_completed);
601             $item->each_while_source(sub {
602 5 50       13 my $src = $weak_sauce or return;
603 5         10 $src->emit($_)
604             }, $src)->on_ready(sub {
605 2         180 undef $item;
606 4         50 });
607             }
608             }
609 5         61 }, $src);
610 5         24 $src
611             }
612              
613              
614             =head2 split
615              
616             Splits the input on the given delimiter.
617              
618             By default, will split into characters.
619              
620             Note that each item will be processed separately - the buffer won't be
621             retained across items, see L for that.
622              
623             =cut
624              
625             sub split : method {
626 0     0 1 0 my ($self, $delim) = @_;
627 0   0     0 $delim //= qr//;
628              
629 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
630 0     0   0 $self->each_while_source(sub { $src->emit($_) for split $delim, $_ }, $src);
  0         0  
631             }
632              
633             =head2 chunksize
634              
635             Splits input into fixed-size chunks.
636              
637             Note that output is always guaranteed to be a full chunk - if there is partial input
638             at the time the input stream finishes, those extra bytes will be discarded.
639              
640             =cut
641              
642             sub chunksize : method {
643 0     0 1 0 my ($self, $size) = @_;
644 0 0 0     0 die 'need positive chunk size parameter' unless $size && $size > 0;
645              
646 0         0 my $buffer = '';
647 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
648             $self->each_while_source(sub {
649 0     0   0 $buffer .= $_;
650 0         0 $src->emit(substr $buffer, 0, $size, '') while length($buffer) >= $size;
651 0         0 }, $src);
652             }
653              
654             =head2 batch
655              
656             Splits input into arrayref batches of a given size.
657              
658             Note that the last item emitted may have fewer elements (or none at all).
659              
660             $src->batch(10)
661             ->map(sub { "Next 10 (or fewer) items: @$_" })
662             ->say;
663              
664             =cut
665              
666             sub batch : method {
667 1     1 1 6 my ($self, $size) = @_;
668 1 50 33     6 die 'need positive batch parameter' unless $size && $size > 0;
669              
670 1         1 my $buffer = '';
671 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
672 1         3 my @batch;
673             $self->each_while_source(sub {
674 4     4   6 push @batch, $_;
675 4   66     17 while(@batch >= $size and my (@items) = splice @batch, 0, $size) {
676 1         4 $src->emit(\@items)
677             }
678             }, $src, cleanup => sub {
679 1 50   1   3 $src->emit([ splice @batch ]) if @batch;
680 1         9 });
681             }
682              
683             =head2 by_line
684              
685             Emits one item for each line in the input. Similar to L with a C<< \n >> parameter,
686             except this will accumulate the buffer over successive items and only emit when a complete
687             line has been extracted.
688              
689             =cut
690              
691             sub by_line : method {
692 0     0 1 0 my ($self, $delim) = @_;
693 0   0     0 $delim //= $/;
694              
695 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
696 0         0 my $buffer = '';
697             $self->each_while_source(sub {
698 0     0   0 $buffer .= $_;
699 0         0 while($buffer =~ s/^(.*)\Q$delim//) {
700 0         0 $src->emit($1)
701             }
702 0         0 }, $src);
703             }
704              
705             =head2 prefix
706              
707             Applies a string prefix to each item.
708              
709             =cut
710              
711             sub prefix {
712 1     1 1 7 my ($self, $txt) = @_;
713 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
714             $self->each_while_source(sub {
715 3     3   13 $src->emit($txt . $_)
716 1         9 }, $src);
717             }
718              
719             =head2 suffix
720              
721             Applies a string suffix to each item.
722              
723             =cut
724              
725             sub suffix {
726 1     1 1 9 my ($self, $txt) = @_;
727 1         15 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
728             $self->each_while_source(sub {
729 3     3   11 $src->emit($_ . $txt)
730 1         9 }, $src);
731             }
732              
733             =head2 sprintf_methods
734              
735             Convenience method for generating a string from a L-style format
736             string and a set of method names to call.
737              
738             Note that any C items will be mapped to an empty string.
739              
740             Example:
741              
742             $src->sprintf_methods('%d has name %s', qw(id name))
743             ->say
744             ->await;
745              
746             =cut
747              
748             sub sprintf_methods {
749 0     0 1 0 my ($self, $fmt, @methods) = @_;
750 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
751             $self->each_while_source(sub {
752 0     0   0 my ($item) = @_;
753 0   0     0 $src->emit(sprintf $fmt, map $item->$_ // '', @methods)
754 0         0 }, $src);
755             }
756              
757             =head2 ignore
758              
759             Receives items, but ignores them entirely.
760              
761             Emits nothing and eventually completes when the upstream L is done.
762              
763             Might be useful for keeping a source alive.
764              
765             =cut
766              
767             sub ignore {
768 0     0 1 0 my ($self) = @_;
769 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
770             $self->_completed->on_ready(sub {
771 0 0   0   0 shift->on_ready($src->_completed) unless $src->_completed->is_ready
772 0         0 });
773 0         0 return $src;
774             }
775              
776             =head2 buffer
777              
778             Accumulate items while any downstream sources are paused.
779              
780             Takes the following named parameters:
781              
782             =over 4
783              
784             =item * C - once at least this many items are buffered, will L
785             the upstream L.
786              
787             =item * C - if the buffered count drops to this number, will L
788             the upstream L.
789              
790             =back
791              
792             =cut
793              
794             sub buffer {
795 3     3 1 1195 my $self = shift;
796 3         5 my %args;
797 3 100       20 %args = @_ != 1
798             ? @_
799             : (
800             low => $_[0],
801             high => $_[0],
802             );
803 3   33     10 $args{low} //= $args{high};
804 3   50     10 $args{low} //= 10;
805 3   33     9 $args{high} //= $args{low};
806              
807 3         43 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
808 3         10 $src->{pause_propagation} = 0;
809 3         4 my @pending;
810             $self->_completed->on_ready(sub {
811 1 50 33 1   96 shift->on_ready($src->_completed) unless $src->_completed->is_ready or @pending;
812 3         7 });
813 3         49 my $item_handler = do {
814 3         9 Scalar::Util::weaken(my $weak_self = $self);
815 3         8 Scalar::Util::weaken(my $weak_src = $src);
816             sub {
817 17     17   18 my $self = $weak_self;
818 17 50       33 my $src = $weak_src or return;
819 17 100 66     67 if(@pending >= $args{high} and $self and not $self->is_paused($src)) {
      100        
820 2         6 $self->pause($src);
821             }
822             $src->emit(shift @pending)
823             while @pending
824             and not($src->is_paused)
825 17   100     57 and @{$self->{children}};
  11   66     37  
826 17 50       30 if($self) {
827 17 100 100     48 $self->resume($src) if @pending < $args{low} and $self->is_paused($src);
828              
829             # It's common to have a situation where the parent chain completes while we're
830             # paused waiting for the queue to drain. In this situation, we want to propagate
831             # completion only once the queue is empty.
832 17 0 33     42 $self->_completed->on_ready($src->_completed)
      33        
833             if $self->_completed->is_ready and not @pending and not $src->_completed->is_ready;
834             }
835             }
836 3         20 };
837 3         15 $src->flow_control
838             ->each($item_handler)->retain;
839             $self->each(my $code = sub {
840 11     11   18 push @pending, $_;
841 11         20 $item_handler->()
842 3         70 });
843             $self->_completed->on_ready(sub {
844 1     1   37 my ($f) = @_;
845 1 50       5 return if @pending;
846 1         12 my $addr = Scalar::Util::refaddr($code);
847 1         6 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  0         0  
  1         5  
848 1 50       12 $f->on_ready($src->_completed) unless $src->is_ready;
849 1         7 $log->tracef("->each_while_source completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
850 3         10 });
851 3         52 $src;
852             }
853              
854             sub retain {
855 3     3 0 6 my ($self) = @_;
856 3         7 $self->{_self} = $self;
857             $self->_completed
858 3     0   23 ->on_ready(sub { delete $self->{_self} });
  0         0  
859 3         44 $self
860             }
861              
862             =head2 as_list
863              
864             Resolves to a list consisting of all items emitted by this source.
865              
866             =cut
867              
868             sub as_list {
869 3     3 1 6 my ($self) = @_;
870 3         8 my @data;
871             $self->each(sub {
872 8     8   23 push @data, $_
873 3         18 });
874 3     3   363 $self->_completed->transform(done => sub { @data })
875 3         9 }
876              
877             =head2 as_arrayref
878              
879             Resolves to a single arrayref consisting of all items emitted by this source.
880              
881             =cut
882              
883             sub as_arrayref {
884 2     2 1 6 my ($self) = @_;
885 2         4 my @data;
886             $self->each(sub {
887 6     6   25 push @data, $_
888 2         10 });
889 2     2   242 $self->_completed->transform(done => sub { \@data })
890 2         5 }
891              
892             =head2 as_string
893              
894             Concatenates all items into a single string.
895              
896             Returns a L which will resolve on completion.
897              
898             =cut
899              
900             sub as_string {
901 0     0 1 0 my ($self) = @_;
902 0         0 my $data = '';
903             $self->each(sub {
904 0     0   0 $data .= $_;
905 0         0 });
906 0     0   0 $self->_completed->transform(done => sub { $data })
907 0         0 }
908              
909             =head2 as_queue
910              
911             Returns a L instance which will
912             L items whenever the source
913             emits them.
914              
915             Unfortunately there is currently no way to tell
916             when the queue will end, so you'd need to track
917             that separately.
918              
919             =cut
920              
921             sub as_queue {
922 0     0 1 0 my ($self) = @_;
923 0         0 my $queue = Future::Queue->new;
924             $self->each(sub {
925 0     0   0 $queue->push($_)
926 0         0 });
927 0         0 return $queue;
928             }
929              
930             =head2 as_buffer
931              
932             Returns a L instance, which will
933             L any emitted items from this
934             source to the buffer as they arrive.
935              
936             Intended for stream protocol handling - individual
937             sized packets are perhaps better suited to the
938             L per-item behaviour.
939              
940             Supports the following named parameters:
941              
942             =over 4
943              
944             =item * C - low waterlevel for buffer, start accepting more bytes
945             once the L has less content than this
946              
947             =item * C - high waterlevel for buffer, will pause the parent stream
948             if this is reached
949              
950             =back
951              
952             The backpressure (low/high) values default to undefined, meaning
953             no backpressure is applied: the buffer will continue to fill
954             indefinitely.
955              
956             =cut
957              
958             sub as_buffer {
959 1     1 1 562 my ($self, %args) = @_;
960 1         4 my $low = delete $args{low};
961 1         2 my $high = delete $args{high};
962             # We're creating a source but keeping it to ourselves here
963 1         20 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
964              
965             my $buffer = Ryu::Buffer->new(
966             new_future => $self->{new_future},
967             %args,
968             on_change => sub {
969 2     2   4 my ($self) = @_;
970 2 100 66     9 $src->resume if $low and $self->size <= $low;
971             }
972 1         16 );
973              
974 1         5 Scalar::Util::weaken(my $weak_sauce = $src);
975 1         3 Scalar::Util::weaken(my $weak_buffer = $buffer);
976             $self->each_while_source(sub {
977 6 50   6   15 my $src = $weak_sauce or return;
978 6 100       17 my $buf = $weak_buffer or do {
979 1         16 $src->finish;
980 1         7 return;
981             };
982 5         22 $buf->write($_);
983 5 100 66     20 $src->pause if $high and $buf->size >= $high;
984 5 100 66     36 $src->resume if $low and $buf->size <= $low;
985 1         17 }, $src);
986 1         3 return $buffer;
987             }
988              
989             =head2 combine_latest
990              
991             Takes the most recent item from one or more Ls, and emits
992             an arrayref containing the values in order.
993              
994             An item is emitted for each update as soon as all sources have provided
995             at least one value. For example, given 2 sources, if the first emits C<1>
996             then C<2>, then the second emits C, this would emit a single C<< [2, 'a'] >>
997             item.
998              
999             =cut
1000              
1001             sub combine_latest : method {
1002 1     1 1 10 my ($self, @sources) = @_;
1003 1 50   0   5 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
1004 1         2 my $code = pop @sources;
1005              
1006 1         14 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1007 1 50       4 unshift @sources, $self if ref $self;
1008 1         2 my @value;
1009             my %seen;
1010 1         5 for my $idx (0..$#sources) {
1011 2         3 my $src = $sources[$idx];
1012             $src->each_while_source(sub {
1013 5     5   7 $value[$idx] = $_;
1014 5   100     21 $seen{$idx} ||= 1;
1015 5 100       17 $combined->emit([ $code->(@value) ]) if @sources == keys %seen;
1016 2         10 }, $combined);
1017             }
1018             Future->needs_any(
1019             map $_->completed, @sources
1020             )->on_ready(sub {
1021 0     0   0 @value = ();
1022 0 0       0 return if $combined->_completed->is_ready;
1023 0         0 shift->on_ready($combined->_completed)
1024 1         6 })->retain;
1025 1         206 $combined
1026             }
1027              
1028             =head2 with_index
1029              
1030             Emits arrayrefs consisting of C<< [ $item, $idx ] >>.
1031              
1032             =cut
1033              
1034             sub with_index {
1035 1     1 1 11 my ($self) = @_;
1036 1         15 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1037 1         3 my $idx = 0;
1038             $self->each_while_source(sub {
1039 3     3   12 $src->emit([ $_, $idx++ ])
1040 1         7 }, $src);
1041             }
1042              
1043             =head2 with_latest_from
1044              
1045             Similar to L, but will start emitting as soon as
1046             we have any values. The arrayref will contain C<< undef >> for any
1047             sources which have not yet emitted any items.
1048              
1049             =cut
1050              
1051             sub with_latest_from : method {
1052 0     0 1 0 my ($self, @sources) = @_;
1053 0 0   0   0 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
1054 0         0 my $code = pop @sources;
1055              
1056 0         0 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1057 0         0 my @value;
1058             my %seen;
1059 0         0 for my $idx (0..$#sources) {
1060 0         0 my $src = $sources[$idx];
1061             $src->each(sub {
1062 0 0   0   0 return if $combined->_completed->is_ready;
1063 0         0 $value[$idx] = $_;
1064 0   0     0 $seen{$idx} ||= 1;
1065 0         0 });
1066             }
1067             $self->each(sub {
1068 0 0   0   0 $combined->emit([ $code->(@value) ]) if keys %seen;
1069 0         0 });
1070 0         0 $self->_completed->on_ready($combined->_completed);
1071             $self->_completed->on_ready(sub {
1072 0     0   0 @value = ();
1073 0 0       0 return if $combined->is_ready;
1074 0         0 shift->on_ready($combined->_completed);
1075 0         0 });
1076 0         0 $combined
1077             }
1078              
1079             =head2 merge
1080              
1081             Emits items as they are generated by the given sources.
1082              
1083             Example:
1084              
1085             $numbers->merge($letters)->say # 1, 'a', 2, 'b', 3, 'c'...
1086              
1087             =cut
1088              
1089             sub merge : method {
1090 4     4 1 29 my ($self, @sources) = @_;
1091              
1092 4         53 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1093 4 50       19 unshift @sources, $self if ref $self;
1094 4         10 for my $src (@sources) {
1095             $src->each(sub {
1096 16 50   16   32 return if $combined->_completed->is_ready;
1097 16         87 $combined->emit($_)
1098 5         39 });
1099             }
1100             Future->needs_all(
1101             map $_->completed, @sources
1102             )->on_ready($combined->_completed)
1103 2     2   198 ->on_ready(sub { @sources = () })
1104 4         28 ->retain;
1105 4         227 $combined
1106             }
1107              
1108             =head2 emit_from
1109              
1110             Emits items as they are generated by the given sources.
1111              
1112             Example:
1113              
1114             my $src = Ryu::Source->new;
1115             $src->say;
1116             $src->emit_from(
1117             $numbers,
1118             $letters
1119             );
1120              
1121             =cut
1122              
1123             sub emit_from : method {
1124 1     1 1 7 my ($self, @sources) = @_;
1125              
1126 1         3 for my $src (@sources) {
1127             $src->each_while_source(sub {
1128 5 50   5   9 return if $self->_completed->is_ready;
1129 5         24 $self->emit($_)
1130 2         8 }, $self);
1131             }
1132             $self
1133 1         3 }
1134              
1135             =head2 apply
1136              
1137             Used for setting up multiple streams.
1138              
1139             Accepts a variable number of coderefs, will call each one and gather L
1140             results.
1141              
1142             =cut
1143              
1144             sub apply : method {
1145 0     0 1 0 my ($self, @code) = @_;
1146              
1147 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1148 0         0 my @pending;
1149 0         0 for my $code (@code) {
1150 0         0 push @pending, map $code->($_), $self;
1151             }
1152             Future->needs_all(
1153 0         0 map $_->completed, @pending
1154             )->on_ready($src->_completed)
1155             ->retain;
1156             # Pass through the original events
1157             $self->each_while_source(sub {
1158 0     0   0 $src->emit($_)
1159 0         0 }, $src)
1160             }
1161              
1162             =head2 switch_str
1163              
1164             Given a condition, will select one of the alternatives based on stringified result.
1165              
1166             Example:
1167              
1168             $src->switch_str(
1169             sub { $_->name }, # our condition
1170             smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
1171             jones => sub { $_->parent->id },
1172             sub { undef } # and this is our default case
1173             );
1174              
1175             =cut
1176              
1177             sub switch_str {
1178 1     1 1 18 my ($self, $condition, @args) = @_;
1179              
1180 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1181 1         3 my @active;
1182             $self->_completed->on_ready(sub {
1183             Future->needs_all(
1184             grep $_, @active
1185             )->on_ready(sub {
1186 0         0 $src->finish
1187 0     0   0 })->retain
1188 1         3 });
1189              
1190             $self->each_while_source(sub {
1191 3     3   8 my ($item) = $_;
1192 3         5 my $rslt = $condition->($item);
1193             (Scalar::Util::blessed($rslt) && $rslt->isa('Future') ? $rslt : Future->done($rslt))->on_done(sub {
1194 3         116 my ($data) = @_;
1195 3         8 my @copy = @args;
1196 3         13 while(my ($k, $v) = splice @copy, 0, 2) {
1197 6 100       19 if(!defined $v) {
    100          
1198             # Only a single value (or undef)? That's our default, just use it as-is
1199 1         3 return $src->emit(map $k->($_), $item)
1200             } elsif($k eq $data) {
1201             # Key matches our result? Call code with the original item
1202 2         5 return $src->emit(map $v->($_), $item)
1203             }
1204             }
1205 3 50 33     22 })->retain
1206 1         27 }, $src)
1207             }
1208              
1209             =head2 ordered_futures
1210              
1211             Given a stream of Ls, will emit the results as each L
1212             is marked ready.
1213              
1214             If any L in the stream fails, that will mark this source as failed,
1215             and all remaining L instances will be cancelled. To avoid this behaviour
1216             and leave the L instances active, use:
1217              
1218             $src->map('without_cancel')
1219             ->ordered_futures
1220              
1221             See L for more details.
1222              
1223             Takes the following named parameters:
1224              
1225             =over 4
1226              
1227             =item * C - once at least this many unresolved L instances are pending,
1228             will L the upstream L.
1229              
1230             =item * C - if the pending count drops to this number, will L
1231             the upstream L.
1232              
1233             =back
1234              
1235             This method is also available as L.
1236              
1237             =cut
1238              
1239             sub ordered_futures {
1240 5     5 1 30 my ($self, %args) = @_;
1241 5         12 my $low = delete $args{low};
1242 5         10 my $high = delete $args{high};
1243 5         71 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1244 5         13 my %pending;
1245 5         11 my $src_completed = $src->_completed;
1246              
1247 5         8 my $all_finished;
1248             $self->_completed->on_ready(sub {
1249 5     5   543 $all_finished = shift;
1250 5 100 66     21 $all_finished->on_ready($src_completed) unless %pending or $src_completed->is_ready;
1251 5         12 });
1252              
1253             $src_completed->on_ready(sub {
1254 4     4   427 my @pending = values %pending;
1255 4         8 %pending = ();
1256 4         11 for(@pending) {
1257 3 100 66     34 $_->cancel if $_ and not $_->is_ready;
1258             }
1259 5         169 });
1260             $self->each(sub {
1261 13     13   22 my $f = $_;
1262 13         28 my $k = Scalar::Util::refaddr $f;
1263             # This will keep a copy of the Future around until the
1264             # ->is_ready callback removes it
1265 13         33 $pending{$k} = $f;
1266 13         50 $log->tracef('Ordered futures has %d pending', 0 + keys %pending);
1267 13 100 100     179 $src->pause if $high and keys(%pending) >= $high and not $src->is_paused;
      66        
1268             $_->on_done(sub {
1269 9         1681 my @pending = @_;
1270 9   66     35 while(@pending and not $src_completed->is_ready) {
1271 4         31 $src->emit(shift @pending);
1272             }
1273             })
1274 1 50       566 ->on_fail(sub { $src->fail(@_) unless $src_completed->is_ready; })
1275             ->on_ready(sub {
1276 12         334 delete $pending{$k};
1277 12 100 100     44 $src->resume if $low and keys(%pending) <= $low and $src->is_paused;
      100        
1278 12         45 $log->tracef('Ordered futures now has %d pending after completion, upstream finish status is %d', 0 + keys(%pending), $all_finished);
1279 12 100       115 return if %pending;
1280 6 100 100     21 $all_finished->on_ready($src_completed) if $all_finished and not $src_completed->is_ready;
1281             })
1282 5         116 });
  13         68  
1283 5         17 return $src;
1284             }
1285              
1286             =head2 resolve
1287              
1288             A synonym for L.
1289              
1290             =cut
1291              
1292             *resolve = *ordered_futures;
1293              
1294             =head2 concurrent
1295              
1296             =cut
1297              
1298             sub concurrent {
1299 0     0 1 0 my ($self) = @_;
1300 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1301             $self->each_while_source(sub {
1302 0     0   0 $_->on_done($src->curry::weak::emit)
1303             ->on_fail($src->curry::weak::fail)
1304             ->retain
1305 0         0 }, $src);
1306             }
1307              
1308             =head2 distinct
1309              
1310             Emits new distinct items, using string equality with an exception for
1311             C (i.e. C is treated differently from empty string or 0).
1312              
1313             Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.
1314              
1315             =cut
1316              
1317             sub distinct {
1318 1     1 1 8 my $self = shift;
1319              
1320 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1321 1         3 my %seen;
1322             my $undef;
1323             $self->each_while_source(sub {
1324 22 100   22   29 if(defined) {
1325 17 100       46 $src->emit($_) unless $seen{$_}++;
1326             } else {
1327 5 100       12 $src->emit($_) unless $undef++;
1328             }
1329 1         9 }, $src);
1330             }
1331              
1332             =head2 distinct_until_changed
1333              
1334             Removes contiguous duplicates, defined by string equality.
1335              
1336             =cut
1337              
1338             sub distinct_until_changed {
1339 1     1 1 6 my $self = shift;
1340              
1341 1         14 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1342 1         2 my $active;
1343             my $prev;
1344             $self->each_while_source(sub {
1345 18 100   18   24 if($active) {
1346 17 100       28 if(defined($prev) ^ defined($_)) {
    100          
1347 10         11 $src->emit($_)
1348             } elsif(defined($_)) {
1349 5 100       12 $src->emit($_) if $prev ne $_;
1350             }
1351             } else {
1352 1         2 $active = 1;
1353 1         4 $src->emit($_);
1354             }
1355 18         23 $prev = $_;
1356 1         5 }, $src);
1357 1         4 $src
1358             }
1359              
1360             =head2 sort_by
1361              
1362             Emits items sorted by the given key. This is a stable sort function.
1363              
1364             The algorithm is taken from L.
1365              
1366             =cut
1367              
1368             sub sort_by {
1369 37     37   506776 use sort qw(stable);
  37         103  
  37         373  
1370 0     0 1 0 my ($self, $code) = @_;
1371 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1372 0         0 my @items;
1373             my @keys;
1374       0     $self->_completed->on_done(sub {
1375             })->on_ready(sub {
1376 0 0   0   0 return if $src->is_ready;
1377 0         0 shift->on_ready($src->_completed);
1378 0         0 });
1379             $self->each_while_source(sub {
1380 0     0   0 push @items, $_;
1381 0         0 push @keys, $_->$code;
1382             }, $src, cleanup => sub {
1383 0     0   0 my ($f) = @_;
1384 0 0       0 return unless $f->is_done;
1385 0         0 $src->emit($_) for @items[sort { $keys[$a] cmp $keys[$b] } 0 .. $#items];
  0         0  
1386 0         0 });
1387             }
1388              
1389             =head2 nsort_by
1390              
1391             Emits items numerically sorted by the given key. This is a stable sort function.
1392              
1393             See L.
1394              
1395             =cut
1396              
1397             sub nsort_by {
1398 37     37   17094 use sort qw(stable);
  37         105  
  37         181  
1399 0     0 1 0 my ($self, $code) = @_;
1400 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1401 0         0 my @items;
1402             my @keys;
1403             $self->each_while_source(sub {
1404 0     0   0 push @items, $_;
1405 0         0 push @keys, $_->$code;
1406             }, $src, cleanup => sub {
1407 0 0   0   0 return unless shift->is_done;
1408 0         0 $src->emit($_) for @items[sort { $keys[$a] <=> $keys[$b] } 0 .. $#items];
  0         0  
1409 0         0 });
1410             }
1411              
1412             =head2 rev_sort_by
1413              
1414             Emits items sorted by the given key. This is a stable sort function.
1415              
1416             The algorithm is taken from L.
1417              
1418             =cut
1419              
1420             sub rev_sort_by {
1421 37     37   12551 use sort qw(stable);
  37         120  
  37         227  
1422 0     0 1 0 my ($self, $code) = @_;
1423 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1424 0         0 my @items;
1425             my @keys;
1426             $self->each_while_source(sub {
1427 0     0   0 push @items, $_;
1428 0         0 push @keys, $_->$code;
1429             }, $src, cleanup => sub {
1430 0 0   0   0 return unless shift->is_done;
1431 0         0 $src->emit($_) for @items[sort { $keys[$b] cmp $keys[$a] } 0 .. $#items];
  0         0  
1432 0         0 });
1433             }
1434              
1435             =head2 rev_nsort_by
1436              
1437             Emits items numerically sorted by the given key. This is a stable sort function.
1438              
1439             See L.
1440              
1441             =cut
1442              
1443             sub rev_nsort_by {
1444 0     0 1 0 my ($self, $code) = @_;
1445 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1446 0         0 my @items;
1447             my @keys;
1448             $self->each_while_source(sub {
1449 0     0   0 push @items, $_;
1450 0         0 push @keys, $_->$code;
1451             }, $src, cleanup => sub {
1452 0 0   0   0 return unless shift->is_done;
1453 0         0 $src->emit($_) for @items[sort { $keys[$b] <=> $keys[$a] } 0 .. $#items];
  0         0  
1454 0         0 });
1455             }
1456              
1457             =head2 extract_all
1458              
1459             Expects a regular expression and emits hashrefs containing
1460             the named capture buffers.
1461              
1462             The regular expression will be applied using the m//gc operator.
1463              
1464             Example:
1465              
1466             $src->extract_all(qr{/(?[^/]+)})
1467             # emits { component => '...' }, { component => '...' }
1468              
1469             =cut
1470              
1471             sub extract_all {
1472 1     1 1 10 my ($self, $pattern) = @_;
1473 1         14 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1474             $self->each_while_source(sub {
1475 37     37   50176 $src->emit(+{ %+ }) while m/$pattern/gc;
  37     3   16984  
  37         247027  
  3         45  
1476 1         7 }, $src);
1477             }
1478              
1479             =head2 skip
1480              
1481             Skips the first N items.
1482              
1483             =cut
1484              
1485             sub skip {
1486 1     1 1 10 my ($self, $count) = @_;
1487 1   50     4 $count //= 0;
1488              
1489 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1490             $self->_completed->on_ready(sub {
1491 1 50   1   103 return if $src->is_ready;
1492 1         7 shift->on_ready($src->_completed);
1493 1         4 });
1494             $self->each(sub {
1495 5 100   5   18 $src->emit($_) unless $count-- > 0;
1496 1         27 });
1497 1         6 $src
1498             }
1499              
1500             =head2 skip_last
1501              
1502             Skips the last N items.
1503              
1504             =cut
1505              
1506             sub skip_last {
1507 1     1 1 8 my ($self, $count) = @_;
1508 1   50     4 $count //= 0;
1509              
1510 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1511             $self->_completed->on_ready(sub {
1512 1 50   1   102 return if $src->is_ready;
1513 1         9 shift->on_ready($src->_completed);
1514 1         4 });
1515 1         21 my @pending;
1516             $self->each(sub {
1517 5     5   7 push @pending, $_;
1518 5 100       21 $src->emit(shift @pending) if @pending > $count;
1519 1         6 });
1520 1         6 $src
1521             }
1522              
1523             =head2 skip_until
1524              
1525             Skips the items that arrive before a given condition is reached.
1526              
1527             =over 4
1528              
1529             =item * Either a L instance (we skip all items until it's marked as `done`), or a coderef,
1530             which we call for each item until it first returns true
1531              
1532             =back
1533              
1534             =cut
1535              
1536             sub skip_until {
1537 2     2 1 25 my ($self, $condition) = @_;
1538              
1539 2         30 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1540 2         6 $self->each_while_source(do {
1541 2 100 33     20 if(ref($condition) eq 'CODE') {
    50          
1542 1         2 my $reached = 0;
1543 5 100 100 5   27 sub { return $src->emit($_) if $reached ||= $condition->($_); }
1544 1         7 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1545             $condition->on_ready($src->$curry::weak(sub {
1546 1     1   67 my ($src, $cond) = @_;
1547 1 50       5 return if $src->is_ready;
1548 1 50       9 $src->fail($cond->failure) if $cond->is_failed;
1549 1 50       11 $src->cancel if $cond->is_cancelled
1550 1         10 }));
1551 4 100   4   11 sub { $src->emit($_) if $condition->is_done; }
1552 1         71 } else {
1553 0         0 die 'unknown type for condition: ' . $condition;
1554             }
1555             }, $src);
1556             }
1557              
1558             =head2 take_until
1559              
1560             Passes through items that arrive until a given condition is reached.
1561              
1562             Expects a single parameter, which can be one of the following:
1563              
1564             =over 4
1565              
1566             =item * a L instance - we will skip all items until it's marked as C
1567              
1568             =item * a coderef, which we call for each item until it first returns true
1569              
1570             =item * or a L, in which case we stop when that first emits a value
1571              
1572             =back
1573              
1574             =cut
1575              
1576             sub take_until {
1577 0     0 1 0 my ($self, $condition) = @_;
1578              
1579 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1580 0 0 0     0 if(Scalar::Util::blessed($condition) && $condition->isa('Ryu::Source')) {
1581             $condition->_completed->on_ready(sub {
1582 0     0   0 $log->warnf('Condition completed: %s and %s', $condition->describe, $src->describe);
1583 0 0       0 return if $src->is_ready;
1584 0         0 $log->warnf('Mark as ready');
1585 0         0 shift->on_ready($src->_completed);
1586 0         0 });
1587             $condition->first->each(sub {
1588 0 0   0   0 $src->finish unless $src->is_ready
1589 0         0 });
1590 0         0 return $self->each_while_source($src->curry::emit, $src);
1591             } else {
1592 0         0 return $self->each_while_source(do {
1593 0 0 0     0 if(ref($condition) eq 'CODE') {
    0          
1594 0         0 my $reached = 0;
1595 0 0 0 0   0 sub { return $src->emit($_) unless $reached ||= $condition->($_); }
1596 0         0 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1597             $condition->on_ready($src->$curry::weak(sub {
1598 0     0   0 my ($src, $cond) = @_;
1599 0 0       0 return if $src->is_ready;
1600 0 0       0 $src->fail($cond->failure) if $cond->is_failed;
1601 0 0       0 $src->cancel if $cond->is_cancelled
1602 0         0 }));
1603 0 0   0   0 sub { $src->emit($_) unless $condition->is_done; }
1604 0         0 } else {
1605 0         0 die 'unknown type for condition: ' . $condition;
1606             }
1607             }, $src);
1608             }
1609             }
1610              
1611             =head2 take
1612              
1613             Takes a limited number of items.
1614              
1615             Given a sequence of C< 1,2,3,4,5 > and C<< ->take(3) >>, you'd get 1,2,3 and then the stream
1616             would finish.
1617              
1618             =cut
1619              
1620             sub take {
1621 2     2 1 15 my ($self, $count) = @_;
1622 2   50     8 $count //= 0;
1623 2 50       7 return $self->empty unless $count > 0;
1624              
1625 2         35 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1626             $self->each_while_source(sub {
1627 5     5   19 $log->tracef("Still alive with %d remaining", $count);
1628 5         47 $src->emit($_);
1629 5 100       18 return if --$count;
1630 2         8 $log->tracef("Count is zero, finishing");
1631 2         17 $src->finish
1632 2         17 }, $src);
1633             }
1634              
1635             =head2 first
1636              
1637             Returns a source which provides the first item from the stream.
1638              
1639             =cut
1640              
1641             sub first {
1642 0     0 1 0 my ($self) = @_;
1643              
1644 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1645             $self->each_while_source(sub {
1646 0     0   0 $src->emit($_);
1647 0         0 $src->finish
1648 0         0 }, $src);
1649             }
1650              
1651             =head2 some
1652              
1653             Applies the given code to each item, and emits a single item:
1654              
1655             =over 4
1656              
1657             =item * 0 if the code never returned true or no items were received
1658              
1659             =item * 1 if the code ever returned a true value
1660              
1661             =back
1662              
1663             =cut
1664              
1665             sub some {
1666 1     1 1 11 my ($self, $code) = @_;
1667              
1668 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1669             $self->_completed->on_ready(sub {
1670 1     1   81 my $sf = $src->_completed;
1671 1 50       2 return if $sf->is_ready;
1672 0         0 my $f = shift;
1673 0 0       0 return $f->on_ready($sf) unless $f->is_done;
1674 0         0 $src->emit(0);
1675 0         0 $sf->done;
1676 1         4 });
1677             $self->each(sub {
1678 4 50   4   7 return if $src->_completed->is_ready;
1679 4 100       17 return unless $code->($_);
1680 1         7 $src->emit(1);
1681 1         2 $src->_completed->done
1682 1         21 });
1683 1         4 $src
1684             }
1685              
1686             =head2 every
1687              
1688             Similar to L, except this requires the coderef to return true for
1689             all values in order to emit a C<1> value.
1690              
1691             =cut
1692              
1693             sub every {
1694 1     1 1 12 my ($self, $code) = @_;
1695              
1696 1         19 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1697             $self->_completed->on_done(sub {
1698 1 50   1   87 return if $src->_completed->is_ready;
1699 1         6 $src->emit(1);
1700 1         2 $src->_completed->done
1701 1         3 });
1702             $self->each(sub {
1703 5 50   5   8 return if $src->_completed->is_ready;
1704 5 50       21 return if $code->($_);
1705 0         0 $src->emit(0);
1706 0         0 $src->_completed->done
1707 1         28 });
1708 1         6 $src
1709             }
1710              
1711             =head2 count
1712              
1713             Emits the count of items seen once the parent source completes.
1714              
1715             =cut
1716              
1717             sub count {
1718 2     2 1 14 my ($self) = @_;
1719              
1720 2         4 my $count = 0;
1721              
1722 2         26 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1723 7     7   17 $self->each_while_source(sub { ++$count }, $src, cleanup => sub {
1724 2 50   2   18 return unless shift->is_done;
1725 2         20 $src->emit($count)
1726 2         16 });
1727             }
1728              
1729             =head2 sum
1730              
1731             Emits the numeric sum of items seen once the parent completes.
1732              
1733             =cut
1734              
1735             sub sum {
1736 1     1 1 11 my ($self) = @_;
1737              
1738 1         3 my $sum = 0;
1739              
1740 1         22 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1741             $self->each_while_source(sub {
1742 5     5   13 $sum += $_
1743             }, $src, cleanup => sub {
1744 1 50   1   4 return unless shift->is_done;
1745 1         10 $src->emit($sum)
1746 1         10 });
1747             }
1748              
1749             =head2 mean
1750              
1751             Emits the mean (average) numerical value of all seen items.
1752              
1753             =cut
1754              
1755             sub mean {
1756 1     1 1 9 my ($self) = @_;
1757              
1758 1         3 my $sum = 0;
1759 1         2 my $count = 0;
1760              
1761 1         21 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1762 1     4   9 $self->each(sub { ++$count; $sum += $_ });
  4         7  
  4         10  
1763 1   50 1   111 $self->_completed->on_done(sub { $src->emit($sum / ($count || 1)) })
1764 1         3 ->on_ready($src->_completed);
1765 1         28 $src
1766             }
1767              
1768             =head2 max
1769              
1770             Emits the maximum numerical value of all seen items.
1771              
1772             =cut
1773              
1774             sub max {
1775 1     1 1 10 my ($self) = @_;
1776              
1777 1         22 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1778 1         3 my $max;
1779             $self->each(sub {
1780 8 100 100 8   29 return if defined $max and $max > $_;
1781 2         6 $max = $_;
1782 1         8 });
1783 1     1   190 $self->_completed->on_done(sub { $src->emit($max) })
1784 1         4 ->on_ready($src->_completed);
1785 1         29 $src
1786             }
1787              
1788             =head2 min
1789              
1790             Emits the minimum numerical value of all seen items.
1791              
1792             =cut
1793              
1794             sub min {
1795 1     1 1 10 my ($self) = @_;
1796              
1797 1         22 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1798 1         2 my $min;
1799             $self->each(sub {
1800 8 100 100 8   30 return if defined $min and $min < $_;
1801 4         10 $min = $_;
1802 1         8 });
1803 1     1   107 $self->_completed->on_done(sub { $src->emit($min) })
1804 1         4 ->on_ready($src->_completed);
1805 1         28 $src
1806             }
1807              
1808             =head2 statistics
1809              
1810             Emits a single hashref of statistics once the source completes.
1811              
1812             This will contain the following keys:
1813              
1814             =over 4
1815              
1816             =item * count
1817              
1818             =item * sum
1819              
1820             =item * min
1821              
1822             =item * max
1823              
1824             =item * mean
1825              
1826             =back
1827              
1828             =cut
1829              
1830             sub statistics {
1831 1     1 1 10 my ($self) = @_;
1832              
1833 1         21 my $sum = 0;
1834 1         2 my $count = 0;
1835 1         3 my $min;
1836             my $max;
1837              
1838 1         23 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1839             $self->each(sub {
1840 9   66 9   19 $min //= $_;
1841 9   66     16 $max //= $_;
1842 9 50       17 $min = $_ if $_ < $min;
1843 9 100       15 $max = $_ if $_ > $max;
1844 9         11 ++$count;
1845 9         18 $sum += $_
1846 1         10 });
1847             $self->_completed->on_done(sub {
1848 1   50 1   116 $src->emit({
1849             count => $count,
1850             sum => $sum,
1851             min => $min,
1852             max => $max,
1853             mean => ($sum / ($count || 1))
1854             })
1855             })
1856 1         5 ->on_ready($src->_completed);
1857 1         29 $src
1858             }
1859              
1860             =head2 filter
1861              
1862             Applies the given parameter to filter values.
1863              
1864             The parameter can be a regex or coderef. You can also
1865             pass (key, value) pairs to filter hashrefs or objects
1866             based on regex or coderef values.
1867              
1868             Examples:
1869              
1870             $src->filter(name => qr/^[A-Z]/, id => sub { $_ % 2 })
1871              
1872             =cut
1873              
1874             sub filter {
1875 12     12 1 3831 my $self = shift;
1876              
1877 12         177 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1878             $self->each_while_source((@_ > 1) ? do {
1879 10         36 my %args = @_;
1880             my $check = sub {
1881 51     51   88 my ($k, $v) = @_;
1882 51 100       114 if(my $ref = ref $args{$k}) {
1883 37 100       90 if($ref eq 'Regexp') {
    100          
    50          
1884 15 100 100     144 return 0 unless defined($v) && $v =~ $args{$k};
1885             } elsif($ref eq 'ARRAY') {
1886 12 100 100     48 return 0 unless defined($v) && List::Util::any { $v eq $_ } @{$args{$k}};
  20         57  
  8         19  
1887             } elsif($ref eq 'CODE') {
1888 10         22 return 0 for grep !$args{$k}->($_), $v;
1889             } else {
1890 0         0 die "Unsure what to do with $args{$k} which seems to be a $ref";
1891             }
1892             } else {
1893 14 100       33 return !defined($args{$k}) if !defined($v);
1894 13   66     104 return defined($args{$k}) && $v eq $args{$k};
1895             }
1896 18         4570 return 1;
1897 10         52 };
1898             sub {
1899 51     51   71 my $item = shift;
1900 51 100       160 if(Scalar::Util::blessed $item) {
    50          
1901 15         37 for my $k (keys %args) {
1902 15         37 my $v = $item->$k;
1903 15 100       63 return unless $check->($k, $v);
1904             }
1905             } elsif(my $ref = ref $item) {
1906 36 50       73 if($ref eq 'HASH') {
1907 36         78 for my $k (keys %args) {
1908 36         51 my $v = $item->{$k};
1909 36 100       66 return unless $check->($k, $v);
1910             }
1911             } else {
1912 0         0 die 'not a ref we know how to handle: ' . $ref;
1913             }
1914             } else {
1915 0         0 die 'not a ref, not sure what to do now';
1916             }
1917 21         62 $src->emit($item);
1918             }
1919 12 100       47 } : do {
  10         58  
1920 2         4 my $code = shift;
1921 2 50       8 if(my $ref = ref($code)) {
1922 2 50       8 if($ref eq 'Regexp') {
    50          
1923 0         0 my $re = $code;
1924 0     0   0 $code = sub { /$re/ };
  0         0  
1925             } elsif($ref eq 'CODE') {
1926             # use as-is
1927             } else {
1928 0         0 die "not sure how to handle $ref";
1929             }
1930             }
1931             sub {
1932 4     4   6 my $item = shift;
1933 4 100       11 $src->emit($item) if $code->($item);
1934             }
1935 2         14 }, $src);
1936             }
1937              
1938             =head2 filter_isa
1939              
1940             Emits only the items which C<< ->isa >> one of the given parameters.
1941             Will skip non-blessed items.
1942              
1943             =cut
1944              
1945             sub filter_isa {
1946 0     0 1 0 my ($self, @isa) = @_;
1947              
1948 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1949             $self->each_while_source(sub {
1950 0     0   0 my ($item) = @_;
1951 0 0       0 return unless Scalar::Util::blessed $item;
1952 0 0       0 $src->emit($_) if grep $item->isa($_), @isa;
1953 0         0 }, $src);
1954             }
1955              
1956             =head2 emit
1957              
1958             Emits the given item.
1959              
1960             =cut
1961              
1962             sub emit {
1963 419     419 1 39388 my $self = shift;
1964 419         881 my $completion = $self->_completed;
1965 419 100       527 my @handlers = @{$self->{on_item} || []} or return $self;
  419 100       1597  
1966 388         687 for (@_) {
1967 420 50       4236 die 'already completed' if $completion->is_ready;
1968 420         1829 for my $code (@handlers) {
1969             try {
1970             $code->($_);
1971 422         745 } catch {
1972             my $ex = $@;
1973             $log->warnf("Exception raised in %s - %s", (eval { $self->describe } // ""), "$ex");
1974             $completion->fail($ex, source => 'exception in on_item callback');
1975             die $ex;
1976             }
1977             }
1978             }
1979             $self
1980 388         4781 }
1981              
1982             =head2 each
1983              
1984             =cut
1985              
1986             sub each : method {
1987 119     119 1 271 my ($self, $code, %args) = @_;
1988 119         163 push @{$self->{on_item}}, $code;
  119         300  
1989 119         263 $self;
1990             }
1991              
1992             =head2 each_as_source
1993              
1994             =cut
1995              
1996             sub each_as_source : method {
1997 0     0 1 0 my ($self, @code) = @_;
1998              
1999 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2000 0         0 my @active;
2001             $self->_completed->on_ready(sub {
2002             Future->needs_all(
2003             grep $_, @active
2004             )->on_ready(sub {
2005 0         0 $src->finish
2006 0     0   0 })->retain
2007 0         0 });
2008              
2009             $self->each_while_source(sub {
2010 0     0   0 my @pending;
2011 0         0 for my $code (@code) {
2012 0         0 push @pending, $code->($_);
2013             }
2014 0         0 push @active, map $_->completed, @pending;
2015 0         0 $src->emit($_);
2016 0         0 }, $src)
2017             }
2018              
2019             sub cleanup {
2020 88     88 0 5562 my ($self) = @_;
2021 88         350 $log->tracef("Cleanup for %s (f = %s)", $self->describe, 0 + $self->_completed);
2022 88 100       699 $_->cancel for values %{$self->{cancel_on_ready} || {}};
  88         564  
2023 88 100       302 $self->parent->notify_child_completion($self) if $self->parent;
2024 88         376 delete @{$self}{qw(on_item cancel_on_ready)};
  88         341  
2025 88         255 $log->tracef("Finished cleanup for %s", $self->describe);
2026             }
2027              
2028             sub notify_child_completion {
2029 39     39 0 96 my ($self, $child) = @_;
2030 39         138 my $addr = Scalar::Util::refaddr($child);
2031 39 50   39   160 if(List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{children}}) {
  39         534  
  39         254  
2032             $log->tracef(
2033             "Removed completed child %s, have %d left",
2034             $child->describe,
2035 39         493 0 + @{$self->{children}}
  39         3360  
2036             );
2037 39 100       294 return $self if $self->is_ready;
2038 7 50       42 return $self if @{$self->{children}};
  7         25  
2039              
2040 7         23 $log->tracef(
2041             "This was the last child, cancelling %s",
2042             $self->describe
2043             );
2044 7         472 $self->cancel;
2045 7         1710 return $self;
2046             }
2047              
2048 0         0 $log->warnf("Child %s (addr 0x%x) not found in list for %s", $child->describe, $self->describe);
2049 0         0 $log->tracef("* %s (addr 0x%x)", $_->describe, Scalar::Util::refaddr($_)) for @{$self->{children}};
  0         0  
2050 0         0 $self
2051             }
2052              
2053             =head2 await
2054              
2055             Block until this source finishes.
2056              
2057             =cut
2058              
2059             sub await {
2060 0     0 1 0 my ($self) = @_;
2061 0         0 $self->prepare_await;
2062 0         0 my $f = $self->_completed;
2063 0         0 $f->await until $f->is_ready;
2064 0         0 $self
2065             }
2066              
2067             =head2 next
2068              
2069             Returns a L which will resolve to the next item emitted by this source.
2070              
2071             If the source completes before an item is emitted, the L will be cancelled.
2072              
2073             Note that these are independent - they don't stack, so if you call C<< ->next >>
2074             multiple times before an item is emitted, each of those would return the same value.
2075              
2076             See L if you're dealing with protocols and want to extract sequences of
2077             bytes or characters.
2078              
2079             To access the sequence as a discrete stream of L instances, try L
2080             which will provide a L.
2081              
2082             =cut
2083              
2084             sub next : method {
2085 3     3 1 1826 my ($self) = @_;
2086             my $f = $self->new_future(
2087             'next'
2088             )->on_ready($self->$curry::weak(sub {
2089 3     3   1515 my ($self, $f) = @_;
2090 3         12 my $addr = Scalar::Util::refaddr($f);
2091 3 50       14 List::UtilsBy::extract_by { Scalar::Util::refaddr($_) == $addr } @{$self->{on_item} || []};
  3         48  
  3         27  
2092 3         33 delete $self->{cancel_on_ready}{$f};
2093 3         21 }));
2094 3         220 $self->{cancel_on_ready}{$f} = $f;
2095 3   50     26 push @{$self->{on_item} ||= []}, sub {
2096 3 100   3   9 $f->done(shift) unless $f->is_ready;
2097 3         8 };
2098 3         18 return $f;
2099             }
2100              
2101             =head2 finish
2102              
2103             Mark this source as completed.
2104              
2105             =cut
2106              
2107 35 100   35 1 5261 sub finish { $_[0]->_completed->done unless $_[0]->_completed->is_ready; $_[0] }
  35         1746  
2108              
2109       0 0   sub refresh { }
2110              
2111             =head1 METHODS - Proxied
2112              
2113             The following methods are proxied to our completion L:
2114              
2115             =over 4
2116              
2117             =item * then
2118              
2119             =item * is_ready
2120              
2121             =item * is_done
2122              
2123             =item * failure
2124              
2125             =item * is_cancelled
2126              
2127             =item * else
2128              
2129             =back
2130              
2131             =cut
2132              
2133             sub get {
2134 0     0 0 0 my ($self) = @_;
2135 0         0 my $f = $self->_completed;
2136 0         0 my @rslt;
2137 0 0   0   0 $self->each(sub { push @rslt, $_ }) if defined wantarray;
  0         0  
2138 0 0       0 if(my $parent = $self->parent) {
2139 0         0 $parent->await
2140             }
2141             $f->transform(done => sub {
2142             @rslt
2143 0     0   0 })->get
  0         0  
2144             }
2145              
2146             for my $k (qw(then fail on_ready transform is_ready is_done is_failed failure else)) {
2147 37     37   515 do { no strict 'refs'; *$k = $_ } for sub { shift->_completed->$k(@_) }
  37     108   105  
  37         5122  
  108         4076  
2148             }
2149             # Cancel operations are only available through the internal state, since we don't want anything
2150             # accidentally cancelling due to Future->wait_any(timeout, $src->_completed) or similar constructs
2151             for my $k (qw(cancel is_cancelled)) {
2152 37     37   287 do { no strict 'refs'; *$k = $_ } for sub { shift->{completed}->$k(@_) }
  37     7   78  
  37         55682  
  7         37  
2153             }
2154              
2155             =head1 METHODS - Internal
2156              
2157             =head2 prepare_await
2158              
2159             Run any pre-completion callbacks (recursively) before
2160             we go into an await cycle.
2161              
2162             Used for compatibility with sync bridges when there's
2163             no real async event loop available.
2164              
2165             =cut
2166              
2167             sub prepare_await {
2168 197     197 1 381 my ($self) = @_;
2169 197 50       502 (delete $self->{on_get})->() if $self->{on_get};
2170 197 100       735 return unless my $parent = $self->parent;
2171 65 50       294 my $code = $parent->can('prepare_await') or return;
2172 65         193 local @_ = ($parent);
2173 65         253 goto &$code;
2174             }
2175              
2176             =head2 chained
2177              
2178             Returns a new L chained from this one.
2179              
2180             =cut
2181              
2182             sub chained {
2183 60     60 1 314 my ($self) = shift;
2184 60 100       224 if(my $class = ref($self)) {
2185             my $src = $class->new(
2186             new_future => $self->{new_future},
2187 59         391 parent => $self,
2188             @_
2189             );
2190 59         348 Scalar::Util::weaken($src->{parent});
2191 59         96 push @{$self->{children}}, $src;
  59         201  
2192 59         306 $log->tracef("Constructing chained source for %s from %s (%s)", $src->label, $self->label, $self->_completed->state);
2193 59         967 return $src;
2194             } else {
2195 1         3 my $src = $self->new(@_);
2196 1         4 $log->tracef("Constructing chained source for %s with no parent", $src->label);
2197 1         5 return $src;
2198             }
2199             }
2200              
2201             =head2 each_while_source
2202              
2203             Like L, but removes the source from the callback list once the
2204             parent completes.
2205              
2206             =cut
2207              
2208             sub each_while_source {
2209 46     46 1 143 my ($self, $code, $src, %args) = @_;
2210 46         160 $self->each($code);
2211             $self->_completed->on_ready(sub {
2212 28     28   2513 my ($f) = @_;
2213 28 100       110 $args{cleanup}->($f, $src) if exists $args{cleanup};
2214 28         84 my $addr = Scalar::Util::refaddr($code);
2215 28         96 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  0         0  
  28         152  
2216 28 100       405 $f->on_ready($src->_completed) unless $src->is_ready;
2217 28         2097 $log->tracef("->each_while_source completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
2218 46         144 });
2219 46         1129 $src
2220             }
2221              
2222             =head2 map_source
2223              
2224             Provides a L source which has more control over what it
2225             emits than a standard L or L implementation.
2226              
2227             $original->map_source(sub {
2228             my ($item, $src) = @_;
2229             $src->emit('' . reverse $item);
2230             });
2231              
2232             =cut
2233              
2234             sub map_source {
2235 0     0 1 0 my ($self, $code) = @_;
2236              
2237 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2238             $self->_completed->on_ready(sub {
2239 0 0   0   0 return if $src->is_ready;
2240 0         0 shift->on_ready($src->_completed);
2241 0         0 });
2242             $self->each_while_source(sub {
2243 0     0   0 $code->($_, $src) for $_;
2244 0         0 }, $src);
2245             }
2246              
2247             sub DESTROY {
2248 88     88   248067 my ($self) = @_;
2249 88 50       444 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
2250 88         364 $log->tracef("Destruction for %s", $self->describe);
2251 88 100       9006 $self->_completed->cancel unless $self->_completed->is_ready;
2252             }
2253              
2254             sub catch {
2255 0     0 0   my ($self, $code) = @_;
2256 0           my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2257             $self->_completed->on_fail(sub {
2258 0     0     my @failure = @_;
2259 0           my $sub = $code->(@failure);
2260 0 0 0       if(Scalar::Util::blessed $sub && $sub->isa('Ryu::Source')) {
2261             $sub->each_while_source(sub {
2262 0           $src->emit($_)
2263 0           }, $src);
2264             } else {
2265 0           $sub->fail(@failure);
2266             }
2267 0           });
2268             $self->each_while_source(sub {
2269 0     0     $src->emit($_)
2270 0           }, $src);
2271             }
2272              
2273             1;
2274              
2275             __END__