File Coverage

blib/lib/Data/Enumerable/Lazy.pm
Criterion Covered Total %
statement 178 189 94.1
branch 44 62 70.9
condition 31 49 63.2
subroutine 61 64 95.3
pod 26 28 92.8
total 340 392 86.7


line stmt bran cond sub pod time code
1             package Data::Enumerable::Lazy;
2              
3 19     19   1228206 use 5.18.2;
  19         177  
4              
5 19     19   97 use strict;
  19         28  
  19         451  
6 19     19   87 use warnings;
  19         48  
  19         2031  
7              
8             our $VERSION = '0.031';
9              
10             =pod
11              
12             =head1 NAME
13              
14             Data::Enumerable::Lazy - Lazy generator + enumerable for Perl5.
15              
16             =head1 SYNOPSIS
17              
18             A basic lazy range implementation picking even numbers only:
19              
20             my ($from, $to) = (0, 10);
21             my $current = $from;
22             my $tream = Data::Enumerable::Lazy->new({
23             on_has_next => sub { $current <= $to },
24             on_next => sub { shift->yield($current++) },
25             })->grep(sub{ shift % 2 == 0 });
26             $tream->to_list(); # generates: [0, 2, 4, 6, 8, 10]
27              
28             =head2 DESCRIPTION
29              
30             This library is another one implementation of a lazy generator + enumerable
31             for Perl5. It might be handy if the elements of the collection are resolved on
32             the flight and the iteration itself should be hidden from the end users.
33              
34             The enumerables are single-pass composable calculation units. What it means:
35             An enumerable is stateful, once it reached the end of the sequence, it will
36             not rewind to the beginning unless explicitly forced to.
37             Enumerables are composable: one enumerable might be an extension of another by
38             applying some additional logic. Enumerables resolve steps on demand, one by one.
39             A single step might return another enumerable (micro batches). The library
40             flattens these enumerables, so for the end user this looks like a single
41             continuous sequence of elements.
42              
43              
44             [enumerable.has_next] -> [_buffer.has_next] -> yes -> return true
45             -> no -> result = [enumerable.on_has_next] -> return result
46              
47             [enumerable.next] -> [_buffer.has_next] -> yes -> return [_buffer.next]
48             -> no -> result = [enumerable.next] -> [enumerable.set_buffer(result)] -> return result
49              
50             =head1 EXAMPLES
51              
52             =head2 A basic range
53              
54             This example implements a range generator from $from until $to. In order to
55             generate this range we define 2 callbacks: C and C.
56             The first one is used as point of truth whether the sequence has any more
57             non-iterated elements, and the 2nd one is here to return the next element in
58             the sequence and the one that changes the state of the internal sequence
59             iterator.
60              
61             sub basic_range {
62             my ($from, $to) = @_;
63             $from <= $to or die '$from should be less or equal $to';
64             my $current = $from;
65             Data::Enumerable::Lazy->new({
66             on_has_next => sub {
67             return $current <= $to;
68             },
69             on_next => sub {
70             my ($self) = @_;
71             return $self->yield($current++);
72             },
73             });
74             }
75              
76             on_has_next() makes sure the current value does not exceed $to value, and
77             on_next() yields the next value of the sequence. Note the yield method.
78             An enumerable developer is expected to use this method in order to return
79             the next step value. This method does some internal bookkeeping and smart
80             caching.
81              
82             Usage:
83              
84             # We initialize a new range generator from 0 to 10 including.
85             my $range = basic_range(0, 10);
86             # We check if the sequence has elements in it's tail.
87             while ($range->has_next) {
88             # In this very line the state of $range is being changed
89             say $range->next;
90             }
91              
92             is $range->has_next, 0, '$range has been iterated completely'
93             is $range->next, undef, 'A fully iterated sequence returns undef on next()'
94              
95             =head2 Prime numbers
96              
97             Prime numbers is an infinite sequence of natural numbers. This example
98             implements a very naive suboptimal prime number generator.
99              
100             my $prime_num_stream = Data::Enumerable::Lazy->new({
101             # This is an infinite sequence
102             on_has_next => sub { 1 },
103             on_next => sub {
104             my $self = shift;
105             # We save the result of the previous step
106             my $next = $self->{_prev_} // 1;
107             LOOKUP: while (1) {
108             $next++;
109             # Check all numbers from 2 to sqrt(N)
110             foreach (2..floor(sqrt($next))) {
111             ($next % $_ == 0) and next LOOKUP;
112             }
113             last LOOKUP;
114             }
115             # Save the result in order to use it in the next step
116             $self->{_prev_} = $next;
117             # Return the result
118             $self->yield($next);
119             },
120             });
121              
122             What's remarkable regarding this specific example is that one can not simply
123             call C in order to get all elements of the sequence. The enumerable
124             will throw an exception claiming it's an infinitive sequence. Therefore, we
125             should use C in order to get elements one by one or use another handy
126             method C which returns first N results.
127              
128             =head2 Flat enumeration (Nested generators)
129              
130             In this example we will output a numbers of a multiplication table 10x10.
131             What's interesting in this example is that there are 2 levels of sequences:
132             primary and secondary. Primary C returns secondary sequence, which
133             multiplicates 2 numbers..
134              
135             # A new stream based on a range from 1 to 10
136             my $mult_table = Data::Enumerable::Lazy->from_list(1..10)->continue({
137             on_has_next => sub {
138             my ($self, $i) = @_;
139             # The primary stream returns another sequence, based on range
140             $self->yield(Data::Enumerable::Lazy->from_list(1..10)->continue({
141             on_next => sub {
142             # $_[0] is a substream self
143             # $_[1] is a next substream sequence element
144             $_[0]->yield( $_[1] * $i )
145             },
146             }));
147             },
148             });
149              
150             Another feature which is represented here is the nested result generation.
151             Let's walk trough the sequence generation step by step and see what happens.
152              
153             $mult_table->has_next; # returns true based on the primary range, _buffer is
154             # empty
155             $mult_table->next; # returns 1, the secondary sequence is now stored as
156             # the primary enumerable buffer and 1 is being served
157             # from this buffer
158             $mult_table->has_next; # returns true, resolved by the state of the buffer
159             $mult_table->next; # returns 2, moves buffer iterator forward, the
160             # primary sequence on_next() is _not_ being called
161             # this time
162             $mult_table->next for (3..10); # The last iteration completes the buffer
163             # iteration cycle
164             $mult_table->has_next; # returns true, but now it calls the primary
165             # on_has_next()
166             $mult_table->next; # returns 2 as the first element in the next
167             # secondary sequence (which is 1 again) multiplied by
168             # the 2nd element of the primary sequence (which is 2)
169             $mult_table->to_list; # Generates the tail of the sesquence:
170             # [4, 6, ..., 80, 90, 100]
171             $mult_table->has_next; # returns false as the buffer is empty now and the
172             # primary sequence on_has_next() says there is nothing
173             # more to iterate over.
174              
175             =head2 DBI paginator example
176              
177             As mentioned earlier, lazy enumerables are useful when the number of the
178             elements in the sequence is not known in advance. So far, we were looking at
179             some synthetic examples, but the majority of us are not being paid for prime
180             number generators. Hands on some real life example. Say, we have a table and
181             we want to iterate over all entries in the table, and we want the data to be
182             retrieved in batches by 10 elements in order to reduce the number of queries.
183             We don't want to compute the number of steps in advance, as the number might
184             be inaccurate: let's assume we're paginating over some new tweets and the new
185             entries might be created on the flight.
186              
187             use DBI;
188             my $dbh = setup_dbh(); # Some config
189              
190             my $last_id = -1;
191             my $limit = 10;
192             my $offset = 0;
193             my $tweet_enum = Data::Enumerable::Lazy->new({
194             on_has_next => sub {
195             my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
196             $sth->execute($last_id);
197             my ($cnt) = $sth->fetchrow_array;
198             return int($cnt) > 0;
199             },
200             on_next => sub {
201             my ($self) = @_;
202             my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
203             $sth->execute($lmit, $offset);
204             $offset += $limit;
205             my @tweets = $sth->fetchrow_array;
206             $last_id = $tweets[-1]->{id};
207             $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
208             },
209             is_finite => 1,
210             });
211              
212             while ($tweet_enum->has_next) {
213             my $tweet = $tweet_enum->next;
214             # do something with this tweet
215             }
216              
217             In this example a tweet consumer is abstracted from any DBI bookkeeping and
218             consumes tweet entries one by one without any prior knowledge about the table
219             size and might work on a rapidly growing dataset.
220              
221             In order to reduce the number of queries, we query the data in batches by 10
222             elements max.
223              
224             =head2 Redis queue consumer
225              
226             use Redis;
227              
228             my $redis = Redis->new;
229             my $queue_enum = Data::Enumerable::Lazy->new({
230             on_has_next => sub { 1 },
231             on_next => sub {
232             # Blocking right POP
233             $redis->brpop();
234             },
235             });
236              
237             while (my $queue_item = $queue_enum->next) {
238             # do something with the queue item
239             }
240              
241             In this example the client is blocked until there is an element available in
242             the queue, but it's hidden away from the clients who consume the data item by
243             item.
244              
245             =head2 Kafka example
246              
247             Kafka consumer wrapper is another example of a lazy calculation application.
248             Lazy enumerables are very naturally co-operated with streaming data, like
249             Kafka. In this example we're fetching batches of messages from Kafka topic,
250             grep out corrupted ones and proceed with the mesages.
251              
252             use Kafka qw($DEFAULT_MAX_BYTES);
253             use Kafka::Connection;
254             use Kafka::Consumer;
255              
256             my $kafka_consumer = Kafka::Consumer->new(
257             Connection => Kafka::Connection->new( host => 'localhost', ),
258             );
259              
260             my $partition = 0;
261             my $offset = 0;
262             my $kafka_enum = Data::Enumerable::Lazy->new({
263             on_has_next => sub { 1 },
264             on_next => sub {
265             my ($self) = @_;
266             # Fetch messages in batch
267             my $messages = $kafka_consumer->fetch({
268             'topic',
269             $partition,
270             $offset,
271             $DEFAULT_MAX_BYTES
272             });
273             if ($messages) {
274             # Note the grep function applied: we're filtering away corrupted messages
275             $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
276             } else {
277             # If there are no more messages, we return an empty enum, this is
278             # another handy use-case for nested enums.
279             $self->yield(Data::Enumerable::Lazy->empty);
280             }
281             },
282             });
283              
284             while (my $message = $kafka_enum->next) {
285             # handle the message
286             }
287              
288             =cut
289              
290             =head1 INSTALLATION
291              
292             To install this module type the following:
293             perl Makefile.PL
294             make
295             make test
296             make install
297              
298             =cut
299              
300 19     19   121 use Carp;
  19         41  
  19         1206  
