File Coverage

blib/lib/Data/Tubes.pm
Criterion Covered Total %
statement 92 109 84.4
branch 39 54 72.2
condition 4 6 66.6
subroutine 15 18 83.3
pod 3 3 100.0
total 153 190 80.5


line stmt bran cond sub pod time code
1             package Data::Tubes;
2              
3             # vim: ts=3 sts=3 sw=3 et ai :
4              
5 35     35   2856421 use strict;
  35         464  
  35         1118  
6 35     35   201 use warnings;
  35         69  
  35         1275  
7 35     35   18324 use English qw< -no_match_vars >;
  35         131503  
  35         235  
8             our $VERSION = '0.738';
9             our $API_VERSION = $VERSION;
10 35     35   12963 use Exporter ();
  35         87  
  35         1262  
11             our @ISA = qw< Exporter >;
12              
13 35     35   20038 use Log::Log4perl::Tiny qw< :easy :dead_if_first LOGLEVEL >;
  35         557980  
  35         193  
14 35         40932 use Data::Tubes::Util qw<
15             args_array_with_options
16             load_sub
17             normalize_args
18             pump
19             resolve_module
20             tube
21 35     35   31349 >;
  35         123  
22              
23             our @EXPORT_OK = (
24             qw<
25             drain
26             pipeline
27             summon
28             tube
29             >
30             );
31             our %EXPORT_TAGS = (all => \@EXPORT_OK,);
32              
33             sub _drain_0_734 {
34 18     18   39 my $tube = shift;
35 18         47 my @outcome = $tube->(@_);
36 18 100       108 return unless scalar @outcome;
37 16 100       39 return $outcome[0] if scalar(@outcome) == 1;
38 12 100       46 return pump($outcome[1]) if $outcome[0] eq 'iterator';
39 6         10 my $wa = wantarray();
40 6 50       13 return if !defined($wa);
41 6 100       19 return $outcome[1] unless $wa;
42 3         5 return @{$outcome[1]};
  3         9  
43             } ## end sub _drain_0_734
44              
45             sub drain {
46 36 100   36 1 19469 goto \&_drain_0_734 if $API_VERSION le '0.734';
47              
48 18         34 my $tube = shift;
49 18         50 my @outcome = $tube->(@_);
50              
51 18         87 my $retval;
52 18 100       55 if (scalar(@outcome) < 2) { # one single record inside
    100          
    50          
53 6         14 $retval = \@outcome;
54             }
55             elsif ($outcome[0] eq 'iterator') {
56 6         27 $retval = [pump($outcome[1])];
57             }
58             elsif ($outcome[0] eq 'records') {
59 6         12 $retval = $outcome[1];
60             }
61             else {
62 0         0 LOGDIE "invalid tube output";
63             }
64              
65 18         37 my $wa = wantarray();
66 18 50       37 return unless defined $wa;
67 18 100       46 return $retval unless $wa;
68 9         30 return @$retval;
69             } ## end sub drain
70              
71             sub import {
72 34     34   358 my $package = shift;
73 34         68 my @filtered;
74 34         159 while (@_) {
75 42         115 my $item = shift;
76 42 50       183 if (lc($item) eq '-api') {
77 0 0       0 LOGDIE "no API version provided for parameter -api"
78             unless @_;
79 0         0 $API_VERSION = shift;
80             }
81             else {
82 42         191 push @filtered, $item;
83             }
84             } ## end while (@_)
85 34         3135378 $package->export_to_level(1, $package, @filtered);
86             } ## end sub import
87              
88             sub pipeline {
89 6     6 1 8836 my ($tubes, $args) = args_array_with_options(@_, {name => 'sequence'});
90              
91 6         22 my $tap = delete $args->{tap};
92 6 100       26 if (defined $tap) {
93             $tap = sub {
94 2     2   6 my $iterator = shift;
95 2         8 while (my @items = $iterator->()) { }
96 2         19 return;
97             }
98 5 100       19 if $tap eq 'sink';
99             $tap = sub {
100 3     3   7 my $iterator = shift;
101 3         5 my @records;
102 3         7 while (my @items = $iterator->()) { push @records, @items; }
  13         66  
103 3 50       9 return unless @records;
104 3 100       15 return $records[0] if @records == 1;
105 2         20 return (records => \@records);
106             }
107 5 100       24 if $tap eq 'bucket';
108             $tap = sub {
109 0     0   0 my ($record) = $_[0]->();
110 0         0 return $record;
111             }
112 5 50       15 if $tap eq 'first';
113             $tap = sub {
114 0     0   0 my $iterator = shift;
115 0         0 my @records;
116 0         0 while (my @items = $iterator->()) { push @records, @items; }
  0         0  
117 0 0       0 return unless @records;
118 0         0 return \@records;
119             }
120 5 50       16 if $tap eq 'array';
121             } ## end if (defined $tap)
122              
123 6 50 66     23 if ((!defined($tap)) && (defined($args->{pump}))) {
124 0         0 my $pump = delete $args->{pump};
125             $tap = sub {
126 0     0   0 my $iterator = shift;
127 0         0 while (my ($record) = $iterator->()) {
128 0         0 $pump->($record);
129             }
130 0         0 return;
131             }
132 0         0 } ## end if ((!defined($tap)) &&...)
133 6 50 66     29 LOGDIE 'invalid tap or pump'
134             if $tap && ref($tap) ne 'CODE';
135              
136 6         24 my $sequence = tube('^Data::Tubes::Plugin::Plumbing::sequence',
137             %$args, tubes => $tubes);
138 6 100       21 return $sequence unless $tap;
139              
140             return sub {
141 5 50   5   1611 my (undef, $iterator) = $sequence->(@_) or return;
142 5         15 return $tap->($iterator);
143 5         30 };
144             } ## end sub pipeline
145              
146             sub summon { # sort-of import
147 41     41 1 7996 my ($imports, $args) = args_array_with_options(
148             @_,
149             {
150             prefix => 'Data::Tubes::Plugin',
151             package => (caller(0))[0],
152             }
153             );
154 41         177 my $prefix = $args->{prefix};
155 41         89 my $cpack = $args->{package};
156              
157 41         115 for my $r (@_) {
158 43         78 my @parts;
159 43 100       164 if (ref($r) eq 'ARRAY') {
160 8         21 @parts = $r;
161             }
162             else {
163 35         260 my ($pack, $name) = $r =~ m{\A(.*)::(\w+)\z}mxs;
164 35         142 @parts = [$pack, $name];
165             }
166 43         108 for my $part (@parts) {
167 43         138 my ($pack, @names) = @$part;
168 43         194 $pack = resolve_module($pack, $prefix);
169 43         275 (my $fpack = "$pack.pm") =~ s{::}{/}gmxs;
170 43         20041 require $fpack;
171 43         189 for my $name (@names) {
172 58 50       768 my $sub = $pack->can($name)
173             or LOGDIE "package '$pack' has no '$name' inside";
174 35     35   350 no strict 'refs';
  35         136  
  35         3606  
175 58         134 *{$cpack . '::' . $name} = $sub;
  58         590  
176             } ## end for my $name (@names)
177             } ## end for my $part (@parts)
178             } ## end for my $r (@_)
179             } ## end sub summon
180              
181             1;
182             __END__