File Coverage

blib/lib/ETL/Pipeline.pm
Criterion Covered Total %
statement 174 184 94.5
branch 94 116 81.0
condition 10 15 66.6
subroutine 24 24 100.0
pod 12 13 92.3
total 314 352 89.2


line stmt bran cond sub pod time code
1             =pod
2              
3             =head1 NAME
4              
5             ETL::Pipeline - Extract-Transform-Load pattern for data file conversions
6              
7             =head1 SYNOPSIS
8              
9             use ETL::Pipeline;
10              
11             # The object oriented interface...
12             ETL::Pipeline->new( {
13             work_in => {search => 'C:\Data', find => qr/Ficticious/},
14             input => ['Excel', find => qr/\.xlsx?$/ ],
15             mapping => {Name => 'A', Address => 'B', ID => 'C' },
16             constants => {Type => 1, Information => 'Demographic' },
17             output => ['SQL', table => 'NewData' ],
18             } )->process;
19              
20             # Or using method calls...
21             my $pipeline = ETL::Pipeline->new;
22             $pipeline->work_in ( search => 'C:\Data', find => qr/Ficticious/ );
23             $pipeline->input ( 'Excel', find => qr/\.xlsx?$/i );
24             $pipeline->mapping ( Name => 'A', Address => 'B', ID => 'C' );
25             $pipeline->constants( Type => 1, Information => 'Demographic' );
26             $pipeline->output ( 'SQL', table => 'NewData' );
27             $pipeline->process;
28              
29             =cut
30              
31             package ETL::Pipeline;
32              
33 21     21   7657 use 5.014000;
  21         1164  
34 21     20   233 use Carp;
  20         846  
  20         18088  
35 20     20   6177 use Moose;
  20         3838937  
  20         5538  
36 20     19   65677 use MooseX::Types::Path::Class qw/Dir File/;
  19         1239367  
  19         72  
37 19     18   18723 use Path::Class::Rule;
  18         113603  
  18         322  
38 18     18   222 use Scalar::Util qw/blessed/;
  18         42  
  18         542  
39 18     18   6300 use String::Util qw/hascontent nocontent/;
  18         30908  
  18         22559  
