File Coverage

blib/lib/Ryu/Source.pm
Criterion Covered Total %
statement 545 844 64.5
branch 171 306 55.8
condition 83 155 53.5
subroutine 129 215 60.0
pod 70 76 92.1
total 998 1596 62.5


line stmt bran cond sub pod time code
1             package Ryu::Source;
2              
3 37     37   170654 use strict;
  37         78  
  37         897  
4 37     37   145 use warnings;
  37         51  
  37         1248  
5              
6 37     37   13083 use parent qw(Ryu::Node);
  37         9174  
  37         175  
7              
8             our $VERSION = '3.001'; # VERSION
9             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
10              
11             =head1 NAME
12              
13             Ryu::Source - base representation for a source of events
14              
15             =head1 SYNOPSIS
16              
17             my $src = Ryu::Source->new;
18             my $chained = $src->map(sub { $_ * $_ })->prefix('value: ')->say;
19             $src->emit($_) for 1..5;
20             $src->finish;
21              
22             =head1 DESCRIPTION
23              
24             This is probably the module you'd want to start with, if you were going to be
25             using any of this. There's a disclaimer in L that may be relevant at this
26             point.
27              
28             =head2 Quick start
29              
30             You'd normally want to start by creating a L instance:
31              
32             my $src = Ryu::Source->new;
33              
34             If you're dealing with L code, use L to ensure that you
35             get properly awaitable L instances:
36              
37             $loop->add(my $ryu = Ryu::Async->new);
38             my $src = $ryu->source;
39              
40             Once you have a source, you'll need two things:
41              
42             =over 4
43              
44             =item * items to put into one end
45              
46             =item * processing to attach to the other end
47              
48             =back
49              
50             For the first, call L:
51              
52             use Future::AsyncAwait;
53             # 1s drifting periodic timer
54             while(1) {
55             await $loop->delay_future(after => 1);
56             $src->emit('');
57             }
58              
59             For the second, this would be L:
60              
61             $src->each(sub { print "Had timer tick\n" });
62              
63             So far, not so useful - the power of this type of reactive programming is in the
64             ability to chain and combine disparate event sources.
65              
66             At this point, L is worth a visit - this provides a clear
67             visual demonstration of how to combine multiple event streams using the chaining
68             methods. Most of the API here is modelled after similar principles.
69              
70             First, the L method: this provides a way to transform each item into
71             something else:
72              
73             $src->map(do { my $count = 0; sub { ++$count } })
74             ->each(sub { print "Count is now $_\n" })
75              
76             Next, L provides an equivalent to Perl's L functionality:
77              
78             $src->map(do { my $count = 0; sub { ++$count } })
79             ->filter(sub { $_ % 2 })
80             ->each(sub { print "Count is now at an odd number: $_\n" })
81              
82             You can stack these:
83              
84             $src->map(do { my $count = 0; sub { ++$count } })
85             ->filter(sub { $_ % 2 })
86             ->filter(sub { $_ % 5 })
87             ->each(sub { print "Count is now at an odd number which is not divisible by 5: $_\n" })
88              
89             or:
90              
91             $src->map(do { my $count = 0; sub { ++$count } })
92             ->map(sub { $_ % 3 ? 'fizz' : $_ })
93             ->map(sub { $_ % 5 ? 'buzz' : $_ })
94             ->each(sub { print "An imperfect attempt at the fizz-buzz game: $_\n" })
95              
96             =cut
97              
98 37     37   18559 no indirect;
  37         35875  
  37         157  
99 37     37   18714 use sort qw(stable);
  37         19028  
  37         204  
100              
101 37     37   1715 use Scalar::Util ();
  37         69  
  37         562  
102 37     37   15907 use Ref::Util ();
  37         53361  
  37         1055  
103 37     37   234 use List::Util ();
  37         82  
  37         853  
104 37     37   16327 use List::UtilsBy;
  37         62644  
  37         1884  
105 37     37   18657 use Encode ();
  37         481978  
  37         1454  
106 37     37   17696 use Syntax::Keyword::Try;
  37         71946  
  37         207  
107 37     37   2666 use Future;
  37         74  
  37         922  
108 37     37   14409 use Future::Queue;
  37         13778  
  37         1118  
109 37     37   13966 use curry::weak;
  37         31020  
  37         1160  
110              
111 37     37   14941 use Ryu::Buffer;
  37         83  
  37         1400  
112              
113 37     37   16613 use Log::Any qw($log);
  37         275638  
  37         183  
