File Coverage

blib/lib/Piper.pm
Criterion Covered Total %
statement 32 34 94.1
branch 1 2 50.0
condition n/a
subroutine 12 12 100.0
pod n/a
total 45 48 93.7


line stmt bran cond sub pod time code
1             #####################################################################
2             ## AUTHOR: Mary Ehlers, regina.verbae@gmail.com
3             ## ABSTRACT: Flexible, iterable pipeline engine with automatic batching
4             #####################################################################
5              
6             package Piper;
7              
8 2     2   183641 use v5.10;
  2         5  
9 2     2   8 use strict;
  2         3  
  2         30  
10 2     2   5 use warnings;
  2         2  
  2         48  
11              
12 2     2   6 use Carp;
  2         2  
  2         92  
13 2     2   708 use Piper::Instance;
  2         7  
  2         77  
14 2     2   995 use Piper::Process;
  2         6  
  2         5  
15 2     2   10 use Types::Standard qw(ArrayRef ConsumerOf Tuple slurpy);
  2         2  
  2         11  
16              
17 2     2   988 use Moo;
  2         2  
  2         8  
18 2     2   631 use namespace::clean;
  2         4  
  2         8  
19              
20             with qw(Piper::Role::Segment);
21              
22             use overload (
23 2     2   2401 q{""} => sub { $_[0]->label },
24 2         11 fallback => 1,
25 2     2   534 );
  2         2  
