File Coverage

blib/lib/Data/Tubes/Plugin/Plumbing.pm
Criterion Covered Total %
statement 161 180 89.4
branch 75 102 73.5
condition 18 30 60.0
subroutine 22 27 81.4
pod 7 7 100.0
total 283 346 81.7


line stmt bran cond sub pod time code
1             package Data::Tubes::Plugin::Plumbing;
2              
3             # vim: ts=3 sts=3 sw=3 et ai :
4              
5 12     12   90 use strict;
  12         25  
  12         426  
6 12     12   68 use warnings;
  12         24  
  12         388  
7 12     12   93 use English qw< -no_match_vars >;
  12         24  
  12         144  
8 12     12   5131 use Data::Dumper;
  12         27  
  12         764  
9 12     12   133 use Scalar::Util qw< blessed >;
  12         27  
  12         879  
10             our $VERSION = '0.740';
11              
12             use Log::Log4perl::Tiny
13 12     12   88 qw< :easy :dead_if_first get_logger LOGLEVEL LEVELID_FOR >;
  12         34  
  12         118  
14 12         948 use Data::Tubes::Util qw<
15             args_array_with_options
16             load_module
17             load_sub
18             pump
19             normalize_args
20             traverse
21 12     12   5175 >;
  12         27  
22 12     12   5125 use Data::Tubes::Plugin::Util qw< identify log_helper tubify >;
  12         36  
  12         26912  
