File Coverage

blib/lib/Data/Tubes.pm
Criterion Covered Total %
statement 92 109 84.4
branch 39 54 72.2
condition 4 6 66.6
subroutine 15 18 83.3
pod 3 3 100.0
total 153 190 80.5


line stmt bran cond sub pod time code
1             package Data::Tubes;
2              
3             # vim: ts=3 sts=3 sw=3 et ai :
4              
5 35     35   2702285 use strict;
  35         445  
  35         1053  
6 35     35   207 use warnings;
  35         70  
  35         1076  
7 35     35   16482 use English qw< -no_match_vars >;
  35         122279  
  35         216  
8             our $VERSION = '0.737';
9             our $API_VERSION = $VERSION;
10 35     35   12239 use Exporter ();
  35         65  
  35         1231  
11             our @ISA = qw< Exporter >;
12              
13 35     35   18699 use Log::Log4perl::Tiny qw< :easy :dead_if_first LOGLEVEL >;
  35         525414  
  35         184  
14 35         38006 use Data::Tubes::Util qw<
15             args_array_with_options
16             load_sub
17             normalize_args
18             pump
19             resolve_module
20             tube
21 35     35   29464 >;
  35         180  
22              
23             our @EXPORT_OK = (
24             qw<
25             drain
26             pipeline
27             summon
28             tube
29             >
30             );
31             our %EXPORT_TAGS = (all => \@EXPORT_OK,);
32              
33             sub _drain_0_734 {
34 18     18   37 my $tube = shift;
35 18         44 my @outcome = $tube->(@_);
36 18 100       103 return unless scalar @outcome;
37 16 100       37 return $outcome[0] if scalar(@outcome) == 1;
38 12 100       39 return pump($outcome[1]) if $outcome[0] eq 'iterator';
39 6         12 my $wa = wantarray();
40 6 50       12 return if !defined($wa);
41 6 100       14 return $outcome[1] unless $wa;
42 3         4 return @{$outcome[1]};
  3         9  
43             } ## end sub _drain_0_734
44              
45             sub drain {
46 36 100   36 1 18792 goto \&_drain_0_734 if $API_VERSION le '0.734';
47              
48 18         32 my $tube = shift;
49 18         45 my @outcome = $tube->(@_);
50              
51 18         83 my $retval;
52 18 100       55 if (scalar(@outcome) < 2) { # one single record inside
    100          
    50          
53 6         9 $retval = \@outcome;
54             }
55             elsif ($outcome[0] eq 'iterator') {
56 6         20 $retval = [pump($outcome[1])];
57             }
58             elsif ($outcome[0] eq 'records') {
59 6         11 $retval = $outcome[1];
60             }
61             else {
62 0         0 LOGDIE "invalid tube output";
63             }
64              
65 18         33 my $wa = wantarray();
66 18 50       35 return unless defined $wa;
67 18 100       44 return $retval unless $wa;
68 9         29 return @$retval;
69             } ## end sub drain
70              
71             sub import {
72 34     34   379 my $package = shift;
73 34         75 my @filtered;
74 34         144 while (@_) {
75 42         80 my $item = shift;
76 42 50       169 if (lc($item) eq '-api') {
77 0 0       0 LOGDIE "no API version provided for parameter -api"
78             unless @_;
79 0         0 $API_VERSION = shift;
80             }
81             else {
82 42         148 push @filtered, $item;
83             }
84             } ## end while (@_)
85 34         2955151 $package->export_to_level(1, $package, @filtered);
86             } ## end sub import
87              
88             sub pipeline {
89 6     6 1 9235 my ($tubes, $args) = args_array_with_options(@_, {name => 'sequence'});
90              
91 6         26 my $tap = delete $args->{tap};
92 6 100       32 if (defined $tap) {
93             $tap = sub {
94 2     2   6 my $iterator = shift;
95 2         9 while (my @items = $iterator->()) { }
96 2         22 return;
97             }
98 5 100       26 if $tap eq 'sink';
99             $tap = sub {
100 3     3   8 my $iterator = shift;
101 3         5 my @records;
102 3         12 while (my @items = $iterator->()) { push @records, @items; }
  13         70  
103 3 50       10 return unless @records;
104 3 100       22 return $records[0] if @records == 1;
105 2         24 return (records => \@records);
106             }
107 5 100       29 if $tap eq 'bucket';
108             $tap = sub {
109 0     0   0 my ($record) = $_[0]->();
110 0         0 return $record;
111             }
112 5 50       18 if $tap eq 'first';
113             $tap = sub {
114 0     0   0 my $iterator = shift;
115 0         0 my @records;
116 0         0 while (my @items = $iterator->()) { push @records, @items; }
  0         0  
117 0 0       0 return unless @records;
118 0         0 return \@records;
119             }
120 5 50       20 if $tap eq 'array';
121             } ## end if (defined $tap)
122              
123 6 50 66     29 if ((!defined($tap)) && (defined($args->{pump}))) {
124 0         0 my $pump = delete $args->{pump};
125             $tap = sub {
126 0     0   0 my $iterator = shift;
127 0         0 while (my ($record) = $iterator->()) {
128 0         0 $pump->($record);
129             }
130 0         0 return;
131             }
132 0         0 } ## end if ((!defined($tap)) &&...)
133 6 50 66     38 LOGDIE 'invalid tap or pump'
134             if $tap && ref($tap) ne 'CODE';
135              
136 6         33 my $sequence = tube('^Data::Tubes::Plugin::Plumbing::sequence',
137             %$args, tubes => $tubes);
138 6 100       27 return $sequence unless $tap;
139              
140             return sub {
141 5 50   5   2373 my (undef, $iterator) = $sequence->(@_) or return;
142 5         21 return $tap->($iterator);
143 5         37 };
144             } ## end sub pipeline
145              
146             sub summon { # sort-of import
147 41     41 1 8507 my ($imports, $args) = args_array_with_options(
148             @_,
149             {
150             prefix => 'Data::Tubes::Plugin',
151             package => (caller(0))[0],
152             }
153             );
154 41         175 my $prefix = $args->{prefix};
155 41         90 my $cpack = $args->{package};
156              
157 41         106 for my $r (@_) {
158 43         125 my @parts;
159 43 100       148 if (ref($r) eq 'ARRAY') {
160 8         22 @parts = $r;
161             }
162             else {
163 35         256 my ($pack, $name) = $r =~ m{\A(.*)::(\w+)\z}mxs;
164 35         139 @parts = [$pack, $name];
165             }
166 43         105 for my $part (@parts) {
167 43         122 my ($pack, @names) = @$part;
168 43         180 $pack = resolve_module($pack, $prefix);
169 43         264 (my $fpack = "$pack.pm") =~ s{::}{/}gmxs;
170 43         18763 require $fpack;
171 43         180 for my $name (@names) {
172 58 50       646 my $sub = $pack->can($name)
173             or LOGDIE "package '$pack' has no '$name' inside";
174 35     35   296 no strict 'refs';
  35         117  
  35         3475  
175 58         127 *{$cpack . '::' . $name} = $sub;
  58         462  
176             } ## end for my $name (@names)
177             } ## end for my $part (@parts)
178             } ## end for my $r (@_)
179             } ## end sub summon
180              
181             1;
182             __END__