File Coverage

blib/lib/Pipeline.pm
Criterion Covered Total %
statement 94 98 95.9
branch 22 24 91.6
condition 5 11 45.4
subroutine 22 23 95.6
pod 16 16 100.0
total 159 172 92.4


line stmt bran cond sub pod time code
1             package Pipeline;
2              
3 11     11   69921 use strict;
  11         24  
  11         402  
4 11     11   61 use warnings::register;
  11         73  
  11         2168  
5              
6 11     11   5431 use Pipeline::Segment;
  11         35  
  11         412  
7 11     11   4704 use Pipeline::Dispatch;
  11         27  
  11         295  
8 11     11   5825 use Pipeline::Store::Simple;
  11         27  
  11         362  
9 11     11   62 use Scalar::Util qw( blessed weaken );
  11         110  
  11         890  
10 11     11   59 use base qw( Pipeline::Segment );
  11         17  
  11         416591  
11              
12             our $VERSION = "3.12";
13              
14             sub init {
15 25     25 1 45 my $self = shift;
16 25 50       150 if ($self->SUPER::init( @_ )) {
17 25         261 $self->debug( 0 );
18 25         177 $self->store( Pipeline::Store::Simple->new() );
19 25         154 $self->dispatcher( Pipeline::Dispatch->new() );
20 25         105 $self->segments( [] );
21 25         83 return $self;
22             } else {
23 0         0 return undef;
24             }
25             }
26              
27             sub add_segment {
28 24     24 1 2933 my $self = shift;
29 24         66 $self->dispatcher->add( @_ );
30 24         49 $self;
31             }
32              
33             sub get_segment {
34 0     0 1 0 my $self = shift;
35 0         0 my $idx = shift;
36 0         0 return $self->dispatcher()->get( $idx );
37             }
38              
39             sub del_segment {
40 1     1 1 3 my $self = shift;
41 1         3 my $idx = shift;
42 1         3 my $seg = $self->segments()->[ $idx ];
43 1         4 $self->dispatcher()->delete( $idx );
44 1         13 $seg;
45             }
46              
47             sub segments {
48 31     31 1 685 my $self = shift;
49 31         74 return $self->dispatcher()->segments( @_ );
50             }
51              
52             sub dispatch {
53 17     17 1 4060 my $self = shift;
54              
55 17         59 my $result = $self->dispatch_loop();
56 17         49 my $cleanup_result = $self->cleanup;
57              
58 17         47 $self->dispatcher()->reset();
59              
60 17 100       67 if (blessed( $result )) {
61 4 50 0     21 return $result->isa('Pipeline::Production') ?
62             $result->contents :
63             $result || 1;
64             } else {
65 13   50     370 return $result || 1;
66             }
67             }
68              
69             sub start_dispatch {
70 23     23 1 31 my $self = shift;
71 23         64 $self->store->start_transaction;
72             }
73              
74             sub end_dispatch {
75 23     23 1 32 my $self = shift;
76 23         93 $self->store->end_transaction;
77             }
78              
79             sub process_indv_result {
80 20     20 1 27 my $self = shift;
81 20         23 my $thing = shift;
82 20         24 my $production = undef;
83 20 100       112 return $production unless blessed( $thing );
84 7 100       331 if ($thing->isa( 'Pipeline::Segment' )) {
    100          
85 1         4 $self->cleanups->add_segment( $thing );
86             } elsif ($thing->isa('Pipeline::Production')) {
87 4         31 $production = $thing;
88 4         12 $self->store->set( $thing->contents );
89             } else {
90 2         9 $self->store->set( $thing );
91             }
92 7   100     40 return $production || undef;
93             }
94              
95             sub process_results {
96 20     20 1 27 my $self = shift;
97 20         28 my $args = shift;
98 20         27 my $final;
99 20         50 foreach my $result ( @$args ) {
100 20         66 my $product = $self->process_indv_result( $result );
101 20 100       84 $final = $product if $product;
102             }
103 20 100       69 return $final if $final;
104 16         39 return undef;
105             }
106              
107             sub dispatch_loop {
108 17     17 1 26 my $self = shift;
109              
110             ## turn on debugging for the dispatcher if we need to
111 17         44 $self->dispatcher->debug( $self->debug );
112              
113 17         47 while($self->dispatcher->segment_available) {
114 20         61 my $unrefined = [ $self->dispatcher->next( $self ) ];
115 20         79 my $refined = $self->process_results( $unrefined );
116 20 100       122 if (defined( $refined )) {
117 4         10 return $refined
118             }
119             }
120 13         49 return 1;
121             }
122              
123             ## be careful here
124             sub cleanup {
125 17     17 1 44 my $self = shift;
126 17 100       99 if ($self->{ cleanup_pipeline }) {
127             return (
128 2   50     8 $self->{ cleanup_pipeline }->debug( $self->debug || 0 )
129             ->parent( $self )
130             ->store( $self->store() )
131             ->dispatch()
132             );
133             }
134             # $self->end_dispatch();
135             }
136              
137             sub dispatcher {
138 169     169 1 205 my $self = shift;
139 169         178 my $obj = shift;
140 169 100       325 if (defined( $obj )) {
141 25         69 $self->{ dispatcher } = $obj;
142 25         46 return $self;
143             } else {
144 144         656 return $self->{ dispatcher };
145             }
146             }
147              
148             sub cleanups {
149 2     2 1 4 my $self = shift;
150 2   33     39 $self->{ cleanup_pipeline } ||= ref($self)->new();
151             }
152              
153             sub debug {
154 48     48 1 70 my $self = shift;
155 48         205 $self->SUPER::debug( @_ );
156             }
157              
158             sub debug_all {
159 2     2 1 4 my $self = shift;
160 2         3 my $debug = shift;
161 2         4 foreach my $segment (@{ $self->segments }) {
  2         4  
162 2 100       20 $segment->isa( 'Pipeline' )
163             ? $segment->debug_all( $debug )
164             : $segment->debug( $debug );
165             }
166              
167 2         6 $self->debug( $debug );
168             }
169              
170             1;
171              
172             =head1 NAME
173              
174             Pipeline - Generic pipeline interface
175              
176             =head1 SYNOPSIS
177              
178             use Pipeline;
179             my $pipeline = Pipeline->new();
180             $pipeline->add_segment( @segments );
181             $pipeline->dispatch();
182              
183             =head1 DESCRIPTION
184              
185             C are a mechanism to process data. They are designed to
186             be plugged together to make fairly complex operations act in a
187             fairly straightforward manner, cleanly, and simply.
188              
189             =head1 USING THE PIPELINE MODULE
190              
191             The usage of the generic pipeline module is fairly simple. You
192             instantiate a Pipeline object by using the I constructor.
193              
194             Segments can be added to the pipeline with the add_segment method.
195              
196             The store that the Pipeline will use can be set by calling the
197             I method later on. If a store is not set by the time
198             a pipeline is executing then it will use a store of the type
199             C.
200              
201             To start the pipeline running call the I method on your
202             Pipeline object.
203              
204             If a segment returns a Pipeline::Production object then the pipeline
205             will be terminated early and the production will be returned to the
206             user. Regardless of when the pipeline is terminated the pipeline's
207             cleanup pipeline is executed. Segments can be added to the cleanup
208             pipeline either explicitly by calling the cleanups method to get the
209             cleanup pipeline and then adding the segment, or implicitly by
210             returning a segment object from a segment.
211              
212             To see what is being dispatched within a pipeline dispatch set the
213             pipeline's debug_all value to true.
214              
215             =head2 INHERITANCE
216              
217             Pipelines are designed to be inherited from. The inheritance tree is
218             somewhat warped and should look a little like this:
219              
220             MySegment --> Pipeline::Segment <--- Pipeline
221              
222             In other words, everything is a pipeline segment.
223              
224             =head1 METHODS
225              
226             The Pipeline class inherits from the C class and
227             therefore also has any additional methods that its superclass may have.
228              
229             =over 4
230              
231             =item init( @_ )
232              
233             Things to do at construction time. If you do override this, it will
234             often be fairly important that you call and check the value of
235             $self->SUPER::init(@_) to make sure that the setup is done correctly.
236             Returns itself on success, undef on failure. The constructor will
237             fail if you return a false value.
238              
239             =item add_segment( LIST )
240              
241             Adds a segment or segments to the pipeline. Returns itself.
242              
243             =item get_segment( INTEGER )
244              
245             Returns the segment located at the index specified by INTEGER
246              
247             =item del_segment( INTEGER )
248              
249             Deletes and returns the segment located at the index specified
250             by INTEGER
251              
252             =item process_results( ARRAYREF )
253              
254             Examines each result of a segment and calls process_indv_result with
255             each element of ARRAYREF. In the case that process_indv_result returns
256             a production then it is returned to the caller.
257              
258             =item process_indv_result( SCALAR )
259              
260             Examines a single result and does the appripriate thing with it (ie, if it
261             is an object it puts it into the store, if it is a production it returns
262             it to the caller, and if it is a simple value it gets thrown away. In
263             the case that a value is returned from process_indv_result the pipeline
264             should terminate.
265              
266             =item dispatch()
267              
268             Starts the pipeline execution. It calls process_results on anything
269             that a segment returns. The pipeline always returns the production
270             or true.
271              
272             =item dispatch_loop( Pipeline, [ ARRAYREF ] )
273              
274             The C method performs the processing for the pipeline
275              
276             =item start_dispatch
277              
278             Prepares all elements of the pipeline to begin processing a segment.
279              
280             =item end_dispatch
281              
282             Cleans up all elements of the pipeline after processing a segment.
283              
284             =item dispatch_segment( Pipeline::Segment )
285              
286             The C method handles the execution of an individual
287             segment object.
288              
289             =item dispatcher( [Pipeline::Dispatch] )
290              
291             The C method gets and sets the pipeline dispatcher object
292             that will be used to traverse the pipeline.
293              
294             =item cleanups()
295              
296             Returns the cleanup pipeline. This is a pipeline in and of itself,
297             and all the methods you can call on a pipeline can also be called on
298             this.
299              
300             =item cleanup()
301              
302             Calls the dispatch method on the cleanup pipeline.
303              
304             =item segments( [ value ] )
305              
306             C gets and sets the value of the pipeline list. At
307             initialization this is set to an array reference.
308              
309             =item debug_all( value )
310              
311             Sets debug( value ) recursively for each segment in this pipeline.
312              
313             =back
314              
315             =head1 SEE ALSO
316              
317             C, C, C
318             C, C
319              
320             =head1 AUTHORS
321              
322             James A. Duncan
323             Leon Brocard
324              
325             =head1 COPYRIGHT
326              
327             Copyright 2003 Fotango Ltd.
328             Licensed under the same terms as Perl itself.
329              
330             =cut
331