114              
115             =head1 GLOBALS
116              
117             =head2 $FUTURE_FACTORY
118              
119             This is a coderef which should return a new L-compatible instance.
120              
121             Example overrides might include:
122              
123             $Ryu::Source::FUTURE_FACTORY = sub { Mojo::Future->new->set_label($_[1]) };
124              
125             =cut
126              
127             our $FUTURE_FACTORY = sub {
128             Future->new->set_label($_[1])
129             };
130              
131             =head2 %ENCODER
132              
133             An encoder is a coderef which takes input and returns output.
134              
135             =cut
136              
137             our %ENCODER = (
138             utf8 => sub {
139             sub {
140             Encode::encode_utf8($_)
141             }
142             },
143             json => sub {
144             require JSON::MaybeXS;
145             my $json = JSON::MaybeXS->new(@_);
146             sub {
147             $json->encode($_)
148             }
149             },
150             csv => sub {
151             require Text::CSV;
152             my $csv = Text::CSV->new(@_);
153             sub {
154             die $csv->error_input unless $csv->combine(@$_);
155             $csv->string
156             }
157             },
158             base64 => sub {
159             require MIME::Base64;
160             sub {
161             MIME::Base64::encode_base64($_, '');
162             }
163             },
164             );
165             # The naming of this one is a perennial source of confusion in Perl,
166             # let's just support both
167             $ENCODER{'UTF-8'} = $ENCODER{utf8};
168              
169             our %DECODER = (
170             utf8 => sub {
171             my $data = '';
172             sub {
173             $data .= $_;
174             Encode::decode_utf8($data, Encode::FB_QUIET)
175             }
176             },
177             json => sub {
178             require JSON::MaybeXS;
179             my $json = JSON::MaybeXS->new(@_);
180             sub {
181             $json->decode($_)
182             }
183             },
184             csv => sub {
185             require Text::CSV;
186             my $csv = Text::CSV->new(@_);
187             sub {
188             die $csv->error_input unless $csv->parse($_);
189             [ $csv->fields ]
190             }
191             },
192             base64 => sub {
193             require MIME::Base64;
194             sub {
195             MIME::Base64::decode_base64($_);
196             }
197             },
198             );
199             $DECODER{'UTF-8'} = $DECODER{utf8};
200              
201             =head1 METHODS
202              
203             =head2 new
204              
205             Takes named parameters, such as:
206              
207             =over 4
208              
209             =item * label - the label used in descriptions
210              
211             =back
212              
213             Note that this is rarely called directly, see L, L and L instead.
214              
215             =cut
216              
217             sub new {
218 126     126 1 140598 my ($self, %args) = @_;
219 126   100     597 $args{label} //= 'unknown';
220 126         745 $self->SUPER::new(%args);
221             }
222              
223             =head2 from
224              
225             Creates a new source from things.
226              
227             The precise details of what this method supports may be somewhat ill-defined at this point in time.
228             It is expected that the interface and internals of this method will vary greatly in versions to come.
229              
230             At the moment, the following inputs are supported:
231              
232             =over 4
233              
234             =item * arrayref - when called as C<< ->from([1,2,3]) >> this will emit the values from the arrayref,
235             deferring until the source is started
236              
237             =item * L - given a L instance, will emit the results when that L is marked as done
238              
239             =item * file handle - if provided a filehandle, such as C<< ->from(\*STDIN) >>, this will read bytes and
240             emit those until EOF
241              
242             =back
243              
244             =cut
245              
246             sub from {
247 0     0 1 0 my $class = shift;
248 0 0       0 my $src = (ref $class) ? $class : $class->new;
249 0 0       0 if(my $from_class = Scalar::Util::blessed($_[0])) {
    0          
250 0 0       0 if($from_class->isa('Future')) {
251             $_[0]->on_ready(sub {
252 0     0   0 my ($f) = @_;
253 0 0       0 if($f->failure) {
    0          
254 0         0 $src->fail($f->from_future);
255             } elsif(!$f->is_cancelled) {
256 0         0 $src->finish;
257             } else {
258 0         0 $src->emit($f->get);
259 0         0 $src->finish;
260             }
261 0         0 })->retain;
262 0         0 return $src;
263             } else {
264 0         0 die 'Unknown class ' . $from_class . ', cannot turn it into a source';
265             }
266             } elsif(my $ref = ref($_[0])) {
267 0 0       0 if($ref eq 'ARRAY') {
    0          
268             $src->{on_get} = sub {
269 0     0   0 $src->emit($_) for @{$_[0]};
  0         0  
270 0         0 $src->finish;
271 0         0 };
272 0         0 return $src;
273             } elsif($ref eq 'GLOB') {
274 0 0       0 if(my $fh = *{$_[0]}{IO}) {
  0         0  
275             my $code = sub {
276 0     0   0 while(read $fh, my $buf, 4096) {
277 0         0 $src->emit($buf)
278             }
279             $src->finish
280 0         0 };
  0         0  
281 0         0 $src->{on_get} = $code;
282 0         0 return $src;
283             } else {
284 0         0 die "have a GLOB with no IO entry, this is not supported"
285             }
286             }
287 0         0 die "unsupported ref type $ref";
288             } else {
289 0         0 die "unknown item in ->from";
290             }
291             }
292              
293             =head2 empty
294              
295             Creates an empty source, which finishes immediately.
296              
297             =cut
298              
299             sub empty {
300 0     0 1 0 my ($class) = @_;
301              
302 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)->finish
303             }
304              
305             =head2 never
306              
307             An empty source that never finishes.
308              
309             =cut
310              
311             sub never {
312 0     0 1 0 my ($class) = @_;
313              
314 0         0 $class->new(label => (caller 0)[3] =~ /::([^:]+)$/)
315             }
316              
317             =head1 METHODS - Instance
318              
319             =cut
320              
321             =head2 encode
322              
323             Passes each item through an encoder.
324              
325             The first parameter is the encoder to use, the remainder are
326             used as options for the selected encoder.
327              
328             Examples:
329              
330             $src->encode('json')
331             $src->encode('utf8')
332             $src->encode('base64')
333              
334             =cut
335              
336             sub encode {
337 2     2 1 18 my ($self, $type) = splice @_, 0, 2;
338 2         29 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
339 2   50     18 my $code = ($ENCODER{$type} || $self->can('encode_' . $type) or die "unsupported encoding $type")->(@_);
340             $self->each_while_source(sub {
341 2     2   6 $src->emit($code->($_))
342 2         12 }, $src);
343             }
344              
345             =head2 decode
346              
347             Passes each item through a decoder.
348              
349             The first parameter is the decoder to use, the remainder are
350             used as options for the selected decoder.
351              
352             Examples:
353              
354             $src->decode('json')
355             $src->decode('utf8')
356             $src->decode('base64')
357              
358             =cut
359              
360             sub decode {
361 0     0 1 0 my ($self, $type) = splice @_, 0, 2;
362 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
363 0   0     0 my $code = ($DECODER{$type} || $self->can('decode_' . $type) or die "unsupported encoding $type")->(@_);
364             $self->each_while_source(sub {
365 0     0   0 $src->emit($code->($_))
366 0         0 }, $src);
367             }
368              
369             =head2 print
370              
371             Shortcut for C<< ->each(sub { print }) >>, except this will
372             also save the initial state of C< $\ > and use that for each
373             call for consistency.
374              
375             =cut
376              
377             sub print {
378 0     0 1 0 my ($self) = @_;
379 0         0 my $delim = $\;
380 0     0   0 $self->each(sub { local $\ = $delim; print });
  0         0  
  0         0  
381             }
382              
383             =head2 say
384              
385             Shortcut for C<< ->each(sub { print "$_\n" }) >>.
386              
387             =cut
388              
389             sub say {
390 0     0 1 0 my ($self) = @_;
391 0     0   0 $self->each(sub { local $\; print "$_\n" });
  0         0  
  0         0  
392             }
393              
394             =head2 hexdump
395              
396             Convert input bytes to a hexdump representation, for example:
397              
398             00000000 00 00 12 04 00 00 00 00 00 00 03 00 00 00 80 00 >................<
399             00000010 04 00 01 00 00 00 05 00 ff ff ff 00 00 04 08 00 >................<
400             00000020 00 00 00 00 7f ff 00 00 >........<
401              
402             One line is emitted for each 16 bytes.
403              
404             Takes the following named parameters:
405              
406             =over 4
407              
408             =item * C - accumulates data for a continuous stream, and
409             does not reset the offset counter. Note that this may cause the last
410             output to be delayed until the source completes.
411              
412             =back
413              
414             =cut
415              
416             sub hexdump {
417 0     0 1 0 my ($self, %args) = @_;
418              
419 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
420 0         0 my $offset = 0;
421 0         0 my $in = '';
422             $self->each_while_source(sub {
423 0     0   0 my @out;
424 0 0       0 if($args{continuous}) {
425 0         0 $in .= $_;
426 0 0       0 return if length($in) < 16;
427             } else {
428 0         0 $in = $_;
429 0         0 $offset = 0;
430             }
431 0         0 while(length(my $bytes = substr $in, 0, 16, '')) {
432 0         0 my $encoded = join '', unpack 'H*' => $bytes;
433 0         0 $encoded =~ s/[[:xdigit:]]{2}\K(?=[[:xdigit:]])/ /g;
434 0         0 my $ascii = $bytes =~ s{[^[:print:]]}{.}gr;
435 0         0 $src->emit(sprintf '%08x %-47.47s %-18.18s', $offset, $encoded, ">$ascii<");
436 0         0 $offset += length($bytes);
437 0 0 0     0 return if $args{continuous} and length($in) < 16;
438             }
439 0         0 }, $src);
440             }
441              
442             =head2 throw
443              
444             Throws something. I don't know what, maybe a chair.
445              
446             =cut
447              
448             sub throw {
449 0     0 1 0 my $src = shift->new(@_);
450 0         0 $src->fail('...');
451             }
452              
453             =head2 debounce
454              
455             Not yet implemented.
456              
457             Requires timing support, see implementations such as L instead.
458              
459             =cut
460              
461             sub debounce {
462 0     0 1 0 my ($self, $interval) = @_;
463             ...
464 0         0 }
465              
466             =head2 chomp
467              
468             Chomps all items with the given delimiter.
469              
470             Once you've instantiated this, it will stick with the delimiter which was in force at the time of instantiation.
471             Said delimiter follows the usual rules of C<< $/ >>, whatever they happen to be.
472              
473             Example:
474              
475             $ryu->stdin
476             ->chomp("\n")
477             ->say
478              
479             =cut
480              
481             sub chomp {
482 0     0 1 0 my ($self, $delim) = @_;
483 0   0     0 $delim //= $/;
484             $self->map(sub {
485 0     0   0 local $/ = $delim;
486 0         0 chomp(my $line = $_);
487 0         0 $line
488             })
489 0         0 }
490              
491             =head2 map
492              
493             A bit like L.
494              
495             Takes a single parameter - the coderef to execute for each item. This should return
496             a scalar value which will be used as the next item.
497              
498             Often useful in conjunction with a C<< do >> block to provide a closure.
499              
500             Examples:
501              
502             $src->map(do {
503             my $idx = 0;
504             sub {
505             [ @$_, ++$idx ]
506             }
507             })
508              
509             =cut
510              
511             sub map : method {
512 3     3 1 22 my ($self, $code) = @_;
513              
514 3         33 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
515             $self->each_while_source(sub {
516             $src->emit(Scalar::Util::blessed($_)
517             ? (scalar $_->$code)
518             : !ref($code)
519 10 100   10   39 ? $_->{$code}
    100          
520             : scalar $_->$code
521             )
522 3         46 }, $src);
523             }
524              
525             =head2 flat_map
526              
527             Similar to L, but will flatten out some items:
528              
529             =over 4
530              
531             =item * an arrayref will be expanded out to emit the individual elements
532              
533             =item * for a L, passes on any emitted elements
534              
535             =back
536              
537             This also means you can "merge" items from a series of sources.
538              
539             Note that this is not recursive - an arrayref of arrayrefs will be expanded out
540             into the child arrayrefs, but no further.
541              
542             =cut
543              
544             sub flat_map {
545 3     3 1 111 my ($self, $code) = splice @_, 0, 2;
546              
547             # Upgrade ->flat_map(method => args...) to a coderef
548 3 50       11 if(!Ref::Util::is_plain_coderef($code)) {
549 0         0 my $method = $code;
550 0         0 my @args = @_;
551 0     0   0 $code = sub { $_->$method(@args) }
552 0         0 }
553              
554 3         38 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
555              
556 3         10 Scalar::Util::weaken(my $weak_sauce = $src);
557             my $add = sub {
558 6     6   6 my $v = shift;
559 6 50       12 my $src = $weak_sauce or return;
560              
561 6         13 my $k = "$v";
562 6         9 $log->tracef("Adding %s which will bring our count to %d", $k, 0 + keys %{$src->{waiting}});
  6         21  
563             $src->{waiting}{$k} = $v->on_ready(sub {
564 2 50       169 return unless my $src = $weak_sauce;
565 2         4 delete $src->{waiting}{$k};
566 2 100       2 $src->finish unless %{$src->{waiting}};
  2         9  
567             })
568 3         11 };
  6         56  
569              
570 3         7 $add->($self->_completed);
571             $self->each_while_source(sub {
572 6 50   6   23 my $src = $weak_sauce or return;
573 6         13 for ($code->($_)) {
574 6         24 my $item = $_;
575 6 100 33     35 if(Ref::Util::is_plain_arrayref($item)) {
    50          
576 3         10 $log->tracef("Have an arrayref of %d items", 0 + @$item);
577 3         23 for(@$item) {
578 9 50       15 last if $src->is_ready;
579 9         38 $src->emit($_);
580             }
581             } elsif(Scalar::Util::blessed($item) && $item->isa(__PACKAGE__)) {
582 3         11 $log->tracef("This item is a source");
583             $src->on_ready(sub {
584 1 50       83 return if $item->is_ready;
585 1         7 $log->tracef("Marking %s as ready because %s was", $item->describe, $src->describe);
586 1         73 shift->on_ready($item->_completed);
587 3         38 });
588 3         56 $add->($item->_completed);
589             $item->each_while_source(sub {
590 5 50       10 my $src = $weak_sauce or return;
591 5         9 $src->emit($_)
592             }, $src)->on_ready(sub {
593 1         134 undef $item;
594 3         59 });
595             }
596             }
597 3         69 }, $src);
598 3         47 $src
599             }
600              
601              
602             =head2 split
603              
604             Splits the input on the given delimiter.
605              
606             By default, will split into characters.
607              
608             Note that each item will be processed separately - the buffer won't be
609             retained across items, see L for that.
610              
611             =cut
612              
613             sub split : method {
614 0     0 1 0 my ($self, $delim) = @_;
615 0   0     0 $delim //= qr//;
616              
617 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
618 0     0   0 $self->each_while_source(sub { $src->emit($_) for split $delim, $_ }, $src);
  0         0  
619             }
620              
621             =head2 chunksize
622              
623             Splits input into fixed-size chunks.
624              
625             Note that output is always guaranteed to be a full chunk - if there is partial input
626             at the time the input stream finishes, those extra bytes will be discarded.
627              
628             =cut
629              
630             sub chunksize : method {
631 0     0 1 0 my ($self, $size) = @_;
632 0 0 0     0 die 'need positive chunk size parameter' unless $size && $size > 0;
633              
634 0         0 my $buffer = '';
635 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
636             $self->each_while_source(sub {
637 0     0   0 $buffer .= $_;
638 0         0 $src->emit(substr $buffer, 0, $size, '') while length($buffer) >= $size;
639 0         0 }, $src);
640             }
641              
642             =head2 batch
643              
644             Splits input into arrayref batches of a given size.
645              
646             Note that the last item emitted may have fewer elements (or none at all).
647              
648             $src->batch(10)
649             ->map(sub { "Next 10 (or fewer) items: @$_" })
650             ->say;
651              
652             =cut
653              
654             sub batch : method {
655 1     1 1 5 my ($self, $size) = @_;
656 1 50 33     6 die 'need positive batch parameter' unless $size && $size > 0;
657              
658 1         3 my $buffer = '';
659 1         15 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
660 1         2 my @batch;
661             $self->each_while_source(sub {
662 4     4   7 push @batch, $_;
663 4   66     19 while(@batch >= $size and my (@items) = splice @batch, 0, $size) {
664 1         4 $src->emit(\@items)
665             }
666             }, $src, cleanup => sub {
667 1 50   1   3 $src->emit([ splice @batch ]) if @batch;
668 1         7 });
669             }
670              
671             =head2 by_line
672              
673             Emits one item for each line in the input. Similar to L with a C<< \n >> parameter,
674             except this will accumulate the buffer over successive items and only emit when a complete
675             line has been extracted.
676              
677             =cut
678              
679             sub by_line : method {
680 0     0 1 0 my ($self, $delim) = @_;
681 0   0     0 $delim //= $/;
682              
683 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
684 0         0 my $buffer = '';
685             $self->each_while_source(sub {
686 0     0   0 $buffer .= $_;
687 0         0 while($buffer =~ s/^(.*)\Q$delim//) {
688 0         0 $src->emit($1)
689             }
690 0         0 }, $src);
691             }
692              
693             =head2 prefix
694              
695             Applies a string prefix to each item.
696              
697             =cut
698              
699             sub prefix {
700 1     1 1 6 my ($self, $txt) = @_;
701 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
702             $self->each_while_source(sub {
703 3     3   8 $src->emit($txt . $_)
704 1         6 }, $src);
705             }
706              
707             =head2 suffix
708              
709             Applies a string suffix to each item.
710              
711             =cut
712              
713             sub suffix {
714 1     1 1 7 my ($self, $txt) = @_;
715 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
716             $self->each_while_source(sub {
717 3     3   10 $src->emit($_ . $txt)
718 1         6 }, $src);
719             }
720              
721             =head2 sprintf_methods
722              
723             Convenience method for generating a string from a L-style format
724             string and a set of method names to call.
725              
726             Note that any C items will be mapped to an empty string.
727              
728             Example:
729              
730             $src->sprintf_methods('%d has name %s', qw(id name))
731             ->say
732             ->await;
733              
734             =cut
735              
736             sub sprintf_methods {
737 0     0 1 0 my ($self, $fmt, @methods) = @_;
738 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
739             $self->each_while_source(sub {
740 0     0   0 my ($item) = @_;
741 0   0     0 $src->emit(sprintf $fmt, map $item->$_ // '', @methods)
742 0         0 }, $src);
743             }
744              
745             =head2 ignore
746              
747             Receives items, but ignores them entirely.
748              
749             Emits nothing and eventually completes when the upstream L is done.
750              
751             Might be useful for keeping a source alive.
752              
753             =cut
754              
755             sub ignore {
756 0     0 1 0 my ($self) = @_;
757 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
758             $self->_completed->on_ready(sub {
759 0 0   0   0 shift->on_ready($src->_completed) unless $src->_completed->is_ready
760 0         0 });
761 0         0 return $src;
762             }
763              
764             =head2 buffer
765              
766             Accumulate items while any downstream sources are paused.
767              
768             Takes the following named parameters:
769              
770             =over 4
771              
772             =item * C - once at least this many items are buffered, will L
773             the upstream L.
774              
775             =item * C - if the buffered count drops to this number, will L
776             the upstream L.
777              
778             =back
779              
780             =cut
781              
782             sub buffer {
783 3     3 1 1602 my $self = shift;
784 3         7 my %args;
785 3 100       20 %args = @_ != 1
786             ? @_
787             : (
788             low => $_[0],
789             high => $_[0],
790             );
791 3   33     13 $args{low} //= $args{high};
792 3   50     11 $args{low} //= 10;
793 3   33     9 $args{high} //= $args{low};
794              
795 3         46 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
796 3         8 $src->{pause_propagation} = 0;
797 3         6 my @pending;
798             $self->_completed->on_ready(sub {
799 1 50 33 1   94 shift->on_ready($src->_completed) unless $src->_completed->is_ready or @pending;
800 3         8 });
801 3         50 my $item_handler = do {
802 3         10 Scalar::Util::weaken(my $weak_self = $self);
803 3         9 Scalar::Util::weaken(my $weak_src = $src);
804             sub {
805 17     17   29 my $self = $weak_self;
806 17 50       51 my $src = $weak_src or return;
807 17 100 66     92 if(@pending >= $args{high} and $self and not $self->is_paused($src)) {
      100        
808 2         9 $self->pause($src);
809             }
810             $src->emit(shift @pending)
811             while @pending
812             and not($src->is_paused)
813 17   100     79 and @{$self->{children}};
  11   66     44  
814 17 50       63 if($self) {
815 17 100 100     64 $self->resume($src) if @pending < $args{low} and $self->is_paused($src);
816              
817             # It's common to have a situation where the parent chain completes while we're
818             # paused waiting for the queue to drain. In this situation, we want to propagate
819             # completion only once the queue is empty.
820 17 0 33     35 $self->_completed->on_ready($src->_completed)
      33        
821             if $self->_completed->is_ready and not @pending and not $src->_completed->is_ready;
822             }
823             }
824 3         15 };
825 3         29 $src->flow_control
826             ->each($item_handler)->retain;
827             $self->each(my $code = sub {
828 11     11   24 push @pending, $_;
829 11         27 $item_handler->()
830 3         15 });
831             $self->_completed->on_ready(sub {
832 1     1   49 my ($f) = @_;
833 1 50       4 return if @pending;
834 1         3 my $addr = Scalar::Util::refaddr($code);
835 1         4 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  0         0  
  1         6  
836 1 50       14 $f->on_ready($src->_completed) unless $src->is_ready;
837 1         8 $log->tracef("->each_while_source completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
838 3         10 });
839 3         51 $src;
840             }
841              
842             sub retain {
843 3     3 0 7 my ($self) = @_;
844 3         8 $self->{_self} = $self;
845             $self->_completed
846 3     0   14 ->on_ready(sub { delete $self->{_self} });
  0         0  
847 3         48 $self
848             }
849              
850             =head2 as_list
851              
852             Resolves to a list consisting of all items emitted by this source.
853              
854             =cut
855              
856             sub as_list {
857 3     3 1 6 my ($self) = @_;
858 3         4 my @data;
859             $self->each(sub {
860 8     8   17 push @data, $_
861 3         10 });
862 3     3   289 $self->_completed->transform(done => sub { @data })
863 3         7 }
864              
865             =head2 as_arrayref
866              
867             Resolves to a single arrayref consisting of all items emitted by this source.
868              
869             =cut
870              
871             sub as_arrayref {
872 2     2 1 3 my ($self) = @_;
873 2         4 my @data;
874             $self->each(sub {
875 6     6   17 push @data, $_
876 2         21 });
877 2     2   243 $self->_completed->transform(done => sub { \@data })
878 2         6 }
879              
880             =head2 as_string
881              
882             Concatenates all items into a single string.
883              
884             Returns a L which will resolve on completion.
885              
886             =cut
887              
888             sub as_string {
889 0     0 1 0 my ($self) = @_;
890 0         0 my $data = '';
891             $self->each(sub {
892 0     0   0 $data .= $_;
893 0         0 });
894 0     0   0 $self->_completed->transform(done => sub { $data })
895 0         0 }
896              
897             =head2 as_queue
898              
899             Returns a L instance which will
900             L items whenever the source
901             emits them.
902              
903             Unfortunately there is currently no way to tell
904             when the queue will end, so you'd need to track
905             that separately.
906              
907             =cut
908              
909             sub as_queue {
910 0     0 1 0 my ($self) = @_;
911 0         0 my $queue = Future::Queue->new;
912             $self->each(sub {
913 0     0   0 $queue->push($_)
914 0         0 });
915 0         0 return $queue;
916             }
917              
918             =head2 as_buffer
919              
920             Returns a L instance, which will
921             L any emitted items from this
922             source to the buffer as they arrive.
923              
924             Intended for stream protocol handling - individual
925             sized packets are perhaps better suited to the
926             L per-item behaviour.
927              
928             Supports the following named parameters:
929              
930             =over 4
931              
932             =item * C - low waterlevel for buffer, start accepting more bytes
933             once the L has less content than this
934              
935             =item * C - high waterlevel for buffer, will pause the parent stream
936             if this is reached
937              
938             =back
939              
940             The backpressure (low/high) values default to undefined, meaning
941             no backpressure is applied: the buffer will continue to fill
942             indefinitely.
943              
944             =cut
945              
946             sub as_buffer {
947 1     1 1 534 my ($self, %args) = @_;
948 1         3 my $low = delete $args{low};
949 1         2 my $high = delete $args{high};
950             # We're creating a source but keeping it to ourselves here
951 1         14 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
952              
953             my $buffer = Ryu::Buffer->new(
954             new_future => $self->{new_future},
955             %args,
956             on_change => sub {
957 2     2   3 my ($self) = @_;
958 2 100 66     8 $src->resume if $low and $self->size <= $low;
959             }
960 1         12 );
961              
962 1         4 Scalar::Util::weaken(my $weak_sauce = $src);
963 1         3 Scalar::Util::weaken(my $weak_buffer = $buffer);
964             $self->each_while_source(sub {
965 6 50   6   12 my $src = $weak_sauce or return;
966 6 100       14 my $buf = $weak_buffer or do {
967 1         11 $src->finish;
968 1         31 return;
969             };
970 5         13 $buf->write($_);
971 5 100 66     24 $src->pause if $high and $buf->size >= $high;
972 5 100 66     14 $src->resume if $low and $buf->size <= $low;
973 1         8 }, $src);
974 1         3 return $buffer;
975             }
976              
977             =head2 combine_latest
978              
979             Takes the most recent item from one or more Ls, and emits
980             an arrayref containing the values in order.
981              
982             An item is emitted for each update as soon as all sources have provided
983             at least one value. For example, given 2 sources, if the first emits C<1>
984             then C<2>, then the second emits C, this would emit a single C<< [2, 'a'] >>
985             item.
986              
987             =cut
988              
989             sub combine_latest : method {
990 1     1 1 10 my ($self, @sources) = @_;
991 1 50   0   6 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
992 1         2 my $code = pop @sources;
993              
994 1         13 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
995 1 50       3 unshift @sources, $self if ref $self;
996 1         2 my @value;
997             my %seen;
998 1         3 for my $idx (0..$#sources) {
999 2         4 my $src = $sources[$idx];
1000             $src->each_while_source(sub {
1001 5     5   9 $value[$idx] = $_;
1002 5   100     14 $seen{$idx} ||= 1;
1003 5 100       15 $combined->emit([ $code->(@value) ]) if @sources == keys %seen;
1004 2         10 }, $combined);
1005             }
1006             Future->needs_any(
1007             map $_->completed, @sources
1008             )->on_ready(sub {
1009 0     0   0 @value = ();
1010 0 0       0 return if $combined->_completed->is_ready;
1011 0         0 shift->on_ready($combined->_completed)
1012 1         6 })->retain;
1013 1         254 $combined
1014             }
1015              
1016             =head2 with_index
1017              
1018             Emits arrayrefs consisting of C<< [ $item, $idx ] >>.
1019              
1020             =cut
1021              
1022             sub with_index {
1023 1     1 1 9 my ($self) = @_;
1024 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1025 1         2 my $idx = 0;
1026             $self->each_while_source(sub {
1027 3     3   10 $src->emit([ $_, $idx++ ])
1028 1         7 }, $src);
1029             }
1030              
1031             =head2 with_latest_from
1032              
1033             Similar to L, but will start emitting as soon as
1034             we have any values. The arrayref will contain C<< undef >> for any
1035             sources which have not yet emitted any items.
1036              
1037             =cut
1038              
1039             sub with_latest_from : method {
1040 0     0 1 0 my ($self, @sources) = @_;
1041 0 0   0   0 push @sources, sub { @_ } if Scalar::Util::blessed $sources[-1];
  0         0  
1042 0         0 my $code = pop @sources;
1043              
1044 0         0 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1045 0         0 my @value;
1046             my %seen;
1047 0         0 for my $idx (0..$#sources) {
1048 0         0 my $src = $sources[$idx];
1049             $src->each(sub {
1050 0 0   0   0 return if $combined->_completed->is_ready;
1051 0         0 $value[$idx] = $_;
1052 0   0     0 $seen{$idx} ||= 1;
1053 0         0 });
1054             }
1055             $self->each(sub {
1056 0 0   0   0 $combined->emit([ $code->(@value) ]) if keys %seen;
1057 0         0 });
1058 0         0 $self->_completed->on_ready($combined->_completed);
1059             $self->_completed->on_ready(sub {
1060 0     0   0 @value = ();
1061 0 0       0 return if $combined->is_ready;
1062 0         0 shift->on_ready($combined->_completed);
1063 0         0 });
1064 0         0 $combined
1065             }
1066              
1067             =head2 merge
1068              
1069             Emits items as they are generated by the given sources.
1070              
1071             Example:
1072              
1073             $numbers->merge($letters)->say # 1, 'a', 2, 'b', 3, 'c'...
1074              
1075             =cut
1076              
1077             sub merge : method {
1078 4     4 1 46 my ($self, @sources) = @_;
1079              
1080 4         54 my $combined = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1081 4 50       26 unshift @sources, $self if ref $self;
1082 4         9 for my $src (@sources) {
1083             $src->each(sub {
1084 16 50   16   31 return if $combined->_completed->is_ready;
1085 16         83 $combined->emit($_)
1086 5         24 });
1087             }
1088             Future->needs_all(
1089             map $_->completed, @sources
1090             )->on_ready($combined->_completed)
1091 2     2   193 ->on_ready(sub { @sources = () })
1092 4         23 ->retain;
1093 4         201 $combined
1094             }
1095              
1096             =head2 emit_from
1097              
1098             Emits items as they are generated by the given sources.
1099              
1100             Example:
1101              
1102             my $src = Ryu::Source->new;
1103             $src->say;
1104             $src->emit_from(
1105             $numbers,
1106             $letters
1107             );
1108              
1109             =cut
1110              
1111             sub emit_from : method {
1112 1     1 1 7 my ($self, @sources) = @_;
1113              
1114 1         4 for my $src (@sources) {
1115             $src->each_while_source(sub {
1116 5 50   5   7 return if $self->_completed->is_ready;
1117 5         24 $self->emit($_)
1118 2         11 }, $self);
1119             }
1120             $self
1121 1         2 }
1122              
1123             =head2 apply
1124              
1125             Used for setting up multiple streams.
1126              
1127             Accepts a variable number of coderefs, will call each one and gather L
1128             results.
1129              
1130             =cut
1131              
1132             sub apply : method {
1133 0     0 1 0 my ($self, @code) = @_;
1134              
1135 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1136 0         0 my @pending;
1137 0         0 for my $code (@code) {
1138 0         0 push @pending, map $code->($_), $self;
1139             }
1140             Future->needs_all(
1141 0         0 map $_->completed, @pending
1142             )->on_ready($src->_completed)
1143             ->retain;
1144             # Pass through the original events
1145             $self->each_while_source(sub {
1146 0     0   0 $src->emit($_)
1147 0         0 }, $src)
1148             }
1149              
1150             =head2 switch_str
1151              
1152             Given a condition, will select one of the alternatives based on stringified result.
1153              
1154             Example:
1155              
1156             $src->switch_str(
1157             sub { $_->name }, # our condition
1158             smith => sub { $_->id }, # if this matches the condition, the code will be called with $_ set to the current item
1159             jones => sub { $_->parent->id },
1160             sub { undef } # and this is our default case
1161             );
1162              
1163             =cut
1164              
1165             sub switch_str {
1166 1     1 1 19 my ($self, $condition, @args) = @_;
1167              
1168 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1169 1         2 my @active;
1170             $self->_completed->on_ready(sub {
1171             Future->needs_all(
1172             grep $_, @active
1173             )->on_ready(sub {
1174 0         0 $src->finish
1175 0     0   0 })->retain
1176 1         3 });
1177              
1178             $self->each_while_source(sub {
1179 3     3   4 my ($item) = $_;
1180 3         8 my $rslt = $condition->($item);
1181             (Scalar::Util::blessed($rslt) && $rslt->isa('Future') ? $rslt : Future->done($rslt))->on_done(sub {
1182 3         93 my ($data) = @_;
1183 3         8 my @copy = @args;
1184 3         10 while(my ($k, $v) = splice @copy, 0, 2) {
1185 6 100       16 if(!defined $v) {
    100          
1186             # Only a single value (or undef)? That's our default, just use it as-is
1187 1         3 return $src->emit(map $k->($_), $item)
1188             } elsif($k eq $data) {
1189             # Key matches our result? Call code with the original item
1190 2         6 return $src->emit(map $v->($_), $item)
1191             }
1192             }
1193 3 50 33     17 })->retain
1194 1         24 }, $src)
1195             }
1196              
1197             =head2 ordered_futures
1198              
1199             Given a stream of Ls, will emit the results as each L
1200             is marked ready.
1201              
1202             If any L in the stream fails, that will mark this source as failed,
1203             and all remaining L instances will be cancelled. To avoid this behaviour
1204             and leave the L instances active, use:
1205              
1206             $src->map('without_cancel')
1207             ->ordered_futures
1208              
1209             See L for more details.
1210              
1211             Takes the following named parameters:
1212              
1213             =over 4
1214              
1215             =item * C - once at least this many unresolved L instances are pending,
1216             will L the upstream L.
1217              
1218             =item * C - if the pending count drops to this number, will L
1219             the upstream L.
1220              
1221             =back
1222              
1223             This method is also available as L.
1224              
1225             =cut
1226              
1227             sub ordered_futures {
1228 5     5 1 43 my ($self, %args) = @_;
1229 5         14 my $low = delete $args{low};
1230 5         12 my $high = delete $args{high};
1231 5         75 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1232 5         14 my %pending;
1233 5         12 my $src_completed = $src->_completed;
1234              
1235 5         8 my $all_finished;
1236             $self->_completed->on_ready(sub {
1237 5     5   446 $all_finished = shift;
1238 5 100 66     21 $all_finished->on_ready($src_completed) unless %pending or $src_completed->is_ready;
1239 5         12 });
1240              
1241             $src_completed->on_ready(sub {
1242 4     4   383 my @pending = values %pending;
1243 4         11 %pending = ();
1244 4         12 for(@pending) {
1245 3 100 66     35 $_->cancel if $_ and not $_->is_ready;
1246             }
1247 5         111 });
1248             $self->each(sub {
1249 13     13   18 my $f = $_;
1250 13         28 my $k = Scalar::Util::refaddr $f;
1251             # This will keep a copy of the Future around until the
1252             # ->is_ready callback removes it
1253 13         44 $pending{$k} = $f;
1254 13         48 $log->tracef('Ordered futures has %d pending', 0 + keys %pending);
1255 13 100 100     130 $src->pause if $high and keys(%pending) >= $high and not $src->is_paused;
      66        
1256             $_->on_done(sub {
1257 9         1788 my @pending = @_;
1258 9   66     40 while(@pending and not $src_completed->is_ready) {
1259 4         30 $src->emit(shift @pending);
1260             }
1261             })
1262 1 50       689 ->on_fail(sub { $src->fail(@_) unless $src_completed->is_ready; })
1263             ->on_ready(sub {
1264 12         308 delete $pending{$k};
1265 12 100 100     40 $src->resume if $low and keys(%pending) <= $low and $src->is_paused;
      100        
1266 12         44 $log->tracef('Ordered futures now has %d pending after completion, upstream finish status is %d', 0 + keys(%pending), $all_finished);
1267 12 100       97 return if %pending;
1268 6 100 100     25 $all_finished->on_ready($src_completed) if $all_finished and not $src_completed->is_ready;
1269             })
1270 5         132 });
  13         69  
1271 5         18 return $src;
1272             }
1273              
1274             =head2 resolve
1275              
1276             A synonym for L.
1277              
1278             =cut
1279              
1280             *resolve = *ordered_futures;
1281              
1282             =head2 concurrent
1283              
1284             =cut
1285              
1286             sub concurrent {
1287 0     0 1 0 my ($self) = @_;
1288 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1289             $self->each_while_source(sub {
1290 0     0   0 $_->on_done($src->curry::weak::emit)
1291             ->on_fail($src->curry::weak::fail)
1292             ->retain
1293 0         0 }, $src);
1294             }
1295              
1296             =head2 distinct
1297              
1298             Emits new distinct items, using string equality with an exception for
1299             C (i.e. C is treated differently from empty string or 0).
1300              
1301             Given 1,2,3,undef,2,3,undef,'2',2,4,1,5, you'd expect to get the sequence 1,2,3,undef,4,5.
1302              
1303             =cut
1304              
1305             sub distinct {
1306 1     1 1 7 my $self = shift;
1307              
1308 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1309 1         2 my %seen;
1310             my $undef;
1311             $self->each_while_source(sub {
1312 22 100   22   27 if(defined) {
1313 17 100       44 $src->emit($_) unless $seen{$_}++;
1314             } else {
1315 5 100       12 $src->emit($_) unless $undef++;
1316             }
1317 1         6 }, $src);
1318             }
1319              
1320             =head2 distinct_until_changed
1321              
1322             Removes contiguous duplicates, defined by string equality.
1323              
1324             =cut
1325              
1326             sub distinct_until_changed {
1327 1     1 1 8 my $self = shift;
1328              
1329 1         19 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1330 1         3 my $active;
1331             my $prev;
1332             $self->each_while_source(sub {
1333 18 100   18   23 if($active) {
1334 17 100       29 if(defined($prev) ^ defined($_)) {
    100          
1335 10         11 $src->emit($_)
1336             } elsif(defined($_)) {
1337 5 100       12 $src->emit($_) if $prev ne $_;
1338             }
1339             } else {
1340 1         2 $active = 1;
1341 1         5 $src->emit($_);
1342             }
1343 18         25 $prev = $_;
1344 1         9 }, $src);
1345 1         6 $src
1346             }
1347              
1348             =head2 sort_by
1349              
1350             Emits items sorted by the given key. This is a stable sort function.
1351              
1352             The algorithm is taken from L.
1353              
1354             =cut
1355              
1356             sub sort_by {
1357 37     37   430937 use sort qw(stable);
  37         92  
  37         329  
1358 0     0 1 0 my ($self, $code) = @_;
1359 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1360 0         0 my @items;
1361             my @keys;
1362       0     $self->_completed->on_done(sub {
1363             })->on_ready(sub {
1364 0 0   0   0 return if $src->is_ready;
1365 0         0 shift->on_ready($src->_completed);
1366 0         0 });
1367             $self->each_while_source(sub {
1368 0     0   0 push @items, $_;
1369 0         0 push @keys, $_->$code;
1370             }, $src, cleanup => sub {
1371 0     0   0 my ($f) = @_;
1372 0 0       0 return unless $f->is_done;
1373 0         0 $src->emit($_) for @items[sort { $keys[$a] cmp $keys[$b] } 0 .. $#items];
  0         0  
1374 0         0 });
1375             }
1376              
1377             =head2 nsort_by
1378              
1379             Emits items numerically sorted by the given key. This is a stable sort function.
1380              
1381             See L.
1382              
1383             =cut
1384              
1385             sub nsort_by {
1386 37     37   14312 use sort qw(stable);
  37         84  
  37         152  
1387 0     0 1 0 my ($self, $code) = @_;
1388 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1389 0         0 my @items;
1390             my @keys;
1391             $self->each_while_source(sub {
1392 0     0   0 push @items, $_;
1393 0         0 push @keys, $_->$code;
1394             }, $src, cleanup => sub {
1395 0 0   0   0 return unless shift->is_done;
1396 0         0 $src->emit($_) for @items[sort { $keys[$a] <=> $keys[$b] } 0 .. $#items];
  0         0  
1397 0         0 });
1398             }
1399              
1400             =head2 rev_sort_by
1401              
1402             Emits items sorted by the given key. This is a stable sort function.
1403              
1404             The algorithm is taken from L.
1405              
1406             =cut
1407              
1408             sub rev_sort_by {
1409 37     37   10601 use sort qw(stable);
  37         72  
  37         164  
1410 0     0 1 0 my ($self, $code) = @_;
1411 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1412 0         0 my @items;
1413             my @keys;
1414             $self->each_while_source(sub {
1415 0     0   0 push @items, $_;
1416 0         0 push @keys, $_->$code;
1417             }, $src, cleanup => sub {
1418 0 0   0   0 return unless shift->is_done;
1419 0         0 $src->emit($_) for @items[sort { $keys[$b] cmp $keys[$a] } 0 .. $#items];
  0         0  
1420 0         0 });
1421             }
1422              
1423             =head2 rev_nsort_by
1424              
1425             Emits items numerically sorted by the given key. This is a stable sort function.
1426              
1427             See L.
1428              
1429             =cut
1430              
1431             sub rev_nsort_by {
1432 0     0 1 0 my ($self, $code) = @_;
1433 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1434 0         0 my @items;
1435             my @keys;
1436             $self->each_while_source(sub {
1437 0     0   0 push @items, $_;
1438 0         0 push @keys, $_->$code;
1439             }, $src, cleanup => sub {
1440 0 0   0   0 return unless shift->is_done;
1441 0         0 $src->emit($_) for @items[sort { $keys[$b] <=> $keys[$a] } 0 .. $#items];
  0         0  
1442 0         0 });
1443             }
1444              
1445             =head2 extract_all
1446              
1447             Expects a regular expression and emits hashrefs containing
1448             the named capture buffers.
1449              
1450             The regular expression will be applied using the m//gc operator.
1451              
1452             Example:
1453              
1454             $src->extract_all(qr{/(?[^/]+)})
1455             # emits { component => '...' }, { component => '...' }
1456              
1457             =cut
1458              
1459             sub extract_all {
1460 1     1 1 10 my ($self, $pattern) = @_;
1461 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1462             $self->each_while_source(sub {
1463 37     37   44860 $src->emit(+{ %+ }) while m/$pattern/gc;
  37     3   14539  
  37         216600  
  3         45  
1464 1         7 }, $src);
1465             }
1466              
1467             =head2 skip
1468              
1469             Skips the first N items.
1470              
1471             =cut
1472              
1473             sub skip {
1474 1     1 1 8 my ($self, $count) = @_;
1475 1   50     2 $count //= 0;
1476              
1477 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1478             $self->_completed->on_ready(sub {
1479 1 50   1   88 return if $src->is_ready;
1480 1         7 shift->on_ready($src->_completed);
1481 1         4 });
1482             $self->each(sub {
1483 5 100   5   15 $src->emit($_) unless $count-- > 0;
1484 1         21 });
1485 1         5 $src
1486             }
1487              
1488             =head2 skip_last
1489              
1490             Skips the last N items.
1491              
1492             =cut
1493              
1494             sub skip_last {
1495 1     1 1 9 my ($self, $count) = @_;
1496 1   50     3 $count //= 0;
1497              
1498 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1499             $self->_completed->on_ready(sub {
1500 1 50   1   88 return if $src->is_ready;
1501 1         8 shift->on_ready($src->_completed);
1502 1         5 });
1503 1         19 my @pending;
1504             $self->each(sub {
1505 5     5   7 push @pending, $_;
1506 5 100       17 $src->emit(shift @pending) if @pending > $count;
1507 1         7 });
1508 1         5 $src
1509             }
1510              
1511             =head2 skip_until
1512              
1513             Skips the items that arrive before a given condition is reached.
1514              
1515             =over 4
1516              
1517             =item * Either a L instance (we skip all items until it's marked as `done`), or a coderef,
1518             which we call for each item until it first returns true
1519              
1520             =back
1521              
1522             =cut
1523              
1524             sub skip_until {
1525 2     2 1 32 my ($self, $condition) = @_;
1526              
1527 2         27 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1528 2         5 $self->each_while_source(do {
1529 2 100 33     19 if(ref($condition) eq 'CODE') {
    50          
1530 1         2 my $reached = 0;
1531 5 100 100 5   18 sub { return $src->emit($_) if $reached ||= $condition->($_); }
1532 1         8 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1533             $condition->on_ready($src->$curry::weak(sub {
1534 1     1   57 my ($src, $cond) = @_;
1535 1 50       5 return if $src->is_ready;
1536 1 50       19 $src->fail($cond->failure) if $cond->is_failed;
1537 1 50       13 $src->cancel if $cond->is_cancelled
1538 1         10 }));
1539 4 100   4   9 sub { $src->emit($_) if $condition->is_done; }
1540 1         34 } else {
1541 0         0 die 'unknown type for condition: ' . $condition;
1542             }
1543             }, $src);
1544             }
1545              
1546             =head2 take_until
1547              
1548             Passes through items that arrive until a given condition is reached.
1549              
1550             Expects a single parameter, which can be one of the following:
1551              
1552             =over 4
1553              
1554             =item * a L instance - we will skip all items until it's marked as C
1555              
1556             =item * a coderef, which we call for each item until it first returns true
1557              
1558             =item * or a L, in which case we stop when that first emits a value
1559              
1560             =back
1561              
1562             =cut
1563              
1564             sub take_until {
1565 0     0 1 0 my ($self, $condition) = @_;
1566              
1567 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1568 0 0 0     0 if(Scalar::Util::blessed($condition) && $condition->isa('Ryu::Source')) {
1569             $condition->_completed->on_ready(sub {
1570 0     0   0 $log->warnf('Condition completed: %s and %s', $condition->describe, $src->describe);
1571 0 0       0 return if $src->is_ready;
1572 0         0 $log->warnf('Mark as ready');
1573 0         0 shift->on_ready($src->_completed);
1574 0         0 });
1575             $condition->first->each(sub {
1576 0 0   0   0 $src->finish unless $src->is_ready
1577 0         0 });
1578 0         0 return $self->each_while_source($src->curry::emit, $src);
1579             } else {
1580 0         0 return $self->each_while_source(do {
1581 0 0 0     0 if(ref($condition) eq 'CODE') {
    0          
1582 0         0 my $reached = 0;
1583 0 0 0 0   0 sub { return $src->emit($_) unless $reached ||= $condition->($_); }
1584 0         0 } elsif(Scalar::Util::blessed($condition) && $condition->isa('Future')) {
1585             $condition->on_ready($src->$curry::weak(sub {
1586 0     0   0 my ($src, $cond) = @_;
1587 0 0       0 return if $src->is_ready;
1588 0 0       0 $src->fail($cond->failure) if $cond->is_failed;
1589 0 0       0 $src->cancel if $cond->is_cancelled
1590 0         0 }));
1591 0 0   0   0 sub { $src->emit($_) unless $condition->is_done; }
1592 0         0 } else {
1593 0         0 die 'unknown type for condition: ' . $condition;
1594             }
1595             }, $src);
1596             }
1597             }
1598              
1599             =head2 take
1600              
1601             Takes a limited number of items.
1602              
1603             Given a sequence of C< 1,2,3,4,5 > and C<< ->take(3) >>, you'd get 1,2,3 and then the stream
1604             would finish.
1605              
1606             =cut
1607              
1608             sub take {
1609 2     2 1 10 my ($self, $count) = @_;
1610 2   50     7 $count //= 0;
1611 2 50       8 return $self->empty unless $count > 0;
1612              
1613 2         22 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1614             $self->each_while_source(sub {
1615 5     5   18 $log->tracef("Still alive with %d remaining", $count);
1616 5         36 $src->emit($_);
1617 5 100       12 return if --$count;
1618 2         7 $log->tracef("Count is zero, finishing");
1619 2         12 $src->finish
1620 2         11 }, $src);
1621             }
1622              
1623             =head2 first
1624              
1625             Returns a source which provides the first item from the stream.
1626              
1627             =cut
1628              
1629             sub first {
1630 0     0 1 0 my ($self) = @_;
1631              
1632 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1633             $self->each_while_source(sub {
1634 0     0   0 $src->emit($_);
1635 0         0 $src->finish
1636 0         0 }, $src);
1637             }
1638              
1639             =head2 some
1640              
1641             Applies the given code to each item, and emits a single item:
1642              
1643             =over 4
1644              
1645             =item * 0 if the code never returned true or no items were received
1646              
1647             =item * 1 if the code ever returned a true value
1648              
1649             =back
1650              
1651             =cut
1652              
1653             sub some {
1654 1     1 1 11 my ($self, $code) = @_;
1655              
1656 1         16 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1657             $self->_completed->on_ready(sub {
1658 1     1   95 my $sf = $src->_completed;
1659 1 50       3 return if $sf->is_ready;
1660 0         0 my $f = shift;
1661 0 0       0 return $f->on_ready($sf) unless $f->is_done;
1662 0         0 $src->emit(0);
1663 0         0 $sf->done;
1664 1         3 });
1665             $self->each(sub {
1666 4 50   4   7 return if $src->_completed->is_ready;
1667 4 100       15 return unless $code->($_);
1668 1         8 $src->emit(1);
1669 1         2 $src->_completed->done
1670 1         23 });
1671 1         5 $src
1672             }
1673              
1674             =head2 every
1675              
1676             Similar to L, except this requires the coderef to return true for
1677             all values in order to emit a C<1> value.
1678              
1679             =cut
1680              
1681             sub every {
1682 1     1 1 12 my ($self, $code) = @_;
1683              
1684 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1685             $self->_completed->on_done(sub {
1686 1 50   1   87 return if $src->_completed->is_ready;
1687 1         16 $src->emit(1);
1688 1         3 $src->_completed->done
1689 1         5 });
1690             $self->each(sub {
1691 5 50   5   8 return if $src->_completed->is_ready;
1692 5 50       20 return if $code->($_);
1693 0         0 $src->emit(0);
1694 0         0 $src->_completed->done
1695 1         27 });
1696 1         6 $src
1697             }
1698              
1699             =head2 count
1700              
1701             Emits the count of items seen once the parent source completes.
1702              
1703             =cut
1704              
1705             sub count {
1706 2     2 1 16 my ($self) = @_;
1707              
1708 2         4 my $count = 0;
1709              
1710 2         41 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1711 7     7   18 $self->each_while_source(sub { ++$count }, $src, cleanup => sub {
1712 2 50   2   11 return unless shift->is_done;
1713 2         21 $src->emit($count)
1714 2         18 });
1715             }
1716              
1717             =head2 sum
1718              
1719             Emits the numeric sum of items seen once the parent completes.
1720              
1721             =cut
1722              
1723             sub sum {
1724 1     1 1 7 my ($self) = @_;
1725              
1726 1         2 my $sum = 0;
1727              
1728 1         12 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1729             $self->each_while_source(sub {
1730 5     5   8 $sum += $_
1731             }, $src, cleanup => sub {
1732 1 50   1   5 return unless shift->is_done;
1733 1         9 $src->emit($sum)
1734 1         7 });
1735             }
1736              
1737             =head2 mean
1738              
1739             Emits the mean (average) numerical value of all seen items.
1740              
1741             =cut
1742              
1743             sub mean {
1744 1     1 1 7 my ($self) = @_;
1745              
1746 1         3 my $sum = 0;
1747 1         1 my $count = 0;
1748              
1749 1         13 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1750 1     4   26 $self->each(sub { ++$count; $sum += $_ });
  4         4  
  4         7  
1751 1   50 1   138 $self->_completed->on_done(sub { $src->emit($sum / ($count || 1)) })
1752 1         3 ->on_ready($src->_completed);
1753 1         23 $src
1754             }
1755              
1756             =head2 max
1757              
1758             Emits the maximum numerical value of all seen items.
1759              
1760             =cut
1761              
1762             sub max {
1763 1     1 1 9 my ($self) = @_;
1764              
1765 1         18 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1766 1         3 my $max;
1767             $self->each(sub {
1768 8 100 100 8   26 return if defined $max and $max > $_;
1769 2         7 $max = $_;
1770 1         6 });
1771 1     1   88 $self->_completed->on_done(sub { $src->emit($max) })
1772 1         4 ->on_ready($src->_completed);
1773 1         24 $src
1774             }
1775              
1776             =head2 min
1777              
1778             Emits the minimum numerical value of all seen items.
1779              
1780             =cut
1781              
1782             sub min {
1783 1     1 1 7 my ($self) = @_;
1784              
1785 1         17 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1786 1         3 my $min;
1787             $self->each(sub {
1788 8 100 100 8   24 return if defined $min and $min < $_;
1789 4         9 $min = $_;
1790 1         6 });
1791 1     1   86 $self->_completed->on_done(sub { $src->emit($min) })
1792 1         2 ->on_ready($src->_completed);
1793 1         23 $src
1794             }
1795              
1796             =head2 statistics
1797              
1798             Emits a single hashref of statistics once the source completes.
1799              
1800             This will contain the following keys:
1801              
1802             =over 4
1803              
1804             =item * count
1805              
1806             =item * sum
1807              
1808             =item * min
1809              
1810             =item * max
1811              
1812             =item * mean
1813              
1814             =back
1815              
1816             =cut
1817              
1818             sub statistics {
1819 1     1 1 7 my ($self) = @_;
1820              
1821 1         2 my $sum = 0;
1822 1         2 my $count = 0;
1823 1         2 my $min;
1824             my $max;
1825              
1826 1         14 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1827             $self->each(sub {
1828 9   66 9   16 $min //= $_;
1829 9   66     13 $max //= $_;
1830 9 50       13 $min = $_ if $_ < $min;
1831 9 100       12 $max = $_ if $_ > $max;
1832 9         10 ++$count;
1833 9         15 $sum += $_
1834 1         7 });
1835             $self->_completed->on_done(sub {
1836 1   50 1   96 $src->emit({
1837             count => $count,
1838             sum => $sum,
1839             min => $min,
1840             max => $max,
1841             mean => ($sum / ($count || 1))
1842             })
1843             })
1844 1         3 ->on_ready($src->_completed);
1845 1         22 $src
1846             }
1847              
1848             =head2 filter
1849              
1850             Applies the given parameter to filter values.
1851              
1852             The parameter can be a regex or coderef. You can also
1853             pass (key, value) pairs to filter hashrefs or objects
1854             based on regex or coderef values.
1855              
1856             Examples:
1857              
1858             $src->filter(name => qr/^[A-Z]/, id => sub { $_ % 2 })
1859              
1860             =cut
1861              
1862             sub filter {
1863 12     12 1 2950 my $self = shift;
1864              
1865 12         168 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1866             $self->each_while_source((@_ > 1) ? do {
1867 10         33 my %args = @_;
1868             my $check = sub {
1869 51     51   88 my ($k, $v) = @_;
1870 51 100       107 if(my $ref = ref $args{$k}) {
1871 37 100       77 if($ref eq 'Regexp') {
    100          
    50          
1872 15 100 100     128 return 0 unless defined($v) && $v =~ $args{$k};
1873             } elsif($ref eq 'ARRAY') {
1874 12 100 100     40 return 0 unless defined($v) && List::Util::any { $v eq $_ } @{$args{$k}};
  20         43  
  8         14  
1875             } elsif($ref eq 'CODE') {
1876 10         28 return 0 for grep !$args{$k}->($_), $v;
1877             } else {
1878 0         0 die "Unsure what to do with $args{$k} which seems to be a $ref";
1879             }
1880             } else {
1881 14 100       27 return !defined($args{$k}) if !defined($v);
1882 13   66     72 return defined($args{$k}) && $v eq $args{$k};
1883             }
1884 18         3522 return 1;
1885 10         55 };
1886             sub {
1887 51     51   60 my $item = shift;
1888 51 100       178 if(Scalar::Util::blessed $item) {
    50          
1889 15         32 for my $k (keys %args) {
1890 15         41 my $v = $item->$k;
1891 15 100       76 return unless $check->($k, $v);
1892             }
1893             } elsif(my $ref = ref $item) {
1894 36 50       73 if($ref eq 'HASH') {
1895 36         66 for my $k (keys %args) {
1896 36         54 my $v = $item->{$k};
1897 36 100       48 return unless $check->($k, $v);
1898             }
1899             } else {
1900 0         0 die 'not a ref we know how to handle: ' . $ref;
1901             }
1902             } else {
1903 0         0 die 'not a ref, not sure what to do now';
1904             }
1905 21         61 $src->emit($item);
1906             }
1907 12 100       46 } : do {
  10         72  
1908 2         5 my $code = shift;
1909 2 50       7 if(my $ref = ref($code)) {
1910 2 50       9 if($ref eq 'Regexp') {
    50          
1911 0         0 my $re = $code;
1912 0     0   0 $code = sub { /$re/ };
  0         0  
1913             } elsif($ref eq 'CODE') {
1914             # use as-is
1915             } else {
1916 0         0 die "not sure how to handle $ref";
1917             }
1918             }
1919             sub {
1920 4     4   5 my $item = shift;
1921 4 100       9 $src->emit($item) if $code->($item);
1922             }
1923 2         14 }, $src);
1924             }
1925              
1926             =head2 filter_isa
1927              
1928             Emits only the items which C<< ->isa >> one of the given parameters.
1929             Will skip non-blessed items.
1930              
1931             =cut
1932              
1933             sub filter_isa {
1934 0     0 1 0 my ($self, @isa) = @_;
1935              
1936 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1937             $self->each_while_source(sub {
1938 0     0   0 my ($item) = @_;
1939 0 0       0 return unless Scalar::Util::blessed $item;
1940 0 0       0 $src->emit($_) if grep $item->isa($_), @isa;
1941 0         0 }, $src);
1942             }
1943              
1944             =head2 emit
1945              
1946             Emits the given item.
1947              
1948             =cut
1949              
1950             sub emit {
1951 418     418 1 39295 my $self = shift;
1952 418         739 my $completion = $self->_completed;
1953 418 100       498 my @handlers = @{$self->{on_item} || []} or return $self;
  418 100       1163  
1954 387         608 for (@_) {
1955 419 50       3267 die 'already completed' if $completion->is_ready;
1956 419         1658 for my $code (@handlers) {
1957             try {
1958             $code->($_);
1959 421         737 } catch {
1960             my $ex = $@;
1961             $log->warnf("Exception raised in %s - %s", (eval { $self->describe } // ""), "$ex");
1962             $completion->fail($ex, source => 'exception in on_item callback');
1963             die $ex;
1964             }
1965             }
1966             }
1967             $self
1968 387         3334 }
1969              
1970             =head2 each
1971              
1972             =cut
1973              
1974             sub each : method {
1975 116     116 1 272 my ($self, $code, %args) = @_;
1976 116         154 push @{$self->{on_item}}, $code;
  116         349  
1977 116         214 $self;
1978             }
1979              
1980             =head2 each_as_source
1981              
1982             =cut
1983              
1984             sub each_as_source : method {
1985 0     0 1 0 my ($self, @code) = @_;
1986              
1987 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
1988 0         0 my @active;
1989             $self->_completed->on_ready(sub {
1990             Future->needs_all(
1991             grep $_, @active
1992             )->on_ready(sub {
1993 0         0 $src->finish
1994 0     0   0 })->retain
1995 0         0 });
1996              
1997             $self->each_while_source(sub {
1998 0     0   0 my @pending;
1999 0         0 for my $code (@code) {
2000 0         0 push @pending, $code->($_);
2001             }
2002 0         0 push @active, map $_->completed, @pending;
2003 0         0 $src->emit($_);
2004 0         0 }, $src)
2005             }
2006              
2007             sub cleanup {
2008 82     82 0 4594 my ($self) = @_;
2009 82         291 $log->tracef("Cleanup for %s (f = %s)", $self->describe, 0 + $self->_completed);
2010 82 100       531 $_->cancel for values %{$self->{cancel_on_ready} || {}};
  82         506  
2011 82 100       294 $self->parent->notify_child_completion($self) if $self->parent;
2012 82         313 delete @{$self}{qw(on_item cancel_on_ready)};
  82         326  
2013 82         204 $log->tracef("Finished cleanup for %s", $self->describe);
2014             }
2015              
2016             sub notify_child_completion {
2017 37     37 0 109 my ($self, $child) = @_;
2018 37         148 my $addr = Scalar::Util::refaddr($child);
2019 37 50   37   129 if(List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{children}}) {
  37         428  
  37         186  
2020             $log->tracef(
2021             "Removed completed child %s, have %d left",
2022             $child->describe,
2023 37         424 0 + @{$self->{children}}
  37         2906  
2024             );
2025 37 100       238 return $self if $self->is_ready;
2026 6 50       29 return $self if @{$self->{children}};
  6         28  
2027              
2028 6         22 $log->tracef(
2029             "This was the last child, cancelling %s",
2030             $self->describe
2031             );
2032 6         314 $self->cancel;
2033 6         1339 return $self;
2034             }
2035              
2036 0         0 $log->warnf("Child %s (addr 0x%x) not found in list for %s", $child->describe, $self->describe);
2037 0         0 $log->tracef("* %s (addr 0x%x)", $_->describe, Scalar::Util::refaddr($_)) for @{$self->{children}};
  0         0  
2038 0         0 $self
2039             }
2040              
2041             =head2 await
2042              
2043             Block until this source finishes.
2044              
2045             =cut
2046              
2047             sub await {
2048 0     0 1 0 my ($self) = @_;
2049 0         0 $self->prepare_await;
2050 0         0 my $f = $self->_completed;
2051 0         0 $f->await until $f->is_ready;
2052 0         0 $self
2053             }
2054              
2055             =head2 next
2056              
2057             Returns a L which will resolve to the next item emitted by this source.
2058              
2059             If the source completes before an item is emitted, the L will be cancelled.
2060              
2061             Note that these are independent - they don't stack, so if you call C<< ->next >>
2062             multiple times before an item is emitted, each of those would return the same value.
2063              
2064             See L if you're dealing with protocols and want to extract sequences of
2065             bytes or characters.
2066              
2067             To access the sequence as a discrete stream of L instances, try L
2068             which will provide a L.
2069              
2070             =cut
2071              
2072             sub next : method {
2073 3     3 1 1634 my ($self) = @_;
2074             my $f = $self->new_future(
2075             'next'
2076             )->on_ready($self->$curry::weak(sub {
2077 3     3   1440 my ($self, $f) = @_;
2078 3         9 my $addr = Scalar::Util::refaddr($f);
2079 3 50       10 List::UtilsBy::extract_by { Scalar::Util::refaddr($_) == $addr } @{$self->{on_item} || []};
  3         66  
  3         19  
2080 3         30 delete $self->{cancel_on_ready}{$f};
2081 3         12 }));
2082 3         172 $self->{cancel_on_ready}{$f} = $f;
2083 3   50     18 push @{$self->{on_item} ||= []}, sub {
2084 3 100   3   6 $f->done(shift) unless $f->is_ready;
2085 3         6 };
2086 3         13 return $f;
2087             }
2088              
2089             =head2 finish
2090              
2091             Mark this source as completed.
2092              
2093             =cut
2094              
2095 34 100   34 1 5564 sub finish { $_[0]->_completed->done unless $_[0]->_completed->is_ready; $_[0] }
  34         1477  
2096              
2097       0 0   sub refresh { }
2098              
2099             =head1 METHODS - Proxied
2100              
2101             The following methods are proxied to our completion L:
2102              
2103             =over 4
2104              
2105             =item * then
2106              
2107             =item * is_ready
2108              
2109             =item * is_done
2110              
2111             =item * failure
2112              
2113             =item * is_cancelled
2114              
2115             =item * else
2116              
2117             =back
2118              
2119             =cut
2120              
2121             sub get {
2122 0     0 0 0 my ($self) = @_;
2123 0         0 my $f = $self->_completed;
2124 0         0 my @rslt;
2125 0 0   0   0 $self->each(sub { push @rslt, $_ }) if defined wantarray;
  0         0  
2126 0 0       0 if(my $parent = $self->parent) {
2127 0         0 $parent->await
2128             }
2129             $f->transform(done => sub {
2130             @rslt
2131 0     0   0 })->get
  0         0  
2132             }
2133              
2134             for my $k (qw(then fail on_ready transform is_ready is_done is_failed failure else)) {
2135 37     37   464 do { no strict 'refs'; *$k = $_ } for sub { shift->_completed->$k(@_) }
  37     89   116  
  37         4456  
  89         2975  
2136             }
2137             # Cancel operations are only available through the internal state, since we don't want anything
2138             # accidentally cancelling due to Future->wait_any(timeout, $src->_completed) or similar constructs
2139             for my $k (qw(cancel is_cancelled)) {
2140 37     37   246 do { no strict 'refs'; *$k = $_ } for sub { shift->{completed}->$k(@_) }
  37     6   84  
  37         45099  
  6         31  
2141             }
2142              
2143             =head1 METHODS - Internal
2144              
2145             =head2 prepare_await
2146              
2147             Run any pre-completion callbacks (recursively) before
2148             we go into an await cycle.
2149              
2150             Used for compatibility with sync bridges when there's
2151             no real async event loop available.
2152              
2153             =cut
2154              
2155             sub prepare_await {
2156 189     189 1 326 my ($self) = @_;
2157 189 50       449 (delete $self->{on_get})->() if $self->{on_get};
2158 189 100       670 return unless my $parent = $self->parent;
2159 63 50       273 my $code = $parent->can('prepare_await') or return;
2160 63         173 local @_ = ($parent);
2161 63         260 goto &$code;
2162             }
2163              
2164             =head2 chained
2165              
2166             Returns a new L chained from this one.
2167              
2168             =cut
2169              
2170             sub chained {
2171 58     58 1 349 my ($self) = shift;
2172 58 100       213 if(my $class = ref($self)) {
2173             my $src = $class->new(
2174             new_future => $self->{new_future},
2175 57         334 parent => $self,
2176             @_
2177             );
2178 57         307 Scalar::Util::weaken($src->{parent});
2179 57         105 push @{$self->{children}}, $src;
  57         199  
2180 57         296 $log->tracef("Constructing chained source for %s from %s (%s)", $src->label, $self->label, $self->_completed->state);
2181 57         937 return $src;
2182             } else {
2183 1         5 my $src = $self->new(@_);
2184 1         6 $log->tracef("Constructing chained source for %s with no parent", $src->label);
2185 1         5 return $src;
2186             }
2187             }
2188              
2189             =head2 each_while_source
2190              
2191             Like L, but removes the source from the callback list once the
2192             parent completes.
2193              
2194             =cut
2195              
2196             sub each_while_source {
2197 43     43 1 122 my ($self, $code, $src, %args) = @_;
2198 43         154 $self->each($code);
2199             $self->_completed->on_ready(sub {
2200 25     25   2039 my ($f) = @_;
2201 25 100       101 $args{cleanup}->($f, $src) if exists $args{cleanup};
2202 25         92 my $addr = Scalar::Util::refaddr($code);
2203 25         85 my $count = List::UtilsBy::extract_by { $addr == Scalar::Util::refaddr($_) } @{$self->{on_item}};
  0         0  
  25         222  
2204 25 100       354 $f->on_ready($src->_completed) unless $src->is_ready;
2205 25         1647 $log->tracef("->each_while_source completed on %s for refaddr 0x%x, removed %d on_item handlers", $self->describe, Scalar::Util::refaddr($self), $count);
2206 43         160 });
2207 43         861 $src
2208             }
2209              
2210             =head2 map_source
2211              
2212             Provides a L source which has more control over what it
2213             emits than a standard L or L implementation.
2214              
2215             $original->map_source(sub {
2216             my ($item, $src) = @_;
2217             $src->emit('' . reverse $item);
2218             });
2219              
2220             =cut
2221              
2222             sub map_source {
2223 0     0 1 0 my ($self, $code) = @_;
2224              
2225 0         0 my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2226             $self->_completed->on_ready(sub {
2227 0 0   0   0 return if $src->is_ready;
2228 0         0 shift->on_ready($src->_completed);
2229 0         0 });
2230             $self->each_while_source(sub {
2231 0     0   0 $code->($_, $src) for $_;
2232 0         0 }, $src);
2233             }
2234              
2235             sub DESTROY {
2236 82     82   206044 my ($self) = @_;
2237 82 50       340 return if ${^GLOBAL_PHASE} eq 'DESTRUCT';
2238 82         342 $log->tracef("Destruction for %s", $self->describe);
2239 82 100       7348 $self->_completed->cancel unless $self->_completed->is_ready;
2240             }
2241              
2242             sub catch {
2243 0     0 0   my ($self, $code) = @_;
2244 0           my $src = $self->chained(label => (caller 0)[3] =~ /::([^:]+)$/);
2245             $self->_completed->on_fail(sub {
2246 0     0     my @failure = @_;
2247 0           my $sub = $code->(@failure);
2248 0 0 0       if(Scalar::Util::blessed $sub && $sub->isa('Ryu::Source')) {
2249             $sub->each_while_source(sub {
2250 0           $src->emit($_)
2251 0           }, $src);
2252             } else {
2253 0           $sub->fail(@failure);
2254             }
2255 0           });
2256             $self->each_while_source(sub {
2257 0     0     $src->emit($_)
2258 0           }, $src);
2259             }
2260              
2261             1;
2262              
2263             __END__