23              
24             sub alternatives {
25 2     2 1 604 my ($tubes, $args) =
26             args_array_with_options(@_, {name => 'alternatives'});
27 2         10 identify($args);
28 2         7 my $name = $args->{name};
29              
30 2         9 my @tubes = tubify($args, @$tubes);
31              
32             return sub {
33 2     2   6 my $record = shift;
34 2         7 for my $tube (@tubes) {
35 5 100       26 if (my @retval = $tube->($record)) {
36 2         22 return @retval;
37             }
38             }
39 0         0 return;
40 2         30 };
41             } ## end sub alternatives
42              
43             sub _get_selector {
44 14     14   22 my $args = shift;
45 14         25 my $selector = $args->{selector};
46 14 100 66     69 if (!defined($selector) && defined($args->{key})) {
47 12         21 my $key = $args->{key};
48 12         24 my $ref = ref $key;
49             $selector =
50             ($ref eq 'CODE')
51             ? $key
52 12 100   38   55 : sub { return traverse($_[0], $ref ? @$key : $key); };
  38 50       144  
53             } ## end if (!defined($selector...))
54             LOGDIE "$args->{name}: required dispatch key or selector"
55 14 0 33     40 if (! defined $selector) && (! $args->{missing_ok});
56 14         29 return $selector;
57             } ## end sub _get_selector
58              
59             sub cache {
60 9     9 1 9909 my %args = normalize_args(@_, [{name => 'cache'}, 'tube']);
61 9         49 identify(\%args);
62 9         23 my $name = $args{name};
63              
64             # the cached tube
65 9         28 my ($tube) = tubify(\%args, $args{tube});
66 9 100       31 LOGCROAK "$name: no tube to cache" unless defined $tube;
67              
68             # the cache! We will use something compatible with CHI
69 8   50     21 my $cache = $args{cache} // {};
70 8 100       26 $cache = ['^Data::Tubes::Util::Cache', repository => $cache]
71             if ref($cache) eq 'HASH';
72 8 100       37 if (!blessed($cache)) {
73 7 50       24 my ($x, @args) = ref($cache) ? @$cache : $cache;
74 7 50       35 $cache = ref($x) ? $x->(@args) : load_module($x)->new(@args);
75             }
76 8 50       122 my @get_options = $args{get_options} ? @{$args{get_options}} : ();
  0         0  
77 8 50       18 my @set_options = $args{set_options} ? @{$args{set_options}} : ();
  0         0  
78              
79             # what allows me to look in the cache?
80 8         44 my $selector = _get_selector({%args, missing_ok => 1});
81             LOGCROAK "missing key or selector, but output is set"
82 8 50 33     28 if (! defined $selector) && defined($args{output});
83              
84             # cleaning trigger, if any
85 8         17 my $cleaner = $args{cleaner};
86 8 100 66     33 $cleaner = $cache->can($cleaner) if defined($cleaner) && !ref($cleaner);
87              
88             # cloning facility, if needed
89 8         12 my $merger = $args{merger};
90 8 50 66     23 $merger = load_sub($merger) if defined($merger) && !ref($merger);
91              
92 8         13 my $output = $args{output};
93             return sub {
94 18     18   28380 my $record = shift;
95 18 50       62 my $key = $selector ? $selector->($record) : $record;
96 18         67 my $data = $cache->get($key, @get_options);
97 18 100       147 if (!$data) { # MUST be an array reference at this point
98 13         32 my @oc = $tube->($record);
99 13 100       123 if (scalar(@oc) == 2) {
    100          
100 2 100       15 my $rcs = ($oc[0] eq 'records') ? $oc[1] : pump($oc[1]);
101 2 50       10 $rcs = [map { $_->{$output} } @$rcs] if defined($output);
  6         15  
102 2         5 $data = [records => $rcs];
103             }
104             elsif (scalar @oc) {
105 10 50       31 $data = defined($output) ? [$oc[0]{$output}] : \@oc;
106             }
107             else {
108 1         3 $data = \@oc;
109             }
110              
111 13         42 $cache->set($key, $data, @set_options);
112 13 100       41 $cleaner->($cache) if $cleaner;
113             } ## end if (!$data)
114              
115 18 100       43 return unless scalar @$data;
116              
117 17 100       37 if (scalar(@$data) == 1) { # single record
118 15 100       36 return $merger->($record, $output, $data->[0]) if $merger;
119 13 50       26 return $data->[0] unless $output;
120 13         24 $record->{$output} = $data->[0];
121 13         46 return $record;
122             } ## end if (scalar(@$data) == ...)
123              
124             # array of records here
125 2         3 my $aref = $data->[1];
126             my $records =
127             $merger
128 0         0 ? [map { $merger->($record, $output, $_) } @$aref]
129             : $output ? [
130             map {
131 2 50       7 { %$record, $output => $_ }
  6 50       15  
132             } @$aref
133             ]
134             : $aref;
135 2         14 return (records => $records);
136 8         58 };
137             } ## end sub cache
138              
139             sub dispatch {
140 6     6 1 1112 my %args = normalize_args(@_,
141             {default => undef, name => 'dispatch', loglevel => $INFO});
142 6         28 identify(\%args);
143 6         16 my $name = $args{name};
144              
145 6         17 my $selector = _get_selector(\%args);
146              
147 6         10 my $factory = $args{factory};
148 6 50       15 if (!defined($factory)) {
149             $factory = sub {
150 0     0   0 my ($key, $record) = @_;
151 0         0 die {
152             message => "$name: unhandled selection key '$key'",
153             record => $record,
154             };
155 0         0 };
156             } ## end if (!defined($factory))
157              
158 6 100       9 my %predefined_for = %{$args{handlers} || {}};
  6         27  
159 6         13 my %handler_for;
160 6         9 my $default = $args{default};
161             return sub {
162 24     24   214 my $record = shift;
163              
164             # get a key into the cache
165 24   33     40 my $key = $selector->($record) // $default;
166 24 50       45 die {
167             message => "$name: selector key is undefined",
168             record => $record,
169             }
170             unless defined $key;
171              
172             $handler_for{$key} //= exists $predefined_for{$key}
173 24 100 66     99 ? (tubify($predefined_for{$key}))[0]
174             : (tubify(\%args, $factory->($key, $record)))[0];
175              
176 24         57 return $handler_for{$key}->($record);
177 6         32 };
178             } ## end sub dispatch
179              
180             sub fallback {
181              
182             # we lose syntax sugar but allow for Try::Catch to remain optional
183 7 50   7 1 15049 eval { require Try::Catch; }
  7         1198  
184             or LOGCONFESS 'Data::Tubes::Plugin::Plumbing::fallback '
185             . 'needs Try::Catch, please install';
186              
187 7         1752 my ($tubes, $args) = args_array_with_options(@_, {name => 'fallback'});
188 7         35 identify($args);
189 7         20 my $name = $args->{name};
190              
191 7         26 my @tubes = tubify($args, @$tubes);
192 7         15 my $catch = $args->{catch};
193             return sub {
194 9     9   18 my $record = shift;
195 9         21 for my $tube (@tubes) {
196 23         39 my (@retval, $do_fallback);
197             Try::Catch::try(
198             sub {
199 23         478 @retval = $tube->($record);
200             },
201             Try::Catch::catch(
202             sub {
203 16 100       247 $catch->($_, $record) if $catch;
204 15         60 $do_fallback = 1;
205             }
206             )
207 23         111 );
208 22 100       309 return @retval unless $do_fallback;
209             } ## end for my $tube (@tubes)
210 1         4 return;
211 7         53 };
212             } ## end sub fallback
213              
214             sub logger {
215 1     1 1 10 my %args = normalize_args(@_, {name => 'log pipe', loglevel => $INFO});
216 1         8 identify(\%args);
217 1         7 my $loglevel = LEVELID_FOR($args{loglevel});
218 1         10 my $mangler = $args{target};
219 1 50       8 if (!defined $mangler) {
    50          
220 0     0   0 $mangler = sub { return shift; }
221 0         0 }
222             elsif (ref($mangler) ne 'CODE') {
223 0 0       0 my @keys = ref($mangler) ? @$mangler : ($mangler);
224             $mangler = sub {
225 0     0   0 my $record = shift;
226 0         0 return traverse($record, @keys);
227 0         0 };
228             } ## end elsif (ref($mangler) ne 'CODE')
229 1         5 my $logger = get_logger();
230             return sub {
231 3     3   6 my $record = shift;
232 3         12 $logger->log($loglevel, $mangler->($record));
233 3         20 return $record;
234 1         11 };
235             } ## end sub logger
236              
237             sub pipeline {
238 0     0 1 0 my ($tubes, $args) = args_array_with_options(@_, {name => 'pipeline'});
239 0         0 return sequence(%$args, tubes => $tubes);
240             }
241              
242             sub sequence {
243 15     15 1 2237 my %args =
244             normalize_args(@_, [{name => 'sequence', tubes => []}, 'tubes']);
245 15         97 identify(\%args);
246              
247             # cope with an empty list of tubes - equivalent to an "id" function but
248             # always returning an iterator for consistency
249 15   50     77 my $tubes = $args{tubes} || [];
250             return sub {
251 0     0   0 my @record = shift;
252             return (
253             iterator => sub {
254 0 0       0 return unless @record;
255 0         0 return shift @record;
256             }
257 0         0 );
258             }
259 15 50       50 unless @$tubes;
260              
261             # auto-generate tubes if you get definitions
262 15         82 my @tubes = tubify(\%args, @$tubes);
263              
264 15   100     83 my $gate = $args{gate} // undef;
265              
266 15         60 my $logger = log_helper(\%args);
267 15         37 my $name = $args{name};
268             return sub {
269 16     16   936 my $record = shift;
270 16 50       52 $logger->($record, \%args) if $logger;
271              
272 16         61 my @stack = ({record => $record});
273             my $iterator = sub {
274             STEP:
275 60         53657 while (@stack) {
276 184         311 my $pos = $#stack;
277              
278 184         292 my $f = $stack[$pos];
279             my @record =
280             exists($f->{record}) ? delete $f->{record}
281             : exists($f->{iterator}) ? $f->{iterator}->()
282 184 100       546 : @{$f->{records} || []} ? shift @{$f->{records}}
  85 100       321  
  18 100       39  
    100          
283             : ();
284 184 100       499 if (!@record) { # no more at this level...
285 78         124 my $n = @stack;
286 78         330 TRACE "$name: level $n backtracking, no more records";
287 78         980 pop @stack;
288 78         347 next STEP;
289             } ## end if (!@record)
290              
291 106         168 my $record = $record[0];
292 106 100       301 return $record if @stack > @tubes; # output cache
293              
294             # cut the sequence early if the gate function says so
295 69 100 100     164 return $record if $gate && ! $gate->($record);
296              
297             # something must be done...
298 62 50       284 my @outcome = $tubes[$pos]->($record)
299             or next STEP;
300              
301 62 100       365 unshift @outcome, 'record' if @outcome == 1;
302 62         218 push @stack, {@outcome}; # and go to next level
303             } ## end STEP: while (@stack)
304              
305 16         52 return; # end of output, empty list
306 16         81 };
307 16         70 return (iterator => $iterator);
308 15         120 };
309             } ## end sub sequence
310              
311             1;