File Coverage

blib/lib/ETL/Pipeline.pm
Criterion Covered Total %
statement 403 423 95.2
branch 173 226 76.5
condition 14 27 51.8
subroutine 49 49 100.0
pod 21 22 95.4
total 660 747 88.3


line stmt bran cond sub pod time code
1             =pod
2              
3             =head1 NAME
4              
5             ETL::Pipeline - Extract-Transform-Load pattern for data file conversions
6              
7             =head1 SYNOPSIS
8              
9             use ETL::Pipeline;
10              
11             # The object oriented interface...
12             ETL::Pipeline->new( {
13             work_in => {search => 'C:\Data', iname => qr/Ficticious/},
14             input => ['Excel', iname => qr/\.xlsx?$/ ],
15             mapping => {Name => 'A', Address => 'B', ID => 'C' },
16             constants => {Type => 1, Information => 'Demographic' },
17             output => ['Memory', key => 'ID' ],
18             } )->process;
19              
20             # Or using method calls...
21             my $etl = ETL::Pipeline->new;
22             $etl->work_in ( search => 'C:\Data', iname => qr/Ficticious/ );
23             $etl->input ( 'Excel', iname => qr/\.xlsx?$/i );
24             $etl->mapping ( Name => 'A', Address => 'B', ID => 'C' );
25             $etl->constants( Type => 1, Information => 'Demographic' );
26             $etl->output ( 'Memory', key => 'ID' );
27             $etl->process;
28              
29             =cut
30              
31             package ETL::Pipeline;
32              
33 14     14   5829 use 5.021000; # Required for "no warnings 'redundant'".
  14         21402  
34 14     14   184 use warnings;
  14         538  
  14         12525  
35              
36 14     14   159 use Carp;
  14         49  
  14         901  
37 14     14   5037 use Data::DPath qw/dpath/;
  14         1057442  
  14         82  
38 14     14   6702 use Data::Traverse qw/traverse/;
  14         6337  
  14         548  
39 14     14   5565 use List::AllUtils qw/any first/;
  14         154478  
  14         931  
40 14     14   5566 use Moose;
  14         4142069  
  14         91  
41 14     14   131335 use MooseX::Types::Path::Class qw/Dir/;
  14         1337804  
  14         98  
42 14     11   18371 use Path::Class::Rule;
  11         119201  
  11         383  
43 11     11   154 use Scalar::Util qw/blessed/;
  11         23  
  11         494  
44 11     11   5849 use String::Util qw/hascontent trim/;
  11         33485  
  11         32003  