301 19     19   130 use List::Util;
  19         49  
  19         48795  
302              
303             sub new {
304 1563     1563 0 6676 my ($class, $opts) = @_;
305 1563         7302 return bless({ _opts => $opts, _buff => undef }, $class);
306             }
307              
308             =head1 OPTIONS
309              
310             =head2 on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
311              
312             C is a code ref, a callback which is being called every time the
313             generator is in demand for a new bit of data. Enumerable buffers up the result
314             of the previous calculation and if there are no more elements left in the
315             buffer, C would be called.
316              
317             C<$element> is defined when the current collection is a contuniation of another
318             enumerable. I.e.:
319             my $enum = Data::Enumerable::Lazy->from_list(1, 2, 3);
320             my $enum2 = $enum->continue({
321             on_next => sub { my ($self, $i) = @_; $self->yield($i * $i) }
322             });
323             $enum2->to_list; # generates 1, 4, 9
324             In this case $i would be defined and it comes from the original enumerable.
325              
326             The function is supposed to return an enumerable, in this case it would be
327             kept as the buffer object. If this function method returns any other value,
328             it would be wrapped in a Csingular()>. There is a
329             way to prevent an enumerable from wrapping your return value in an enum and
330             keeping it in a raw state by providing C.
331              
332             =cut
333              
334 3229   100 3 1 7444 sub on_next { $_[0]->{_opts}->{on_next} // sub {} }
        3229      
335              
336             =head2 on_has_next($self) :: CodeRef -> Bool
337              
338             C is a code ref, a callback to be called whenever the enumerable
339             is about to resolve C method call. Similar to C call,
340             this one is also triggered whenever an enumerable runs out of buffered
341             elements. The function shoiuld return boolean.
342              
343             A method that returns 1 all the time is the way to initialize an infinite
344             enumerable (see C). If it returns 0 no matter what, it would be
345             an empty enumerable (see C). Normally you want to stay somewhere in
346             the middle and implement some state check login in there.
347              
348             =cut
349              
350 46   100 46 1 134 sub on_has_next { $_[0]->{_opts}->{on_has_next} // sub {0} }
  5214     5214   12119  
351              
352             =head2 on_reset($self) :: CodeRef -> void
353              
354             This is a callback to be called in order to reset the state of the enumerable.
355             This callback should be defined in the same scope as the enumerable itself.
356             The library provides nothing magical but a callback and a handle to call it,
357             so the state cleanup is completely on the developer's side.
358              
359             =cut
360              
361 0   0 0 1 0 sub on_reset { $_[0]->{_opts}->{on_reset} // sub {} }
        0      
362              
363             =head2 is_finite :: Bool
364              
365             A boolean flag indicating whether an enumerable is finite or not. By default
366             enumerables are treated as infinite, which means some functions will throw
367             an exception, like: C or C.
368              
369             Make sure to not mark an enumerable as finite and to call finite-size defined
370             methods, in this case it will create an infinite loop on the resolution.
371              
372             =cut
373              
374 78   50 78 1 444 sub is_finite { $_[0]->{_opts}->{is_finite} // 0 }
375              
376 10021   100 10021 0 30780 sub no_wrap { $_[0]->{_opts}->{no_wrap} // 0 }
377              
378             =head1 INSTANCE METHODS
379              
380             =head2 next()
381              
382             Function C is the primary interface for accessing elements of an
383             enumerable. It will do some internal checks and if there is no elements to be
384             served from an intermediate buffer, it will resolve the next step by calling
385             C callback.
386             Enumerables are composable: one enumerable might be based on another
387             enumeration. E.g.: a sequence of natural number squares is based on the
388             sequence of natural numbers themselves. In other words, a sequence is defined
389             as a tuple of another sequence and a function which would be lazily applied to
390             every element of this sequence.
391              
392             C accepts 0 or more arguments, which would be passed to C
393             callback.
394              
395             C is expected to do the heavy-lifting job in opposite to C,
396             which is supposed to be cheap and fast. This statement flips upside down
397             whenever C is applied to a stream. See C for more details.
398              
399             =cut
400              
401             sub next {
402 3431     3431 1 5448 my $self = shift;
403 3431         4276 my $res;
404 3431 100 100     7315 unless ($self->{_buff} && $self->{_buff}->has_next()) {
405 3229         5746 $res = $self->on_next()->($self, @_);
406 3229 100       5831 $self->{_buff} = $res
407             unless $self->no_wrap();
408             }
409 3431 100       5810 my $return = $self->no_wrap() ? $res : $self->{_buff}->next();
410 3431         7090 return $return;
411             }
412              
413             =head2 has_next()
414              
415             C is the primary entry point to get an information about the state
416             of an enumerable. If the method returned false, there are no more elements to be
417             consumed. I.e. the sequence has been iterated completely. Normally it means
418             the end of an iteration cycle.
419              
420             Enumerables use internal buffers in order to support batched C
421             resolutions. If there are some elements left in the buffer, C
422             won't call C callback immediately. If the buffer has been
423             iterated completely, C would be called.
424              
425             C should be fast on resolving the state of an enumerable as it's going
426             to be used for a condition state check.
427              
428             =cut
429              
430             sub has_next {
431 5399     5399 1 12201 my $self = shift;
432 5399         6639 my $res;
433             eval {
434 5399   100     8423 $res = $self->_has_next_in_buffer() ||
435             $self->_has_next_in_generator();
436 5399         13506 1;
437 5399 50       6864 } or do {
438 0   0     0 croak sprintf('Problem calling on_has_next(): %s', $@ // 'zombie error');
439             };
440 5399         13989 return int $res;
441             }
442              
443             =head2 reset()
444              
445             This method is a generic entry point for a enum reset. In fact, it is basically
446             a wrapper around user-defined C. Use with caution: if C
447             was not defined, it will reset the buffer and might cause a partial calculation
448             skip (reset implicitly clears the internal buffer) if the buffer was not fully
449             iterated yet.
450              
451             =cut
452              
453             sub reset {
454 0     0 1 0 my $self = shift;
455 0         0 $self->{_buff} = undef;
456 0 0       0 eval { $self->on_reset(); 1 } or do {
  0         0  
  0         0  
457 0   0     0 croak sprintf('Problem calling on_reset(): %s', $@ // 'zombie error');
458             };
459             }
460              
461             =head2 to_list()
462              
463             This function transforms a lazy enumerable to a list. Only finite enumerables
464             can be transformed to a list, so the method checks if an enumerable is created
465             with C flag. An exception would be thrown otherwise.
466              
467             =cut
468              
469             sub to_list {
470 24     24 1 110 my ($self) = @_;
471 24 50       52 croak 'Only finite enumerables might be converted to list. Use is_finite=1'
472             unless $self->is_finite();
473 24         50 my @acc;
474 24         59 push @acc, $self->next() while $self->has_next();
475 24         140 return \@acc;
476             }
477              
478             =head2 map($callback)
479              
480             Creates a new enumerable by applying a user-defined function to the original
481             enumerable. Works the same way as perl map {} function but it's lazy.
482              
483             Example
484             Data::Enumerable::Lazy
485             ->from_array(1, 2, 3)
486             ->map(sub {
487             my ($number) = @_;
488             return $number * $number
489             });
490              
491             =cut
492              
493             sub map {
494 1     1 1 2 my ($self, $callback) = @_;
495             Data::Enumerable::Lazy->new({
496             on_has_next => $self->on_has_next(),
497 10     10   20 on_next => sub { shift->yield($callback->($self->next())) },
498 1         2 is_finite => $self->is_finite(),
499             no_wrap => $self->no_wrap(),
500             });
501             }
502              
503             =head2 reduce($acc, $callback)
504              
505             Resolves the enumerable and returns the resulting state of the accumulator $acc
506             provided as the 1st argument. C<$callback> should always return the new state of
507             C<$acc>.
508              
509             C is defined for finite enumerables only.
510              
511             Example
512             Data::Enumerable::Lazy
513             ->from_array(1, 2, 3)
514             ->reduce(1, sub {
515             my ($acc, $number) = @_;
516             return $acc *= $number
517             });
518              
519             =cut
520              
521             sub reduce {
522 2     2 1 5 my ($self, $acc, $callback) = @_;
523 2 50       5 croak 'Only finite enumerables might be reduced. Use is_finite=1'
524             unless $self->is_finite();
525 2         7 ($acc = $callback->($acc, $self->next())) while $self->has_next();
526 2         4 return $acc;
527             }
528              
529             =head2 grep($callback, $max_lookahead)
530              
531             C is a function which returns a new enumerable by applying a
532             user-defined filter function.
533              
534             C might be applied to both finite and infinite enumerables. In case of
535             an infinitive enumerable there is an additional argument specifying max number
536             of lookahead steps. If an element satisfying the condition could not be found in
537             C steps, an enumerable is considered to be completely iterated
538             and C will return false.
539              
540             C returns a new enumerable with quite special properties: C
541             will perform a look ahead and call the original enumerable C method
542             in order to find an element for which the user-defined function will return
543             true. C, on the other side, returns the value that was pre-fetched
544             by C.
545              
546             Example
547             Data::Enumerable::Lazy
548             ->from_list(1, 2, 3)
549             ->grep(sub {
550             my ($number) = @_;
551             return $number % 2
552             });
553              
554             =cut
555              
556             sub grep {
557 9     9 1 18 my ($self, $callback, $max_lookahead) = @_;
558 9         12 my $next;
559 9         12 my $initialized = 0;
560 9   50     96 $max_lookahead //= 0;
561 9 100       17 $max_lookahead = 0
562             if $self->is_finite;
563 9         14 my $prev_has_next;
564             Data::Enumerable::Lazy->new({
565             on_has_next => sub {
566 65 50   65   107 defined $prev_has_next
567             and return $prev_has_next;
568 65         77 my $ix = 0;
569 65         84 $initialized = 1;
570 65         99 undef $next;
571 65         98 while ($self->has_next()) {
572 84 50       133 if ($max_lookahead > 0) {
573             $ix > $max_lookahead
574 0 0       0 and do {
575 0         0 carp sprintf 'Max lookahead steps cnt reached. Bailing out';
576 0         0 return $prev_has_next = 0;
577             };
578             }
579 84         139 $next = $self->next();
580 84 100       138 $callback->($next) and last;
581 27         61 undef $next;
582 27         43 $ix++;
583             }
584 65         154 return $prev_has_next = (defined $next);
585             },
586             on_next => sub {
587 56     56   66 my $self = shift;
588 56 50       97 $initialized or $self->has_next();
589 56         75 undef $prev_has_next;
590 56         86 $self->yield($next);
591             },
592 9         68 is_finite => $self->is_finite(),
593             no_wrap => $self->no_wrap(),
594             });
595             }
596              
597             =head2 resolve()
598              
599             Resolves an enumerable completely. Applicable for finite enumerables only.
600             The method returns nothing.
601              
602             =cut
603              
604             sub resolve {
605 1     1 1 5 my ($self) = @_;
606 1 50       2 croak 'Only finite enumerables might be resolved. Use is_finite=1'
607             unless $self->is_finite();
608 1         3 $self->next() while $self->has_next();
609             }
610              
611             =head2 take($N_elements)
612              
613             Resolves first $N_elements and returns the resulting list. If there are
614             fewer than N elements in the enumerable, the entire enumerable would be
615             returned as a list.
616              
617             =cut
618              
619             sub take {
620 10     10 1 44 my ($self, $slice_size) = @_;
621 10         14 my $ix = 0;
622 10         12 my @acc;
623 10   100     22 push @acc, $self->next() while ($self->has_next() && $ix++ < $slice_size);
624 10         39 return \@acc;
625             }
626              
627             =head2 take_while($callback)
628              
629             This function takes elements until it meets the first one that does not
630             satisfy the conditional callback.
631             The callback takes only 1 argument: an element. It should return true if
632             the element should be taken. Once it returned false, the stream is over.
633              
634             =cut
635              
636             sub take_while {
637 7     7 1 11 my ($self, $callback) = @_;
638 7         9 my $next_el;
639             my $prev_has_next;
640 7         7 my $initialized = 0;
641             Data::Enumerable::Lazy->new({
642             on_has_next => sub {
643 53     53   63 $initialized = 1;
644 53 50       71 defined $prev_has_next
645             and return $prev_has_next;
646 53         49 $prev_has_next = 0;
647 53 100       61 if ($self->has_next()) {
648 52         62 $next_el = $self->next();
649 52 100       65 if ($callback->($next_el)) {
650 47         76 $prev_has_next = 1;
651             }
652             }
653 53         88 return $prev_has_next;
654             },
655             on_next => sub {
656 47     47   59 my ($new_self) = @_;
657 47 50       58 $initialized or $new_self->has_next();
658 47 50       55 $prev_has_next
659             or return $new_self->yield(Data::Enumerable::Lazy->empty());
660 47         54 undef $prev_has_next;
661 47         61 $new_self->yield($next_el);
662             },
663 7         22 is_finite => $self->is_finite(),
664             });
665             }
666              
667             =head2 continue($ext = %{ on_next => sub {}, ... })
668              
669             Creates a new enumerable by extending the existing one. on_next is
670             the only manfatory argument. on_has_next might be overriden if some
671             custom logic comes into play.
672              
673             is_finite is inherited from the parent enumerable by default. All additional
674             attributes would be transparently passed to the constuctor.
675              
676             =cut
677              
678             sub continue {
679 16     16 1 34 my ($this, $ext) = @_;
680 16         89 my %ext = %$ext;
681             my $on_next = delete $ext{on_next}
682 16 50       54 or croak '`on_next` should be defined on stream continuation';
683 16 50       44 ref($on_next) eq 'CODE'
684             or croak '`on_next` should be a function';
685             Data::Enumerable::Lazy->new({
686             on_next => sub {
687 138     138   202 my $self = shift;
688 138 100       596 $self->yield(
689             $this->has_next() ?
690             $on_next->($self, $this->next()) :
691             Data::Enumerable::Lazy->empty
692             );
693             },
694             on_has_next => delete $ext->{on_has_next} // $this->on_has_next(),
695             is_finite => delete $ext->{is_finite} // $this->is_finite(),
696 16   33     93 no_wrap => delete $ext->{no_wrap} // 0,
      66        
      50        
697             %ext,
698             });
699             }
700              
701             =head2 count()
702              
703             Counts the number of the elements in the stream. This method iterates through
704             the stream so it makes it exhausted by the end of the computatuion.
705              
706             =cut
707              
708             sub count {
709 3     3 1 57 my ($self) = @_;
710 3 100       8 croak 'Only finite enumerables might be counted. Use is_finite=1'
711             unless $self->is_finite();
712 2         5 my $cnt = 0;
713 2         6 for (; $self->has_next(); $self->next()) {
714 10         20 $cnt++;
715             }
716 2         27 return $cnt;
717             }
718              
719             =head2 yield($result)
720              
721             This method is supposed to be called from C callback only. This is
722             the only valid result for an Enumerable to return the next step result.
723             Effectively, it ensures the returned result conforms to the required interface
724             and is wrapped in a lazy wrapper if needed.
725              
726             =cut
727              
728             sub yield {
729 3351     3351 1 116284 my $self = shift;
730 3351         4218 my $val = shift;
731 3351   66     10404 my $val_is_stream = $val && ref($val) eq 'Data::Enumerable::Lazy' &&
732             $val->isa('Data::Enumerable::Lazy');
733 3351 100 100     5618 if ($self->no_wrap() || $val_is_stream) {
734 1917         3612 return $val;
735             } else {
736 1434         2638 return Data::Enumerable::Lazy->singular($val);
737             }
738             }
739              
740             # Private methods
741              
742             sub _has_next_in_buffer {
743 5399     5399   6997 my $self = shift;
744 5399 100       15826 defined($self->{_buff}) && $self->{_buff}->has_next();
745             }
746              
747             sub _has_next_in_generator {
748 5197     5197   7187 my $self = shift;
749 5197         8554 $self->on_has_next()->($self, @_);
750             }
751              
752             =head1 CLASS METHODS
753              
754             =head2 empty()
755              
756             Returns an empty enumerable. Effectively it means an equivalent of an empty
757             array. C will return false and C will return undef. Useful
758             whenever a C step wants to return an empty resultset.
759              
760             =cut
761              
762             sub empty {
763 16     16 1 2331 Data::Enumerable::Lazy->new({
764             is_finite => 1,
765             no_wrap => 1,
766             });
767             }
768              
769             =head2 singular($val)
770              
771             Returns an enumerable with a single element $val. Actively used as an internal
772             data container.
773              
774             =cut
775              
776             sub singular {
777 1445     1445 1 5608 my ($class, $val) = @_;
778 1445         1922 my $resolved = 0;
779             Data::Enumerable::Lazy->new({
780 3070     3070   7961 on_has_next => sub { not $resolved },
781 1445     1445   2119 on_next => sub { $resolved = 1; shift->yield($val) },
  1445         2261  
782 1445         7049 is_finite => 1,
783             no_wrap => 1,
784             });
785             }
786              
787             =head2 from_list(@list)
788              
789             Returns a new enumerable instantiated from a list. The easiest way to
790             initialize an enumerable. In fact, all elements are already resolved
791             so this method sets C by default.
792              
793             =cut
794              
795             sub from_list {
796 32     32 1 8677 my $class = shift;
797 32         89 my @list = @_;
798 32         50 my $ix = 0;
799             Data::Enumerable::Lazy->new({
800 550     550   1482 on_has_next => sub { $ix < scalar(@list) },
801 249     249   480 on_next => sub { shift->yield($list[$ix++]) },
802 32         250 is_finite => 1,
803             no_wrap => 1,
804             });
805             }
806              
807             =head2 cycle()
808              
809             Creates an infinitive enumerable by cycling the original list. E.g. if the
810             original list is [1, 2, 3], C will generate an infinitive sequences
811             like: 1, 2, 3, 1, 2, 3, 1, ...
812              
813             =cut
814              
815             sub cycle {
816 6     6 1 90 my $class = shift;
817 6         12 my @list = @_;
818 6         7 my $ix = 0;
819 6         8 my $max_ix = scalar(@list) - 1;
820             Data::Enumerable::Lazy->new({
821 62     62   130 on_has_next => sub { 1 },
822             on_next => sub {
823 49 100   49   77 $ix = $ix > $max_ix ? 0 : $ix;
824 49         71 shift->yield($list[$ix++])
825             },
826 6         29 is_finite => 0,
827             no_wrap => 1,
828             });
829             }
830              
831             =head2 infinity()
832              
833             Returns a new infinite enumerable. C always returns true whereas
834             C returns undef all the time. Useful as an extension basis for infinite
835             sequences.
836              
837             =cut
838              
839             sub infinity {
840 2     2 1 947 my $class = shift;
841             Data::Enumerable::Lazy->new({
842 11     11   20 on_has_next => sub { 1 },
843       10     on_next => sub {},
844 2         14 is_finite => 0,
845             no_wrap => 1,
846             });
847             }
848              
849             =head2 merge($tream1 [, $tream2 [, $tream3 [, ...]]])
850              
851             This function merges one or more streams together by fan-outing C
852             method call among the non-empty streams.
853             Returns a new enumerable instance, which:
854             * Has next elements as far as at least one of the streams does.
855             * Returns next element py picking it one-by-one from the streams.
856             * Is finite if and only if all the streams are finite.
857             If one of the streams is over, it would be taken into account and
858             C will continue choosing from non-empty ones.
859              
860             =cut
861              
862             sub merge {
863 4     4 1 7 my $class = shift;
864 4         7 my @streams = @_;
865 4 50       7 scalar @streams == 0
866             and croak '`merge` function takes at least 1 stream';
867 4 50       7 scalar @streams == 1
868             and return shift;
869             my $ixs = Data::Enumerable::Lazy->cycle(0..scalar(@streams) - 1)
870 38     38   104 -> take_while(sub { List::Util::any { $_->has_next() } @streams })
  91         120  
871 4     35   8 -> grep(sub { $streams[ shift ]->has_next() });
  35         46  
872             Data::Enumerable::Lazy->new({
873 24     24   31 on_has_next => sub { $ixs->has_next() },
874             on_next => sub {
875 20     20   28 shift->yield($streams[ $ixs->next() ]->next());
876             },
877 4 100   17   37 is_finite => (List::Util::reduce { $a || $b->is_finite() } 0, @streams),
  17         39  
878             });
879             }
880              
881             =head2 chain($tream1(, $tream2(, $tream3(, ...))))
882              
883             Executes streams sequentually, one after another: the next stream starts once
884             the previous is over.
885              
886             =cut
887              
888             sub chain {
889 3     3 1 5 my $class = shift;
890 3         6 my @streams = @_;
891 3 100       10 scalar(@streams) < 2
892             and return $streams[0];
893             Data::Enumerable::Lazy->from_list(@streams)
894             -> continue({
895 7     7   15 on_next => sub { $_[0]->yield($_[1]) }
896             })
897 29     29   64 -> grep(sub { defined $_[0] })
898 2         5 }
899              
900             =head2 from_text_file($file_handle(, $options))
901              
902             Method takes an open file handle and an optional hash of options and creates a
903             stream of it. The file would be read as a text file, line by line. For
904             additional options see C perl core function reference.
905             Options is a basic hash, supported attributes are:
906             * chomp :: Bool | Whether the lines should be chomped, 0 by default.
907             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
908              
909             =cut
910              
911             sub from_text_file {
912 2     2 1 5985 my ($class, $file_handle, $options) = @_;
913 2   50     7 $options //= +{};
914             my $str = Data::Enumerable::Lazy->new({
915 22     22   127 on_has_next => sub { !eof($file_handle) },
916             on_next => sub {
917 20     20   45 my $line = readline($file_handle);
918 20         34 $_[0]->yield($line);
919             },
920 2   50     18 is_finite => $options->{is_finite} // 0,
921             });
922 2 100       7 if ($options->{chomp}) {
923 1     10   5 $str = $str->map(sub { my $s = $_[0]; chomp $s; $s });
  10         14  
  10         19  
  10         19  
924             }
925 2         5 return $str;
926             }
927              
928             =head2 from_bin_file($file_handle(, $options))
929              
930             Method similar to C but forces binary reading from file.
931             Takes a file handle created by C function and an optional hash of
932             options. Supported attributes are:
933             * block_size :: Integer | The size of read block, 1024 bytes by default.
934             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
935              
936             =cut
937              
938             sub from_bin_file {
939 1     1 1 90 my ($class, $file_handle, $options) = @_;
940 1   50     3 $options //= +{};
941 1   50     4 my $block_size = $options->{block_size} // 1024;
942             Data::Enumerable::Lazy->new({
943 23     23   104 on_has_next => sub { !eof($file_handle) },
944             on_next => sub {
945 22     22   20 my $buf;
946 22         87 read($file_handle, $buf, $block_size);
947 22         34 $_[0]->yield($buf);
948             },
949 1   50     7 is_finite => $options->{is_finite} // 0,
950             })
951             }
952              
953             =head1 AUTHOR
954              
955             Oleg S
956              
957             =cut
958              
959             =head1 SEE ALSO
960              
961             =head2 Lazy evaluation in a nutshell
962              
963             L
964              
965             =head2 Library GitHub page:
966              
967             L
968              
969             =head2 Alternative implementations:
970              
971             L
972             L
973             L
974              
975             =cut
976              
977             =head1 COPYRIGHT AND LICENSE
978              
979             Copyright 2017, 2018 Oleg S
980              
981             Copying and distribution of this file, with or without modification, are
982             permitted in any medium without royalty provided the copyright notice and this
983             notice are preserved. This file is offered as-is, without any warranty.
984              
985             =cut
986              
987             1;
988              
989             __END__