File Coverage

blib/lib/Data/Enumerable/Lazy.pm
Criterion Covered Total %
statement 178 189 94.1
branch 44 62 70.9
condition 31 49 63.2
subroutine 61 64 95.3
pod 26 28 92.8
total 340 392 86.7


line stmt bran cond sub pod time code
1             package Data::Enumerable::Lazy;
2              
3 19     19   1491881 use 5.18.2;
  19         287  
4              
5 19     19   132 use strict;
  19         48  
  19         568  
6 19     19   128 use warnings;
  19         46  
  19         2236  
7              
8             our $VERSION = '0.03';
9              
10             =pod
11              
12             =head1 NAME
13              
14             Data::Enumerable::Lazy
15              
16             =head1 SYNOPSIS
17              
18             A basic lazy range implementation picking even numbers only:
19              
20             my ($from, $to) = (0, 10);
21             my $current = $from;
22             my $tream = Data::Enumerable::Lazy->new({
23             on_has_next => sub { $current <= $to },
24             on_next => sub { shift->yield($current++) },
25             })->grep(sub{ shift % 2 == 0 });
26             $tream->to_list(); # generates: [0, 2, 4, 6, 8, 10]
27              
28             =head2 DESCRIPTION
29              
30             This library is another one implementation of a lazy generator + enumerable
31             for Perl5. It might be handy if the elements of the collection are resolved on
32             the flight and the iteration itself should be hidden from the end users.
33              
34             The enumerables are single-pass composable calculation units. What it means:
35             An enumerable is stateful, once it reached the end of the sequence, it will
36             not rewind to the beginning unless explicitly forced to.
37             Enumerables are composable: one enumerable might be an extension of another by
38             applying some additional logic. Enumerables resolve steps on demand, one by one.
39             A single step might return another enumerable (micro batches). The library
40             flattens these enumerables, so for the end user this looks like a single
41             continuous sequence of elements.
42              
43              
44             [enumerable.has_next] -> [_buffer.has_next] -> yes -> return true
45             -> no -> result = [enumerable.on_has_next] -> return result
46              
47             [enumerable.next] -> [_buffer.has_next] -> yes -> return [_buffer.next]
48             -> no -> result = [enumerable.next] -> [enumerable.set_buffer(result)] -> return result
49              
50             =head1 EXAMPLES
51              
52             =head2 A basic range
53              
54             This example implements a range generator from $from until $to. In order to
55             generate this range we define 2 callbacks: C and C.
56             The first one is used as point of truth whether the sequence has any more
57             non-iterated elements, and the 2nd one is here to return the next element in
58             the sequence and the one that changes the state of the internal sequence
59             iterator.
60              
61             sub basic_range {
62             my ($from, $to) = @_;
63             $from <= $to or die '$from should be less or equal $to';
64             my $current = $from;
65             Data::Enumerable::Lazy->new({
66             on_has_next => sub {
67             return $current <= $to;
68             },
69             on_next => sub {
70             my ($self) = @_;
71             return $self->yield($current++);
72             },
73             });
74             }
75              
76             on_has_next() makes sure the current value does not exceed $to value, and
77             on_next() yields the next value of the sequence. Note the yield method.
78             An enumerable developer is expected to use this method in order to return
79             the next step value. This method does some internal bookkeeping and smart
80             caching.
81              
82             Usage:
83              
84             # We initialize a new range generator from 0 to 10 including.
85             my $range = basic_range(0, 10);
86             # We check if the sequence has elements in it's tail.
87             while ($range->has_next) {
88             # In this very line the state of $range is being changed
89             say $range->next;
90             }
91              
92             is $range->has_next, 0, '$range has been iterated completely'
93             is $range->next, undef, 'A fully iterated sequence returns undef on next()'
94              
95             =head2 Prime numbers
96              
97             Prime numbers is an infinite sequence of natural numbers. This example
98             implements a very basic prime number generator.
99              
100             my $prime_num_stream = Data::Enumerable::Lazy->new({
101             # This is an infinite sequence
102             on_has_next => sub { 1 },
103             on_next => sub {
104             my $self = shift;
105             # We save the result of the previous step
106             my $next = $self->{_prev_} // 1;
107             LOOKUP: while (1) {
108             $next++;
109             # Check all numbers from 2 to sqrt(N)
110             foreach (2..floor(sqrt($next))) {
111             ($next % $_ == 0) and next LOOKUP;
112             }
113             last LOOKUP;
114             }
115             # Save the result in order to use it in the next step
116             $self->{_prev_} = $next;
117             # Return the result
118             $self->yield($next);
119             },
120             });
121              
122             What's remarkable regarding this specific example is that one can not simply
123             call C in order to get all elements of the sequence. The enumerable
124             will throw an exception claiming it's an infinitive sequence. Therefore, we
125             should use C in order to get elements one by one or use another handy
126             method C which returns first N results.
127              
128             =head2 Nested enumerables
129              
130             In this example we will output a numbers of a multiplication table 10x10.
131             What's interesting in this example is that there are 2 sequences: primary and
132             secondary. Primary C returns secondary sequence, which generates the
133             result of multiplication of 2 numbers.
134              
135             # A new stream based on a range from 1 to 10
136             my $mult_table = Data::Enumerable::Lazy->from_list(1..10)->continue({
137             on_has_next => sub {
138             my ($self, $i) = @_;
139             # The primary stream returns another sequence, based on range
140             $self->yield(Data::Enumerable::Lazy->from_list(1..10)->continue({
141             on_next => sub {
142             # $_[0] is a substream self
143             # $_[1] is a next substream sequence element
144             $_[0]->yield( $_[1] * $i )
145             },
146             }));
147             },
148             });
149              
150             Another feature which is demonstrated here is the batched result generation.
151             Let's iterate the sequence step by step and see what happens inside.
152              
153             $mult_table->has_next; # returns true based on the primary range, _buffer is
154             # empty
155             $mult_table->next; # returns 1, the secondary sequence is now stored as
156             # the primary enumerable buffer and 1 is being served
157             # from this buffer
158             $mult_table->has_next; # returns true, resolved by the state of the buffer
159             $mult_table->next; # returns 2, moves buffer iterator forward, the
160             # primary sequence on_next() is _not_ being called
161             # this time
162             $mult_table->next for (3..10); # The last iteration completes the buffer
163             # iteration cycle
164             $mult_table->has_next; # returns true, but now it calls the primary
165             # on_has_next()
166             $mult_table->next; # returns 2 as the first element in the next
167             # secondary sequence (which is 1 again) multiplied by
168             # the 2nd element of the primary sequence (which is 2)
169             $mult_table->to_list; # Generates the tail of the sesquence:
170             # [4, 6, ..., 80, 90, 100]
171             $mult_table->has_next; # returns false as the buffer is empty now and the
172             # primary sequence on_has_next() says there is nothing
173             # more to iterate over.
174              
175             =head2 DBI paginator example
176              
177             As mentioned earlier, lazy enumerables are useful when the number of the
178             elements in the sequence is not known in advance. So far, we were looking at
179             some synthetic examples, but the majority of us are not being paid for prime
180             number generators. Hands on some real life example. Say, we have a table and
181             we want to iterate over all entries in the table, and we want the data to be
182             retrieved in batches by 10 elements in order to reduce the number of queries.
183             We don't want to compute the number of steps in advance, as the number might
184             be inaccurate: let's assume we're paginating over some new tweets and the new
185             entries might be created on the flight.
186              
187             use DBI;
188             my $dbh = setup_dbh(); # Some config
189              
190             my $last_id = -1;
191             my $limit = 10;
192             my $offset = 0;
193             my $tweet_enum = Data::Enumerable::Lazy->new({
194             on_has_next => sub {
195             my $sth = $dbh->prepare('SELECT count(1) from Tweets where id > ?');
196             $sth->execute($last_id);
197             my ($cnt) = $sth->fetchrow_array;
198             return int($cnt) > 0;
199             },
200             on_next => sub {
201             my ($self) = @_;
202             my $sth = $dbh->prepare('SELECT * from Tweets ORDER BY id LIMIT ? OFFSET ?');
203             $sth->execute($lmit, $offset);
204             $offset += $limit;
205             my @tweets = $sth->fetchrow_array;
206             $last_id = $tweets[-1]->{id};
207             $self->yield(Data::Enumerable::Lazy->from_list(@tweets));
208             },
209             is_finite => 1,
210             });
211              
212             while ($tweet_enum->has_next) {
213             my $tweet = $tweet_enum->next;
214             # do something with this tweet
215             }
216              
217             In this example a tweet consumer is abstracted from any DBI bookkeeping and
218             consumes tweet entries one by one without any prior knowledge about the table
219             size and might work on a rapidly growing dataset.
220              
221             In order to reduce the number of queries, we query the data in batches by 10
222             elements max.
223              
224             =head2 Redis queue consumer
225              
226             use Redis;
227              
228             my $redis = Redis->new;
229             my $queue_enum = Data::Enumerable::Lazy->new({
230             on_has_next => sub { 1 },
231             on_next => sub {
232             # Blocking right POP
233             $redis->brpop();
234             },
235             });
236              
237             while (my $queue_item = $queue_enum->next) {
238             # do something with the queue item
239             }
240              
241             In this example the client is blocked until there is an element available in
242             the queue, but it's hidden away from the clients who consume the data item by
243             item.
244              
245             =head2 Kafka example
246              
247             Kafka consumer wrapper is another example of a lazy calculation application.
248             Lazy enumerables are very naturally co-operated with streaming data, like
249             Kafka. In this example we're fetching batches of messages from Kafka topic,
250             grep out corrupted ones and proceed with the mesages.
251              
252             use Kafka qw($DEFAULT_MAX_BYTES);
253             use Kafka::Connection;
254             use Kafka::Consumer;
255              
256             my $kafka_consumer = Kafka::Consumer->new(
257             Connection => Kafka::Connection->new( host => 'localhost', ),
258             );
259              
260             my $partition = 0;
261             my $offset = 0;
262             my $kafka_enum = Data::Enumerable::Lazy->new({
263             on_has_next => sub { 1 },
264             on_next => sub {
265             my ($self) = @_;
266             # Fetch messages in batch
267             my $messages = $kafka_consumer->fetch({
268             'topic',
269             $partition,
270             $offset,
271             $DEFAULT_MAX_BYTES
272             });
273             if ($messages) {
274             # Note the grep function applied: we're filtering away corrupted messages
275             $self->yield(Data::Enumerable::Lazy->from_list(@$messages))->grep(sub { $_[0]->valid });
276             } else {
277             # If there are no more messages, we return an empty enum, this is
278             # another handy use-case for nested enums.
279             $self->yield(Data::Enumerable::Lazy->empty);
280             }
281             },
282             });
283              
284             while (my $message = $kafka_enum->next) {
285             # handle the message
286             }
287              
288             =cut
289              
290             =head1 INSTALLATION
291              
292             To install this module type the following:
293             perl Makefile.PL
294             make
295             make test
296             make install
297              
298             =cut
299              
300 19     19   150 use Carp;
  19         48  
  19         1514  
