File Coverage

blib/lib/Data/Tubes.pm
Criterion Covered Total %
statement 92 109 84.4
branch 39 54 72.2
condition 4 6 66.6
subroutine 15 18 83.3
pod 3 3 100.0
total 153 190 80.5


line stmt bran cond sub pod time code
1             package Data::Tubes;
2              
3             # vim: ts=3 sts=3 sw=3 et ai :
4              
5 35     35   2758394 use strict;
  35         445  
  35         1072  
6 35     35   195 use warnings;
  35         66  
  35         1091  
7 35     35   17698 use English qw< -no_match_vars >;
  35         127193  
  35         221  
8             our $VERSION = '0.740';
9             our $API_VERSION = $VERSION;
10 35     35   12309 use Exporter ();
  35         82  
  35         1229  
11             our @ISA = qw< Exporter >;
12              
13 35     35   19376 use Log::Log4perl::Tiny qw< :easy :dead_if_first LOGLEVEL >;
  35         540070  
  35         188  
14 35         38858 use Data::Tubes::Util qw<
15             args_array_with_options
16             load_sub
17             normalize_args
18             pump
19             resolve_module
20             tube
21 35     35   30432 >;
  35         99  
22              
23             our @EXPORT_OK = (
24             qw<
25             drain
26             pipeline
27             summon
28             tube
29             >
30             );
31             our %EXPORT_TAGS = (all => \@EXPORT_OK,);
32              
33             sub _drain_0_734 {
34 18     18   31 my $tube = shift;
35 18         48 my @outcome = $tube->(@_);
36 18 100       104 return unless scalar @outcome;
37 16 100       41 return $outcome[0] if scalar(@outcome) == 1;
38 12 100       43 return pump($outcome[1]) if $outcome[0] eq 'iterator';
39 6         8 my $wa = wantarray();
40 6 50       13 return if !defined($wa);
41 6 100       17 return $outcome[1] unless $wa;
42 3         4 return @{$outcome[1]};
  3         10  
43             } ## end sub _drain_0_734
44              
45             sub drain {
46 36 100   36 1 24035 goto \&_drain_0_734 if $API_VERSION le '0.734';
47              
48 18         32 my $tube = shift;
49 18         43 my @outcome = $tube->(@_);
50              
51 18         89 my $retval;
52 18 100       57 if (scalar(@outcome) < 2) { # one single record inside
    100          
    50          
53 6         11 $retval = \@outcome;
54             }
55             elsif ($outcome[0] eq 'iterator') {
56 6         19 $retval = [pump($outcome[1])];
57             }
58             elsif ($outcome[0] eq 'records') {
59 6         10 $retval = $outcome[1];
60             }
61             else {
62 0         0 LOGDIE "invalid tube output";
63             }
64              
65 18         31 my $wa = wantarray();
66 18 50       37 return unless defined $wa;
67 18 100       46 return $retval unless $wa;
68 9         32 return @$retval;
69             } ## end sub drain
70              
71             sub import {
72 34     34   378 my $package = shift;
73 34         58 my @filtered;
74 34         153 while (@_) {
75 42         87 my $item = shift;
76 42 50       187 if (lc($item) eq '-api') {
77 0 0       0 LOGDIE "no API version provided for parameter -api"
78             unless @_;
79 0         0 $API_VERSION = shift;
80             }
81             else {
82 42         150 push @filtered, $item;
83             }
84             } ## end while (@_)
85 34         2984565 $package->export_to_level(1, $package, @filtered);
86             } ## end sub import
87              
88             sub pipeline {
89 6     6 1 10026 my ($tubes, $args) = args_array_with_options(@_, {name => 'sequence'});
90              
91 6         20 my $tap = delete $args->{tap};
92 6 100       22 if (defined $tap) {
93             $tap = sub {
94 2     2   4 my $iterator = shift;
95 2         7 while (my @items = $iterator->()) { }
96 2         21 return;
97             }
98 5 100       20 if $tap eq 'sink';
99             $tap = sub {
100 3     3   5 my $iterator = shift;
101 3         4 my @records;
102 3         8 while (my @items = $iterator->()) { push @records, @items; }
  13         92  
103 3 50       8 return unless @records;
104 3 100       15 return $records[0] if @records == 1;
105 2         21 return (records => \@records);
106             }
107 5 100       23 if $tap eq 'bucket';
108             $tap = sub {
109 0     0   0 my ($record) = $_[0]->();
110 0         0 return $record;
111             }
112 5 50       16 if $tap eq 'first';
113             $tap = sub {
114 0     0   0 my $iterator = shift;
115 0         0 my @records;
116 0         0 while (my @items = $iterator->()) { push @records, @items; }
  0         0  
117 0 0       0 return unless @records;
118 0         0 return \@records;
119             }
120 5 50       15 if $tap eq 'array';
121             } ## end if (defined $tap)
122              
123 6 50 66     21 if ((!defined($tap)) && (defined($args->{pump}))) {
124 0         0 my $pump = delete $args->{pump};
125             $tap = sub {
126 0     0   0 my $iterator = shift;
127 0         0 while (my ($record) = $iterator->()) {
128 0         0 $pump->($record);
129             }
130 0         0 return;
131             }
132 0         0 } ## end if ((!defined($tap)) &&...)
133 6 50 66     28 LOGDIE 'invalid tap or pump'
134             if $tap && ref($tap) ne 'CODE';
135              
136 6         24 my $sequence = tube('^Data::Tubes::Plugin::Plumbing::sequence',
137             %$args, tubes => $tubes);
138 6 100       18 return $sequence unless $tap;
139              
140             return sub {
141 5 50   5   1765 my (undef, $iterator) = $sequence->(@_) or return;
142 5         17 return $tap->($iterator);
143 5         29 };
144             } ## end sub pipeline
145              
146             sub summon { # sort-of import
147 41     41 1 8888 my ($imports, $args) = args_array_with_options(
148             @_,
149             {
150             prefix => 'Data::Tubes::Plugin',
151             package => (caller(0))[0],
152             }
153             );
154 41         166 my $prefix = $args->{prefix};
155 41         101 my $cpack = $args->{package};
156              
157 41         116 for my $r (@_) {
158 43         73 my @parts;
159 43 100       177 if (ref($r) eq 'ARRAY') {
160 8         24 @parts = $r;
161             }
162             else {
163 35         288 my ($pack, $name) = $r =~ m{\A(.*)::(\w+)\z}mxs;
164 35         140 @parts = [$pack, $name];
165             }
166 43         116 for my $part (@parts) {
167 43         127 my ($pack, @names) = @$part;
168 43         260 $pack = resolve_module($pack, $prefix);
169 43         276 (my $fpack = "$pack.pm") =~ s{::}{/}gmxs;
170 43         19711 require $fpack;
171 43         204 for my $name (@names) {
172 58 50       763 my $sub = $pack->can($name)
173             or LOGDIE "package '$pack' has no '$name' inside";
174 35     35   305 no strict 'refs';
  35         112  
  35         3754  
175 58         124 *{$cpack . '::' . $name} = $sub;
  58         496  
176             } ## end for my $name (@names)
177             } ## end for my $part (@parts)
178             } ## end for my $r (@_)
179             } ## end sub summon
180              
181             1;
182             __END__