40              
41              
42             our $VERSION = '2.02';
43              
44              
45             =head1 DESCRIPTION
46              
47             B<ETL> stands for I<Extract-Transform-Load>. ETL isn't just for Data
48             Warehousing. ETL works on almost any type of data conversion. You read the
49             source, translate the data for your target, and store the result.
50              
51             By dividing a conversion into 3 steps, we isolate the input from the output...
52              
53             =over
54              
55             =item * Centralizes data formatting and validation.
56              
57             =item * Makes new input formats a breeze.
58              
59             =item * Makes new outputs just as easy.
60              
61             =back
62              
63             B<ETL::Pipeline> takes your data files from extract to load. It reads an input
64             source, translates the data, and writes it to an output destination. For
65             example, I use the these pipelines for reading an Excel spread sheet (input)
66             and saving the information in an SQL database (output).
67              
68             use ETL::Pipeline;
69             ETL::Pipeline->new( {
70             work_in => {search => 'C:\Data', find => qr/Ficticious/},
71             input => ['Excel', find => qr/\.xlsx?$/],
72             mapping => {Name => 'A', Complaint => 'B', ID => 'C'},
73             constants => {Client => 1, Type => 'Complaint'}
74             output => ['SQL', table => 'NewData']
75             } )->process;
76              
77             Or like this, calling the methods instead of through the constructor...
78              
79             use ETL::Pipeline;
80             my $etl = ETL::Pipeline->new;
81             $etl->work_in ( search => 'C:\Data', find => qr/Ficticious/ );
82             $etl->input ( 'Excel', find => qr/\.xlsx?$/ );
83             $etl->mapping ( Name => 'A', Complaint => 'B', ID => 'C' );
84             $etl->constants( Client => 1, Type => 'Complaint' );
85             $etl->output ( 'SQL', table => 'NewData' );
86             $etl->process;
87              
88             =head2 What is a pipeline?
89              
90             The term I<pipeline> describes a complete ETL process - extract, transform,
91             and load. Or more accurately - input, mapping, output. Raw data enters one end
92             of the pipe (input) and useful information comes out the other (output). An
93             B<ETL::Pipeline> object represents a complete pipeline.
94              
95             =head1 METHODS & ATTRIBUTES
96              
97             =head3 new
98              
99             Create a new ETL pipeline. The constructor accepts these values...
100              
101             =over
102              
103             =item chain
104              
105             This optional attribute copies L</work_in>, L</data_in>, and L</session> from
106             another object. B<chain> accepts an B<ETL::Pipeline> object. The constructor
107             copies L</work_in>, L</data_in>, and L</session> from that object. It helps
108             scripts process multiple files from the same place.
109              
110             See the section L</Multiple input sources> for an example.
111              
112             =item constants
113              
114             Assigns constant values to output fields. Since B<mapping> accepts input
115             field names, B<constants> assigns literal strings or numbers to fields. The
116             constructor calls the L</constants> method. Assign a hash reference to this
117             attribute.
118              
119             constants => {Type => 1, Information => 'Demographic'},
120              
121             =item input
122              
123             Setup the L<ETL::Pipeline::Input> object for retrieving the raw data. The
124             constructor calls the L</input> method. Assign an array reference to this
125             attribute. The array is passed directly to L</input> as parameters.
126              
127             input => ['Excel', find => qr/\.xlsx?$/],
128              
129             =item output
130              
131             Setup the L<ETL::Pipeline::Output> object for retrieving the raw data. The
132             constructor calls the L</output> method. Assign an array reference to this
133             attribute. The array is passed directly to L</output> as parameters.
134              
135             output => ['SQL', table => 'NewData'],
136              
137             =item mapping
138              
139             Move data from the input to the output. This attribute maps the input to the
140             output. The constructor calls the L</mapping> method. Assign a hash
141             reference to the attribute.
142              
143             mapping => {Name => 'A', Address => 'B', ID => 'C'},
144              
145             =item work_in
146              
147             Sets the working directory. All files - input, output, or temporary - reside
148             in this directory. The constructor accepts the same value as the parameters
149             to the L</work_in> method. As a matter of fact, the constructor just calls the
150             L</work_in> method.
151              
152             =back
153              
154             When creating the pipeline, B<ETL::Pipeline> sets up arguments in this order...
155              
156             =over
157              
158             =item 1. work_in
159              
160             =item 2. data_in
161              
162             =item 3. input
163              
164             =item 4. constants
165              
166             =item 5. mapping
167              
168             =item 6. output
169              
170             =back
171              
172             Later parts (e.g. output) can depend on earlier parts (e.g. input). For
173             example, the B<input> will use B<data_in> in its constructor.
174              
175             =cut
176              
177             sub BUILD {
178 51     51 0 125 my $self = shift;
179 51         73 my $arguments = shift;
180              
181             # The order of these blocks is important. ETL::Pipeline::Input and
182             # ETL::Pipeline::Output objects depend on work_in and data_in being set.
183             # And I want parameters to override chained values.
184              
185             # Copy information from an existing object. This allows objects to share
186             # settings or information.
187             #
188             # NOTE: Always copy "work_in" before "data_in". The trigger on "work_in"
189             # will change "data_in" if you don't.
190 51 100       113 if (defined $arguments->{chain}) {
191 6         67 my $object = $arguments->{chain};
192 6 50       24 croak '"link" requires an ETL::Pipeline object' unless defined blessed( $object );
193 6 50       13 croak '"link" requires an ETL::Pipeline object' unless $object->isa( 'ETL::Pipeline' );
194 6 100       59 $self->_set_work_in( $object->work_in ) if defined $object->work_in;
195 4 100       9 $self->_set_data_in( $object->data_in ) if defined $object->data_in;
196 4         72 $self->_set_session( $object->_get_session );
197             }
198              
199             # The order of these two is important. "work_in" resets "data_in" with a
200             # trigger. "work_in" must be set first so that we don't lose the value
201             # from "data_in".
202 49 100       133 if (defined $arguments->{work_in}) {
203 32         42 my $values = $arguments->{work_in};
204 32 50       112 $self->work_in( ref( $values ) eq '' ? $values : @$values );
205             }
206 48 100       105 if (defined $arguments->{data_in}) {
207 2         3 my $values = $arguments->{data_in};
208 2 50       6 $self->data_in( ref( $values ) eq '' ? $values : @$values );
209             }
210              
211             # Configure the object in one fell swoop. This always happen AFTER copying
212             # the linked object. Normal setup overrides the linked object.
213             #
214             # The order of the object creation matches the order of execution -
215             # Extract, Transform, Load. Later parts on the pipeline can depend on
216             # the configuration of earlier parts.
217 48 100       103 if (defined $arguments->{input}) {
218 34         55 my $values = $arguments->{input};
219 34 100       115 $self->input( ref( $values ) eq '' ? $values : @$values );
220             }
221 46 100       121 if (defined $arguments->{constants}) {
222 19         26 my $values = $arguments->{constants};
223 19         68 $self->constants( %$values );
224             }
225 46 100       107 if (defined $arguments->{mapping}) {
226 11         14 my $values = $arguments->{mapping};
227 11         30 $self->mapping( %$values );
228             }
229 46 100       379 if (defined $arguments->{output}) {
230 31         36 my $values = $arguments->{output};
231 31 100       119 $self->output( ref( $values ) eq '' ? $values : @$values );
232             }
233             }
234              
235              
236             =head3 chain
237              
238             This method creates a new pipeline using the same L</work_in> and L</data_in>
239             directories. It accepts the same arguments as L</new>. Use B<chain> when
240             linking multiple pipelines together. See the section L</Multiple input sources>
241             for more details.
242              
243             =cut
244              
245             sub chain {
246 3     6 1 11 my ($self, %arguments) = @_;
247 3 50       8 $arguments{chain} = $self unless exists $arguments{chain};
248 3         64 return __PACKAGE__->new( \%arguments );
249             }
250              
251              
252             =head2 Reading the input
253              
254             =head3 input
255              
256             B<input> sets and returns the L<ETL::Pipeline::Input> object. The pipeline uses
257             this object for reading the input records.
258              
259             With no parameters, B<input> returns the current L<ETL::Pipeline::Input> object.
260              
261             You tie in a new input source by calling B<input> with parameters...
262              
263             $pipeline->input( 'Excel', find => qr/\.xlsx/i );
264              
265             The first parameter is a class name. B<input> looks for a Perl module matching
266             this name in the C<ETL::Pipeline::Input> namespace. In this example, the actual
267             class name becomes C<ETL::Pipeline::Input::Excel>.
268              
269             The rest of the parameters are passed directly to the C<new> method of that
270             class.
271              
272             B<Technical Note:> Want to use a custom class from B<Local> instead of
273             B<ETL::Pipeline::Input>? Put a B<+> (plus sign) in front of the class name.
274             For example, this command uses the input class B<Local::CustomExtract>.
275              
276             $pipeline->input( '+Local::CustomExtract' );
277              
278             =head3 get
279              
280             The B<get> method returns the value of a single field from the input. It maps
281             directly to the L<get method from ETL::Pipeline::Input|ETL::Pipeline::Input/get>.
282             See L<ETL::Pipeline::Input/get> for more information.
283              
284             $pipeline->get( 'A' );
285             # -or-
286             $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );
287              
288             When you use a code reference, B<ETL::Pipeline> passes itself in C<$_>. B<get>
289             provides a convenient shortcut. Instead of writing C<< $_->input->get >>, you
290             can write C<< $_->get >>.
291              
292             =head3 record_number
293              
294             The B<record_number> method returns current record number. It maps directly
295             to the L<record_number method from ETL::Pipeline::Input|ETL::Pipeline::Input/record_number>.
296             See L<ETL::Pipeline::Input/record_number> for more information.
297              
298             $pipeline->record_number;
299             # -or-
300             $pipeline->mapping( Row => sub { $_->record_number } );
301              
302             =cut
303              
304             has 'input' => (
305             does => 'ETL::Pipeline::Input',
306             handles => {get => 'get', record_number => 'record_number'},
307             init_arg => undef,
308             is => 'bare',
309             reader => '_get_input',
310             writer => '_set_input',
311             );
312              
313              
314             sub input {
315 290     291 1 15470 my $self = shift;
316              
317 290 100       578 $self->_set_input( $self->_object_of_class( 'Input', @_ ) ) if (scalar @_);
318 288         7186 return $self->_get_input;
319             }
320              
321              
322             =head2 Translating the data
323              
324             =head3 mapping
325              
326             B<mapping> ties the input fields with the output fields. If you call
327             B<mapping> with no parameters, it returns the hash reference. Call B<mapping>
328             with a hash or hash reference and it replaces the entire mapping with the new
329             one.
330              
331             Hash keys are output field names. The L</output> class defines acceptable field
332             names. The hash values can be...
333              
334             =over
335              
336             =item A string
337              
338             =item A regular expression reference (with C<qr/.../>)
339              
340             =item A code reference
341              
342             =back
343              
344             Strings and regular expressions are passed to L<ETL::Pipeline::Input/get>.
345             They must refer to an input field.
346              
347             A code reference is executed in a scalar context. It's return value goes into
348             the output field. The subroutine receives this B<ETL::Pipeline> object as its
349             first parameter B<and> in the C<$_> variable.
350              
351             # Get the current mapping...
352             my $transformation = $pipeline->mapping;
353              
354             # Set the output field "Name" to the input column "A"...
355             $pipeline->mapping( Name => 'A' );
356              
357             # Set "Name" from "Full Name" or "FullName"...
358             $pipeline->mapping( Name => qr/Full\s*Name/i );
359              
360             # Use the lower case of input column "A"...
361             $pipeline->mapping( Name => sub { lc $_->get( 'A' ) } );
362              
363             Want to save a literal value? Use L</constants> instead.
364              
365             =head3 add_mapping
366              
367             B<add_mapping> adds new fields to the current mapping. L</mapping> replaces
368             the entire mapping. B<add_mapping> modifies it, leaving all of your old
369             transformations in place.
370              
371             B<add_mapping> accepts key/value pairs as parameters.
372              
373             $pipeline->add_mapping( Address => 'B' );
374              
375             =cut
376              
377             has 'mapping' => (
378             handles => {add_mapping => 'set', has_mapping => 'count'},
379             init_arg => undef,
380             is => 'bare',
381             isa => 'HashRef',
382             reader => '_get_mapping',
383             traits => [qw/Hash/],
384             writer => '_set_mapping',
385             );
386              
387              
388             sub mapping {
389 17     17 1 20 my $self = shift;
390 17         27 my @pairs = @_;
391              
392 17 50 33     55 if (scalar( @pairs ) == 1 && ref( $pairs[0] ) eq 'HASH') {
    100          
393 0         0 $self->_set_mapping( $pairs[0] );
394             } elsif (scalar @pairs) {
395 12         23 my %new = @_;
396 12         304 $self->_set_mapping( \%new );
397             }
398 17         387 return $self->_get_mapping;
399             }
400              
401              
402             =head3 constants
403              
404             B<constants> sets output fields to literal values. L</mapping> accepts input
405             field names as strings. Instead of obtuse Perl tricks for marking literals,
406             B<constants> explicitly handles them.
407              
408             If you call B<constants> with no parameters, it returns the hash reference.
409             Call B<constants> with a hash or hash reference and it replaces the entire
410             hash with the new one.
411              
412             Hash keys are output field names. The L</output> class defines acceptable
413             field names. The hash values are literals.
414              
415             # Get the current mapping...
416             my $transformation = $pipeline->constants;
417              
418             # Set the output field "Name" to the string "John Doe"...
419             $pipeline->constants( Name => 'John Doe' );
420              
421             =head3 add_constant
422              
423             =head3 add_constants
424              
425             B<add_constant> adds new fields to the current hash of literal values.
426             L</constants> replaces the entire hash. B<add_constant> and B<add_constants>
427             modify the hash, leaving all of your old literals in place.
428              
429             B<add_constant> accepts key/value pairs as parameters.
430              
431             $pipeline->add_constant( Address => 'B' );
432              
433             =cut
434              
435             has 'constants' => (
436             handles => {add_constant => 'set', add_constants => 'set', has_constants => 'count'},
437             init_arg => undef,
438             is => 'bare',
439             isa => 'HashRef',
440             reader => '_get_constants',
441             traits => [qw/Hash/],
442             writer => '_set_constants',
443             );
444              
445              
446             sub constants {
447 25     25 1 95 my $self = shift;
448 25         40 my @pairs = @_;
449              
450 25 50 33     111 if (scalar( @pairs ) == 1 && ref( $pairs[0] ) eq 'HASH') {
    100          
451 0         0 $self->_set_constants( $pairs[0] );
452             } elsif (scalar @pairs) {
453 20         39 my %new = @_;
454 20         511 $self->_set_constants( \%new );
455             }
456 25         609 return $self->_get_constants;
457             }
458              
459              
460             =head2 Saving the output
461              
462             =head3 output
463              
464             B<output> sets and returns the L<ETL::Pipeline::Output> object. The pipeline
465             uses this object for creating output records.
466              
467             With no parameters, B<output> returns the current L<ETL::Pipeline::Output>
468             object.
469              
470             You tie in a new output destination by calling B<output> with parameters...
471              
472             $pipeline->output( 'SQL', table => 'NewData' );
473              
474             The first parameter is a class name. B<output> looks for a Perl module
475             matching this name in the C<ETL::Pipeline::Output> namespace. In this example,
476             the actual class name becomes C<ETL::Pipeline::Output::SQL>.
477              
478             The rest of the parameters are passed directly to the C<new> method of that
479             class.
480              
481             B<Technical Note:> Want to use a custom class from B<Local> instead of
482             B<ETL::Pipeline::Output>? Put a B<+> (plus sign) in front of the class name.
483             For example, this command uses the input class B<Local::CustomLoad>.
484              
485             $pipeline->output( '+Local::CustomLoad' );
486              
487             =head3 set
488              
489             B<set> assigns a value to an output field. The L<ETL::Pipeline::Output> class
490             defines the valid field names.
491              
492             B<set> accepts two parameters...
493              
494             =over
495              
496             =item field
497              
498             =item value
499              
500             =back
501              
502             B<set> places I<value> into the output I<field>.
503              
504             =head3 write_record
505              
506             B<write_record> outputs the current record. It is normally called by
507             L</process>. The pipeline makes it available in case you need to do something
508             special. B<write_record> takes no parameters.
509              
510             =cut
511              
512             has 'output' => (
513             does => 'ETL::Pipeline::Output',
514             handles => {set => 'set', write_record => 'write_record'},
515             init_arg => undef,
516             is => 'bare',
517             reader => '_get_output',
518             writer => '_set_output',
519             );
520              
521              
522             sub output {
523 149     149 1 10989 my $self = shift;
524              
525 149 100       359 $self->_set_output( $self->_object_of_class( 'Output', @_ ) ) if (scalar @_);
526 149         3668 return $self->_get_output;
527             }
528              
529              
530             =head2 The rest of the pipeline
531              
532             =head3 process
533              
534             B<process> kicks off the entire data conversion process. It takes no
535             parameters. All of the setup is done by the other methods.
536              
537             B<process> returns the B<ETL::Pipeline> object so you can do things like
538             this...
539              
540             ETL::Pipeline->new( {...} )->process->chain( ... )->process;
541              
542             =cut
543              
544             sub process {
545 5     5 1 28 my $self = shift;
546              
547 5         13 my ($success, $error) = $self->is_valid;
548 5 50       10 croak $error unless $success;
549              
550             # Configure the input and output objects. I expect them to "die" if they
551             # encounter errors. Always configure the input first. The output may use
552             # information from it.
553 5         10 $self->input->configure;
554 5         7 $self->output->configure;
555              
556             # The actual ETL process...
557 5         25 my $constants = $self->constants;
558 5         12 my $mapping = $self->mapping ;
559              
560 5         13 $self->progress( 'start' );
561 5         20 while ($self->input->next_record) {
562             # User defined, record level logic...
563 14         21 $self->execute_code_ref( $self->input->debug );
564 14 100       17 last if $self->execute_code_ref( $self->input->stop_if );
565 13 100       37 next if $self->execute_code_ref( $self->input->skip_if );
566              
567             # "constants" values...
568 12         69 while (my ($field, $value) = each %$constants) {
569 6 100       13 $value = $self->execute_code_ref( $value ) if ref( $value ) eq 'CODE';
570 6         16 $self->output->set( $field, $value );
571             }
572              
573             # "mapping" values...
574 12         28 while (my ($to, $from) = each %$mapping) {
575 24 100       44 if (ref( $from ) eq 'CODE') {
    50          
576 6         8 $self->output->set( $to, $self->execute_code_ref( $from ) );
577             } elsif (ref( $from ) eq 'ARRAY') {
578 0         0 $self->output->set( $to, $self->input->get( @$from ) );
579             } else {
580 18         28 $self->output->set( $to, $self->input->get( $from ) );
581             }
582             }
583              
584             # "output"...
585 12         26 $self->output->write_record;
586 13         66 } continue { $self->progress( '' ); }
587 5         27 $self->progress( 'end' );
588              
589             # Close the input and output in the opposite order we created them. This
590             # safely unwinds any dependencies.
591 5         19 $self->output->finish;
592 5         12 $self->input->finish;
593              
594             # Return the pipeline object so that we can chain calls. Useful shorthand
595             # when running multiple pipelines.
596 5         11 return $self;
597             }
598              
599              
600             =head3 work_in
601              
602             The working directory sets the default place for finding files. All searches
603             start here and only descend subdirectories. Temporary or output files go into
604             this directory as well.
605              
606             B<work_in> has two forms: C<work_in( 'C:\Data' );> or
607             C<< work_in( search => 'C:\Data', matching => 'Ficticious' ); >>.
608              
609             The first form specifies the exact directory path. In our example, the working
610             directory is F<C:\Data>.
611              
612             The second form searches the file system for a matching directory. Take this
613             example...
614              
615             $etl->work_in( search => 'C:\Data', matching => 'Ficticious' );
616              
617             It scans the F<C:\Data> directory for a subdirectory named F<Fictious>, like
618             this: F<C:\Data\Ficticious>. The search is B<not> recursive. It locates files
619             in the B<search> folder.
620              
621             =over
622              
623             =item search
624              
625             Search inside this directory for a matching subdirectory. The search is not
626             recursive.
627              
628             =item matching
629              
630             Look for a subdirectory that matches this name. Wildcards and regular
631             expressions are supported. Searches are case insensitive.
632              
633             =back
634              
635             B<work_in> automatically resets L</data_in>.
636              
637             =cut
638              
639             has 'work_in' => (
640             coerce => 1,
641             init_arg => undef,
642             is => 'bare',
643             isa => Dir,
644             reader => '_get_work_in',
645             trigger => \&_trigger_work_in,
646             writer => '_set_work_in',
647             );
648              
649              
650             sub work_in {
651 79     79 1 2496 my $self = shift;
652              
653 79 100       173 if (scalar( @_ ) == 1) {
    100          
654 34         920 $self->_set_work_in( shift );
655             } elsif(scalar( @_ ) > 1) {
656 4         10 my %options = @_;
657              
658 4 50       9 if (defined $options{matching}) {
659             my $search = hascontent( $options{search} )
660             ? $options{search}
661 4 100       10 : $self->_default_search
662             ;
663             my $next = Path::Class::Rule
664             ->new
665             ->max_depth( 1 )
666             ->min_depth( 1 )
667             ->iname( $options{matching} )
668 4         45 ->directory
669             ->iter( $search )
670             ;
671 4         987 my $match = $next->();
672 4 50       7208 croak 'No matching directories' unless defined $match;
673 4         120 $self->_set_work_in( $match );
674 0         0 } else { $self->_set_work_in( $options{search} ); }
675             }
676 79         1943 return $self->_get_work_in;
677             }
678              
679              
680             sub _trigger_work_in {
681 40     40   41 my $self = shift;
682 40         41 my $new = shift;
683 40         1011 $self->_set_data_in( $new );
684             }
685              
686              
687             =head3 data_in
688              
689             The working directory (L</work_in>) usually contains the raw data files. In
690             some cases, though, the actual data sits in a subdirectory underneath
691             L</work_in>. B<data_in> tells the pipeline where to find the input file.
692              
693             B<data_in> accepts a search pattern - name, glob, or regular expression. It
694             searches L</work_in> for the first matching directory. The search is case
695             insensitive.
696              
697             If you pass an empty string to B<data_in>, the pipeline resets B<data_in> to
698             the L</work_in> directory. This is useful when chaining pipelines. If one
699             changes the data directory, the next in line can change back.
700              
701             =cut
702              
703             has 'data_in' => (
704             coerce => 1,
705             init_arg => undef,
706             is => 'bare',
707             isa => Dir,
708             reader => '_get_data_in',
709             writer => '_set_data_in',
710             );
711              
712              
713             sub data_in {
714 28     28 1 703 my $self = shift;
715              
716 28 100       73 if (scalar @_) {
717 4 50       9 croak 'The working folder was not set' unless defined $self->work_in;
718              
719 4         4 my $name = shift;
720 4 50       10 if (hascontent( $name )) {
721 4         44 my $next = Path::Class::Rule
722             ->new
723             ->min_depth( 1 )
724             ->iname( $name )
725             ->directory
726             ->iter( $self->work_in )
727             ;
728 4         536 my $match = $next->();
729 4 50       8970 croak 'No matching directories' unless defined $match;
730 4         124 $self->_set_data_in( $match );
731             } else {
732 0         0 $self->_set_data_in( $self->work_in );
733             }
734             }
735 28         751 return $self->_get_data_in;
736             }
737              
738              
739             =head3 session
740              
741             B<ETL::Pipeline> supports sessions. A session allows input and output objects
742             to share information along a chain. For example, imagine 3 Excel files being
743             loaded into an Access database. All 3 files go into the same Access database.
744             The first pipeline creates the database and saves its path in the session. That
745             pipeline chains with a second pipeline. The second pipeline retrieves the
746             Access filename from the session.
747              
748             The B<session> method provides access to session level variables. As you write
749             your own L<ETL::Pipeline::Output> classes, they can use session variables for
750             sharing information.
751              
752             The first parameter is the variable name. If you pass only the variable name,
753             B<session> returns the value.
754              
755             my $database = $etl->session( 'access_file' );
756             my $identifier = $etl->session( 'session_identifier' );
757              
758             A second parameter is the value.
759              
760             $etl->session( access_file => 'C:\ExcelData.accdb' );
761              
762             You can set multiple variables in one call.
763            
764             $etl->session( access_file => 'C:\ExcelData.accdb', name => 'Abe' );
765              
766             When retrieving an array or hash reference, B<session> automatically
767             derefernces it if called in a list context. In a scalar context, B<session>
768             returns the reference.
769              
770             # Returns the list of names as a list.
771             foreach my $name ($etl->session( 'name_list' )) { ... }
772            
773             # Returns a list reference instead of a list.
774             my $reference = $etl->session( 'name_list' );
775              
776             =head3 session_has
777              
778             B<session_has> checks for a specific session variable. It returns I<true> if
779             the variable exists and I<false> if it doesn't.
780              
781             B<session_has> only checks existence. It does not tell you if the value is
782             defined.
783              
784             if ($etl->session_has( 'access_file' )) { ... }
785              
786             =cut
787              
788             has 'session' => (
789             default => sub { {} },
790             handles => {
791             _get_variable => 'get',
792             session_has => 'exists',
793             _set_variable => 'set',
794             },
795             init_arg => undef,
796             is => 'bare',
797             isa => 'HashRef[Any]',
798             reader => '_get_session',
799             traits => [qw/Hash/],
800             writer => '_set_session',
801             );
802              
803              
804             sub session {
805 15     15 1 6829 my $self = shift;
806            
807 15 100       31 if (scalar( @_ ) > 1) {
808 6         13 my %parameters = @_;
809 6         20 while (my ($key, $value) = each %parameters) {
810 7         212 $self->_set_variable( $key, $value );
811             }
812             }
813            
814 15         16 my $key = shift;
815 15 100       24 if (wantarray) {
816 1         36 my $result = $self->_get_variable( $key );
817 1 50       4 if (ref( $result ) eq 'ARRAY') { return @$result; }
  1 0       3  
818 0         0 elsif (ref( $result ) eq 'HASH' ) { return %$result; }
819 0         0 else { return $result; }
820 14         426 } else { return $self->_get_variable( $key ); }
821             }
822              
823              
824             # Alternate design: Use attributes for session level information.
825             # Result: Discarded
826             #
827             # Instead of keeping session variables in a hash, the class would have an
828             # attribute corresponding to the session data it can keep. Since
829             # ETL::Pipeline::Input and ETL::Pipeline::Output objects have access to the
830             # the pipeline, they can share data through the attributes.
831             #
832             # For any session information, the developer must subclass ETL::Pipeline. The
833             # ETL::Pipeline::Input or ETL::Pipeline::Output classes would be tied to that
834             # specific subclass. And if you needed to combine two sets of session
835             # variables, well that just means another class type. That's very confusing.
836             #
837             # Attributes make development of new input and output classes very difficult.
838             # The hash is simple. It decouples the input/output classes from pipeline.
839             # That keeps customization simpler.
840              
841              
842             =head2 Other methods & attributes
843              
844             =head3 is_valid
845              
846             This method returns true or false. True means that the pipeline is ready to
847             go. False, of course, means that there's a problem. In a list context,
848             B<is_invalid> returns the false value and an error message. On success, the
849             error message is C<undef>.
850              
851             =cut
852              
853             sub is_valid {
854 20     20 1 44 my $self = shift;
855 20         22 my $error = '';
856              
857 20 100 100     30 if (!defined $self->work_in) {
    100          
    100          
    100          
858 3         3 $error = 'The working folder was not set';
859             } elsif (!defined $self->input) {
860 4         5 $error = 'The "input" object was not set';
861             } elsif (!defined $self->output) {
862 2         2 $error = 'The "output" object was not set';
863             } elsif (!$self->has_mapping && !$self->has_constants) {
864 2         2 $error = 'The mapping was not set';
865             }
866              
867 20 100       30 if (wantarray) {
868 11 100       35 return (($error eq '' ? 1 : 0), $error);
869             } else {
870 9 100       35 return ($error eq '' ? 1 : 0);
871             }
872             }
873              
874              
875             =head3 progress
876              
877             This method displays the current upload progress. It is called automatically
878             by L</process>.
879              
880             B<progress> takes one parameter - a status...
881              
882             =over
883              
884             =item start
885              
886             The ETL process is just beginning. B<progress> displays the input file name,
887             if L</input> supports the L<ETL::Pipeline::Input::File> role. Otherwise,
888             B<progress> displays the L<ETL::Pipeline::Input> class name.
889              
890             =item end
891              
892             The ETL process is complete.
893              
894             =item (blank)
895              
896             B<progress> displays a count every 50 records, so you know that it's working.
897              
898             =back
899              
900             =cut
901              
902             sub progress {
903 23     23 1 26 my ($self, $mark) = @_;
904              
905 23 100       45 if (nocontent( $mark )) {
    100          
    50          
906 13         77 my $count = $self->input->record_number;
907 13 50       43 say "Processed record #$count..." unless $count % 50;
908             } elsif ($mark eq 'start') {
909 5         51 my $name;
910 5 50       12 if ($self->input->does( 'Data::Pipeline::Input::File' )) {
911 0         0 $name = $self->input->path->relative( $self->work_in );
912             } else {
913 5         151 $name = ref( $self->input );
914 5         15 $name =~ s/^ETL::Pipeline::Input:://;
915             }
916 5         754 say "Processing '$name'...";
917             } elsif ($mark eq 'end') {
918 5         480 say 'Finished, cleaning up...';
919             } else {
920 0         0 say $mark;
921             }
922             }
923              
924              
925             =head3 execute_code_ref
926              
927             This method runs arbitrary Perl code. B<ETL::Pipeline> itself,
928             L<input sources|ETL::Pipeline::Input>, and
929             L<output destinations|ETL::Pipeline::Output> call this method.
930              
931             The first parameter is the code reference. Any additional parameters are
932             passed directly to the code reference.
933              
934             The code reference receives the B<ETL::Pipeline> object as its first parameter,
935             plus any additional parameters. B<execute_code_ref> also puts the
936             B<ETL::Pipeline> object into C<$_>;
937              
938             =cut
939              
940             sub execute_code_ref {
941 62     62 1 1382 my $self = shift;
942 62         49 my $code = shift;
943              
944 62 100 100     171 if (defined( $code ) && ref( $code ) eq 'CODE') {
945 25         61 local $_;
946 25         22 $_ = $self;
947 25         55 return $code->( $self, @_ );
948 37         53 } else { return undef; }
949             }
950              
951              
952             =head2 For overriding in a subclass
953              
954             =head3 _default_search
955              
956             L</work_in> searches inside this directory if you do not specify a B<search>
957             parameter. It defaults to the current directory. Override this in the subclass
958             with the correct B<default> for your environment.
959              
960             =cut
961              
962             has '_default_search' => (
963             default => '.',
964             init_arg => undef,
965             is => 'ro',
966             isa => 'Str',
967             );
968              
969              
970             =head3 _object_of_class
971              
972             This private method creates the L<ETL::Pipeline::Input> and
973             L<ETL::Pipeline::Output> objects. It allows me to centralize the error
974             handling. The program dies if there's an error. It means that something is
975             wrong with the corresponding class. And I don't want to hide those errors.
976             You can only fix errors if you know about them.
977              
978             Override or modify this method if you want to perform extra checks.
979              
980             The first parameter is a string with either I<Input> or I<Output>.
981             B<_object_of_class> appends this value onto C<ETL::Pipeline>. For example,
982             I<'Input'> becomes C<ETL::Pipeline::Input>.
983              
984             The rest of the parameters are passed directly into the constructor for
985             the class B<_object_of_class> instantiates.
986              
987             =cut
988              
989             sub _object_of_class {
990 69     69   63 my $self = shift;
991 69         71 my $action = shift;
992              
993 69         96 my @arguments = @_;
994 69 50 66     285 @arguments = @{$arguments[0]} if (scalar( @arguments ) == 1 && ref( $arguments[0] ) eq 'ARRAY');
  0         0  
995              
996 69         95 my $class = shift @arguments;
997 69 100       779 if ($class =~ m/^\+/) {
    50          
998 10         28 $class =~ s/^\+//;
999             } elsif ($class !~ m/^ETL::Pipeline::$action/) {
1000 59         126 $class = "ETL::Pipeline::$action::$class";
1001             }
1002              
1003 69         134 my %attributes = @arguments;
1004 69         134 $attributes{pipeline} = $self;
1005              
1006 16     16   304138 my $object = eval "use $class; $class->new( \%attributes )";
  14         48169  
  14         342  
  69         3744  
1007 69 100       12342 croak "Error creating $class...\n$@\n" unless defined $object;
1008 67         1835 return $object;
1009             }
1010              
1011              
1012             =head1 ADVANCED TOPICS
1013              
1014             =head2 Multiple input sources
1015              
1016             It is not uncommon to receive your data spread across more than one file. How
1017             do you guarantee that each pipeline pulls files from the same working directory
1018             (L</work_in>)? You L</chain> the pipelines together.
1019              
1020             The L</chain> method works like this...
1021              
1022             ETL::Pipeline->new( {
1023             work_in => {search => 'C:\Data', find => qr/Ficticious/},
1024             input => ['Excel', find => 'main.xlsx' ],
1025             mapping => {Name => 'A', Address => 'B', ID => 'C' },
1026             constants => {Type => 1, Information => 'Demographic' },
1027             output => ['SQL', table => 'NewData' ],
1028             } )->process->chain( {
1029             input => ['Excel', find => 'notes.xlsx' ],
1030             mapping => {User => 'A', Text => 'B', Date => 'C' },
1031             constants => {Type => 2, Information => 'Note' },
1032             output => ['SQL', table => 'NewData' ],
1033             } )->process;
1034              
1035             When the first pipeline finishes, it creates a new object with the same
1036             L</work_in>. The code then calls L</process> on the new object. You can also
1037             use the B<chain> constructor argument...
1038              
1039             my $pipeline1 = ETL::Pipeline->new( {
1040             work_in => {search => 'C:\Data', find => qr/Ficticious/},
1041             input => ['Excel', find => 'main.xlsx' ],
1042             mapping => {Name => 'A', Address => 'B', ID => 'C' },
1043             constants => {Type => 1, Information => 'Demographic' },
1044             output => ['SQL', table => 'NewData' ],
1045             } )->process;
1046             my $pipeline2 = ETL::Pipeline->new( {
1047             input => ['Excel', find => 'notes.xlsx' ],
1048             chain => $pipeline1,
1049             mapping => {User => 'A', Text => 'B', Date => 'C' },
1050             constants => {Type => 2, Information => 'Note' },
1051             output => ['SQL', table => 'NewData' ],
1052             } )->process;
1053              
1054             In both of these styles, the second pipeline copies L</work_in> from the first
1055             pipeline. There is no difference between the L</chain> method or B<chain>
1056             constructor argument. Pick the one that best suits your programming style.
1057              
1058             =head2 Writing an input source
1059              
1060             B<ETL::Pipeline> provides some basic, generic input sources. Invariable, you
1061             will come across data that doesn't fit one of these. No problem.
1062             B<ETL::Pipeline> lets you create your own input sources.
1063              
1064             An input source is a L<Moose> class that implements the L<ETL::Pipeline::Input>
1065             role. The role requires that you define certain methods. B<ETL::Pipeline> makes
1066             use of those methods. Name your class B<ETL::Pipeline::Input::*> and the
1067             L</input> method can find it automatically.
1068              
1069             See L<ETL::Pipeline::Input> for more details.
1070              
1071             =head2 Writing an output destination
1072              
1073             B<ETL::Pipeline> does not have any default output destinations. Output
1074             destinations are customized. You have something you want done with the data.
1075             And that something intimately ties into your specific business. You will have
1076             to write at least one output destination to do anything useful.
1077              
1078             An output destination is a L<Moose> class that implements the
1079             L<ETL::Pipeline::Output> role. The role defines required methods and a simple
1080             hash for storing the new record in memory. B<ETL::Pipeline> makes use of the
1081             methods. Name your class B<ETL::Pipeline::Output::*> and the L</output> method
1082             can find it automatically.
1083              
1084             See L<ETL::Pipeline::Output> for more details.
1085              
1086             =head2 Why are the inputs and outputs separate?
1087              
1088             Wouldn't it make sense to have an input source for Excel and an output
1089             destination for Excel?
1090              
1091             Input sources are generic. It takes the same code to read from one Excel file
1092             as another. Output destinations, on the other hand, are customized for your
1093             business - with data validation and business logic.
1094              
1095             B<ETL::Pipeline> assumes that you have multiple input sources. Different
1096             feeds use different formats. But output destinations will be much fewer.
1097             You're writing data into a centralized place.
1098              
1099             For these reasons, it makes sense to keep the input sources and output
1100             destinations separate. You can easily add more inputs without affecting the
1101             outputs.
1102              
1103             =head1 SEE ALSO
1104              
1105             L<ETL::Pipeline::Input>, L<ETL::Pipeline::Output>, L<ETL::Pipeline::Mapping>
1106              
1107             =head2 Input Source Formats
1108              
1109             L<ETL::Pipeline::Input::Excel>, L<ETL::Pipeline::Input::DelimitedText>
1110              
1111             =head1 REPOSITORY
1112              
1113             L<https://github.com/rbwohlfarth/ETL-Pipeline>
1114              
1115             =head1 AUTHOR
1116              
1117             Robert Wohlfarth <robert.j.wohlfarth@vanderbilt.edu>
1118              
1119             =head1 COPYRIGHT AND LICENSE
1120              
1121             Copyright (c) 2016 Robert Wohlfarth
1122              
1123             This module is free software; you can redistribute it and/or modify it
1124             under the same terms as Perl 5.10.0. For more details, see the full text
1125             of the licenses in the directory LICENSES.
1126              
1127             This program is distributed in the hope that it will be useful, but
1128             without any warranty; without even the implied
1129              
1130             =cut
1131              
1132 18     16   165 no Moose;
  16         34  
  16         90  
1133             __PACKAGE__->meta->make_immutable;