File Coverage

blib/lib/Data/Pipeline/Iterator/Source.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Data::Pipeline::Iterator::Source;
2              
3 2     2   25263 use Moose;
  0            
  0            
4             use Carp ();
5              
6             has has_next => (
7             isa => 'CodeRef',
8             is => 'rw',
9             required => 1
10             );
11              
12             has get_next => (
13             isa => 'CodeRef',
14             is => 'rw',
15             required => 1
16             );
17              
18             has _iterators => (
19             isa => 'HashRef',
20             is => 'rw',
21             default => sub{ +{ } },
22             lazy => 1
23             );
24              
25             has _data => (
26             isa => 'ArrayRef',
27             is => 'rw',
28             default => sub{ [ ] },
29             lazy => 1
30             );
31              
32             has _can_register => (
33             isa => 'Bool',
34             is => 'rw',
35             default => 1
36             );
37              
38             sub register {
39             my($self, $observer) = @_;
40              
41             if($self -> _can_register) {
42             $self -> _iterators -> {"".$observer} = undef;
43             }
44             else {
45             Carp::croak "Unable to register an iterator for a data source that has forgotten its beginning";
46             }
47             }
48              
49             sub unregister {
50             my($self, $observer) = @_;
51              
52             delete $self -> _iterators -> {"".$observer};
53             }
54              
55             sub _push_data {
56             my($self) = @_;
57              
58             if($self -> has_next -> ()) {
59             push @{$self -> _data}, $self -> get_next -> ();
60             }
61             }
62              
63             sub _prime { $_[0] -> _push_data; }
64              
65             sub _pop_data {
66             my($self) = @_;
67              
68             my $min = scalar(@{$self -> _data});
69             for my $ob (keys %{$self -> _iterators}) {
70             return unless defined $self -> _iterators -> {$ob};
71             $min = $self -> _iterators -> {$ob} if $min > $self -> _iterators -> {$ob};
72             }
73              
74             return if $min == 0;
75              
76             for my $ob (keys %{$self -> _iterators}) {
77             $self -> _iterators -> {$ob} -= $min;
78             }
79              
80             splice @{$self -> _data}, 0, $min;
81              
82             $self -> _can_register(0);
83             }
84              
85             sub finished {
86             my($self, $ob) = @_;
87              
88             return 1 unless exists $self -> _iterators -> {"".$ob};
89              
90             $self -> _iterators->{"".$ob} ||= 0;
91              
92             # if $ob is at the end of the _data and data is finished
93             if($self -> _iterators->{"".$ob} == scalar(@{$self -> _data})
94             && !$self -> has_next -> ())
95             {
96             delete $self -> _iterators->{"".$ob};
97             return 1;
98             }
99             }
100              
101             sub next {
102             my($self, $ob) = @_;
103              
104             return if $self -> finished($ob);
105              
106             if($self -> _iterators->{"".$ob} == scalar(@{$self -> _data})) {
107             $self -> _push_data;
108             }
109              
110             $self -> _pop_data if rand() > 0.7; # tunable
111              
112             return $self -> _data -> [ $self -> _iterators->{"".$ob} ++ ];
113             }
114              
115             # duplicate when you want to run through an iterator again but don't want
116             # to keep it in memory (e.g., reading from a file again)
117             sub duplicate {
118             my($self) = @_;
119              
120             # we want to build a new iterator source that should start over
121             # we avoid private attributes and {get|has}_next
122             # if those are required, we can't duplicate
123              
124             my $attrs;
125              
126             $attrs = $self -> can_duplicate or return ;
127             # Carp::croak "Unable to duplicate a source iterator (".($self -> meta -> name).")";
128              
129             my %options;
130             $options{$_} = $self -> $_
131             foreach grep { !/^_/ && !/^(get|has)_next$/ } (keys %$attrs);
132              
133             return $self -> new(%options);
134             }
135              
136             sub can_duplicate {
137             my($self) = @_;
138              
139             my $attrs = $self -> meta -> get_attribute_map;
140              
141             # we return $attrs as an optimization
142             return $attrs unless
143             $attrs -> {'get_next'} -> is_required
144             || $attrs -> {'has_next'} -> is_required;
145             return;
146             }
147              
148             1;
149              
150             __END__
151              
152             we need a way to have observers of an iterator -- basically, anyone using the
153             iterator is an observer. This allows multiple pipelines to share a common
154             pipeline (one splits into multiple) while only running the common pipeline once