26              
27             our $VERSION = '0.04'; # from Piper-0.04.tar.gz
28              
29             my $CONFIG;
30              
31             sub import {
32 2     2   12 my $class = shift;
33 2 50       6101 if (@_) {
34 0           require Piper::Config;
35 0           $CONFIG = Piper::Config->new(@_);
36             }
37             }
38              
39             #pod =head1 SYNOPSIS
40             #pod
41             #pod =for stopwords clearers dataflow injectAfter iterable mentorship queueing
42             #pod
43             #pod use Piper;
44             #pod
45             #pod my $pipeline = Piper->new(
46             #pod first_process => sub {
47             #pod my ($instance, $batch) = @_;
48             #pod $instance->emit( map { ... } @$batch );
49             #pod },
50             #pod second_processes => Piper->new(...),
51             #pod final_process => sub { ... },
52             #pod )->init;
53             #pod
54             #pod $pipeline->enqueue(@data);
55             #pod
56             #pod while ($pipeline->isnt_exhausted) {
57             #pod my $item = $pipeline->dequeue;
58             #pod ...
59             #pod }
60             #pod
61             #pod =head1 DESCRIPTION
62             #pod
63             #pod The software engineering concept known as a pipeline is a chain of processing segments, arranged such that the output of each segment is the input of the next.
64             #pod
65             #pod L is a pipeline builder. It composes arbitrary processing segments into a single pipeline instance with the following features:
66             #pod
67             #pod =over
68             #pod
69             #pod =item *
70             #pod
71             #pod Pipeline instances are iterators, only processing data as needed.
72             #pod
73             #pod =item *
74             #pod
75             #pod Data is automatically processed in batches for each segment (with configurable batch sizes).
76             #pod
77             #pod =item *
78             #pod
79             #pod Built-in support exists for non-linear and/or recursive pipelines.
80             #pod
81             #pod =item *
82             #pod
83             #pod Processing segments are pluggable and reusable.
84             #pod
85             #pod =back
86             #pod
87             #pod =head1 CONSTRUCTOR
88             #pod
89             #pod =head2 new(@segments)
90             #pod
91             #pod Create a container pipeline segment (parent) from the provided child C<@segments>.
92             #pod
93             #pod Additionally, a single hashref of attributes for the container/parent segment may be included as an argument to the constructor (anywhere in the argument list). See the L section for a description of attributes available for both parent and child segments.
94             #pod
95             #pod Accepted segment types are as follows:
96             #pod
97             #pod =over
98             #pod
99             #pod =item L object
100             #pod
101             #pod Creates a sub-container of pipeline segments. There is no (explicit) limit to the number of nested containers a pipeline may contain.
102             #pod
103             #pod =item L object
104             #pod
105             #pod See the L section for a description of L objects.
106             #pod
107             #pod =item A coderef (which will be coerced into a L object).
108             #pod
109             #pod =item A hashref that can be coerced into a L object.
110             #pod
111             #pod In order to be considered a candidate for coercion, the hashref must contain (at a minimum) the 'handler' key.
112             #pod
113             #pod =item L object
114             #pod
115             #pod In this case, the associated L or L object is extracted from the L object for use in the new pipeline segment.
116             #pod
117             #pod See L for a description of L objects.
118             #pod
119             #pod =item A C<< $label => $segment >> pair
120             #pod
121             #pod For such pairs, the C<$segment> can be any of the above segment types, and C<$label> is a simple scalar which will be used as C<$segment>'s label.
122             #pod
123             #pod If the C<$segment> already has a label, C<$label> will override it.
124             #pod
125             #pod =back
126             #pod
127             #pod =head2 Constructor Example
128             #pod
129             #pod my $pipe = Piper->new(
130             #pod \%main_opts,
131             #pod subpipe_label => Piper->new(
132             #pod first_handler => Piper::Process->new(sub { ... }),
133             #pod second_handler => sub { ... },
134             #pod third_handler => {
135             #pod handler => sub { ... },
136             #pod },
137             #pod another_subpipe => Piper->new(...),
138             #pod \%subpipe_opts,
139             #pod ),
140             #pod Piper::Process->new({
141             #pod label => 'another_handler',
142             #pod handler => sub { ... },
143             #pod }),
144             #pod sub {
145             #pod # An un-labeled handler
146             #pod ...
147             #pod },
148             #pod {
149             #pod label => 'final_handler',
150             #pod handler => sub { ... },
151             #pod },
152             #pod );
153             #pod
154             #pod =head1 INITIALIZATION
155             #pod
156             #pod Piper segments were designed to be easily reusable. Prior to initialization, L and L objects do not process data; they simply contain the blueprint for creating the pipeline. As such, blueprints for commonly-used pipeline segments can be stored in package libraries and imported wherever needed.
157             #pod
158             #pod To create a functioning pipeline from one such blueprint, simply call the C method on the outermost segment. The C method returns a L object of the outermost segment, which is the realization of the pipeline design, and which contains L objects created from all its contained segments.
159             #pod
160             #pod Initialization fuses the pipeline segments together, establishes the relationships between the segments, and initializes the dataflow infrastructure.
161             #pod
162             #pod The C method may be chained from the constructor if the blueprint object is not needed:
163             #pod
164             #pod my $instance = Piper->new(...)->init;
165             #pod
166             #pod Any arguments passed to the C method will be cached and made available to each handler in the pipeline (see the L section for full description of handlers). This is a great way to share a resource (such as a database handle) among process handlers.
167             #pod
168             #pod my $pipe = Piper->new(
169             #pod query => sub {
170             #pod my ($instance, $batch, $dbh) = @_;
171             #pod $instance->emit(
172             #pod $dbh->do_query(@$batch)
173             #pod );
174             #pod },
175             #pod ...
176             #pod );
177             #pod my $instance = $pipe->init($dbh);
178             #pod
179             #pod Instances are ready to accept data for processing:
180             #pod
181             #pod $instance->enqueue(@data);
182             #pod while ($instance->isnt_exhausted) {
183             #pod my $result = $instance->dequeue;
184             #pod }
185             #pod
186             #pod =head1 PROCESS HANDLER
187             #pod
188             #pod L objects have the same L as L objects, but have an additional required attribute known as its C.
189             #pod
190             #pod A process C is the data-processing subroutine for the segment.
191             #pod
192             #pod In its simplest form, the process handler takes input from the previous pipeline segment, processes it, and passes it on to the next segment; but handlers also have built-in support for non-linear and recursive dataflow (see L).
193             #pod
194             #pod The arguments provided to the C subroutine are:
195             #pod
196             #pod =over
197             #pod
198             #pod =item C<$instance>
199             #pod
200             #pod The instance (a L object) corresponding to the segment.
201             #pod
202             #pod =item C<$batch>
203             #pod
204             #pod An arrayref of data items to process.
205             #pod
206             #pod =item C<@args>
207             #pod
208             #pod Any arguments provided to the C method during the L of the pipeline.
209             #pod
210             #pod =back
211             #pod
212             #pod After processing a batch of data, the C may pass the results to the next segment using the C method called from the handler's C<$instance>.
213             #pod
214             #pod =head2 Example:
215             #pod
216             #pod sub {
217             #pod my ($instance, $batch) = @_;
218             #pod $instance->emit( map { ... } @$batch );
219             #pod }
220             #pod
221             #pod =head1 FLOW CONTROL
222             #pod
223             #pod Since L has built-in support for non-linear and/or recursive pipelines, a L may send data to any other segment in the pipeline, including itself.
224             #pod
225             #pod The following methods may be called from the C<$instance> object passed as the first argument to a C:
226             #pod
227             #pod =head2 emit(@data)
228             #pod
229             #pod Send C<@data> to the next segment in the pipeline. If the instance is the last in the pipeline, emits to the drain, making the C<@data> ready for C.
230             #pod
231             #pod =head2 recycle(@data)
232             #pod
233             #pod Re-queue C<@data> to the top of the current segment in an order such that C would subsequently return C<$data[0]> and so forth.
234             #pod
235             #pod =head2 injectAt($location, @data)
236             #pod
237             #pod =head2 injectAfter($location, @data)
238             #pod
239             #pod Send C<@data> to the segment I or I the specified C<$location>.
240             #pod
241             #pod For each of the above methods, C<$location> must be the label of a segment in the pipeline or a path-like representation of an hierarchy of labels.
242             #pod
243             #pod For example, in the following pipeline, a few possible C<$location> values include C, C, or C
.
244             #pod
245             #pod my $pipe = Piper->new(
246             #pod { label => 'main' },
247             #pod subpipe => Piper->new(
248             #pod a => sub { ... },
249             #pod b => sub { ... },
250             #pod c => sub { ... },
251             #pod ),
252             #pod );
253             #pod
254             #pod If a label is unique within the pipeline, only the label is required. For non-unique labels, searches are performed in a nearest-neighbor, depth-first manner.
255             #pod
256             #pod For example, in the following pipeline, searching for C from the handler of C would find C
, not C
. So to reach C
from C, the handler would need to search for C
.
257             #pod
258             #pod my $pipe = Piper->new(
259             #pod { label => 'main' },
260             #pod pipeA => Piper->new(
261             #pod processA => sub { ... },
262             #pod processB => sub { ... },
263             #pod ),
264             #pod processA => sub { ... },
265             #pod );
266             #pod
267             #pod =head2 inject(@data)
268             #pod
269             #pod If the segment has a parent, enqueues C<@data> to its parent. Otherwise, enqueues C<@data> to itself.
270             #pod
271             #pod =head2 eject(@data)
272             #pod
273             #pod If the segment has a parent, send C<@data> to the drain of its parent. Otherwise, enqueues C<@data> to the segment's drain.
274             #pod
275             #pod =head1 SEGMENT ATTRIBUTES
276             #pod
277             #pod All of the following attributes are available for both container (L) and processor (L) segment types.
278             #pod
279             #pod Each attribute is equipped with an accessor of the same name.
280             #pod
281             #pod A star (*) indicates that the attribute is writable, and can be modified at runtime by passing a value as an argument to the method of the same name.
282             #pod
283             #pod All attributes (except C
284             #pod
285             #pod All writable attributes (indicated by *) can be cleared by passing an explicit C to the writer method or by calling the appropriate clearer method called C.
286             #pod
287             #pod All accessors, writers, predicates, and clearers are available for each segment before and after L.
288             #pod
289             #pod =head2 allow
290             #pod
291             #pod A coderef which can be used to subset the items which are I to be processed by the segment.
292             #pod
293             #pod The coderef executes on each item attempting to queue to the segment. If it returns true, the item is queued. Otherwise, the item skips the segment and proceeds to the next adjacent segment.
294             #pod
295             #pod Each item is localized to C<$_>, and is also passed in as the first argument.
296             #pod
297             #pod These example C subroutines are equivalent:
298             #pod
299             #pod # This segment only accepts digit inputs
300             #pod allow => sub { /^\d+$/ }
301             #pod allow => sub { $_ =~ /^\d+$/ }
302             #pod allow => sub { $_[0] =~ /^\d+$/ }
303             #pod
304             #pod =head2 *batch_size
305             #pod
306             #pod The number of items to process at a time for the segment.
307             #pod
308             #pod Once initialized (see L), a segment inherits the C of any existing parent(s) if not provided. If the segment has no parents, or if none of the parents have a C defined, the default C will be used. The default C is 200, but can be configured in the import statement (see the L section).
309             #pod
310             #pod =head2 *debug
311             #pod
312             #pod The debug level for the segment.
313             #pod
314             #pod Once initialized (see L), a segment inherits the debug level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable C.
315             #pod
316             #pod See the L section for specifics about debug and verbosity levels.
317             #pod
318             #pod =head2 *enabled
319             #pod
320             #pod A boolean indicating that the segment is enabled and can accept items for processing.
321             #pod
322             #pod Once initialized (see L), a segment inherits this attribute from any existing parent(s). The default is true.
323             #pod
324             #pod If a segment is disabled (C), all items attempting to queue to the segment are forwarded to the next adjacent segment.
325             #pod
326             #pod =head2 label
327             #pod
328             #pod A label for the segment. If no label is provided, a globally unique ID will be used.
329             #pod
330             #pod Labels are necessary for certain types of L (for example, L or L). For pipelines that do not utilize L features, labels are primarily useful for L.
331             #pod
332             #pod =head2 *verbose
333             #pod
334             #pod The verbosity level for the segment.
335             #pod
336             #pod Once initialized (see L), a segment inherits the verbosity level of any existing parent(s) if not specified. The default level is 0, but can be globally overridden by the environment variable C.
337             #pod
338             #pod See the L section for specifics about debug and verbosity levels.
339             #pod
340             #pod =head2 INSTANCE ATTRIBUTES
341             #pod
342             #pod The following attributes have read-only accessors (of the same name).
343             #pod
344             #pod =head3 children
345             #pod
346             #pod For container instances (made from L objects, not L objects), holds an arrayref of the contained instance objects.
347             #pod
348             #pod =head3 main
349             #pod
350             #pod For any instance in the pipeline, this attribute holds a reference to the outermost container instance.
351             #pod
352             #pod =head3 parent
353             #pod
354             #pod For all instances in the pipeline except the outermost container (C
), this attribute holds a reference to the instance's immediate container segment.
355             #pod
356             #pod =head3 path
357             #pod
358             #pod The full path to the instance, built as the concatenation of all the parent(s) labels and the instance's label, joined by C. Instances stringify to this attribute.
359             #pod
360             #pod =head2 INSTANCE METHODS
361             #pod
362             #pod Methods marked with a (*) should only be called from the outermost instance.
363             #pod
364             #pod =head3 *dequeue([$num])
365             #pod
366             #pod Remove at most C<$num> S<(default 1)> processed items from the end of the pipeline.
367             #pod
368             #pod =head3 *enqueue(@data)
369             #pod
370             #pod Queue C<@data> for processing by the pipeline.
371             #pod
372             #pod =head3 find_segment($location)
373             #pod
374             #pod Find and return the segment instance according to C<$location>, which can be a label or a path-like hierarchy of labels. See L for a detailed description of C<$location>.
375             #pod
376             #pod =head3 *flush
377             #pod
378             #pod Process batches until there are no more items pending.
379             #pod
380             #pod =head3 has_children
381             #pod
382             #pod A boolean indicating whether the instance has any children.
383             #pod
384             #pod =head3 has_parent
385             #pod
386             #pod A boolean indicating whether the instance has a parent.
387             #pod
388             #pod =head3 has_pending
389             #pod
390             #pod Returns a boolean indicating whether there are any items that are queued at some level of the segment but have not completed processing.
391             #pod
392             #pod =head3 *is_exhausted
393             #pod
394             #pod Returns a boolean indicating whether there are any items left to process or dequeue.
395             #pod
396             #pod =head3 *isnt_exhausted
397             #pod
398             #pod Returns the opposite of C.
399             #pod
400             #pod =head3 next_segment
401             #pod
402             #pod Returns the next adjacent segment from the calling segment. Returns undef for the outermost container.
403             #pod
404             #pod =head3 pending
405             #pod
406             #pod Returns the number of items that are queued at some level of the pipeline segment but have not completed processing.
407             #pod
408             #pod =head3 *prepare([$num])
409             #pod
410             #pod Process batches while data is still C until at least C<$num> S<(default 1)> items are C for C.
411             #pod
412             #pod =head3 ready
413             #pod
414             #pod Returns the number of items that have finished processing and are ready for C from the pipeline segment.
415             #pod
416             #pod =head1 GLOBAL CONFIGURATION
417             #pod
418             #pod The following global attributes are configurable from the Piper import statement.
419             #pod
420             #pod Ex:
421             #pod # Change the default batch_size to 50
422             #pod use Piper batch_size => 50;
423             #pod
424             #pod =head2 batch_size
425             #pod
426             #pod The default batch size used by pipeline segments which do not have a locally defined C and do not have a parent segment with a defined C.
427             #pod
428             #pod The C attribute must be a positive integer.
429             #pod
430             #pod The default C is 200.
431             #pod
432             #pod =head1 LOGGING AND DEBUGGING
433             #pod
434             #pod Logging and debugging facilities are available upon L of a pipeline.
435             #pod
436             #pod Warnings and errors are issued regardless of debug and verbosity levels via C and C from the L module, and are therefore configurable with any of L's global options or environment variables.
437             #pod
438             #pod Debugging and/or informational messages are printed to STDERR if debug and/verbosity levels have been set. There are three levels used by L for each of C/C: S<0, 1, or 2>. The default is S<0 (off)>.
439             #pod
440             #pod =head2 Levels
441             #pod
442             #pod Levels can be set by any of the following mechanisms: at construction of the L/L objects, dynamically via the C and C methods of segments, or with the environment variables C and C.
443             #pod
444             #pod Levels can be set local to specific segments. The default levels of a sub-segment are inherited from its parent.
445             #pod
446             #pod Ex:
447             #pod # main verbose => 0 (default)
448             #pod # main/subpipe verbose => 1
449             #pod # main/subpipe/normal verbose => 1 (inherited)
450             #pod # main/subpipe/loud verbose => 2
451             #pod # main/subpipe/quiet verbose => 0
452             #pod
453             #pod my $pipe = Piper->new(
454             #pod { label => 'main' },
455             #pod subpipe => Piper->new(
456             #pod { verbose => 1 },
457             #pod normal => sub {...},
458             #pod loud => {
459             #pod verbose => 2,
460             #pod handler => sub {...},
461             #pod },
462             #pod quiet => {
463             #pod verbose => 0,
464             #pod handler => sub {...},
465             #pod },
466             #pod ),
467             #pod );
468             #pod
469             #pod Levels set via the environment variables C and C are global. If set, these environment variables override any and all settings defined in the source code.
470             #pod
471             #pod =head2 Messages
472             #pod
473             #pod All messages include information about the segment which called the logger.
474             #pod
475             #pod Existing informational (C or S<< C > 0 >>) messages describe data processing steps, such as noting when items are queueing or being processed by specific segments. Increasing S 1> simply adds more detail to the printed messages.
476             #pod
477             #pod Existing debug messages describe the decision actions of the pipeline engine itself. Examples include logging its search steps when locating a named segment or explaining how it chooses which batch to process. Increasing the debug S<< level > 1 >> simply adds more detail to the printed messages.
478             #pod
479             #pod =head2 Custom messaging
480             #pod
481             #pod User-defined errors, warnings, and debug or informational messages can use the same logging system as L itself.
482             #pod
483             #pod The first argument passed to a L is the L object associated with that segment, which has the below-described methods available for logging, debugging, warning, or throwing errors.
484             #pod
485             #pod In each of the below methods, the C<@items> are optional and only printed if the verbosity level for the segment S<< is > 1 >>. They can be used to pass additional context or detail about the data being processed or which caused the message to print (for conditional messages).
486             #pod
487             #pod The built-in messaging only uses debug/verbosity levels S<1 and 2>, but there are no explicit rules enforced on maximum debug/verbosity levels, so users may explicitly require higher levels for custom messages to heighten the required levels for any custom message.
488             #pod
489             #pod =head3 ERROR($message, [@items])
490             #pod
491             #pod Throws an error with C<$message> via C.
492             #pod
493             #pod =head3 WARN($message, [@items])
494             #pod
495             #pod Issues a warning with C<$message> via C.
496             #pod
497             #pod =head3 INFO($message, [@items])
498             #pod
499             #pod Prints an informational C<$message> to STDERR if either the debug or verbosity level for the segment S<< is > 0 >>.
500             #pod
501             #pod =head3 DEBUG($message, [@items])
502             #pod
503             #pod Prints a debug C<$message> to STDERR if the debug level for the segment S<< is > 0 >>.
504             #pod
505             #pod =head3 Example:
506             #pod
507             #pod my $pipe = Piper->new(
508             #pod messenger => sub {
509             #pod my ($instance, $batch) = @_;
510             #pod for my $data (@$batch) {
511             #pod if ($data->is_bad) {
512             #pod $instance->ERROR("Data <$data> is bad!");
513             #pod }
514             #pod }
515             #pod # User-heightened verbosity level
516             #pod $instance->INFO('Data all good!', @$batch)
517             #pod if $instance->verbose > 2;
518             #pod ...
519             #pod },
520             #pod ...
521             #pod );
522             #pod
523             #pod =head1 ACKNOWLEDGEMENTS
524             #pod
525             #pod Much of the concept and API for this project was inspired by the work of L.
526             #pod
527             #pod Special thanks to L for his encouragement and mentorship.
528             #pod
529             #pod =cut
530              
531             around BUILDARGS => sub {
532             my ($orig, $self, @args) = @_;
533              
534             my $opts;
535             my @children;
536             my $label;
537             for my $i (0..$#args) {
538             # Label
539             if (!ref $args[$i]) {
540             croak 'ERROR: Label ('.($label // $args[$i]).') missing a segment'
541             if defined $label or !exists $args[$i+1];
542             $label = $args[$i];
543             next;
544             }
545              
546             # Options hash
547             if (!defined $opts and ref $args[$i] eq 'HASH'
548             # Options should not be labeled
549             and !defined $label
550             # Options shouldn't have a handler
551             and !exists $args[$i]->{handler}
552             ) {
553             $opts = $args[$i];
554             next;
555             }
556              
557             # Segment
558             my $thing = $args[$i];
559             if (eval { $thing->isa('Piper') }
560             or eval { $thing->isa('Piper::Process') }
561             ) {
562             $thing->_set_label($label) if $label;
563             push @children, $thing;
564             }
565             elsif (eval { $thing->isa('Piper::Instance') }) {
566             $thing = $thing->segment;
567             $thing->_set_label($label) if $label;
568             push @children, $thing;
569             }
570             elsif ((ref $thing eq 'CODE') or (ref $thing eq 'HASH')) {
571             croak 'ERROR: Segment is missing a handler [ '
572             . ($label ? "label => $label" : "position => $i") . ' ]'
573             if ref $thing eq 'HASH' and !exists $thing->{handler};
574              
575             $thing = Piper::Process->new(
576             ($label ? $label : ()),
577             $thing
578             );
579             push @children, $thing;
580             }
581             else {
582             croak 'ERROR: Cannot coerce type ('.(ref $thing).') into a segment [ '
583             . ($label ? "label => $label" : "position => $i") . ' ]';
584             }
585              
586             undef $label;
587             }
588              
589             croak 'ERROR: No segments provided to constructor' unless @children;
590              
591             $opts->{config} = $CONFIG if defined $CONFIG;
592              
593             return $self->$orig(
594             %$opts,
595             children => \@children,
596             );
597             };
598              
599             has children => (
600             is => 'rwp',
601             # Force to contain at least one child
602             isa => Tuple[ConsumerOf['Piper::Role::Segment'],
603             slurpy ArrayRef[ConsumerOf['Piper::Role::Segment']]
604             ],
605             required => 1,
606             );
607              
608             sub init {
609             my $self = shift;
610              
611             my $instance = Piper::Instance->new(
612             segment => $self,
613             children => [
614             map { $_->init } @{$self->children}
615             ],
616             );
617              
618             # Set parents for children
619             for my $child (@{$instance->children}) {
620             $child->_set_parent($instance);
621             }
622              
623             return $instance;
624             }
625              
626             1;
627              
628             __END__