File Coverage

blib/lib/Data/Pipeline/Aggregator/Union.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Data::Pipeline::Aggregator::Union;
2              
3 1     1   1967 use Moose;
  0            
  0            
4             extends 'Data::Pipeline::Aggregator';
5              
6             use Data::Pipeline::Types qw( Iterator );
7              
8             has actions => (
9             isa => 'ArrayRef',
10             is => 'rw',
11             predicate => 'has_actions',
12             default => sub { [ ] },
13             lazy => 1
14             );
15              
16             sub transform {
17             my($self, @iterators) = @_;
18              
19             $_ = to_Iterator($_) for @iterators;
20              
21             my $n = scalar(@{$self -> actions});
22              
23             $n = scalar(@iterators) if scalar(@iterators) < $n;
24              
25             while( $n > 0 ) {
26             $n--;
27             $iterators[$n] = $self -> actions -> [$n] -> transform( $iterators[$n] );
28             }
29              
30             # we want to round-robin between them
31             return Data::Pipeline::Iterator -> new(
32             source => Data::Pipeline::Source::Iterator -> new(
33             has_next => sub {
34             @iterators = grep { !$_ -> finished } @iterators;
35             return 1 if @iterators;
36             return 0;
37             },
38             get_next => sub {
39             my $i;
40             $i = shift @iterators while $i && $i -> finished;
41             return unless defined $i;
42             push @iterators, $i;
43             return $i -> next;
44             }
45             )
46             );
47             }
48              
49             1;
50              
51             __END__