File Coverage

blib/lib/Data/Tubes/Plugin/Plumbing.pm
Criterion Covered Total %
statement 161 180 89.4
branch 75 102 73.5
condition 18 30 60.0
subroutine 22 27 81.4
pod 7 7 100.0
total 283 346 81.7


line stmt bran cond sub pod time code
1             package Data::Tubes::Plugin::Plumbing;
2              
3             # vim: ts=3 sts=3 sw=3 et ai :
4              
5 12     12   90 use strict;
  12         72  
  12         429  
6 12     12   103 use warnings;
  12         68  
  12         413  
7 12     12   64 use English qw< -no_match_vars >;
  12         23  
  12         101  
8 12     12   6029 use Data::Dumper;
  12         32  
  12         690  
9 12     12   86 use Scalar::Util qw< blessed >;
  12         22  
  12         869  
10             our $VERSION = '0.737';
11              
12             use Log::Log4perl::Tiny
13 12     12   87 qw< :easy :dead_if_first get_logger LOGLEVEL LEVELID_FOR >;
  12         45  
  12         116  
14 12         938 use Data::Tubes::Util qw<
15             args_array_with_options
16             load_module
17             load_sub
18             pump
19             normalize_args
20             traverse
21 12     12   5059 >;
  12         25  
22 12     12   4988 use Data::Tubes::Plugin::Util qw< identify log_helper tubify >;
  12         32  
  12         25834  