45              
46              
47             our $VERSION = '3.10';
48              
49              
50             =head1 DESCRIPTION
51              
52             B<ETL> stands for I<Extract-Transform-Load>. ETL isn't just for Data
53             Warehousing. ETL works on almost any type of data conversion. You read the
54             source, translate the data for your target, and store the result.
55              
56             By dividing a conversion into 3 steps, we isolate the input from the output...
57              
58             =over
59              
60             =item * Centralizes data formatting and validation.
61              
62             =item * Makes new input formats a breeze.
63              
64             =item * Makes new outputs just as easy.
65              
66             =back
67              
68             B<ETL::Pipeline> takes your data files from extract to load. It reads an input
69             source, translates the data, and writes it to an output destination. For
70             example, this pipeline reads an Excel spread sheet (input) and saves the
71             information in a Perl hash (output).
72              
73             use ETL::Pipeline;
74             ETL::Pipeline->new( {
75             work_in => {search => 'C:\Data', find => qr/Ficticious/},
76             input => ['Excel', find => qr/\.xlsx?$/],
77             mapping => {Name => 'A', Complaint => 'B', ID => 'C'},
78             constants => {Client => 1, Type => 'Complaint'}
79             output => ['Memory', key => 'ID']
80             } )->process;
81              
82             Or like this, calling the methods instead of through the constructor...
83              
84             use ETL::Pipeline;
85             my $etl = ETL::Pipeline->new;
86             $etl->work_in ( search => 'C:\Data', find => qr/Ficticious/ );
87             $etl->input ( 'Excel', find => qr/\.xlsx?$/ );
88             $etl->mapping ( Name => 'A', Complaint => 'B', ID => 'C' );
89             $etl->constants( Client => 1, Type => 'Complaint' );
90             $etl->output ( 'Memory', key => 'ID' );
91             $etl->process;
92              
93             These are equivalent. They do exactly the same thing. You can pick whichever
94             best suits your style.
95              
96             =head2 What is a pipeline?
97              
98             The term I<pipeline> describes a complete ETL process - extract, transform,
99             and load. Or more accurately - input, mapping, output. Raw data enters one end
100             of the pipe (input) and useful information comes out the other (output). An
101             B<ETL::Pipeline> object represents a complete pipeline.
102              
103             =head2 Upgrade Warning
104              
105             B<WARNING:> The API for input sources has changed in version 3.00. Custom input
106             sources written for an earlier version will not work with version 3.00 and
107             later. You will need to re-write your custom input sources.
108              
109             See L<ETL::Pipeline::Input> for more details.
110              
111             =head1 METHODS & ATTRIBUTES
112              
113             =head2 Managing the pipeline
114              
115             =head3 new
116              
117             Create a new ETL pipeline. The constructor accepts a hash reference whose keys
118             are B<ETL::Pipeline> attributes. See the corresponding attribute documentation
119             for details about acceptable values.
120              
121             =over
122              
123             =item aliases
124              
125             =item constants
126              
127             =item data_in
128              
129             =item input
130              
131             =item mapping
132              
133             =item on_record
134              
135             =item output
136              
137             =item session
138              
139             =item work_in
140              
141             =back
142              
143             =cut
144              
145             sub BUILD {
146 73     73 0 192 my $self = shift;
147 73         139 my $arguments = shift;
148              
149             # "_chain" is a special argument to the constructor that implements the
150             # "chain" method. It copies information from an existing object. This allows
151             # pipelines to share settings.
152             #
153             # Always handle "_chain" first. That way "work_in" and "data_in" arguments
154             # can override the defaults.
155 73 100       340 if (defined $arguments->{_chain}) {
156 4         29 my $object = $arguments->{_chain};
157 4 50 33     42 croak '"chain" requires an ETL::Pipeline object' unless
158             defined( blessed( $object ) )
159             && $object->isa( 'ETL::Pipeline' )
160             ;
161 4 100       86 $self->_work_in( $object->_work_in ) if defined $object->_work_in;
162 4 100       96 $self->_data_in( $object->_data_in ) if defined $object->_data_in;
163 4         82 $self->_session( $object->_session );
164             }
165              
166             # The order of these two is important. "work_in" resets "data_in" with a
167             # trigger. "work_in" must be set first so that we don't lose the value
168             # from "data_in".
169 73 100       256 if (defined $arguments->{work_in}) {
170 65         196 my $values = $arguments->{work_in};
171 65 50       390 $self->work_in( ref( $values ) eq '' ? $values : @$values );
172             }
173 73 100       295 if (defined $arguments->{data_in}) {
174 3         47 my $values = $arguments->{data_in};
175 3 50       21 $self->data_in( ref( $values ) eq '' ? $values : @$values );
176             }
177              
178             # The input and output configurations may be single string values or an
179             # array of arguments. It depends on what each source or destination expects.
180 73 100       351 if (defined $arguments->{input}) {
181 65         190 my $values = $arguments->{input};
182 65 100       390 $self->input( ref( $values ) eq '' ? $values : @$values );
183             }
184 73 100       292 if (defined $arguments->{output}) {
185 65         202 my $values = $arguments->{output};
186 65 100       369 $self->output( ref( $values ) eq '' ? $values : @$values );
187             }
188              
189             # Save any alias definition for use in "record".
190 73 50       1726 if (defined $arguments->{aliases}) {
191 1 0       30 if (ref( $arguments->{aliases} ) eq 'ARRAY') {
192 1         9 $self->aliases( @{$arguments->{aliases}} );
  1         2  
193             } else {
194 1         40 $self->aliases( $arguments->{aliases} );
195             }
196             }
197             }
198              
199              
200             =head3 aliases
201              
202             B<aliases> defines alternate names for input fields. This is how column headers
203             work, for example. You can define your own shortcuts using this method or
204             declaring B<aliases> in L</new>. Aliases can make complex field names more
205             readable.
206              
207             B<aliases> accepts a list of hash references. Each hash reference has one or
208             more alias-to-field definitions. The hash key is the alias name. The value is
209             any field name recognized by L</get>.
210              
211             Aliases are resolved in the order they are added. That way, your pipelines know
212             where each value came from, if that's important. Aliases set by the input source
213             always sort before aliases set by the script. Within a hash, all definitions are
214             considered equal and may sort in any order.
215              
216             # Array definitions to force sorting.
217             my $etl = ETL::Pipeline->new( {aliases => [{A => '0'}, {B => '1'}], ...} );
218             $etl->aliases( {C => '2'}, {D => '3'} );
219              
220             # Hash where it can sort either way.
221             my $etl = ETL::Pipeline->new( {aliases => {A => '0', B => '1'}, ...} );
222             $etl->aliases( {C => '2', D => '3'} );
223              
224             B<aliases> returns a sorted list of all aliases for fields in this input source.
225              
226             I recommend using the hash, unless order matters. In that case, use the array
227             form instead.
228              
229             B<Special Note:> Custom input sources call B<aliases> to add their own
230             shortcuts, such as column headers. These aliases are always evaluated I<before>
231             those set by L</new> or calls to this method by the script.
232              
233             =cut
234              
235             sub aliases {
236 85     85 1 167 my $self = shift;
237              
238             # Add any new aliases first.
239 85         1888 my $list = $self->_alias->{$self->_alias_type};
240 85         356 push( @$list, $_ ) foreach (@_);
241              
242             # Update the cache, if it already exists. This should be VERY, VERY rare.
243             # But I wanted to plan for it so that things behave as expected.
244 85 100       2430 if ($self->_alias_cache_built) {
245 2         23 my $cache = $self->_alias_cache;
246 2         72 foreach my $item (@_) {
247 1         7 while (my ($alias, $location) = each %$item) {
248 1 0       138 $cache->{$alias} = [] unless exists $cache->{alias};
249 1         28 push @{$cache->{alias}}, $self->_as_dpath( $location );
  1         9  
250             }
251             }
252             }
253              
254             # Return a flattened list of aliases. Input source defined aliases first.
255             # Then user defined aliases.
256 85         142 my @all;
257 85         289 push( @all, @{$self->_alias->{$_}} ) foreach (qw/input pipeline/);
  169         3143  
258 85         234 return @all;
259             }
260              
261              
262             =head3 chain
263              
264             This method creates a new pipeline using the same L</work_in>, L</data_in>, and
265             L</session> as the current pipeline. It returns a new instance of
266             B<ETL::Pipeline>.
267              
268             B<chain> takes the same arguments as L</new>. It passes those arguments through
269             to the constructor of the new object.
270              
271             See the section on L</Multiple input sources> for examples of chaining.
272              
273             =cut
274              
275             sub chain {
276 4     4 1 58 my ($self, $arguments) = @_;
277              
278             # Create the new object. Use the internal "_chain" argument to do the
279             # actual work of chaining.
280 4 50       21 if (defined $arguments) { $arguments->{_chain} = $self ; }
  1         3  
281 4         64 else { $arguments = {_chain => $self}; }
282              
283 4         94 return ETL::Pipeline->new( $arguments );
284             }
285              
286              
287             =head3 constants
288              
289             B<constants> sets output fields to literal values. L</mapping> accepts input
290             field names as strings. Instead of obtuse Perl tricks for marking literals,
291             B<constants> explicitly handles them.
292              
293             Hash keys are output field names. The L</output> class defines acceptable
294             field names. The hash values are literals.
295              
296             # Set the output field "Name" to the string "John Doe"...
297             $etl->constants( Name => 'John Doe' );
298              
299             # Get the current list of constants...
300             my $transformation = $etl->constants;
301              
302             B<Note:> B<constants> does not accept code references, array references, or hash
303             references. It only works with literal values. Use L</mapping> instead for
304             calculated items.
305              
306             With no parameters, B<constants> returns the current hash reference. If you pass
307             in a hash reference, B<constants> replaces the current hash with this new one.
308             If you pass in a list of key value pairs, B<constants> adds them to the current
309             hash.
310              
311             =cut
312              
313             has '_constants' => (
314             handles => {_add_constants => 'set', _has_constants => 'count'},
315             init_arg => 'constants',
316             is => 'rw',
317             isa => 'HashRef[Maybe[Str]]',
318             traits => [qw/Hash/],
319             );
320              
321              
322             sub constants {
323 1     1 1 2 my $self = shift;
324 1         23 my @pairs = @_;
325              
326 1 0 0     9 if (scalar( @pairs ) == 1 && ref( $pairs[0] ) eq 'HASH') {
    0          
327 1         2 return $self->_constants( $pairs[0] );
328             } elsif (scalar @pairs) {
329 1         62 return $self->_add_constants( @pairs );
330             } else {
331 1         7 return $self->_constants;
332             }
333             }
334              
335              
336             =head3 data_in
337              
338             The working directory (L</work_in>) usually contains the raw data files. In
339             some cases, though, the actual data sits in a subdirectory underneath
340             L</work_in>. B<data_in> tells the pipeline where to find the input file.
341              
342             B<data_in> accepts a search pattern - name, glob, or regular expression. It
343             searches L</work_in> for the first matching directory. The search is case
344             insensitive.
345              
346             If you pass an empty string to B<data_in>, the pipeline resets B<data_in> to
347             the L</work_in> directory. This is useful when chaining pipelines. If one
348             changes the data directory, the next in line can change back.
349              
350             =cut
351              
352             has '_data_in' => (
353             coerce => 1,
354             init_arg => undef,
355             is => 'rw',
356             isa => Dir,
357             );
358              
359              
360             sub data_in {
361 88     88 1 3941 my $self = shift;
362              
363 88 100       314 if (scalar @_) {
364 5 50       106 croak 'The working folder was not set' unless defined $self->_work_in;
365              
366 5         16 my $name = shift;
367 5 50       72 if (hascontent( $name )) {
368 5         117 my $next = Path::Class::Rule
369             ->new
370             ->min_depth( 1 )
371             ->iname( $name )
372             ->directory
373             ->iter( $self->_work_in )
374             ;
375 5         1140 my $match = $next->();
376 5 50       11096 croak 'No matching directories' unless defined $match;
377 5         223 return $self->_data_in( $match );
378 1         3 } else { return $self->_data_in( $self->_work_in ); }
379 84         2269 } else { return $self->_data_in; }
380             }
381              
382              
383             =head3 input
384              
385             B<input> sets and returns the L<ETL::Pipeline::Input> object. This object reads
386             the data. With no parameters, B<input> returns the current
387             L<ETL::Pipeline::Input> object.
388              
389             my $source = $etl->input();
390              
391             Set the input source by calling B<input> with parameters...
392              
393             $etl->input( 'Excel', find => qr/\.xlsx/i );
394              
395             The first parameter is a class name. B<input> looks for a Perl module matching
396             this name in the C<ETL::Pipeline::Input> namespace. In this example, the actual
397             class name becomes C<ETL::Pipeline::Input::Excel>.
398              
399             The rest of the parameters are passed directly to the C<new> method of that
400             class.
401              
402             B<Technical Note:> Want to use a custom class from B<Local> instead of
403             B<ETL::Pipeline::Input>? Put a B<+> (plus sign) in front of the class name.
404             For example, this command uses the input class B<Local::CustomExtract>.
405              
406             $etl->input( '+Local::CustomExtract' );
407              
408             =cut
409              
410             has '_input' => (
411             does => 'ETL::Pipeline::Input',
412             init_arg => undef,
413             is => 'rw',
414             );
415              
416              
417             sub input {
418 208     208 1 436 my $self = shift;
419              
420 208 100       794 return $self->_input( $self->_object_of_class( 'Input', @_ ) ) if scalar @_;
421 143         3456 return $self->_input;
422             }
423              
424              
425             =head3 mapping
426              
427             B<mapping> ties the input fields with the output fields. Hash keys are output
428             field names. The L</output> class defines acceptable field names. The hash
429             values can be anything accepted by the L</get> method. See L</get> for more
430             information.
431              
432             # Add the output field "Name" with data from input column "A"...
433             $etl->mapping( Name => 'A' );
434              
435             # Change "Name" to get data from "Full Name" or "FullName"...
436             $etl->mapping( Name => qr/Full\s*Name/i );
437              
438             # "Name" gets the lower case of input column "A"...
439             $etl->mapping( Name => sub {
440             my ($etl, $record) = @_;
441             return lc $record{A};
442             } );
443              
444             If L</get> returns an ARRAY reference (aka multiple values), they will be
445             concatenated in the output with a semi-colon between values - B<; >. You can
446             override the seperator by setting the value to an ARRAY reference. The first
447             element is a regular field name for L</get>. The second element is a new
448             seperator string.
449              
450             # Slashes between multiple names.
451             $etl->mapping( Name => [qr/Name/i, ' / '] );
452            
453             # These will do the same thing - semi-colon between multiple names.
454             $etl->mapping( Name => [qr/Name/i, '; '] );
455             $etl->mapping( Name => qr/Name/i );
456              
457             With no parameters, B<mapping> returns the current hash reference. If you pass
458             in a hash reference, B<mapping> replaces the current hash with this new one. If
459             you pass in a list of key value pairs, B<mapping> adds them to the current hash.
460              
461             # Get the current mapping...
462             my $transformation = $etl->mapping;
463              
464             # Add the output field "Name" with data from input column "A"...
465             $etl->mapping( Name => 'A' );
466              
467             # Replace the entire mapping so only "Name" is output...
468             $etl->mapping( {Name => 'C'} );
469              
470             Want to save a literal value? Use L</constants> instead.
471              
472             =head4 Complex data structures
473              
474             B<mapping> only sets scalar values. If the matching fields contain sub-records,
475             L</record> throws an error message and sets the output field to C<undef>.
476              
477             =head4 Fully customized mapping
478              
479             B<mapping> accepts a CODE reference in place of the hash. In this case,
480             L</record> executes the code and uses the return value as the record to send
481             L</output>. The CODE should return a hash reference for success or C<undef> if
482             there is an error.
483              
484             # Execute code instead of defining the output fields.
485             $etl->mapping( sub { ... } );
486              
487             # These are the same.
488             $etl->mapping( {Name => 'A'} );
489             $etl->mapping( sub {
490             my $etl = shift;
491             return {Name => $etl->get( 'A' )};
492             } );
493              
494             C<undef> saves an empty record. To print an error message, have your code call
495             L</status> with a type of B<ERROR>.
496              
497             # Return an enpty record.
498             $etl->mapping( sub { undef; } );
499            
500             # Print an error message.
501             $etl->mapping( sub {
502             ...
503             $etl->status( 'ERROR', 'There is no data!' );
504             return undef;
505             });
506              
507             The results of L</constants> are folded into the resulting hash reference.
508             Fields set by B<mapping> override constants.
509              
510             # Output record has two fields - "Extra" and "Name".
511             $etl->constants( Extra => 'ABC' );
512             $etl->mapping( sub { {Name => shift->get( 'A' )} } );
513            
514             # Output record has only one field, with the value from the input record.
515             $etl->constants( Name => 'ABC' );
516             $etl->mapping( sub { {Name => shift->get( 'A' )} } );
517              
518             L</record> passes two parameters into the CODE reference - the B<ETL::Pipeline>
519             object and L<the raw data record|/this>.
520              
521             $etl->mapping( sub {
522             my ($etl, $record) = @_;
523             ...
524             } );
525              
526             B<WARNING:> This is considered an I<advanced> feature and should be used
527             sparingly. You will find the I<< name => field >> format easier to maintain.
528              
529             =cut
530              
531             has '_mapping' => (
532             init_arg => 'mapping',
533             is => 'rw',
534             isa => 'HashRef|CodeRef',
535             );
536              
537              
538             sub mapping {
539 127     127 1 221 my $self = shift;
540 127         259 my @pairs = @_;
541              
542 127 50       427 if (scalar( @pairs) <= 0) {
    0          
543 127         2770 return $self->_mapping;
544             } elsif (scalar( @pairs ) == 1) {
545 1         3 return $self->_mapping( $pairs[0] );
546             } else {
547 1 0       22 $self->_mapping( {} ) if ref( $self->_mapping) ne 'HASH';
548 1         10 my $reference = $self->_mapping;
549 1         2 $reference = {%$reference, @pairs};
550             }
551             }
552              
553              
554             =head3 on_record
555              
556             Executes a customized subroutine on every record before any mapping. The code
557             can modify the record and your changes will feed into the mapping. You can use
558             B<on_record> for filtering, debugging, or just about anything.
559              
560             B<on_record> accepts a code reference. L</record> executes this code for every
561             input record.
562              
563             The code reference receives two parameters - the C<ETL::Pipeline> object and the
564             input record. The record is passed as a hash reference. If B<on_record> returns
565             a false value, L</record> will never send this record to the output destination.
566             It's as if this record never existed.
567              
568             ETL::Pipeline->new( {
569             ...
570             on_record => sub {
571             my ($etl, $record) = @_;
572             foreach my $field (keys %$record) {
573             my $value = $record->{$field};
574             $record->{$field} = ($value eq 'NA' ? '' : $value);
575             }
576             },
577             ...
578             } )->process;
579              
580             # -- OR --
581             $etl->on_record( sub {
582             my ($etl, $record) = @_;
583             foreach my $field (keys %$record) {
584             my $value = $record->{$field};
585             $record->{$field} = ($value eq 'NA' ? '' : $value);
586             }
587             } );
588              
589             B<Note:> L</record> automatically removes leading and trailing whitespace. You
590             do not need B<on_record> for that.
591              
592             =cut
593              
594             has 'on_record' => (
595             is => 'rw',
596             isa => 'Maybe[CodeRef]',
597             );
598              
599              
600             =head3 output
601              
602             B<output> sets and returns the L<ETL::Pipeline::Output> object. This object
603             writes records to their final destination. With no parameters, B<output> returns
604             the current L<ETL::Pipeline::Output> object.
605              
606             Set the output destination by calling B<output> with parameters...
607              
608             $etl->output( 'SQL', table => 'NewData' );
609              
610             The first parameter is a class name. B<output> looks for a Perl module
611             matching this name in the C<ETL::Pipeline::Output> namespace. In this example,
612             the actual class name becomes C<ETL::Pipeline::Output::SQL>.
613              
614             The rest of the parameters are passed directly to the C<new> method of that
615             class.
616              
617             B<Technical Note:> Want to use a custom class from B<Local> instead of
618             B<ETL::Pipeline::Output>? Put a B<+> (plus sign) in front of the class name.
619             For example, this command uses the input class B<Local::CustomLoad>.
620              
621             $etl->output( '+Local::CustomLoad' );
622              
623             =cut
624              
625             has '_output' => (
626             does => 'ETL::Pipeline::Output',
627             init_arg => undef,
628             is => 'rw',
629             );
630              
631              
632             sub output {
633 144     144 1 11079 my $self = shift;
634              
635 144 100       497 return $self->_output( $self->_object_of_class( 'Output', @_ ) ) if scalar @_;
636 79         1914 return $self->_output;
637             }
638              
639              
640             =head3 process
641              
642             B<process> kicks off the entire data conversion process. It takes no
643             parameters. All of the setup is done by the other methods.
644              
645             B<process> returns the B<ETL::Pipeline> object so you can do things like
646             this...
647              
648             ETL::Pipeline->new( {...} )->process->chain( ... )->process;
649              
650             =cut
651              
652             sub process {
653 57     57 1 218 my $self = shift;
654              
655             # Make sure we have all the required information.
656 56         271 my ($success, $error) = $self->is_valid;
657 56 50       150 croak $error unless $success;
658              
659             # Sort aliases from the input source before any that were set in the object.
660 56         1412 $self->_alias_type( 'input' );
661              
662             # Kick off the process. The input source loops over the records. It calls
663             # the "record" method, described below.
664 56         1181 $self->_output->open( $self );
665 56         254 $self->status( 'START' );
666 56         329 $self->input->run( $self );
667 55         2093 $self->_decrement_count; # "record" adds 1 at the end, so this goes one past the last record.
668 55         167 $self->status( 'END' );
669 55         1773 $self->_output->close( $self );
670              
671             # Return the pipeline object so that we can chain calls. Useful shorthand
672             # when running multiple pipelines.
673 55         201 return $self;
674             }
675              
676              
677             =head3 session
678              
679             B<ETL::Pipeline> supports sessions. A session allows input and output objects
680             to share information along a chain. For example, imagine 3 Excel files being
681             loaded into an Access database. All 3 files go into the same Access database.
682             The first pipeline creates the database and saves its path in the session. That
683             pipeline chains with a second pipeline. The second pipeline retrieves the
684             Access filename from the session.
685              
686             The B<session> method provides access to session level variables. As you write
687             your own L<ETL::Pipeline::Output> classes, they can use session variables for
688             sharing information.
689              
690             The first parameter is the variable name. If you pass only the variable name,
691             B<session> returns the value.
692              
693             my $database = $etl->session( 'access_file' );
694             my $identifier = $etl->session( 'session_identifier' );
695              
696             A second parameter is the value.
697              
698             $etl->session( access_file => 'C:\ExcelData.accdb' );
699              
700             You can set multiple variables in one call.
701              
702             $etl->session( access_file => 'C:\ExcelData.accdb', name => 'Abe' );
703              
704             If you pass in a hash referece, it completely replaces the current session with
705             the new values.
706              
707             When retrieving an array or hash reference, B<session> automatically
708             derefernces it if called in a list context. In a scalar context, B<session>
709             returns the reference.
710              
711             # Returns the list of names as a list.
712             foreach my $name ($etl->session( 'name_list' )) { ... }
713              
714             # Returns a list reference instead of a list.
715             my $reference = $etl->session( 'name_list' );
716              
717             =head3 session_has
718              
719             B<session_has> checks for a specific session variable. It returns I<true> if
720             the variable exists and I<false> if it doesn't.
721              
722             B<session_has> only checks existence. It does not tell you if the value is
723             defined.
724              
725             if ($etl->session_has( 'access_file' )) { ... }
726              
727             =cut
728              
729             # Alternate design: Use attributes for session level information.
730             # Result: Discarded
731             #
732             # Instead of keeping session variables in a hash, the class would have an
733             # attribute corresponding to the session data it can keep. Since
734             # ETL::Pipeline::Input and ETL::Pipeline::Output objects have access to the
735             # the pipeline, they can share data through the attributes.
736             #
737             # For any session information, the developer must subclass ETL::Pipeline. The
738             # ETL::Pipeline::Input or ETL::Pipeline::Output classes would be tied to that
739             # specific subclass. And if you needed to combine two sets of session
740             # variables, well that just means another class type. That's very confusing.
741             #
742             # Attributes make development of new input and output classes very difficult.
743             # The hash is simple. It decouples the input/output classes from pipeline.
744             # That keeps customization simpler.
745              
746              
747             has '_session' => (
748             default => sub { {} },
749             handles => {
750             _add_session => 'set',
751             _get_session => 'get',
752             session_has => 'exists',
753             },
754             init_arg => undef,
755             is => 'rw',
756             isa => 'HashRef[Any]',
757             traits => [qw/Hash/],
758             );
759              
760              
761             sub session {
762 15     16 1 16699 my $self = shift;
763              
764 15 100       85 if (scalar( @_ ) > 1) {
    50          
765 6         24 my %parameters = @_;
766 6         40 while (my ($key, $value) = each %parameters) {
767 7         334 $self->_add_session( $key, $value );
768             }
769 6         31 return $_[1];
770             } elsif (scalar( @_ ) == 1) {
771 9         21 my $key = shift;
772 9 50       48 if (ref( $key ) eq 'HASH') {
    100          
773 0         0 return $self->_session( $key );
774             } elsif (wantarray) {
775 1         61 my $result = $self->_get_session( $key );
776 1 50       6 if (ref( $result ) eq 'ARRAY') { return @$result; }
  1 0       6  
777 0         0 elsif (ref( $result ) eq 'HASH' ) { return %$result; }
778 0         0 else { return $result; }
779 8         358 } else { return $self->_get_session( $key ); }
780 0         0 } else { return undef; }
781             }
782              
783              
784             =head3 work_in
785              
786             The working directory sets the default place for finding files. All searches
787             start here and only descend subdirectories. Temporary or output files go into
788             this directory as well.
789              
790             B<work_in> has two forms: C<work_in( 'C:\Data' );> or
791             C<< work_in( root => 'C:\Data', iname => 'Ficticious' ); >>.
792              
793             The first form specifies the exact directory path. In our example, the working
794             directory is F<C:\Data>.
795              
796             The second form searches the file system for a matching directory. Take this
797             example...
798              
799             $etl->work_in( root => 'C:\Data', iname => 'Ficticious' );
800              
801             It scans the F<C:\Data> directory for a subdirectory named F<Fictious>, like
802             this: F<C:\Data\Ficticious>. The search is B<not> recursive. It locates files
803             in the B<root> folder.
804              
805             B<work_in> accepts any of the tests provided by L<Path::Iterator::Rule>. The
806             values of these arguments are passed directly into the test. For boolean tests
807             (e.g. readable, exists, etc.), pass an C<undef> value.
808              
809             B<work_in> automatically applies the C<directory> filter. Do not set it
810             yourself.
811              
812             C<iname> is the most common one that I use. It matches the file name, supports
813             wildcards and regular expressions, and is case insensitive.
814              
815             # Search using a regular expression...
816             $etl->work_in( iname => qr/\.xlsx$/, root => 'C:\Data' );
817              
818             # Search using a file glob...
819             $etl->work_in( iname => '*.xlsx', root => 'C:\Data' );
820              
821             The code throws an error if no directory matches the criteria. Only the first
822             match is used.
823              
824             B<work_in> automatically resets L</data_in>.
825              
826             =cut
827              
828             has '_work_in' => (
829             coerce => 1,
830             init_arg => undef,
831             is => 'rw',
832             isa => Dir,
833             trigger => \&_trigger_work_in,
834             );
835              
836              
837             sub work_in {
838 143     144 1 5122 my $self = shift;
839              
840 143 100       580 if (scalar( @_ ) == 1) {
    100          
841 65         1775 return $self->_work_in( shift );
842             } elsif(scalar( @_ ) > 1) {
843 5         35 my %options = @_;
844              
845 5   100     28 my $root = $options{root} // '.';
846 5         15 delete $options{root};
847              
848 5 50       17 if (scalar %options) {
849 5         86 my $rule = Path::Class::Rule->new->directory;
850 5         293 while (my ($name, $value) = each %options) {
851 5         374 eval "\$rule = \$rule->$name( \$value )";
852 5 50       1458 confess $@ unless $@ eq '';
853             }
854 5         31 my $next = $rule->iter( $root );
855 5         1458 my $match = $next->();
856 5 50       18032 croak 'No matching directories' unless defined $match;
857 5         364 return $self->_work_in( $match );
858 0         0 } else { return $self->_work_in( $root ); }
859 73         1736 } else { return $self->_work_in; }
860             }
861              
862              
863             sub _trigger_work_in {
864 142     143   266 my $self = shift;
865 142         197 my $new = shift;
866              
867             # Force absolute paths. Changing the value will fire this trigger again.
868             # I only want to change "_data_in" once.
869 142 100 66     649 if (defined( $new ) && $new->is_relative) {
870 70         3091 $self->_work_in( $new->cleanup->absolute );
871             } else {
872 72         3714 $self->_data_in( $new );
873             }
874             }
875              
876              
877             =head2 Used in mapping
878              
879             These methods may be used by code references in the L</mapping> attribute. They
880             will return information about/from the current record.
881              
882             =head3 count
883              
884             This attribute tells you how many records have been read from the input source.
885             This value is incremented before any filtering. So it even counts records that
886             are bypassed by L</on_record>.
887              
888             The first record is always number B<1>.
889              
890             =cut
891              
892             has 'count' => (
893             default => '1',
894             handles => {_decrement_count => 'dec', _increment_count => 'inc'},
895             is => 'ro',
896             isa => 'Int',
897             traits => [qw/Counter/],
898             );
899              
900              
901             =head3 get
902              
903             Retrieve the value from the record for the given field name. This method accepts
904             two parameters - a field name and the current record. It returns the exact value
905             found at the matching node.
906              
907             =head4 Field names
908              
909             The field name can be...
910              
911             =over
912              
913             =item A string containing a hash key
914              
915             =item An array index (all digits)
916              
917             =item A string containing a L<Data::DPath> path (starts with B</>)
918              
919             =item A regular expression reference
920              
921             =item A code reference
922              
923             =back
924              
925             Hash keys and regular expressions match both field names and aliases. These are
926             the only types that match aliases. Hash keys cannot be all digits and cannot
927             begin with the B</> character. Otherwise B<get> mis-identifies them.
928              
929             B<get> interprets strings of all digits as array indexes (numbers). Excel files,
930             for example, return an array instead of a hash. And this is an easy way to
931             reference columns in order.
932              
933             B<get> treats a string beginning with a B</> (slash) as a L<Data::DPath> path.
934             This lets you very specifically traverse a complex data sturcture, such as those
935             from XML or JSON.
936              
937             For a regular expression, B<get> matches hash keys at the top level of the data
938             structure plus aliases.
939              
940             And with a a code reference, B<get> executes the subroutine. The return value
941             becomes the field value. The code reference is called in a scalar context. If
942             you need to return multiple values, then return an ARRAY or HASH reference.
943              
944             A code reference receives two parameters - the B<ETL::Pipeline> object and the
945             current record.
946              
947             =head4 Current record
948              
949             The current record is optional. B<get> will use L</this> if you do not pass in a
950             record. By accepting a record, you can use B<get> on sub-records. So by default,
951             B<get> returns a value from the top record. Use the second parameter to retrieve
952             values from a sub-record.
953              
954             B<get> only applies aliases when using L</this>. Aliases do not apply to
955             sub-records.
956              
957             =head4 Return value
958              
959             B<get> always returns a scalar value, but not always a string. The return value
960             might be a string, ARRAY reference, or HASH reference.
961              
962             B<get> does not flatten out the nodes that it finds. It merely returns a
963             reference to whatever is in the data structure at the named point. The calling
964             code must account for the possibility of finding an array or hash or string.
965              
966             =cut
967              
968             sub get {
969 460     461 1 903 my ($self, $field, $record) = @_;
970              
971             # Because the reference may be stored, I want to force a new copy every
972             # time. Otherwise scripts may get invalid values from previous records.
973 460         648 my $found = [];
974              
975             # Use the current record from the attribute unless the programmer explicilty
976             # sent in a record. By sending in a record, "get" works on sub-records. But
977             # the default behaviour is what you would expect.
978 460         566 my $full = 0;
979 460 50       811 unless (defined $record) {
980 460         10125 $record = $self->this;
981 460         612 $full = 1;
982             }
983              
984             # Execute code reference. This is all there is to do. We send back whatever
985             # the code returns.
986 460 100       864 if (ref( $field ) eq 'CODE') {
987 100         370 @$found = $field->( $self, $record );
988             }
989              
990             # Anything else we match - either a field name or an alias. The sequence is
991             # the same for both.
992             else {
993             # Match field names first.
994 360         461 my $check_alias = 0;
995 360         796 $field = $self->_as_dpath( $field, \$check_alias );
996 360         1178 @$found = dpath( $field )->match( $record );
997              
998 360 100 66     106930 if ($check_alias && $full) {
999             # Build the cache first time through. Re-use it later to save time.
1000 89 100       2945 unless ($self->_alias_cache_built) {
1001 41         867 my $cache = $self->_alias_cache;
1002 41         125 foreach my $item ($self->aliases) {
1003 46         141 while (my ($alias, $location) = each %$item) {
1004 70 100       191 $cache->{$alias} = [] unless exists $cache->{$alias};
1005 70         84 push @{$cache->{$alias}}, $self->_as_dpath( $location );
  70         119  
1006             }
1007             }
1008             }
1009              
1010             # Search the actual data in all of the fields from matching aliases.
1011 89         258 my @search = dpath( $field )->match( $self->_alias_cache );
1012 89         18475 foreach my $list (@search) {
1013 51         112 foreach my $location (@$list) {
1014 55         144 my @values = dpath( $location )->match( $record );
1015 55         11590 push @$found, @values;
1016             }
1017             }
1018             }
1019             }
1020              
1021             # Send back the final value.
1022 460 100       2078 return (scalar( @$found ) <= 1 ? $found->[0] : $found);
1023             }
1024              
1025              
1026             # Format the match string for Data::DPath. I allow scripts to use shortcut
1027             # formatting so they are easier to read. This method translates those into a
1028             # correct Data::DPath path.
1029             sub _as_dpath {
1030 430     431   721 my ($self, $field, $alias) = @_;
1031              
1032 430 100       1675 if (ref( $field ) eq 'Regexp') {
    100          
    100          
1033 4 50       19 $$alias = 1 if ref( $alias ) eq 'SCALAR';
1034 4         18 return "/*[key =~ /$field/]";
1035             } elsif ($field =~ m/^\d+$/) {
1036 239 100       502 $$alias = 0 if ref( $alias ) eq 'SCALAR';
1037 239         652 return "/*[$field]";
1038             } elsif ($field !~ m|^/|) {
1039 85 50       232 $$alias = 1 if ref( $alias ) eq 'SCALAR';
1040 85         215 return "/$field";
1041             } else {
1042 102 50       331 $$alias = 0 if ref( $alias ) eq 'SCALAR';
1043 102         223 return $field;
1044             }
1045             }
1046              
1047              
1048             # Alternate designs...
1049             #
1050             # I considered building a temporary hash keyed by the alias names. Then I could
1051             # apply a full Data::DPath to retrieve aliased fields. But a path like "/*[0]"
1052             # would always match both the main record and the aliases. I would always be
1053             # returning multiple values when the user clearly expected one. It makes aliases
1054             # pretty much useless.
1055              
1056              
1057             =head3 this
1058              
1059             The current record. The L</record> method sets B<this> before it does anything
1060             else. L</get> will use B<this> if you don't pass in a record. It makes a
1061             convenient shortcut so you don't have to pass the record into every call.
1062              
1063             B<this> can be any valid Perl data. Usually a hash reference or array reference.
1064             The input source controls the type.
1065              
1066             =cut
1067              
1068             has 'this' => (
1069             is => 'ro',
1070             isa => 'Maybe[Any]',
1071             writer => '_set_this',
1072             );
1073              
1074              
1075             =head2 Used by input sources
1076              
1077             =head3 aliases (see above)
1078              
1079             Your input source can use the L</aliases> method documented above to set
1080             column headers as field names. Excel files, for example, would call L</aliases>
1081             to assign letters to column numbers, like a real spreadsheet.
1082              
1083             =head3 record
1084              
1085             The input source calls this method for each data record. This is where
1086             L<ETL::Pipeline> applies the mapping, constants, and sends the results on to the
1087             L<ETL::Pipeline> applies the mapping, constants, and sends the results on to the
1088             output destination.
1089              
1090             B<record> takes one parameter - he current record. The record can be any Perl
1091             data structure - hash, array, or scalar. B<record> uses L<Data::DPath> to
1092             traverse the structure.
1093              
1094             B<record> calls L</get> on each field in L</mapping>. B</get> traverses the
1095             data structure retrieving the correct values. B<record> concatenates multiple
1096             matches into a single, scalar value for the output.
1097              
1098             =cut
1099              
1100             sub record {
1101 127     128 1 6753 my ($self, $record) = @_;
1102              
1103             # Save the current record so that other methods and helper functions can
1104             # access it without the programmer passing it around.
1105 127         3377 $self->_set_this( $record );
1106              
1107             # Remove leading and trailing whitespace from all fields. We always want to
1108             # do this. Otherwise we end up with weird looking text. I do this first so
1109             # that all the customized code sees is the filtered data.
1110 127 100   10783   1237 traverse { trim( m/HASH/ ? $b : $a ) } $record;
  10782         231338  
1111              
1112             # Run the custom record filter, if there is one. If the filter returns
1113             # "false", then we bypass this entire record.
1114 127         4927 my $code = $self->on_record;
1115 127         256 my $continue = 1;
1116              
1117 127 100       354 $continue = $code->( $self, $record ) if defined $code;
1118 127 100       345 unless ($continue) {
1119 1         28 $self->_increment_count; # Record processed.
1120 1         4 return;
1121             }
1122              
1123             # Insert constants into the output. Do this before the mapping. The mapping
1124             # will always override constants. I want data from the input.
1125             #
1126             # I had used a regular hash. Perl kept re-using the same memory location.
1127             # The records were overwriting each other. Switched to a hash reference so I
1128             # can force Perl to allocate new memory for every record.
1129 126         249 my $save = {};
1130 126 100       3621 if ($self->_has_constants) {
1131 15         364 my $constants = $self->_constants;
1132 15         77 %$save = %$constants;
1133             }
1134              
1135             # This is the transform step. It converts the input record into an output
1136             # record. The mapping can be either a hash reference of transformations or
1137             # a code reference that does all of the transformations.
1138 126         513 my $mapping = $self->mapping;
1139 126 100       455 if (ref( $mapping ) eq 'CODE') {
1140 8         32 my $return = $mapping->( $self, $record );
1141            
1142             # Merge with constants that might have been set above.
1143 8 100       54 $save = {%$save, %$return} if defined $return;
1144             } else {
1145 118         527 while (my ($to, $from) = each %$mapping) {
1146 278         411 my $seperator = '; ';
1147 278 50       587 if (ref( $from ) eq 'ARRAY') {
1148 0         0 $seperator = $from->[1];
1149 0         0 $from = $from->[0]; # Do this LAST!
1150             }
1151            
1152 278         651 my $values = $self->get( $from );
1153 278 100       648 if (ref( $values ) eq '' ) { $save->{$to} = $values; }
  261 50       1280  
1154             elsif (ref( $values ) eq 'ARRAY') {
1155 17 50   22   169 my $invalid = first { defined( $_ ) && ref( $_ ) ne '' } @$values;
  21         121  
1156 17 100       89 if (defined $invalid) {
1157 13         26 my $type = ref( $invalid );
1158 13         81 $self->status( 'ERROR', "Data structure of type $type found by mapping '$from' to '$to'" );
1159 13         115 $save->{$to} = undef;
1160             } else {
1161 4         13 my @usable = grep { hascontent( $_ ) } @$values;
  8         80  
1162 4 50       45 if(scalar @usable) { $save->{$to} = join( $seperator, @usable ); }
  4         52  
1163 0         0 else { $save->{$to} = undef; }
1164             }
1165 0         0 } else { $save->{$to} = undef; }
1166             }
1167             }
1168              
1169             # We're done with this record. Finish up.
1170 126         3305 $self->_output->write( $self, $save );
1171 126         487 $self->status( 'STATUS' );
1172              
1173             # Increase the record count. Do this last so that any status messages from
1174             # the input source reflect the correct record number.
1175 126         3502 $self->_increment_count;
1176             }
1177              
1178              
1179             =head3 status
1180              
1181             This method displays a status message. B<ETL::Pipeline> calls this method to
1182             report on the progress of pipeline. It takes one or two parameters - the message
1183             type (required) and the message itself (optional).
1184              
1185             The type can be anything. These are the ones that B<ETL::Pipeline> uses...
1186              
1187             =over
1188              
1189             =item DEBUG
1190              
1191             Messages used for debugging problems. You should only use these temporarily to
1192             look for specific issues. Otherwise they clog up the display for the end user.
1193              
1194             =item END
1195              
1196             The pipeline has finished. The input source is closed. The output destination
1197             is still open. It will be closed immediately after. There is no message text.
1198              
1199             =item ERROR
1200              
1201             Report an error message to the user. These are not necessarily fatal errors.
1202              
1203             =item INFO
1204              
1205             An informational message to the user.
1206              
1207             =item START
1208              
1209             The pipeline is just starting. The output destination is open. But the input
1210             source is not. There is no message text.
1211              
1212             =item STATUS
1213              
1214             Progress update. This is sent every after every input record.
1215              
1216             =back
1217              
1218             See L</Custom logging> for information about adding your own log method.
1219              
1220             =cut
1221              
1222             sub status {
1223 314     315 1 755 my ($self, $type, $message) = @_;
1224 314         727 $type = uc( $type );
1225              
1226 314 100       1189 if ($type eq 'START') {
    100          
    100          
1227 56         82 my $name;
1228 56         4963 say 'Processing...';
1229             } elsif ($type eq 'END') {
1230 55         97 my $name;
1231 55         2569 say 'Finished!';
1232             } elsif ($type eq 'STATUS') {
1233 126         3005 my $count = $self->count;
1234 126 50       488 say "Processed record #$count..." unless $count % 50;
1235             } else {
1236 77         1779 my $count = $self->count;
1237 77         273 my $source = $self->input->source;
1238              
1239 77 50       311 if (hascontent( $source )) {
1240 77         4381 say "$type [record #$count at $source] $message";
1241             } else {
1242 0         0 say "$type [record #$count] $message";
1243             }
1244             }
1245             }
1246              
1247              
1248             =head2 Utility Functions
1249              
1250             These methods can be used inside L</mapping> code references. Unless otherwise
1251             noted, these all work on L<the current record|/this>.
1252              
1253             my $etl = ETL::Pipeline->new( {
1254             ...
1255             mapping => {A => sub { shift->function( ... ) }},
1256             ...
1257             } );
1258              
1259             =head3 coalesce
1260              
1261             Emulates the SQL Server C<COALESCE> command. It takes a list of field names for
1262             L</get> and returns the value of the first non-blank field.
1263              
1264             # First non-blank field
1265             $etl->coalesce( 'Patient', 'Complainant', 'From' );
1266              
1267             # Actual value if no non-blank fields
1268             $etl->coalesce( 'Date', \$today );
1269              
1270             In the first example, B<coalesce> looks at the B<Patient> field first. If it's
1271             blank, then B<coalesce> looks at the B<Complainant> field. Same thing - if it's
1272             blank, B<coalesce> returns the B<From> field.
1273              
1274             I<Blank> means C<undef>, empty string, or all whitespace. This is different
1275             than the SQL version.
1276              
1277             The second examples shows an actual value passed as a scalar reference. Because
1278             it's a reference, B<coalesce> recognizes that it is not a field name for
1279             L</get>. B<coalesce> uses the value in C<$today> if the B<Date> field is blank.
1280              
1281             B<coalesce> returns an empty string if all of the fields are blank.
1282              
1283             =cut
1284              
1285             sub coalesce {
1286 8     9 1 27 my $self = shift;
1287              
1288 8         13 my $result = '';
1289 8         21 foreach my $field (@_) {
1290 14 100       76 my $value = (ref( $field ) eq 'SCALAR') ? $$field : $self->get( $field );
1291 14 100       44 if (hascontent( $value )) {
1292 8         62 $result = $value;
1293 8         13 last;
1294             }
1295             }
1296 8         18 return $result;
1297             }
1298              
1299              
1300             =head3 foreach
1301              
1302             Executes a CODE reference against repeating sub-records. XML files, for example,
1303             have repeating nodes. B<foreach> allows you to format multiple fields from the
1304             same record. It looks like this...
1305              
1306             # Capture the resulting strings.
1307             my @results = $etl->foreach( sub { ... }, '/File/People' );
1308              
1309             # Combine the resulting strings.
1310             join( '; ', $etl->foreach( sub { ... }, '/File/People' ) );
1311              
1312             B<foreach> calls L</get> to retrieve a list of sub-records. It replaces L</this>
1313             with each sub-record in turn and executes the code reference. You can use any of
1314             the standard unitlity functions inside the code reference. They will operate
1315             only on the current sub-record.
1316              
1317             B<foreach> returns a single string per sub-record. Blank strings are discarded.
1318             I<Blank> means C<undef>, empty strings, or all whitespace. You can filter
1319             sub-records by returning C<undef> from the code reference.
1320              
1321             For example, you might do something like this to format names from XML...
1322              
1323             # Format names "last, first" and put a semi-colon between multiple names.
1324             $etl->format( '; ', $etl->foreach(
1325             sub { $etl->format( ', ', '/Last', '/First' ) },
1326             '/File/People'
1327             ) );
1328              
1329             # Same thing, but using parameters.
1330             $etl->format( '; ', $etl->foreach(
1331             sub {
1332             my ($object, $record) = @_;
1333             $object->format( ', ', '/Last', '/First' )
1334             },
1335             '/File/People'
1336             ) );
1337              
1338             B<foreach> passed two parameters to the code reference...
1339              
1340             =over
1341              
1342             =item The current B<ETL::Pipeline> object.
1343              
1344             =item The current sub-record. This will be the same value as L</this>.
1345              
1346             =back
1347              
1348             The code reference should return a string. If it returns an ARRAY reference,
1349             B<foreach> flattens it, discarding any blank elements. So if you have to return
1350             multiple values, B<foreach> tries to do something intelligent.
1351              
1352             B<foreach> sets L</this> before executing the CODE reference. The code can call
1353             any of the other utility functions with field names relative to the sub-record.
1354             I<Please note, the code cannot access fields outside of the sub-record>.
1355             Instead, cache these in a local variable before called B<foreach>.
1356              
1357             my $x = $etl->get( '/File/MainPerson' );
1358             join( '; ', $etl->foreach( sub {
1359             my $y = $etl->format( ', ', '/Last', '/First' );
1360             "$y is with $x";
1361             }, '/File/People' );
1362              
1363             =head4 Calling foreach
1364              
1365             B<foreach> accepts the code reference as the first parameter. All remaining
1366             parameters are field names. B<foreach> passes them through L</get> one at a
1367             time. Each field should resolve to a repeating node.
1368              
1369             B<foreach> returns a list. The list may be empty or have one element. But it is
1370             always a list. You can use Perl functions such as C<join> to convert the list
1371             into a single value.
1372              
1373             =cut
1374              
1375             sub foreach {
1376 29     30 1 230 my $self = shift;
1377 29         38 my $code = shift;
1378              
1379             # Cache the current record. I need to restore this later so other function
1380             # calls work normally.
1381 29         577 my $this = $self->this;
1382              
1383             # Retrieve the repeating sub-records.
1384 29         62 my $all = [];
1385 29         75 foreach my $item (@_) {
1386 29         86 my $current = $self->get( $item );
1387 29 50       169 if (ref( $current ) eq 'ARRAY') { push @$all, @$current; }
  0         0  
1388 29         75 else { push @$all, $current; }
1389             }
1390              
1391             # Execute the code reference against each sub-record.
1392 29         37 my @results;
1393 29         64 foreach my $record (@$all) {
1394 29         876 $self->_set_this( $record );
1395 29         62 local $_ = $record;
1396 29         83 my @values = $code->( $self, $_ );
1397              
1398 29 50 33     160 if (scalar( @values ) == 1 && ref( $values[0] ) eq 'ARRAY') {
1399 0         0 push @results, @{$values[0]};
  0         0  
1400 29         90 } else { push @results, @values; }
1401             }
1402              
1403             # Restore the current record and return all of the results.
1404 29         816 $self->_set_this( $this );
1405 29 50       60 return grep { ref( $_ ) eq '' && hascontent( $_ ) } @results;
  29         137  
1406             }
1407              
1408              
1409             =head3 format
1410              
1411             Builds a string from a list of fields, discarding blank fields. That's the main
1412             purpose of the function - don't use entirely blank strings. This prevents things
1413             like orphanded commas from showing up in your data.
1414              
1415             B<format> can both concateneate (C<join>) fields or format them (C<sprintf>).
1416             A SCALAR reference signifies a format. A regular string indicates concatenation.
1417              
1418             # Concatenate fields (aka join)
1419             $etl->format( "\n\n", 'D', 'E', 'F' );
1420              
1421             # Format fields (aka sprintf)
1422             $etl->format( \'%s, %s (%s)', 'D', 'E', 'F' );
1423              
1424             You can nest constructs with an ARRAY reference. The seperator or format string
1425             is the first element. The remaining elements are more fields (or other nested
1426             ARRAY references). Basically, B<format> recursively calls itself passing the
1427             array as parameters.
1428              
1429             # Blank lines between. Third line is two fields seperated by a space.
1430             $etl->format( "\n\n", 'D', 'E', [' ', 'F', 'G'] );
1431              
1432             # Blank lines between. Third line is formatted.
1433             $etl->format( "\n\n", 'D', 'E', [\'-- from %s %s', 'F', 'G'] );
1434              
1435             I<Blank> means C<undef>, empty string, or all whitespace. B<format> returns an
1436             empty string if all of fields are blank.
1437              
1438             =head4 Format until
1439              
1440             B<format> optionally accepts a CODE reference to stop processing early.
1441             B<format> passes each value into the code reference. If the code returns
1442             B<true>, then B<format> stops processing fields and returns. The code reference
1443             comes before the seperator/format.
1444              
1445             # Concantenate fields until one of them is the word "END".
1446             $etl->format( sub { $_ eq 'END' }, "\n\n", '/*[idx > 8]' );
1447              
1448             B<format> sets C<$_> to the field value. It also passes the value as the first
1449             and only parameter. Your code can use either C<$_> or C<shift> to access the
1450             value.
1451              
1452             You can include code references inside an ARRAY reference too. The code only
1453             stops processing inside that substring. It continues processing the outer set of
1454             fields after the ARRAY.
1455              
1456             # The last line concatenates fields until one of them is the word "END".
1457             $etl->format( "\n\n", 'A', 'B', [sub { $_ eq 'END' }, ' ', '/*[idx > 8]'] );
1458              
1459             # Do the conditional concatenate in the middle. Results in 3 lines.
1460             $etl->format( "\n\n", 'A', [sub { $_ eq 'END' }, ' ', '/*[idx > 8]'], 'B' );
1461              
1462             What happens if you have a CODE reference and an ARRAY reference, like this?
1463              
1464             $etl->format( sub { $_ eq 'END' }, "\n\n", 'A', [' ', 'B', 'C'], 'D' );
1465              
1466             B<format> retrieves the ARRAY reference as a single string. It then sends that
1467             entire string through the CODE reference. If the code returns B<true>,
1468             processing stops. In other words, B<format> treats the results of an ARRAY
1469             reference just like any other field.
1470              
1471             =cut
1472              
1473             sub format {
1474 86     87 1 180 my $self = shift;
1475 86         98 my $conditional = shift;
1476 86         93 my $seperator;
1477              
1478             # Process the fixed parameters.
1479 86 100       119 if (ref( $conditional ) eq 'CODE') {
1480 4         6 $seperator = shift;
1481             } else {
1482 82         89 $seperator = $conditional;
1483 82         101 $conditional = undef ;
1484             }
1485              
1486             # Retrieve the fields.
1487 86         92 my @results;
1488 86         90 my $stop = 0;
1489              
1490 86         115 foreach my $name (@_) {
1491             # Retrieve the value for this field.
1492 146         172 my $values;
1493 146 100       227 if (ref( $name ) eq 'ARRAY') {
1494 56         139 $values = $self->format( @$name );
1495             } else {
1496 90         185 $values = $self->get( $name );
1497             }
1498              
1499             # Check the results.
1500 146 50       626 $values = [$values] unless ref( $values ) eq 'ARRAY';
1501 146 100       204 if (defined $conditional) {
1502 12         18 foreach my $item (@$values) {
1503 12         17 local $_ = $item;
1504 12 100       27 if ($conditional->( $_ )) {
1505 2         9 $stop = 1;
1506 2         7 last;
1507 10         53 } else { push @results, $item; }
1508             }
1509 134         193 } else { push @results, @$values; }
1510              
1511             # Terminate the loop early.
1512 146 100       438 last if $stop;
1513             }
1514              
1515             # Return the formatted results.
1516 86 100       133 if (ref( $seperator ) eq 'SCALAR') {
1517 22 100   15   140 if (any { hascontent( $_ ) } @results) {
  14         41  
1518 11     11   257 no warnings 'redundant';
  11         39  
  11         11487  
1519 14         214 return sprintf( $$seperator, @results );
1520 8         18 } else { return ''; }
1521 64         93 } else { return join( $seperator, grep { hascontent( $_ ) } @results ); }
  120         611  
1522             }
1523              
1524              
1525             =head3 from
1526              
1527             Return data from a hash, like the one from L<ETL::Pipeline::Output::Memory>. The
1528             first parameter is the hash reference. The remaining parameters are field names
1529             whose values become the hash keys. It's a convenient shorthand for accessing
1530             a hash, with all of the error checking built in.
1531              
1532             $etl->from( $etl->output->hash, qr/myID/i, qr/Site/i );
1533              
1534             To pass a string literal, use a scalar reference.
1535              
1536             $etl->from( \%hash, qr/myID/i, \'Date' );
1537              
1538             This is equivalent to...
1539              
1540             $hash{$etl->get( qr/myID/i )}->{'Date'}
1541              
1542             B<from> returns C<undef> is any one key does not exist.
1543              
1544             B<from> automatically dereferences arrays. So if you store multiple values, the
1545             function returns them as a list instead of the list reference. Scalar values and
1546             hash references are returned as-is.
1547              
1548             =cut
1549              
1550             sub from {
1551 6     7 1 25 my $self = shift;
1552 6         9 my $value = shift;
1553              
1554 6         21 foreach my $field (@_) {
1555 8 100       37 if (ref( $value ) ne 'HASH' ) { return undef ; }
  1 50       4  
    100          
1556 0         0 elsif (!defined( $field ) ) { return undef ; }
1557 1         5 elsif (ref( $field ) eq 'SCALAR' ) { $value = $value->{$$field}; }
1558             else {
1559 6         19 my $key = $self->get( $field );
1560 6 50       21 if (hascontent( $key )) { $value = $value->{$key}; }
  6         59  
1561 0         0 else { return undef ; }
1562             }
1563             }
1564 5 100       21 return (ref( $value ) eq 'ARRAY' ? @$value : $value);
1565             }
1566              
1567              
1568             =head3 name
1569              
1570             Format fields as a person's name. Names are common data elements. This function
1571             provides a common format. Yet is flexible enough to handle customization.
1572              
1573             # Simple name formatted as "last, first".
1574             $etl->name( 'Last', 'First' );
1575              
1576             # Simple name formatted "first last". The format is the first element.
1577             $etl->name( \'%s %s', 'First', 'Last' );
1578              
1579             # Add a role or description in parenthesis, if it's there.
1580             $etl->name( 'Last', 'First', ['Role'] );
1581              
1582             # Add two fields with a custom format if at least one exists.
1583             $etl->name( 'Last', 'First', [\'(%s; %s)', 'Role', 'Type'] );
1584              
1585             # Same thing, but only adds the semi-colon if both values are there.
1586             $etl->name( 'Last', 'First', [['; ', 'Role', 'Type']] );
1587              
1588             # Long hand way of writing the above.
1589             $etl->name( 'Last', 'First', [\'(%s)', ['; ', 'Role', 'Type']] );
1590              
1591             If B<name> doesn't do what you want, try L</build>. L</build> is more flexible.
1592             As a matter of fact, B<name> calls L</build> internally.
1593              
1594             =cut
1595              
1596             sub name {
1597 16     17 1 73 my $self = shift;
1598             # Initialize name format.
1599 16 100       32 my $name_format = ref( $_[0] ) eq 'SCALAR' ? shift : ', ';
1600 16         19 my @name_fields;
1601              
1602 16         21 my $role_format = \'(%s)';
1603 16         20 my @role_fields;
1604              
1605             # Process name and role fields. Anything after that is just extra text
1606             # appended to the result.
1607 16         46 for (my $item = shift; defined $item; $item = shift) {
1608 38 100       65 if (ref( $item ) eq 'ARRAY') {
1609 8 100       46 $role_format = shift( @$item ) if ref( $item->[0] ) eq 'SCALAR';
1610 8         21 @role_fields = @$item;
1611 8         17 last;
1612 30         53 } else { push @name_fields, $item; }
1613             }
1614 16         25 my $last_name = shift @name_fields;
1615              
1616             # Build the string using the "build" method. Elements are concatenated with
1617             # a single space between them. This properly leaves out any blank elements.
1618 16         55 return $self->format( ' ',
1619             [$name_format, $last_name, [' ', @name_fields]],
1620             [$role_format, @role_fields],
1621             @_
1622             );
1623             }
1624              
1625              
1626             =head3 piece
1627              
1628             Split a string and extract one or more of the individual pieces. This can come
1629             in handy with file names, for example. A file split on the period has two pieces
1630             - the name and the extension, piece 1 and piece 2 respectively. Here are some
1631             examples...
1632              
1633             # File name: Example.JPG
1634             # Returns: Example
1635             $etl->piece( 'Filename', qr|\.|, 1 );
1636              
1637             # Returns: JPG
1638             $etl->piece( 'Filename', qr|\.|, 2 );
1639              
1640             B<piece> takes a minimum of 3 parameters...
1641              
1642             =over
1643              
1644             =item 1. Any field name valid for L</get>
1645              
1646             =item 2. Regular expression for splitting the field
1647              
1648             =item 3. Piece number to extract (the first piece is B<1>, not B<0>)
1649              
1650             =back
1651              
1652             B<piece> accepts any field name valid with L</get>. Multiple values are
1653             concatenated with a single space. You can specify a different seperator using
1654             the same syntax as L</mapping> - an array reference. In that array, the first
1655             element is the field name and the second is the seperator string.
1656              
1657             The second parameter for B<piece> is a regular expression. B<piece> passes this
1658             to C<split> and breaks apart the field value.
1659              
1660             The third parameter returns one or more pieces from the split string. In the
1661             simplest form, this is a single number. And B<piece> returns that piece from the
1662             split string. Note that pieces start at number 1, not 0 like array indexes.
1663              
1664             A negative piece number starts from the end of the string. For example, B<-2>
1665             returns the second to last piece. You can also include a length - number of
1666             pieces to return starting at the given position. The default length is B<1>.
1667              
1668             # Filename: abc_def_ghi_jkl_mno_pqr
1669             # Returns: abc def
1670             $etl->piece( 'Filename', qr/_/, '1,2' );
1671              
1672             # Returns: ghi jkl mno
1673             $etl->piece( 'Filename', qr/_/, '3,3' );
1674              
1675             # Returns: mno pqr
1676             $etl->piece( 'Filename', qr/_/, '-2,2' );
1677              
1678             Notice that the multiple pieces are re-joined using a space. You can specify the
1679             seperator string after the length. Do not put spaces after the commas. B<piece>
1680             will mistakenly use it as part of the seperator.
1681              
1682             # Filename: abc_def_ghi_jkl_mno_pqr
1683             # Returns: abc+def
1684             $etl->piece( 'Filename', qr/_/, '1,2,+' );
1685              
1686             # Returns: ghi,jkl,mno
1687             $etl->piece( 'Filename', qr/_/, '3,3,,' );
1688              
1689             # Returns: ghi -jkl -mno
1690             $etl->piece( 'Filename', qr/_/, '3,3, -' );
1691              
1692             A blank length returns all pieces from the start position to the end, just like
1693             the Perl C<splice> function.
1694              
1695             # Filename: abc_def_ghi_jkl_mno_pqr
1696             # Returns: ghi jkl mno pqr
1697             $etl->piece( 'Filename', qr/_/, '3,' );
1698              
1699             # Returns: ghi+jkl+mno+pqr
1700             $etl->piece( 'Filename', qr/_/, '3,,+' );
1701              
1702             =head4 Recursive pieces
1703              
1704             Imagine a name like I<Public, John Q., MD>. How would you parse out the middle
1705             initial by hand? First, you piece the string by comma. Next you split the
1706             second piece of that by a space. B<piece> lets you do the same thing.
1707              
1708             # Name: Public, John Q., MD
1709             # Returns: Q.
1710             $etl->piece( 'Name', qr/,/, 2, qr/ /, 2 );
1711              
1712             # Returns: John
1713             $etl->piece( 'Name', qr/,/, 2, qr/ /, 1 );
1714              
1715             B<piece> will take the results from the first split and use it as the input to
1716             the second split. It will continue to do this for as many pairs of expressions
1717             and piece numbers as you send.
1718              
1719             =cut
1720              
1721             sub piece {
1722 16     17 1 94 my $self = shift;
1723 16         21 my $field = shift;
1724              
1725             # Retrieve the initial value from the field.
1726 16         20 my $seperator = ' ';
1727 16 50       50 if (ref( $field ) eq 'ARRAY') {
1728 0   0     0 $seperator = $field->[1] // ' ';
1729 0         0 $field = $field->[0];
1730             }
1731 16         31 my $value = $self->get( $field );
1732 16 50       35 $value = trim( join( $seperator, @$value ) ) if ref( $value ) eq 'ARRAY';
1733              
1734             # Recursively split the string.
1735 16         33 while (scalar @_) {
1736 18         41 my $split = shift;
1737 18         48 my @location = split /,/, shift, 3;
1738              
1739 18         60 my @pieces = split( $split, $value );
1740 18 50       50 if (scalar( @location ) == 0) {
    100          
    100          
1741 0         0 $value = $pieces[0];
1742             } elsif (scalar( @location ) == 1) {
1743 12 100       36 my $index = $location[0] > 0 ? $location[0] - 1 : $location[0];
1744 12         20 $value = $pieces[$index];
1745             } elsif (scalar( @location ) == 2) {
1746 2         5 my @parts;
1747 2 50       8 if (hascontent( $location[1] )) {
1748 2         23 @parts = splice @pieces, $location[0] - 1, $location[1];
1749             } else {
1750 0         0 @parts = splice @pieces, $location[0] - 1;
1751             }
1752 2         7 $value = join( ' ', @parts );
1753             } else {
1754 4         8 my @parts;
1755 4 100       16 if (hascontent( $location[1] )) {
1756 2         32 @parts = splice @pieces, $location[0] - 1, $location[1];
1757             } else {
1758 2         19 @parts = splice @pieces, $location[0] - 1;
1759             }
1760 4         11 $value = join( $location[2], @parts );
1761             }
1762 18         47 $value = trim( $value );
1763             }
1764              
1765             # Return the value extracted from the last split.
1766 16   100     190 return $value // '';
1767             }
1768              
1769              
1770             =head3 replace
1771              
1772             Substitute one string for another. This function uses the C<s///> operator and
1773             returns the modified string. B<replace> accepts a field name for L</get>. A
1774             little more convenient that calling L</get> and applying C<s///> yourself.
1775              
1776             B<replace> takes three parameters...
1777              
1778             =over
1779              
1780             =item The field to change
1781              
1782             =item The regular expression to match against
1783              
1784             =item The string to replace the match with
1785              
1786             =back
1787              
1788             All instances of the matching pattern are replaced. For the patterns, you can
1789             use strings or regular expression references.
1790              
1791             =cut
1792              
1793             sub replace {
1794 4     4 1 27 my ($self, $field, $match, $change) = @_;
1795              
1796 4         13 my $string = $self->get( $field );
1797 4         51 $string =~ s/$match/$change/g;
1798 4         15 return $string;
1799             }
1800              
1801              
1802             =head2 Other
1803              
1804             =head3 is_valid
1805              
1806             This method returns true or false. True means that the pipeline is ready to
1807             go. False, of course, means that there's a problem. In a list context,
1808             B<is_invalid> returns the false value and an error message. On success, the
1809             error message is C<undef>.
1810              
1811             =cut
1812              
1813             sub is_valid {
1814 71     71 1 189 my $self = shift;
1815 71         157 my $error = undef;
1816              
1817 71 100       1679 if (!defined $self->_work_in) {
    100          
    100          
1818 3         7 $error = 'The working folder was not set';
1819             } elsif (!defined $self->_input) {
1820 4         11 $error = 'The "input" object was not set';
1821             } elsif (!defined $self->_output) {
1822 2         4 $error = 'The "output" object was not set';
1823             } else {
1824 62         1827 my $found = $self->_has_constants;
1825            
1826 62         1358 my $mapping = $self->_mapping;
1827 62 100 66     426 if (ref( $mapping ) eq 'CODE' ) { $found++; }
  4 100       8  
1828 51         99 elsif (ref( $mapping ) eq 'HASH' && scalar( $mapping ) > 0) { $found++; }
1829            
1830 62 100       198 $error = 'The mapping was not set' unless $found;
1831             }
1832              
1833 71 100       280 if (wantarray) {
1834 62 100       291 return ((defined( $error ) ? 0 : 1), $error);
1835             } else {
1836 9 100       103 return (defined( $error ) ? 0 : 1);
1837             }
1838             }
1839              
1840              
1841             #----------------------------------------------------------------------
1842             # Internal methods and attributes.
1843              
1844             # These attributes define field aliases. This is how column names work for Excel
1845             # and CSV. The script may also define aliases to shortcut long names.
1846              
1847             has '_alias' => (
1848             default => sub { {input => [], pipeline => []} },
1849             init_arg => undef,
1850             is => 'ro',
1851             isa => 'HashRef[ArrayRef[HashRef[Str]]]',
1852             );
1853              
1854             has '_alias_cache' => (
1855             default => sub { {} },
1856             handles => {_alias_cache_built => 'count'},
1857             is => 'ro',
1858             isa => 'HashRef[ArrayRef[Str]]',
1859             traits => [qw/Hash/],
1860             );
1861              
1862             has '_alias_type' => (
1863             default => 'pipeline',
1864             init_arg => undef,
1865             is => 'rw',
1866             isa => 'Str',
1867             );
1868              
1869              
1870             # This private method creates the ETL::Pipeline::Input and ETL::Pipeline::Output
1871             # objects. It allows me to centralize the error handling. The program dies if
1872             # there's an error. It means that something is wrong with the corresponding
1873             # class. And I don't want to hide those errors. You can only fix errors if you
1874             # know about them.
1875             #
1876             # Override or modify this method if you want to perform extra checks.
1877             #
1878             # The first parameter is a string with either "Input" or "Output".
1879             # The method appends this value onto "ETL::Pipeline". For example, "Input"
1880             # becomes "ETL::Pipeline::Input".
1881             #
1882             # The rest of the parameters are passed directly into the constructor for the
1883             # class this method instantiates.
1884             sub _object_of_class {
1885 130     130   218 my $self = shift;
1886 130         243 my $action = shift;
1887              
1888 130         338 my @arguments = @_;
1889 130 50 66     669 @arguments = @{$arguments[0]} if
  0         0  
1890             scalar( @arguments ) == 1
1891             && ref( $arguments[0] ) eq 'ARRAY'
1892             ;
1893              
1894 130         290 my $class = shift @arguments;
1895 130 100       452 if (substr( $class, 0, 1 ) eq '+') {
1896 11         23 $class = substr( $class, 1 );
1897             } else {
1898 119         311 my $base = "ETL::Pipeline::$action";
1899 119 50       459 $class = "${base}::$class" if substr( $class, 0, length( $base ) ) ne $base;
1900             }
1901              
1902 130         304 my %attributes = @arguments;
1903 130         268 $attributes{pipeline} = $self;
1904              
1905 11     11   5789 my $object = eval "use $class; $class->new( \%attributes )";
  11     11   23640  
  11     9   369  
  11     9   6513  
  11     8   4214  
  11     8   351  
  9     6   95  
  9     6   32  
  9         285  
  9         87  
  9         23  
  9         195  
  8         105  
  8         26  
  8         204  
  8         77  
  8         25  
  8         151  
  6         79  
  6         19  
  6         138  
  6         63  
  6         22  
  6         121  
  130         11066  
1906 130 50       696 croak "Error creating $class...\n$@\n" unless defined $object;
1907 130         3317 return $object;
1908             }
1909              
1910              
1911             =head1 ADVANCED TOPICS
1912              
1913             =head2 Multiple input sources
1914              
1915             It is not uncommon to receive your data spread across more than one file. How
1916             do you guarantee that each pipeline pulls files from the same working directory
1917             (L</work_in>)? You L</chain> the pipelines together.
1918              
1919             The L</chain> method works like this...
1920              
1921             ETL::Pipeline->new( {
1922             work_in => {search => 'C:\Data', find => qr/Ficticious/},
1923             input => ['Excel', iname => 'main.xlsx' ],
1924             mapping => {Name => 'A', Address => 'B', ID => 'C' },
1925             constants => {Type => 1, Information => 'Demographic' },
1926             output => ['SQL', table => 'NewData' ],
1927             } )->process->chain( {
1928             input => ['Excel', iname => 'notes.xlsx' ],
1929             mapping => {User => 'A', Text => 'B', Date => 'C' },
1930             constants => {Type => 2, Information => 'Note' },
1931             output => ['SQL', table => 'OtherData' ],
1932             } )->process;
1933              
1934             When the first pipeline finishes, it creates a new object with the same
1935             L</work_in>. The code then calls L</process> on the new object. The second
1936             pipeline copies L</work_in> from the first pipeline.
1937              
1938             =head2 Writing an input source
1939              
1940             B<ETL::Pipeline> provides some basic, generic input sources. Inevitably, you
1941             will come across data that doesn't fit one of these. No problem.
1942             B<ETL::Pipeline> lets you create your own input sources.
1943              
1944             An input source is a L<Moose> class that implements the L<ETL::Pipeline::Input>
1945             role. The role requires that you define the L<ETL::Pipeline::Input/run> method.
1946             B<ETL::Pipeline> calls that method. Name your class B<ETL::Pipeline::Input::*>
1947             and the L</input> method can find it automatically.
1948              
1949             See L<ETL::Pipeline::Input> for more details.
1950              
1951             =head2 Writing an output destination
1952              
1953             B<ETL::Pipeline> does not have any default output destinations. Output
1954             destinations are customized. You have something you want done with the data.
1955             And that something intimately ties into your specific business. You will have
1956             to write at least one output destination to do anything useful.
1957              
1958             An output destination is a L<Moose> class that implements the
1959             L<ETL::Pipeline::Output> role. The role defines required methods.
1960             B<ETL::Pipeline> calls those methods. Name your class
1961             B<ETL::Pipeline::Output::*> and the L</output> method can find it automatically.
1962              
1963             See L<ETL::Pipeline::Output> for more details.
1964              
1965             =head2 Why are the inputs and outputs separate?
1966              
1967             Wouldn't it make sense to have an input source for Excel and an output
1968             destination for Excel?
1969              
1970             Input sources are generic. It takes the same code to read from one Excel file
1971             as another. Output destinations, on the other hand, are customized for your
1972             business - with data validation and business logic.
1973              
1974             B<ETL::Pipeline> assumes that you have multiple input sources. Different
1975             feeds use different formats. But output destinations will be much fewer.
1976             You're writing data into a centralized place.
1977              
1978             For these reasons, it makes sense to keep the input sources and output
1979             destinations separate. You can easily add more inputs without affecting the
1980             outputs.
1981              
1982             =head2 Custom logging
1983              
1984             The default L<status> method send updates to STDOUT. If you want to add log
1985             files or integrate with a GUI, then subclass B<ETL::Pipeline> and
1986             L<override|Moose::Manual::MethodModifiers/OVERRIDE-AND-SUPER> the L</status>
1987             method.
1988              
1989             =head1 SEE ALSO
1990              
1991             L<ETL::Pipeline::Input>, L<ETL::Pipeline::Output>
1992              
1993             =head2 Input Source Formats
1994              
1995             L<ETL::Pipeline::Input::Excel>, L<ETL::Pipeline::Input::DelimitedText>,
1996             L<ETL::Pipeline::Input::JsonFiles>, L<ETL::Pipeline::Input::Xml>,
1997             L<ETL::Pipeline::Input::XmlFiles>
1998              
1999             =head1 REPOSITORY
2000              
2001             L<https://github.com/rbwohlfarth/ETL-Pipeline>
2002              
2003             =head1 AUTHOR
2004              
2005             Robert Wohlfarth <robert.j.wohlfarth@vumc.org>
2006              
2007             =head1 COPYRIGHT AND LICENSE
2008              
2009             Copyright (c) 2021 Robert Wohlfarth
2010              
2011             This module is free software; you can redistribute it and/or modify it
2012             under the same terms as Perl 5.10.0. For more details, see the full text
2013             of the licenses in the directory LICENSES.
2014              
2015             This program is distributed in the hope that it will be useful, but
2016             without any warranty; without even the implied
2017              
2018             =cut
2019              
2020 11     11   134 no Moose;
  11         66  
  11         145  
2021             __PACKAGE__->meta->make_immutable;