File Coverage

blib/lib/Data/Enumerable/Lazy.pm
Criterion Covered Total %
statement 178 189 94.1
branch 44 62 70.9
condition 31 49 63.2
subroutine 61 64 95.3
pod 26 28 92.8
total 340 392 86.7


line stmt bran cond sub pod time code
1             package Data::Enumerable::Lazy;
2              
3 19     19   1327059 use 5.18.2;
  19         179  
4              
5 19     19   96 use strict;
  19         36  
  19         440  
6 19     19   128 use warnings;
  19         48  
  19         2228  
7              
8             our $VERSION = '0.032';
9              
10             =pod
11              
12             =head1 NAME
13              
14             Data::Enumerable::Lazy - Lazy generator + enumerable for Perl5.
15              
16             =head1 SYNOPSIS
17              
18             A basic lazy range implementation picking even numbers only:
19              
20             my ($from, $to) = (0, 10);
21             my $current = $from;
22             my $tream = Data::Enumerable::Lazy->new({
23             on_has_next => sub { $current <= $to },
24             on_next => sub { shift->yield($current++) },
25             })->grep(sub{ shift % 2 == 0 });
26             $tream->to_list(); # generates: [0, 2, 4, 6, 8, 10]
27              
28             =head2 DESCRIPTION
29              
30             This library is another one implementation of a lazy generator + enumerable
31             for Perl5. It might be handy if the elements of the collection are resolved on
32             the flight and the iteration itself should be hidden from the end users.
33              
34             The enumerables are single-pass composable calculation units. What it means:
35             An enumerable is stateful, once it reached the end of the sequence, it will
36             not rewind to the beginning unless explicitly forced to.
37             Enumerables are composable: one enumerable might be an extension of another by
38             applying some additional logic. Enumerables resolve steps on demand, one by one.
39             A single step might return another enumerable (micro batches). The library
40             flattens these enumerables, so for the end user this looks like a single
41             continuous sequence of elements.
42              
43              
44             [enumerable.has_next] -> [_buffer.has_next] -> yes -> return true
45             -> no -> result = [enumerable.on_has_next] -> return result
46              
47             [enumerable.next] -> [_buffer.has_next] -> yes -> return [_buffer.next]
48             -> no -> result = [enumerable.next] -> [enumerable.set_buffer(result)] -> return result
49              
50             =head1 EXAMPLES
51              
52             =head2 A basic range
53              
54             This example implements a range generator from $from until $to. In order to
55             generate this range we define 2 callbacks: C and C.
56             The first one is used as point of truth whether the sequence has any more
57             non-iterated elements, and the 2nd one is here to return the next element in
58             the sequence and the one that changes the state of the internal sequence
59             iterator.
60              
61             sub basic_range {
62             my ($from, $to) = @_;
63             $from <= $to or die '$from should be less or equal $to';
64             my $current = $from;
65             Data::Enumerable::Lazy->new({
66             on_has_next => sub {
67             return $current <= $to;
68             },
69             on_next => sub {
70             my ($self) = @_;
71             return $self->yield($current++);
72             },
73             });
74             }
75              
76             on_has_next() makes sure the current value does not exceed $to value, and
77             on_next() yields the next value of the sequence. Note the yield method.
78             An enumerable developer is expected to use this method in order to return
79             the next step value. This method does some internal bookkeeping and smart
80             caching.
81              
82             Usage:
83              
84             # We initialize a new range generator from 0 to 10 including.
85             my $range = basic_range(0, 10);
86             # We check if the sequence has elements in it's tail.
87             while ($range->has_next) {
88             # In this very line the state of $range is being changed
89             say $range->next;
90             }
91              
92             is $range->has_next, 0, '$range has been iterated completely'
93             is $range->next, undef, 'A fully iterated sequence returns undef on next()'
94              
95             =head2 Prime numbers
96              
97             Prime numbers is an infinite sequence of natural numbers. This example
98             implements a very naive suboptimal prime number generator.
99              
100             my $prime_num_stream = Data::Enumerable::Lazy->new({
101             # This is an infinite sequence
102             on_has_next => sub { 1 },
103             on_next => sub {
104             my $self = shift;
105             # We save the result of the previous step
106             my $next = $self->{_prev_} // 1;
107             LOOKUP: while (1) {
108             $next++;
109             # Check all numbers from 2 to sqrt(N)
110             foreach (2..floor(sqrt($next))) {
111             ($next % $_ == 0) and next LOOKUP;
112             }
113             last LOOKUP;
114             }
115             # Save the result in order to use it in the next step
116             $self->{_prev_} = $next;
117             # Return the result
118             $self->yield($next);
119             },
120             });
121              
122             What's remarkable regarding this specific example is that one can not simply
123             call C in order to get all elements of the sequence. The enumerable
124             will throw an exception claiming it's an infinitive sequence. Therefore, we
125             should use C in order to get elements one by one or use another handy
126             method C which returns first N results.
127              
128             =head2 Flat enumeration (Nested generators)
129              
130             In this example we will output a numbers of a multiplication table 10x10.
131             What's interesting in this example is that there are 2 levels of sequences:
132             primary and secondary. Primary C returns secondary sequence, which
133             multiplicates 2 numbers.
134              
135             # A new stream based on a range from 1 to 10
136             my $mult_table = Data::Enumerable::Lazy->from_list(1..10)->continue({
137             on_has_next => sub {
138             my ($self, $i) = @_;
139             # The primary stream returns another sequence, based on range
140             $self->yield(Data::Enumerable::Lazy->from_list(1..10)->continue({
141             on_next => sub {
142             # $_[0] is a substream self
143             # $_[1] is a next substream sequence element
144             $_[0]->yield( $_[1] * $i )
145             },
146             }));
147             },
148             });
149              
150             Another feature which is represented here is the nested result generation.
151             Let's walk trough the sequence generation step by step and see what happens.
152              
153             $mult_table->has_next; # returns true based on the primary range, _buffer is
154             # empty
155             $mult_table->next; # returns 1, the secondary sequence is now stored as
156             # the primary enumerable buffer and 1 is being served
157             # from this buffer
158             $mult_table->has_next; # returns true, resolved by the state of the buffer
159             $mult_table->next; # returns 2, moves buffer iterator forward, the
160             # primary sequence on_next() is _not_ being called
161             # this time
162             $mult_table->next for (3..10); # The last iteration completes the buffer
163             # iteration cycle
164             $mult_table->has_next; # returns true, but now it calls the primary
165             # on_has_next()
166             $mult_table->next; # returns 2 as the first element in the next
167             # secondary sequence (which is 1 again) multiplied by
168             # the 2nd element of the primary sequence (which is 2)
169             $mult_table->to_list; # Generates the tail of the sesquence:
170             # [4, 6, ..., 80, 90, 100]
171             $mult_table->has_next; # returns false as the buffer is empty now and the
172             # primary sequence on_has_next() says there is nothing
173             # more to iterate over.
174              
175             =head2 DBI paginator example
176              
177             As mentioned earlier, lazy enumerables are useful when the number of the
178             elements in the sequence is not known in advance. So far, we were looking at
179             some synthetic examples, but the majority of us are not being paid for prime
180             number generators. Hands on some real life example. Say, we have a table and
181             we want to iterate over all entries in the table, and we want the data to be
182             retrieved in batches by 10 elements in order to reduce the number of queries.
183             We don't want to compute the number of steps in advance, as the number might
184             be inaccurate: let's assume we're paginating over some new tweets and the new
185             entries might be created on the flight.
186              
187             use DBI;
188             my $dbh = setup_dbh(); # Some config
189              
190             my $last_id = -1;
191             my $limit = 10;
192             my $offset = 0;
193             my $tweet_enum = Data::Enumerable::Lazy->new({
194             on_has_next => sub {
195             my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
196             $sth->execute($last_id);
197             my ($cnt) = $sth->fetchrow_array;
198             return int($cnt) > 0;
199             },
200             on_next => sub {
201             my ($self) = @_;
202             my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
203             $sth->execute($lmit, $offset);
204             $offset += $limit;
205             my @tweets = $sth->fetchrow_array;
206             $last_id = $tweets[-1]->{id};
207             $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
208             },
209             is_finite => 1,
210             });
211              
212             while ($tweet_enum->has_next) {
213             my $tweet = $tweet_enum->next;
214             # do something with this tweet
215             }
216              
217             In this example a tweet consumer is abstracted from any DBI bookkeeping and
218             consumes tweet entries one by one without any prior knowledge about the table
219             size and might work on a rapidly growing dataset.
220              
221             In order to reduce the number of queries, we query the data in batches by 10
222             elements max.
223              
224             =head2 Redis queue consumer
225              
226             use Redis;
227              
228             my $redis = Redis->new;
229             my $queue_enum = Data::Enumerable::Lazy->new({
230             on_has_next => sub { 1 },
231             on_next => sub {
232             # Blocking right POP
233             $redis->brpop();
234             },
235             });
236              
237             while (my $queue_item = $queue_enum->next) {
238             # do something with the queue item
239             }
240              
241             In this example the client is blocked until there is an element available in
242             the queue, but it's hidden away from the clients who consume the data item by
243             item.
244              
245             =head2 Kafka example
246              
247             Kafka consumer wrapper is another example of a lazy calculation application.
248             Lazy enumerables are very naturally co-operated with streaming data, like
249             Kafka. In this example we're fetching batches of messages from Kafka topic,
250             grep out corrupted ones and proceed with the mesages.
251              
252             use Kafka qw($DEFAULT_MAX_BYTES);
253             use Kafka::Connection;
254             use Kafka::Consumer;
255              
256             my $kafka_consumer = Kafka::Consumer->new(
257             Connection => Kafka::Connection->new( host => 'localhost', ),
258             );
259              
260             my $partition = 0;
261             my $offset = 0;
262             my $kafka_enum = Data::Enumerable::Lazy->new({
263             on_has_next => sub { 1 },
264             on_next => sub {
265             my ($self) = @_;
266             # Fetch messages in batch
267             my $messages = $kafka_consumer->fetch({
268             'topic',
269             $partition,
270             $offset,
271             $DEFAULT_MAX_BYTES
272             });
273             if ($messages) {
274             # Note the grep function applied: we're filtering away corrupted messages
275             $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
276             } else {
277             # If there are no more messages, we return an empty enum, this is
278             # another handy use-case for nested enums.
279             $self->yield(Data::Enumerable::Lazy->empty);
280             }
281             },
282             });
283              
284             while (my $message = $kafka_enum->next) {
285             # handle the message
286             }
287              
288             =cut
289              
290             =head1 INSTALLATION
291              
292             To install this module type the following:
293             perl Makefile.PL
294             make
295             make test
296             make install
297              
298             =cut
299              
300 19     19   156 use Carp;
  19         45  
  19         1226  