23              
24             sub alternatives {
25 2     2 1 1018 my ($tubes, $args) =
26             args_array_with_options(@_, {name => 'alternatives'});
27 2         9 identify($args);
28 2         6 my $name = $args->{name};
29              
30 2         7 my @tubes = tubify($args, @$tubes);
31              
32             return sub {
33 2     2   5 my $record = shift;
34 2         5 for my $tube (@tubes) {
35 5 100       20 if (my @retval = $tube->($record)) {
36 2         14 return @retval;
37             }
38             }
39 0         0 return;
40 2         14 };
41             } ## end sub alternatives
42              
43             sub _get_selector {
44 14     14   28 my $args = shift;
45 14         56 my $selector = $args->{selector};
46 14 100 66     85 if (!defined($selector) && defined($args->{key})) {
47 12         30 my $key = $args->{key};
48 12         28 my $ref = ref $key;
49             $selector =
50             ($ref eq 'CODE')
51             ? $key
52 12 100   38   67 : sub { return traverse($_[0], $ref ? @$key : $key); };
  38 50       188  
53             } ## end if (!defined($selector...))
54             LOGDIE "$args->{name}: required dispatch key or selector"
55 14 0 33     44 if (! defined $selector) && (! $args->{missing_ok});
56 14         27 return $selector;
57             } ## end sub _get_selector
58              
59             sub cache {
60 9     9 1 8146 my %args = normalize_args(@_, [{name => 'cache'}, 'tube']);
61 9         62 identify(\%args);
62 9         21 my $name = $args{name};
63              
64             # the cached tube
65 9         35 my ($tube) = tubify(\%args, $args{tube});
66 9 100       31 LOGCROAK "$name: no tube to cache" unless defined $tube;
67              
68             # the cache! We will use something compatible with CHI
69 8   50     24 my $cache = $args{cache} // {};
70 8 100       24 $cache = ['^Data::Tubes::Util::Cache', repository => $cache]
71             if ref($cache) eq 'HASH';
72 8 100       35 if (!blessed($cache)) {
73 7 50       27 my ($x, @args) = ref($cache) ? @$cache : $cache;
74 7 50       33 $cache = ref($x) ? $x->(@args) : load_module($x)->new(@args);
75             }
76 8 50       120 my @get_options = $args{get_options} ? @{$args{get_options}} : ();
  0         0  
77 8 50       21 my @set_options = $args{set_options} ? @{$args{set_options}} : ();
  0         0  
78              
79             # what allows me to look in the cache?
80 8         48 my $selector = _get_selector({%args, missing_ok => 1});
81             LOGCROAK "missing key or selector, but output is set"
82 8 50 33     31 if (! defined $selector) && defined($args{output});
83              
84             # cleaning trigger, if any
85 8         16 my $cleaner = $args{cleaner};
86 8 100 66     35 $cleaner = $cache->can($cleaner) if defined($cleaner) && !ref($cleaner);
87              
88             # cloning facility, if needed
89 8         16 my $merger = $args{merger};
90 8 50 66     24 $merger = load_sub($merger) if defined($merger) && !ref($merger);
91              
92 8         15 my $output = $args{output};
93             return sub {
94 18     18   24094 my $record = shift;
95 18 50       67 my $key = $selector ? $selector->($record) : $record;
96 18         73 my $data = $cache->get($key, @get_options);
97 18 100       165 if (!$data) { # MUST be an array reference at this point
98 13         34 my @oc = $tube->($record);
99 13 100       138 if (scalar(@oc) == 2) {
    100          
100 2 100       14 my $rcs = ($oc[0] eq 'records') ? $oc[1] : pump($oc[1]);
101 2 50       7 $rcs = [map { $_->{$output} } @$rcs] if defined($output);
  6         14  
102 2         5 $data = [records => $rcs];
103             }
104             elsif (scalar @oc) {
105 10 50       31 $data = defined($output) ? [$oc[0]{$output}] : \@oc;
106             }
107             else {
108 1         5 $data = \@oc;
109             }
110              
111 13         47 $cache->set($key, $data, @set_options);
112 13 100       38 $cleaner->($cache) if $cleaner;
113             } ## end if (!$data)
114              
115 18 100       50 return unless scalar @$data;
116              
117 17 100       43 if (scalar(@$data) == 1) { # single record
118 15 100       33 return $merger->($record, $output, $data->[0]) if $merger;
119 13 50       30 return $data->[0] unless $output;
120 13         29 $record->{$output} = $data->[0];
121 13         41 return $record;
122             } ## end if (scalar(@$data) == ...)
123              
124             # array of records here
125 2         4 my $aref = $data->[1];
126             my $records =
127             $merger
128 0         0 ? [map { $merger->($record, $output, $_) } @$aref]
129             : $output ? [
130             map {
131 2 50       7 { %$record, $output => $_ }
  6 50       15  
132             } @$aref
133             ]
134             : $aref;
135 2         7 return (records => $records);
136 8         56 };
137             } ## end sub cache
138              
139             sub dispatch {
140 6     6 1 621 my %args = normalize_args(@_,
141             {default => undef, name => 'dispatch', loglevel => $INFO});
142 6         34 identify(\%args);
143 6         14 my $name = $args{name};
144              
145 6         30 my $selector = _get_selector(\%args);
146              
147 6         15 my $factory = $args{factory};
148 6 50       18 if (!defined($factory)) {
149             $factory = sub {
150 0     0   0 my ($key, $record) = @_;
151 0         0 die {
152             message => "$name: unhandled selection key '$key'",
153             record => $record,
154             };
155 0         0 };
156             } ## end if (!defined($factory))
157              
158 6 100       8 my %predefined_for = %{$args{handlers} || {}};
  6         39  
159 6         16 my %handler_for;
160 6         14 my $default = $args{default};
161             return sub {
162 24     24   240 my $record = shift;
163              
164             # get a key into the cache
165 24   33     61 my $key = $selector->($record) // $default;
166 24 50       63 die {
167             message => "$name: selector key is undefined",
168             record => $record,
169             }
170             unless defined $key;
171              
172             $handler_for{$key} //= exists $predefined_for{$key}
173 24 100 66     110 ? (tubify($predefined_for{$key}))[0]
174             : (tubify(\%args, $factory->($key, $record)))[0];
175              
176 24         77 return $handler_for{$key}->($record);
177 6         47 };
178             } ## end sub dispatch
179              
180             sub fallback {
181              
182             # we lose syntax sugar but allow for Try::Tiny to remain optional
183 7 50   7 1 12312 eval { require Try::Tiny; }
  7         1090  
184             or LOGCONFESS 'Data::Tubes::Plugin::Plumbing::fallback '
185             . 'needs Try::Tiny, please install';
186              
187 7         4212 my ($tubes, $args) = args_array_with_options(@_, {name => 'fallback'});
188 7         31 identify($args);
189 7         17 my $name = $args->{name};
190              
191 7         25 my @tubes = tubify($args, @$tubes);
192 7         14 my $catch = $args->{catch};
193             return sub {
194 9     9   17 my $record = shift;
195 9         18 for my $tube (@tubes) {
196 23         36 my (@retval, $do_fallback);
197             Try::Tiny::try(
198             sub {
199 23         2302 @retval = $tube->($record);
200             },
201             Try::Tiny::catch(
202             sub {
203 16 100       285 $catch->($_, $record) if $catch;
204 15         87 $do_fallback = 1;
205             }
206             )
207 23         119 );
208 22 100       227 return @retval unless $do_fallback;
209             } ## end for my $tube (@tubes)
210 1         4 return;
211 7         50 };
212             } ## end sub fallback
213              
214             sub logger {
215 1     1 1 9 my %args = normalize_args(@_, {name => 'log pipe', loglevel => $INFO});
216 1         8 identify(\%args);
217 1         7 my $loglevel = LEVELID_FOR($args{loglevel});
218 1         8 my $mangler = $args{target};
219 1 50       8 if (!defined $mangler) {
    50          
220 0     0   0 $mangler = sub { return shift; }
221 0         0 }
222             elsif (ref($mangler) ne 'CODE') {
223 0 0       0 my @keys = ref($mangler) ? @$mangler : ($mangler);
224             $mangler = sub {
225 0     0   0 my $record = shift;
226 0         0 return traverse($record, @keys);
227 0         0 };
228             } ## end elsif (ref($mangler) ne 'CODE')
229 1         4 my $logger = get_logger();
230             return sub {
231 3     3   5 my $record = shift;
232 3         11 $logger->log($loglevel, $mangler->($record));
233 3         19 return $record;
234 1         11 };
235             } ## end sub logger
236              
237             sub pipeline {
238 0     0 1 0 my ($tubes, $args) = args_array_with_options(@_, {name => 'pipeline'});
239 0         0 return sequence(%$args, tubes => $tubes);
240             }
241              
242             sub sequence {
243 15     15 1 1447 my %args =
244             normalize_args(@_, [{name => 'sequence', tubes => []}, 'tubes']);
245 15         98 identify(\%args);
246              
247             # cope with an empty list of tubes - equivalent to an "id" function but
248             # always returning an iterator for consistency
249 15   50     73 my $tubes = $args{tubes} || [];
250             return sub {
251 0     0   0 my @record = shift;
252             return (
253             iterator => sub {
254 0 0       0 return unless @record;
255 0         0 return shift @record;
256             }
257 0         0 );
258             }
259 15 50       54 unless @$tubes;
260              
261             # auto-generate tubes if you get definitions
262 15         81 my @tubes = tubify(\%args, @$tubes);
263              
264 15   100     82 my $gate = $args{gate} // undef;
265              
266 15         59 my $logger = log_helper(\%args);
267 15         40 my $name = $args{name};
268             return sub {
269 16     16   1037 my $record = shift;
270 16 50       43 $logger->($record, \%args) if $logger;
271              
272 16         55 my @stack = ({record => $record});
273             my $iterator = sub {
274             STEP:
275 60         50215 while (@stack) {
276 184         319 my $pos = $#stack;
277              
278 184         290 my $f = $stack[$pos];
279             my @record =
280             exists($f->{record}) ? delete $f->{record}
281             : exists($f->{iterator}) ? $f->{iterator}->()
282 184 100       522 : @{$f->{records} || []} ? shift @{$f->{records}}
  85 100       349  
  18 100       36  
    100          
283             : ();
284 184 100       420 if (!@record) { # no more at this level...
285 78         118 my $n = @stack;
286 78         323 TRACE "$name: level $n backtracking, no more records";
287 78         941 pop @stack;
288 78         282 next STEP;
289             } ## end if (!@record)
290              
291 106         188 my $record = $record[0];
292 106 100       302 return $record if @stack > @tubes; # output cache
293              
294             # cut the sequence early if the gate function says so
295 69 100 100     183 return $record if $gate && ! $gate->($record);
296              
297             # something must be done...
298 62 50       227 my @outcome = $tubes[$pos]->($record)
299             or next STEP;
300              
301 62 100       378 unshift @outcome, 'record' if @outcome == 1;
302 62         216 push @stack, {@outcome}; # and go to next level
303             } ## end STEP: while (@stack)
304              
305 16         44 return; # end of output, empty list
306 16         84 };
307 16         67 return (iterator => $iterator);
308 15         119 };
309             } ## end sub sequence
310              
311             1;