301 19     19   152 use List::Util;
  19         58  
  19         55554  
302              
303             sub new {
304 1563     1563 0 8675 my ($class, $opts) = @_;
305 1563         8761 return bless({ _opts => $opts, _buff => undef }, $class);
306             }
307              
308             =head1 OPTIONS
309              
310             =head2 on_next($self, $element) :: CodeRef -> Data::Enumerable::Lazy | Any
311              
312             C is a code ref, a callback which is being called every time the
313             generator is in demand for a new bit of data. Enumerable buffers up the result
314             of the previous calculation and if there are no more elements left in the
315             buffer, C would be called.
316              
317             C<$element> is defined when the current collection is a contuniation of another
318             enumerable. I.e.:
319             my $enum = Data::Enumerable::Lazy->from_list(1, 2, 3);
320             my $enum2 = $enum->continue({
321             on_next => sub { my ($self, $i) = @_; $self->yield($i * $i) }
322             });
323             $enum2->to_list; # generates 1, 4, 9
324             In this case $i would be defined and it comes from the original enumerable.
325              
326             The function is supposed to return an enumerable, in this case it would be
327             kept as the buffer object. If this function method returns any other value,
328             it would be wrapped in a Csingular()>. There is a
329             way to prevent an enumerable from wrapping your return value in an enum and
330             keeping it in a raw state by providing C.
331              
332             =cut
333              
334 3229   100 3 1 11318 sub on_next { $_[0]->{_opts}->{on_next} // sub {} }
        3229      
335              
336             =head2 on_has_next($self) :: CodeRef -> Bool
337              
338             C is a code ref, a callback to be called whenever the enumerable
339             is about to resolve C method call. Similar to C call,
340             this one is also triggered whenever an enumerable runs out of buffered
341             elements. The function shoiuld return boolean.
342              
343             A method that returns 1 all the time is the way to initialize an infinite
344             enumerable (see C). If it returns 0 no matter what, it would be
345             an empty enumerable (see C). Normally you want to stay somewhere in
346             the middle and implement some state check login in there.
347              
348             =cut
349              
350 46   100 46 1 318 sub on_has_next { $_[0]->{_opts}->{on_has_next} // sub {0} }
  5214     5214   17172  
351              
352             =head2 on_reset($self) :: CodeRef -> void
353              
354             This is a callback to be called in order to reset the state of the enumerable.
355             This callback should be defined in the same scope as the enumerable itself.
356             The library provides nothing magical but a callback and a handle to call it,
357             so the state cleanup is completely on the developer's side.
358              
359             =cut
360              
361 0   0 0 1 0 sub on_reset { $_[0]->{_opts}->{on_reset} // sub {} }
        0      
362              
363             =head2 is_finite :: Bool
364              
365             A boolean flag indicating whether an enumerable is finite or not. By default
366             enumerables are treated as infinite, which means some functions will throw
367             an exception, like: C or C.
368              
369             Make sure to not mark an enumerable as finite and to call finite-size defined
370             methods, in this case it will create an infinite loop on the resolution.
371              
372             =cut
373              
374 78   50 78 1 469 sub is_finite { $_[0]->{_opts}->{is_finite} // 0 }
375              
376 10021   100 10021 0 42898 sub no_wrap { $_[0]->{_opts}->{no_wrap} // 0 }
377              
378             =head1 INSTANCE METHODS
379              
380             =head2 next()
381              
382             Function C is the primary interface for accessing elements of an
383             enumerable. It will do some internal checks and if there is no elements to be
384             served from an intermediate buffer, it will resolve the next step by calling
385             C callback.
386             Enumerables are composable: one enumerable might be based on another
387             enumeration. E.g.: a sequence of natural number squares is based on the
388             sequence of natural numbers themselves. In other words, a sequence is defined
389             as a tuple of another sequence and a function which would be lazily applied to
390             every element of this sequence.
391              
392             C accepts 0 or more arguments, which would be passed to C
393             callback.
394              
395             C is expected to do the heavy-lifting job in opposite to C,
396             which is supposed to be cheap and fast. This statement flips upside down
397             whenever C is applied to a stream. See C for more details.
398              
399             =cut
400              
401             sub next {
402 3431     3431 1 7659 my $self = shift;
403 3431         5558 my $res;
404 3431 100 100     9368 unless ($self->{_buff} && $self->{_buff}->has_next()) {
405 3229         8088 $res = $self->on_next()->($self, @_);
406 3229 100       7771 $self->{_buff} = $res
407             unless $self->no_wrap();
408             }
409 3431 100       8358 my $return = $self->no_wrap() ? $res : $self->{_buff}->next();
410 3431         9293 return $return;
411             }
412              
413             =head2 has_next()
414              
415             C is the primary entry point to get an information about the state
416             of an enumerable. If the method returned false, there are no more elements to be
417             consumed. I.e. the sequence has been iterated completely. Normally it means
418             the end of an iteration cycle.
419              
420             Enumerables use internal buffers in order to support batched C
421             resolutions. If there are some elements left in the buffer, C
422             won't call C callback immediately. If the buffer has been
423             iterated completely, C would be called.
424              
425             C should be fast on resolving the state of an enumerable as it's going
426             to be used for a condition state check.
427              
428             =cut
429              
430             sub has_next {
431 5399     5399 1 16187 my $self = shift;
432 5399         8670 my $res;
433             eval {
434 5399   100     11013 $res = $self->_has_next_in_buffer() ||
435             $self->_has_next_in_generator();
436 5399         17670 1;
437 5399 50       9000 } or do {
438 0   0     0 croak sprintf('Problem calling on_has_next(): %s', $@ // 'zombie error');
439             };
440 5399         19670 return int $res;
441             }
442              
443             =head2 reset()
444              
445             This method is a generic entry point for a enum reset. In fact, it is basically
446             a wrapper around user-defined C.
447              
448             =cut
449              
450             sub reset {
451 0     0 1 0 my $self = shift;
452 0         0 $self->{_buff} = undef;
453 0 0       0 eval { $self->on_reset(); 1 } or do {
  0         0  
  0         0  
454 0   0     0 croak sprintf('Problem calling on_reset(): %s', $@ // 'zombie error');
455             };
456             }
457              
458             =head2 to_list()
459              
460             This function transforms a lazy enumerable to a list. Only finite enumerables
461             can be transformed to a list, so the method checks if an enumerable is created
462             with C flag. An exception would be thrown otherwise.
463              
464             =cut
465              
466             sub to_list {
467 24     24 1 142 my ($self) = @_;
468 24 50       69 croak 'Only finite enumerables might be converted to list. Use is_finite=1'
469             unless $self->is_finite();
470 24         49 my @acc;
471 24         71 push @acc, $self->next() while $self->has_next();
472 24         161 return \@acc;
473             }
474              
475             =head2 map($callback)
476              
477             Creates a new enumerable by applying a user-defined function to the original
478             enumerable. Works the same way as perl map {} function but it's lazy.
479              
480             =cut
481              
482             sub map {
483 1     1 1 5 my ($self, $callback) = @_;
484             Data::Enumerable::Lazy->new({
485             on_has_next => $self->on_has_next(),
486 10     10   22 on_next => sub { shift->yield($callback->($self->next())) },
487 1         6 is_finite => $self->is_finite(),
488             no_wrap => $self->no_wrap(),
489             });
490             }
491              
492             =head2 reduce($acc, $callback)
493              
494             Resolves the enumerable and returns the resulting state of the accumulator $acc
495             provided as the 1st argument. C<$callback> should always return the new state of
496             C<$acc>.
497              
498             C is defined for finite enumerables only.
499              
500             =cut
501              
502             sub reduce {
503 2     2 1 7 my ($self, $acc, $callback) = @_;
504 2 50       6 croak 'Only finite enumerables might be reduced. Use is_finite=1'
505             unless $self->is_finite();
506 2         10 ($acc = $callback->($acc, $self->next())) while $self->has_next();
507 2         6 return $acc;
508             }
509              
510             =head2 grep($callback, $max_lookahead)
511              
512             C is a function which returns a new enumerable by applying a
513             user-defined filter function.
514              
515             C might be applied to both finite and infinite enumerables. In case of
516             an infinitive enumerable there is an additional argument specifying max number
517             of lookahead steps. If an element satisfying the condition could not be found in
518             C steps, an enumerable is considered to be completely iterated
519             and C will return false.
520              
521             C returns a new enumerable with quite special properties: C
522             will perform a look ahead and call the original enumerable C method
523             in order to find an element for which the user-defined function will return
524             true. C, on the other side, returns the value that was pre-fetched
525             by C.
526              
527             =cut
528              
529             sub grep {
530 9     9 1 24 my ($self, $callback, $max_lookahead) = @_;
531 9         18 my $next;
532 9         16 my $initialized = 0;
533 9   50     64 $max_lookahead //= 0;
534 9 100       26 $max_lookahead = 0
535             if $self->is_finite;
536 9         15 my $prev_has_next;
537             Data::Enumerable::Lazy->new({
538             on_has_next => sub {
539 65 50   65   139 defined $prev_has_next
540             and return $prev_has_next;
541 65         101 my $ix = 0;
542 65         92 $initialized = 1;
543 65         95 undef $next;
544 65         125 while ($self->has_next()) {
545 84 50       203 if ($max_lookahead > 0) {
546             $ix > $max_lookahead
547 0 0       0 and do {
548 0         0 carp sprintf 'Max lookahead steps cnt reached. Bailing out';
549 0         0 return $prev_has_next = 0;
550             };
551             }
552 84         175 $next = $self->next();
553 84 100       185 $callback->($next) and last;
554 27         81 undef $next;
555 27         78 $ix++;
556             }
557 65         193 return $prev_has_next = (defined $next);
558             },
559             on_next => sub {
560 56     56   106 my $self = shift;
561 56 50       96 $initialized or $self->has_next();
562 56         88 undef $prev_has_next;
563 56         106 $self->yield($next);
564             },
565 9         60 is_finite => $self->is_finite(),
566             no_wrap => $self->no_wrap(),
567             });
568             }
569              
570             =head2 resolve()
571              
572             Resolves an enumerable completely. Applicable for finite enumerables only.
573             The method returns nothing.
574              
575             =cut
576              
577             sub resolve {
578 1     1 1 6 my ($self) = @_;
579 1 50       3 croak 'Only finite enumerables might be resolved. Use is_finite=1'
580             unless $self->is_finite();
581 1         4 $self->next() while $self->has_next();
582             }
583              
584             =head2 take($N_elements)
585              
586             Resolves first $N_elements and returns the resulting list. If there are
587             fewer than N elements in the enumerable, the entire enumerable would be
588             returned as a list.
589              
590             =cut
591              
592             sub take {
593 10     10 1 95 my ($self, $slice_size) = @_;
594 10         23 my $ix = 0;
595 10         20 my @acc;
596 10   100     41 push @acc, $self->next() while ($self->has_next() && $ix++ < $slice_size);
597 10         200 return \@acc;
598             }
599              
600             =head2 take_while($callback)
601              
602             This function takes elements until it meets the first one that does not
603             satisfy the conditional callback.
604             The callback takes only 1 argument: an element. It should return true if
605             the element should be taken. Once it returned false, the stream is over.
606              
607             =cut
608              
609             sub take_while {
610 7     7 1 18 my ($self, $callback) = @_;
611 7         11 my $next_el;
612             my $prev_has_next;
613 7         13 my $initialized = 0;
614             Data::Enumerable::Lazy->new({
615             on_has_next => sub {
616 53     53   94 $initialized = 1;
617 53 50       109 defined $prev_has_next
618             and return $prev_has_next;
619 53         75 $prev_has_next = 0;
620 53 100       105 if ($self->has_next()) {
621 52         100 $next_el = $self->next();
622 52 100       113 if ($callback->($next_el)) {
623 47         110 $prev_has_next = 1;
624             }
625             }
626 53         165 return $prev_has_next;
627             },
628             on_next => sub {
629 47     47   96 my ($new_self) = @_;
630 47 50       101 $initialized or $new_self->has_next();
631 47 50       89 $prev_has_next
632             or return $new_self->yield(Data::Enumerable::Lazy->empty());
633 47         70 undef $prev_has_next;
634 47         96 $new_self->yield($next_el);
635             },
636 7         43 is_finite => $self->is_finite(),
637             });
638             }
639              
640             =head2 continue($ext = %{ on_next => sub {}, ... })
641              
642             Creates a new enumerable by extending the existing one. on_next is
643             the only manfatory argument. on_has_next might be overriden if some
644             custom logic comes into play.
645              
646             is_finite is inherited from the parent enumerable by default. All additional
647             attributes would be transparently passed to the constuctor.
648              
649             =cut
650              
651             sub continue {
652 16     16 1 55 my ($this, $ext) = @_;
653 16         81 my %ext = %$ext;
654             my $on_next = delete $ext{on_next}
655 16 50       85 or croak '`on_next` should be defined on stream continuation';
656 16 50       76 ref($on_next) eq 'CODE'
657             or croak '`on_next` should be a function';
658             Data::Enumerable::Lazy->new({
659             on_next => sub {
660 138     138   283 my $self = shift;
661 138 100       331 $self->yield(
662             $this->has_next() ?
663             $on_next->($self, $this->next()) :
664             Data::Enumerable::Lazy->empty
665             );
666             },
667             on_has_next => delete $ext->{on_has_next} // $this->on_has_next(),
668             is_finite => delete $ext->{is_finite} // $this->is_finite(),
669 16   33     139 no_wrap => delete $ext->{no_wrap} // 0,
      66        
      50        
670             %ext,
671             });
672             }
673              
674             =head2 count()
675              
676             Counts the number of the elements in the stream. This method iterates through
677             the stream so it makes it exhausted by the end of the computatuion.
678              
679             =cut
680              
681             sub count {
682 3     3 1 64 my ($self) = @_;
683 3 100       8 croak 'Only finite enumerables might be counted. Use is_finite=1'
684             unless $self->is_finite();
685 2         3 my $cnt = 0;
686 2         7 for (; $self->has_next(); $self->next()) {
687 10         14 $cnt++;
688             }
689 2         25 return $cnt;
690             }
691              
692             =head2 yield($result)
693              
694             This method is supposed to be called from C callback only. This is
695             the only valid result for an Enumerable to return the next step result.
696             Effectively, it ensures the returned result conforms to the required interface
697             and is wrapped in a lazy wrapper if needed.
698              
699             =cut
700              
701             sub yield {
702 3351     3351 1 155268 my $self = shift;
703 3351         5680 my $val = shift;
704 3351   66     14270 my $val_is_stream = $val && ref($val) eq 'Data::Enumerable::Lazy' &&
705             $val->isa('Data::Enumerable::Lazy');
706 3351 100 100     7765 if ($self->no_wrap() || $val_is_stream) {
707 1917         5414 return $val;
708             } else {
709 1434         4201 return Data::Enumerable::Lazy->singular($val);
710             }
711             }
712              
713             # Private methods
714              
715             sub _has_next_in_buffer {
716 5399     5399   8974 my $self = shift;
717 5399 100       22374 defined($self->{_buff}) && $self->{_buff}->has_next();
718             }
719              
720             sub _has_next_in_generator {
721 5197     5197   9898 my $self = shift;
722 5197         11706 $self->on_has_next()->($self, @_);
723             }
724              
725             =head1 CLASS METHODS
726              
727             =head2 empty()
728              
729             Returns an empty enumerable. Effectively it means an equivalent of an empty
730             array. C will return false and C will return undef. Useful
731             whenever a C step wants to return an empty resultset.
732              
733             =cut
734              
735             sub empty {
736 16     16 1 4347 Data::Enumerable::Lazy->new({
737             is_finite => 1,
738             no_wrap => 1,
739             });
740             }
741              
742             =head2 singular($val)
743              
744             Returns an enumerable with a single element $val. Actively used as an internal
745             data container.
746              
747             =cut
748              
749             sub singular {
750 1445     1445 1 6597 my ($class, $val) = @_;
751 1445         2444 my $resolved = 0;
752             Data::Enumerable::Lazy->new({
753 3070     3070   10755 on_has_next => sub { not $resolved },
754 1445     1445   2614 on_next => sub { $resolved = 1; shift->yield($val) },
  1445         3255  
755 1445         10254 is_finite => 1,
756             no_wrap => 1,
757             });
758             }
759              
760             =head2 from_list(@list)
761              
762             Returns a new enumerable instantiated from a list. The easiest way to
763             initialize an enumerable. In fact, all elements are already resolved
764             so this method sets C by default.
765              
766             =cut
767              
768             sub from_list {
769 32     32 1 10814 my $class = shift;
770 32         117 my @list = @_;
771 32         74 my $ix = 0;
772             Data::Enumerable::Lazy->new({
773 550     550   2153 on_has_next => sub { $ix < scalar(@list) },
774 249     249   669 on_next => sub { shift->yield($list[$ix++]) },
775 32         313 is_finite => 1,
776             no_wrap => 1,
777             });
778             }
779              
780             =head2 cycle()
781              
782             Creates an infinitive enumerable by cycling the original list. E.g. if the
783             original list is [1, 2, 3], C will generate an infinitive sequences
784             like: 1, 2, 3, 1, 2, 3, 1, ...
785              
786             =cut
787              
788             sub cycle {
789 6     6 1 139 my $class = shift;
790 6         23 my @list = @_;
791 6         14 my $ix = 0;
792 6         16 my $max_ix = scalar(@list) - 1;
793             Data::Enumerable::Lazy->new({
794 62     62   223 on_has_next => sub { 1 },
795             on_next => sub {
796 49 100   49   123 $ix = $ix > $max_ix ? 0 : $ix;
797 49         138 shift->yield($list[$ix++])
798             },
799 6         65 is_finite => 0,
800             no_wrap => 1,
801             });
802             }
803              
804             =head2 infinity()
805              
806             Returns a new infinite enumerable. C always returns true whereas
807             C returns undef all the time. Useful as an extension basis for infinite
808             sequences.
809              
810             =cut
811              
812             sub infinity {
813 2     2 1 1424 my $class = shift;
814             Data::Enumerable::Lazy->new({
815 11     11   29 on_has_next => sub { 1 },
816       10     on_next => sub {},
817 2         20 is_finite => 0,
818             no_wrap => 1,
819             });
820             }
821              
822             =head2 merge($tream1 [, $tream2 [, $tream3 [, ...]]])
823              
824             This function merges one or more streams together by fan-outing C
825             method call among the non-empty streams.
826             Returns a new enumerable instance, which:
827             * Has next elements as far as at least one of the streams does.
828             * Returns next element py picking it one-by-one from the streams.
829             * Is finite if and only if all the streams are finite.
830             If one of the streams is over, it would be taken into account and
831             C will continue choosing from non-empty ones.
832              
833             =cut
834              
835             sub merge {
836 4     4 1 20 my $class = shift;
837 4         15 my @streams = @_;
838 4 50       17 scalar @streams == 0
839             and croak '`merge` function takes at least 1 stream';
840 4 50       17 scalar @streams == 1
841             and return shift;
842             my $ixs = Data::Enumerable::Lazy->cycle(0..scalar(@streams) - 1)
843 38     38   226 -> take_while(sub { List::Util::any { $_->has_next() } @streams })
  91         216  
844 4     35   26 -> grep(sub { $streams[ shift ]->has_next() });
  35         89  
845             Data::Enumerable::Lazy->new({
846 24     24   51 on_has_next => sub { $ixs->has_next() },
847             on_next => sub {
848 20     20   47 shift->yield($streams[ $ixs->next() ]->next());
849             },
850 4 100   17   57 is_finite => (List::Util::reduce { $a || $b->is_finite() } 0, @streams),
  17         56  
851             });
852             }
853              
854             =head2 chain($tream1(, $tream2(, $tream3(, ...))))
855              
856             Executes streams sequentually, one after another: the next stream starts once
857             the previous is over.
858              
859             =cut
860              
861             sub chain {
862 3     3 1 7 my $class = shift;
863 3         7 my @streams = @_;
864 3 100       10 scalar(@streams) < 2
865             and return $streams[0];
866             Data::Enumerable::Lazy->from_list(@streams)
867             -> continue({
868 7     7   13 on_next => sub { $_[0]->yield($_[1]) }
869             })
870 29     29   58 -> grep(sub { defined $_[0] })
871 2         6 }
872              
873             =head2 from_text_file($file_handle(, $options))
874              
875             Method takes an open file handle and an optional hash of options and creates a
876             stream of it. The file would be read as a text file, line by line. For
877             additional options see C perl core function reference.
878             Options is a basic hash, supported attributes are:
879             * chomp :: Bool | Whether the lines should be chomped, 0 by default.
880             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
881              
882             =cut
883              
884             sub from_text_file {
885 2     2 1 8774 my ($class, $file_handle, $options) = @_;
886 2   50     13 $options //= +{};
887             my $str = Data::Enumerable::Lazy->new({
888 22     22   142 on_has_next => sub { !eof($file_handle) },
889             on_next => sub {
890 20     20   65 my $line = readline($file_handle);
891 20         61 $_[0]->yield($line);
892             },
893 2   50     35 is_finite => $options->{is_finite} // 0,
894             });
895 2 100       11 if ($options->{chomp}) {
896 1     10   7 $str = $str->map(sub { my $s = $_[0]; chomp $s; $s });
  10         20  
  10         21  
  10         23  
897             }
898 2         9 return $str;
899             }
900              
901             =head2 from_bin_file($file_handle(, $options))
902              
903             Method similar to C but forces binary reading from file.
904             Takes a file handle created by C function and an optional hash of
905             options. Supported attributes are:
906             * block_size :: Integer | The size of read block, 1024 bytes by default.
907             * is_finite :: Bool | Forces the stream to be processed as finite, 0 by default.
908              
909             =cut
910              
911             sub from_bin_file {
912 1     1 1 100 my ($class, $file_handle, $options) = @_;
913 1   50     3 $options //= +{};
914 1   50     6 my $block_size = $options->{block_size} // 1024;
915             Data::Enumerable::Lazy->new({
916 23     23   88 on_has_next => sub { !eof($file_handle) },
917             on_next => sub {
918 22     22   24 my $buf;
919 22         70 read($file_handle, $buf, $block_size);
920 22         40 $_[0]->yield($buf);
921             },
922 1   50     10 is_finite => $options->{is_finite} // 0,
923             })
924             }
925              
926             =head1 AUTHOR
927              
928             Oleg S
929              
930             =cut
931              
932             =head1 SEE ALSO
933              
934             =head2 Lazy evaluation in a nutshell
935              
936             L
937              
938             =head2 Library GitHub page:
939              
940             L
941              
942             =head2 Alternative implementations:
943              
944             L
945             L
946             L
947              
948             =cut
949              
950             =head1 COPYRIGHT AND LICENSE
951              
952             Copyright 2017 Oleg S
953              
954             Copying and distribution of this file, with or without modification, are
955             permitted in any medium without royalty provided the copyright notice and this
956             notice are preserved. This file is offered as-is, without any warranty.
957              
958             =cut
959              
960             1;
961              
962             __END__