301 19     19   132 use List::Util;
  19         57  
  19         51902  
302              
303             sub new {
304 1563     1563 0 6402 my ($class, $opts) = @_;
305 1563         7355 return bless({ _opts => $opts, _buff => undef }, $class);
306             }
307              
308             =head1 OPTIONS
309              
310             =head2 on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
311              
312             C is a code ref, a callback which is being called every time the
313             generator is in demand for a new bit of data. Enumerable buffers up the result
314             of the previous calculation and if there are no more elements left in the
315             buffer, C would be called.
316              
317             C<$element> is defined when the current collection is a contuniation of another
318             enumerable. I.e.:
319              
320             my $enum = Data::Enumerable::Lazy->from_list(1, 2, 3);
321             my $enum2 = $enum->continue({
322             on_next => sub { my ($self, $i) = @_; $self->yield($i * $i) }
323             });
324             $enum2->to_list; # generates 1, 4, 9
325              
326             In this case $i would be defined and it comes from the original enumerable.
327              
328             The function is supposed to return an enumerable, in this case it would be
329             kept as the buffer object. If this function method returns any other value,
330             it would be wrapped in a Csingular()>. There is a
331             way to prevent an enumerable from wrapping your return value in an enum and
332             keeping it in a raw state by providing C.
333              
334             =cut
335              
336 3229   100 3 1 7439 sub on_next { $_[0]->{_opts}->{on_next} // sub {} }
        3229      
337              
338             =head2 on_has_next($self) :: CodeRef -> Bool
339              
340             C is a code ref, a callback to be called whenever the enumerable
341             is about to resolve C method call. Similar to C call,
342             this one is also triggered whenever an enumerable runs out of buffered
343             elements. The function shoiuld return boolean.
344              
345             A method that returns 1 all the time is the way to initialize an infinite
346             enumerable (see C). If it returns 0 no matter what, it would be
347             an empty enumerable (see C). Normally you want to stay somewhere in
348             the middle and implement some state check login in there.
349              
350             =cut
351              
352 46   100 46 1 158 sub on_has_next { $_[0]->{_opts}->{on_has_next} // sub {0} }
  5214     5214   11808  
353              
354             =head2 on_reset($self) :: CodeRef -> void
355              
356             This is a callback to be called in order to reset the state of the enumerable.
357             This callback should be defined in the same scope as the enumerable itself.
358             The library provides nothing magical but a callback and a handle to call it,
359             so the state cleanup is completely on the developer's side.
360              
361             =cut
362              
363 0   0 0 1 0 sub on_reset { $_[0]->{_opts}->{on_reset} // sub {} }
        0      
364              
365             =head2 is_finite :: Bool
366              
367             A boolean flag indicating whether an enumerable is finite or not. By default
368             enumerables are treated as infinite, which means some functions will throw
369             an exception, like: C or C.
370              
371             Make sure to not mark an enumerable as finite and to call finite-size defined
372             methods, in this case it will create an infinite loop on the resolution.
373              
374             =cut
375              
376 78   50 78 1 372 sub is_finite { $_[0]->{_opts}->{is_finite} // 0 }
377              
378 10021   100 10021 0 30999 sub no_wrap { $_[0]->{_opts}->{no_wrap} // 0 }
379              
380             =head1 INSTANCE METHODS
381              
382             =head2 next()
383              
384             Function C is the primary interface for accessing elements of an
385             enumerable. It will do some internal checks and if there is no elements to be
386             served from an intermediate buffer, it will resolve the next step by calling
387             C callback.
388             Enumerables are composable: one enumerable might be based on another
389             enumeration. E.g.: a sequence of natural number squares is based on the
390             sequence of natural numbers themselves. In other words, a sequence is defined
391             as a tuple of another sequence and a function which would be lazily applied to
392             every element of this sequence.
393              
394             C accepts 0 or more arguments, which would be passed to C
395             callback.
396              
397             C is expected to do the heavy-lifting job in opposite to C,
398             which is supposed to be cheap and fast. This statement flips upside down
399             whenever C is applied to a stream. See C for more details.
400              
401             =cut
402              
403             sub next {
404 3431     3431 1 5639 my $self = shift;
405 3431         4271 my $res;
406 3431 100 100     7431 unless ($self->{_buff} && $self->{_buff}->has_next()) {
407 3229         5652 $res = $self->on_next()->($self, @_);
408 3229 100       5622 $self->{_buff} = $res
409             unless $self->no_wrap();
410             }
411 3431 100       5854 my $return = $self->no_wrap() ? $res : $self->{_buff}->next();
412 3431         6559 return $return;
413             }
414              
415             =head2 has_next()
416              
417             C is the primary entry point to get an information about the state
418             of an enumerable. If the method returned false, there are no more elements to be
419             consumed. I.e. the sequence has been iterated completely. Normally it means
420             the end of an iteration cycle.
421              
422             Enumerables use internal buffers in order to support batched C
423             resolutions. If there are some elements left in the buffer, C
424             won't call C callback immediately. If the buffer has been
425             iterated completely, C would be called.
426              
427             C should be fast on resolving the state of an enumerable as it's going
428             to be used for a condition state check.
429              
430             =cut
431              
432             sub has_next {
433 5399     5399 1 12289 my $self = shift;
434 5399         6850 my $res;
435             eval {
436 5399   100     8120 $res = $self->_has_next_in_buffer() ||
437             $self->_has_next_in_generator();
438 5399         13191 1;
439 5399 50       6999 } or do {
440 0   0     0 croak sprintf('Problem calling on_has_next(): %s', $@ // 'zombie error');
441             };
442 5399         13765 return int $res;
443             }
444              
445             =head2 reset()
446              
447             This method is a generic entry point for a enum reset. In fact, it is basically
448             a wrapper around user-defined C. Use with caution: if C
449             was not defined, it will reset the buffer and might cause a partial calculation
450             skip (reset implicitly clears the internal buffer) if the buffer was not fully
451             iterated yet.
452              
453             =cut
454              
455             sub reset {
456 0     0 1 0 my $self = shift;
457 0         0 $self->{_buff} = undef;
458 0 0       0 eval { $self->on_reset(); 1 } or do {
  0         0  
  0         0  
459 0   0     0 croak sprintf('Problem calling on_reset(): %s', $@ // 'zombie error');
460             };
461             }
462              
463             =head2 to_list()
464              
465             This function transforms a lazy enumerable to a list. Only finite enumerables
466             can be transformed to a list, so the method checks if an enumerable is created
467             with C flag. An exception would be thrown otherwise.
468              
469             =cut
470              
471             sub to_list {
472 24     24 1 115 my ($self) = @_;
473 24 50       53 croak 'Only finite enumerables might be converted to list. Use is_finite=1'
474             unless $self->is_finite();
475 24         46 my @acc;
476 24         60 push @acc, $self->next() while $self->has_next();
477 24         132 return \@acc;
478             }
479              
480             =head2 map($callback)
481              
482             Creates a new enumerable by applying a user-defined function to the original
483             enumerable. Works the same way as perl map {} function but it's lazy.
484              
485             Example
486              
487             Data::Enumerable::Lazy
488             ->from_array(1, 2, 3)
489             ->map(sub {
490             my ($number) = @_;
491             return $number * $number
492             });
493              
494             =cut
495              
496             sub map {
497 1     1 1 3 my ($self, $callback) = @_;
498             Data::Enumerable::Lazy->new({
499             on_has_next => $self->on_has_next(),
500 10     10   19 on_next => sub { shift->yield($callback->($self->next())) },
501 1         2 is_finite => $self->is_finite(),
502             no_wrap => $self->no_wrap(),
503             });
504             }
505              
506             =head2 reduce($acc, $callback)
507              
508             Resolves the enumerable and returns the resulting state of the accumulator $acc
509             provided as the 1st argument. C<$callback> should always return the new state of
510             C<$acc>.
511              
512             C is defined for finite enumerables only.
513              
514             Example
515              
516             Data::Enumerable::Lazy
517             ->from_array(1, 2, 3)
518             ->reduce(1, sub {
519             my ($acc, $number) = @_;
520             return $acc *= $number
521             });
522              
523             =cut
524              
525             sub reduce {
526 2     2 1 4 my ($self, $acc, $callback) = @_;
527 2 50       5 croak 'Only finite enumerables might be reduced. Use is_finite=1'
528             unless $self->is_finite();
529 2         7 ($acc = $callback->($acc, $self->next())) while $self->has_next();
530 2         5 return $acc;
531             }
532              
533             =head2 grep($callback, $max_lookahead)
534              
535             C is a function which returns a new enumerable by applying a
536             user-defined filter function.
537              
538             C might be applied to both finite and infinite enumerables. In case of
539             an infinitive enumerable there is an additional argument specifying max number
540             of lookahead steps. If an element satisfying the condition could not be found in
541             C steps, an enumerable is considered to be completely iterated
542             and C will return false.
543              
544             C returns a new enumerable with quite special properties: C
545             will perform a look ahead and call the original enumerable C method
546             in order to find an element for which the user-defined function will return
547             true. C, on the other side, returns the value that was pre-fetched
548             by C.
549              
550             Example
551              
552             Data::Enumerable::Lazy
553             ->from_list(1, 2, 3)
554             ->grep(sub {
555             my ($number) = @_;
556             return $number % 2
557             });
558              
559             =cut
560              
561             sub grep {
562 9     9 1 22 my ($self, $callback, $max_lookahead) = @_;
563 9         11 my $next;
564 9         13 my $initialized = 0;
565 9   50     83 $max_lookahead //= 0;
566 9 100       18 $max_lookahead = 0
567             if $self->is_finite;
568 9         50 my $prev_has_next;
569             Data::Enumerable::Lazy->new({
570             on_has_next => sub {
571 65 50   65   124 defined $prev_has_next
572             and return $prev_has_next;
573 65         87 my $ix = 0;
574 65         99 $initialized = 1;
575 65         90 undef $next;
576 65         113 while ($self->has_next()) {
577 84 50       152 if ($max_lookahead > 0) {
578             $ix > $max_lookahead
579 0 0       0 and do {
580 0         0 carp sprintf 'Max lookahead steps cnt reached. Bailing out';
581 0         0 return $prev_has_next = 0;
582             };
583             }
584 84         151 $next = $self->next();
585 84 100       150 $callback->($next) and last;
586 27         69 undef $next;
587 27         50 $ix++;
588             }
589 65         165 return $prev_has_next = (defined $next);
590             },
591             on_next => sub {
592 56     56   79 my $self = shift;
593 56 50       96 $initialized or $self->has_next();
594 56         80 undef $prev_has_next;
595 56         100 $self->yield($next);
596             },
597 9         88 is_finite => $self->is_finite(),
598             no_wrap => $self->no_wrap(),
599             });
600             }
601              
602             =head2 resolve()
603              
604             Resolves an enumerable completely. Applicable for finite enumerables only.
605             The method returns nothing.
606              
607             =cut
608              
609             sub resolve {
610 1     1 1 5 my ($self) = @_;
611 1 50       3 croak 'Only finite enumerables might be resolved. Use is_finite=1'
612             unless $self->is_finite();
613 1         3 $self->next() while $self->has_next();
614             }
615              
616             =head2 take($N_elements)
617              
618             Resolves first $N_elements and returns the resulting list. If there are
619             fewer than N elements in the enumerable, the entire enumerable would be
620             returned as a list.
621              
622             =cut
623              
624             sub take {
625 10     10 1 48 my ($self, $slice_size) = @_;
626 10         15 my $ix = 0;
627 10         17 my @acc;
628 10   100     23 push @acc, $self->next() while ($self->has_next() && $ix++ < $slice_size);
629 10         50 return \@acc;
630             }
631              
632             =head2 take_while($callback)
633              
634             This function takes elements until it meets the first one that does not
635             satisfy the conditional callback.
636             The callback takes only 1 argument: an element. It should return true if
637             the element should be taken. Once it returned false, the stream is over.
638              
639             =cut
640              
641             sub take_while {
642 7     7 1 15 my ($self, $callback) = @_;
643 7         11 my $next_el;
644             my $prev_has_next;
645 7         11 my $initialized = 0;
646             Data::Enumerable::Lazy->new({
647             on_has_next => sub {
648 53     53   86 $initialized = 1;
649 53 50       97 defined $prev_has_next
650             and return $prev_has_next;
651 53         69 $prev_has_next = 0;
652 53 100       79 if ($self->has_next()) {
653 52         90 $next_el = $self->next();
654 52 100       89 if ($callback->($next_el)) {
655 47         104 $prev_has_next = 1;
656             }
657             }
658 53         126 return $prev_has_next;
659             },
660             on_next => sub {
661 47     47   90 my ($new_self) = @_;
662 47 50       86 $initialized or $new_self->has_next();
663 47 50       84 $prev_has_next
664             or return $new_self->yield(Data::Enumerable::Lazy->empty());
665 47         101 undef $prev_has_next;
666 47         89 $new_self->yield($next_el);
667             },
668 7         36 is_finite => $self->is_finite(),
669             });
670             }
671              
672             =head2 continue($ext = %{ on_next => sub {}, ... })
673              
674             Creates a new enumerable by extending the existing one. on_next is
675             the only manfatory argument. on_has_next might be overriden if some
676             custom logic comes into play.
677              
678             is_finite is inherited from the parent enumerable by default. All additional
679             attributes would be transparently passed to the constuctor.
680              
681             =cut
682              
683             sub continue {
684 16     16 1 34 my ($this, $ext) = @_;
685 16         49 my %ext = %$ext;
686             my $on_next = delete $ext{on_next}
687 16 50       54 or croak '`on_next` should be defined on stream continuation';
688 16 50       43 ref($on_next) eq 'CODE'
689             or croak '`on_next` should be a function';
690             Data::Enumerable::Lazy->new({
691             on_next => sub {
692 138     138   191 my $self = shift;
693 138 100       224 $self->yield(
694             $this->has_next() ?
695             $on_next->($self, $this->next()) :
696             Data::Enumerable::Lazy->empty
697             );
698             },
699             on_has_next => delete $ext->{on_has_next} // $this->on_has_next(),
700             is_finite => delete $ext->{is_finite} // $this->is_finite(),
701 16   33     130 no_wrap => delete $ext->{no_wrap} // 0,
      66        
      50        
702             %ext,
703             });
704             }
705              
706             =head2 count()
707              
708             Counts the number of the elements in the stream. This method iterates through
709             the stream so it makes it exhausted by the end of the computatuion.
710              
711             =cut
712              
713             sub count {
714 3     3 1 49 my ($self) = @_;
715 3 100       6 croak 'Only finite enumerables might be counted. Use is_finite=1'
716             unless $self->is_finite();
717 2         14 my $cnt = 0;
718 2         4 for (; $self->has_next(); $self->next()) {
719 10         19 $cnt++;
720             }
721 2         26 return $cnt;
722             }
723              
724             =head2 yield($result)
725              
726             This method is supposed to be called from C callback only. This is
727             the only valid result for an Enumerable to return the next step result.
728             Effectively, it ensures the returned result conforms to the required interface
729             and is wrapped in a lazy wrapper if needed.
730              
731             =cut
732              
733             sub yield {
734 3351     3351 1 109108 my $self = shift;
735 3351         4432 my $val = shift;
736 3351   66     10320 my $val_is_stream = $val && ref($val) eq 'Data::Enumerable::Lazy' &&
737             $val->isa('Data::Enumerable::Lazy');
738 3351 100 100     5456 if ($self->no_wrap() || $val_is_stream) {
739 1917         3710 return $val;
740             } else {
741 1434         2606 return Data::Enumerable::Lazy->singular($val);
742             }
743             }
744              
745             # Private methods
746              
747             sub _has_next_in_buffer {
748 5399     5399   7000 my $self = shift;
749 5399 100       15616 defined($self->{_buff}) && $self->{_buff}->has_next();
750             }
751              
752             sub _has_next_in_generator {
753 5197     5197   7146 my $self = shift;
754 5197         8613 $self->on_has_next()->($self, @_);
755             }
756              
757             =head1 CLASS METHODS
758              
759             =head2 empty()
760              
761             Returns an empty enumerable. Effectively it means an equivalent of an empty
762             array. C will return false and C will return undef. Useful
763             whenever a C step wants to return an empty resultset.
764              
765             =cut
766              
767             sub empty {
768 16     16 1 3569 Data::Enumerable::Lazy->new({
769             is_finite => 1,
770             no_wrap => 1,
771             });
772             }
773              
774             =head2 singular($val)
775              
776             Returns an enumerable with a single element $val. Actively used as an internal
777             data container.
778              
779             =cut
780              
781             sub singular {
782 1445     1445 1 6111 my ($class, $val) = @_;
783 1445         1969 my $resolved = 0;
784             Data::Enumerable::Lazy->new({
785 3070     3070   8107 on_has_next => sub { not $resolved },
786 1445     1445   1998 on_next => sub { $resolved = 1; shift->yield($val) },
  1445         2370  
787 1445         7179 is_finite => 1,
788             no_wrap => 1,
789             });
790             }
791              
792             =head2 from_list(@list)
793              
794             Returns a new enumerable instantiated from a list. The easiest way to
795             initialize an enumerable. In fact, all elements are already resolved
796             so this method sets C by default.
797              
798             =cut
799              
800             sub from_list {
801 32     32 1 9986 my $class = shift;
802 32         114 my @list = @_;
803 32         52 my $ix = 0;
804             Data::Enumerable::Lazy->new({
805 550     550   1547 on_has_next => sub { $ix < scalar(@list) },
806 249     249   498 on_next => sub { shift->yield($list[$ix++]) },
807 32         255 is_finite => 1,
808             no_wrap => 1,
809             });
810             }
811              
812             =head2 cycle()
813              
814             Creates an infinitive enumerable by cycling the original list. E.g. if the
815             original list is [1, 2, 3], C will generate an infinitive sequences
816             like: 1, 2, 3, 1, 2, 3, 1, ...
817              
818             =cut
819              
820             sub cycle {
821 6     6 1 90 my $class = shift;
822 6         12 my @list = @_;
823 6         9 my $ix = 0;
824 6         9 my $max_ix = scalar(@list) - 1;
825             Data::Enumerable::Lazy->new({
826 62     62   165 on_has_next => sub { 1 },
827             on_next => sub {
828 49 100   49   96 $ix = $ix > $max_ix ? 0 : $ix;
829 49         96 shift->yield($list[$ix++])
830             },
831 6         40 is_finite => 0,
832             no_wrap => 1,
833             });
834             }
835              
836             =head2 infinity()
837              
838             Returns a new infinite enumerable. C always returns true whereas
839             C returns undef all the time. Useful as an extension basis for infinite
840             sequences.
841              
842             =cut
843              
844             sub infinity {
845 2     2 1 1244 my $class = shift;
846             Data::Enumerable::Lazy->new({
847 11     11   23 on_has_next => sub { 1 },
848       10     on_next => sub {},
849 2         14 is_finite => 0,
850             no_wrap => 1,
851             });
852             }
853              
854             =head2 merge($tream1 [, $tream2 [, $tream3 [, ...]]])
855              
856             This function merges one or more streams together by fan-outing C
857             method call among the non-empty streams.
858             Returns a new enumerable instance, which:
859             * Has next elements as far as at least one of the streams does.
860             * Returns next element py picking it one-by-one from the streams.
861             * Is finite if and only if all the streams are finite.
862             If one of the streams is over, it would be taken into account and
863             C will continue choosing from non-empty ones.
864              
865             =cut
866              
867             sub merge {
868 4     4 1 11 my $class = shift;
869 4         9 my @streams = @_;
870 4 50       12 scalar @streams == 0
871             and croak '`merge` function takes at least 1 stream';
872 4 50       9 scalar @streams == 1
873             and return shift;
874             my $ixs = Data::Enumerable::Lazy->cycle(0..scalar(@streams) - 1)
875 38     38   158 -> take_while(sub { List::Util::any { $_->has_next() } @streams })
  91         149  
876 4     35   11 -> grep(sub { $streams[ shift ]->has_next() });
  35         66  
877             Data::Enumerable::Lazy->new({
878 24     24   39 on_has_next => sub { $ixs->has_next() },
879             on_next => sub {
880 20     20   38 shift->yield($streams[ $ixs->next() ]->next());
881             },
882 4 100   17   43 is_finite => (List::Util::reduce { $a || $b->is_finite() } 0, @streams),
  17         49  
883             });
884             }
885              
886             =head2 chain($tream1(, $tream2(, $tream3(, ...))))
887              
888             Executes streams sequentually, one after another: the next stream starts once
889             the previous is over.
890              
891             =cut
892              
893             sub chain {
894 3     3 1 4 my $class = shift;
895 3         7 my @streams = @_;
896 3 100       11 scalar(@streams) < 2
897             and return $streams[0];
898             Data::Enumerable::Lazy->from_list(@streams)
899             -> continue({
900 7     7   25 on_next => sub { $_[0]->yield($_[1]) }
901             })
902 29     29   68 -> grep(sub { defined $_[0] })
903 2         5 }
904              
905             =head2 from_text_file($file_handle(, $options))
906              
907             Method takes an open file handle and an optional hash of options and creates a
908             stream of it. The file would be read as a text file, line by line. For
909             additional options see C perl core function reference.
910             Options is a basic hash, supported attributes are:
911             * chomp :: Bool | Whether the lines should be chomped, 0 by default.
912             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
913              
914             =cut
915              
916             sub from_text_file {
917 2     2 1 6040 my ($class, $file_handle, $options) = @_;
918 2   50     7 $options //= +{};
919             my $str = Data::Enumerable::Lazy->new({
920 22     22   123 on_has_next => sub { !eof($file_handle) },
921             on_next => sub {
922 20     20   44 my $line = readline($file_handle);
923 20         36 $_[0]->yield($line);
924             },
925 2   50     18 is_finite => $options->{is_finite} // 0,
926             });
927 2 100       7 if ($options->{chomp}) {
928 1     10   4 $str = $str->map(sub { my $s = $_[0]; chomp $s; $s });
  10         14  
  10         20  
  10         16  
929             }
930 2         5 return $str;
931             }
932              
933             =head2 from_bin_file($file_handle(, $options))
934              
935             Method similar to C but forces binary reading from file.
936             Takes a file handle created by C function and an optional hash of
937             options. Supported attributes are:
938             * block_size :: Integer | The size of read block, 1024 bytes by default.
939             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
940              
941             =cut
942              
943             sub from_bin_file {
944 1     1 1 158 my ($class, $file_handle, $options) = @_;
945 1   50     7 $options //= +{};
946 1   50     6 my $block_size = $options->{block_size} // 1024;
947             Data::Enumerable::Lazy->new({
948 23     23   146 on_has_next => sub { !eof($file_handle) },
949             on_next => sub {
950 22     22   26 my $buf;
951 22         167 read($file_handle, $buf, $block_size);
952 22         48 $_[0]->yield($buf);
953             },
954 1   50     16 is_finite => $options->{is_finite} // 0,
955             })
956             }
957              
958             =head1 AUTHOR
959              
960             Oleg S
961              
962             =cut
963              
964             =head1 SEE ALSO
965              
966             =head2 Lazy evaluation in a nutshell
967              
968             L
969              
970             =head2 Library GitHub page:
971              
972             L
973              
974             =head2 Alternative implementations:
975              
976             L
977             L
978             L
979              
980             =cut
981              
982             =head1 COPYRIGHT AND LICENSE
983              
984             Copyright 2017, 2018 Oleg S
985              
986             Copying and distribution of this file, with or without modification, are
987             permitted in any medium without royalty provided the copyright notice and this
988             notice are preserved. This file is offered as-is, without any warranty.
989              
990             =cut
991              
992             1;
993              
994             __END__