File Coverage

lib/DataFlow.pm
Criterion Covered Total %
statement 7 9 77.7
branch n/a
condition n/a
subroutine 3 3 100.0
pod n/a
total 10 12 83.3


line stmt bran cond sub pod time code
1             package DataFlow;
2              
3 1     1   8561 use strict;
  1         4  
  1         51  
4 1     1   6 use warnings;
  1         3  
  1         97  
5              
6             # ABSTRACT: A framework for dataflow processing
7              
8             our $VERSION = '1.121830'; # VERSION
9              
10 1     1   822 use Moose;
  0            
  0            
11             use Moose::Exporter;
12             with 'DataFlow::Role::Processor';
13             with 'DataFlow::Role::Dumper';
14              
15             use DataFlow::Types qw(WrappedProcList);
16              
17             use Moose::Autobox;
18              
19             use namespace::autoclean;
20             use Queue::Base 2.1;
21              
22             with 'MooseX::OneArgNew' => { 'type' => 'Str', 'init_arg' => 'procs', };
23             with 'MooseX::OneArgNew' => { 'type' => 'ArrayRef', 'init_arg' => 'procs', };
24             with 'MooseX::OneArgNew' => { 'type' => 'CodeRef', 'init_arg' => 'procs', };
25             with 'MooseX::OneArgNew' => { 'type' => 'DataFlow', 'init_arg' => 'procs', };
26             with 'MooseX::OneArgNew' =>
27             { 'type' => 'DataFlow::Proc', 'init_arg' => 'procs', };
28              
29             Moose::Exporter->setup_import_methods( as_is => ['dataflow'] );
30              
31             # attributes
32             has 'default_channel' => (
33             'is' => 'ro',
34             'isa' => 'Str',
35             'lazy' => 1,
36             'default' => 'default',
37             );
38              
39             has 'auto_process' => (
40             'is' => 'ro',
41             'isa' => 'Bool',
42             'lazy' => 1,
43             'default' => 1,
44             );
45              
46             has '_procs' => (
47             'is' => 'ro',
48             'isa' => 'WrappedProcList',
49             'required' => 1,
50             'coerce' => 1,
51             'builder' => '_build_procs',
52             'init_arg' => 'procs',
53             );
54              
55             has '_queues' => (
56             'is' => 'ro',
57             'isa' => 'ArrayRef[Queue::Base]',
58             'lazy' => 1,
59             'default' => sub { return shift->_make_queues(); },
60             'handles' => {
61             '_firstq' => sub { return shift->_queues->[0] },
62             'has_queued_data' =>
63             sub { return _count_queued_items( shift->_queues ) },
64             '_make_queues' => sub {
65             shift->_procs->map( sub { Queue::Base->new() } );
66             },
67             },
68             );
69              
70             has '_lastq' => (
71             'is' => 'ro',
72             'isa' => 'Queue::Base',
73             'lazy' => 1,
74             'default' => sub { return Queue::Base->new },
75             );
76              
77             ##############################################################################
78              
79             sub _build_procs {
80             return;
81             }
82              
83             sub procs {
84             return @{ [ shift->_procs ]->map( sub { $_->on_proc } ) };
85             }
86              
87             # functions
88             sub _count_queued_items {
89             my $q = shift;
90             my $count = 0;
91              
92             $q->map( sub { $count = $count + $_->size } );
93              
94             return $count;
95             }
96              
97             sub _process_queues {
98             my ( $proc, $inputq, $outputq ) = @_;
99              
100             my $item = $inputq->remove;
101             my @res = $proc->process($item);
102             $outputq->add(@res);
103             return;
104             }
105              
106             sub _reduce {
107             my ( $p, @q ) = @_;
108             [ 0 .. $#q - 1 ]
109             ->map( sub { _process_queues( $p->[$_], $q[$_], $q[ $_ + 1 ] ) } );
110             return;
111             }
112              
113             # methods
114             sub clone {
115             my $self = shift;
116             return DataFlow->new( procs => $self->_procs );
117             }
118              
119             sub channel_input {
120             my ( $self, $channel, @args ) = @_;
121             $self->prefix_dumper( $self->has_name ? $self->name . ' <<' : '<<', @args )
122             if $self->dump_input;
123              
124             $self->_firstq->add(
125             @{ @args->map( sub { DataFlow::Item->itemize( $channel, $_ ) } ) } );
126             return;
127             }
128              
129             sub input {
130             my ( $self, @args ) = @_;
131             $self->channel_input( $self->default_channel, @args );
132             return;
133             }
134              
135             sub process_input {
136             my $self = shift;
137             my @q = ( @{ $self->_queues }, $self->_lastq );
138             _reduce( $self->_procs, @q );
139             return;
140             }
141              
142             sub _unitem {
143             my ( $item, $channel ) = @_;
144             return unless defined $item;
145             return $item->get_data($channel);
146             }
147              
148             sub _output_items {
149             my $self = shift;
150             $self->process_input if ( $self->_lastq->empty && $self->auto_process );
151             return wantarray ? $self->_lastq->remove_all : $self->_lastq->remove;
152             }
153              
154             sub output_items {
155             my $self = shift;
156             my @res = wantarray ? $self->_output_items : scalar( $self->_output_items );
157             $self->prefix_dumper( $self->has_name ? $self->name . ' >>' : '>>', @res )
158             if $self->dump_output;
159             return wantarray ? @res : $res[0];
160             }
161              
162             sub output {
163             my $self = shift;
164             my $channel = shift || $self->default_channel;
165              
166             my @res = wantarray ? $self->_output_items : scalar( $self->_output_items );
167             $self->prefix_dumper( $self->has_name ? $self->name . ' >>' : '>>', @res )
168             if $self->dump_output;
169             return wantarray
170             ? @{ @res->map( sub { _unitem( $_, $channel ) } ) }
171             : _unitem( $res[0], $channel );
172             }
173              
174             sub reset { ## no critic
175             return shift->_queues->map( sub { $_->clear } );
176             }
177              
178             sub flush {
179             my $self = shift;
180             while ( $self->has_queued_data ) {
181             $self->process_input;
182             }
183             return $self->output;
184             }
185              
186             sub process {
187             my ( $self, @args ) = @_;
188              
189             my $flow = $self->clone();
190             $flow->input(@args);
191             return $flow->flush;
192             }
193              
194             sub proc_by_index {
195             my ( $self, $index ) = @_;
196             return unless $self->_procs->[$index];
197             return $self->_procs->[$index]->on_proc;
198             }
199              
200             sub proc_by_name {
201             my ( $self, $name ) = @_;
202             return $self->_procs->map( sub { $_->on_proc } )
203             ->grep( sub { $_->name eq $name } )->[0];
204             }
205              
206             sub dataflow (@) { ## no critic
207             my @args = @_;
208             return __PACKAGE__->new( procs => [@args] );
209             }
210              
211             __PACKAGE__->meta->make_immutable;
212              
213             1;
214              
215              
216              
217             __END__
218             =pod
219              
220             =encoding utf-8
221              
222             =head1 NAME
223              
224             DataFlow - A framework for dataflow processing
225              
226             =head1 VERSION
227              
228             version 1.121830
229              
230             =head1 SYNOPSIS
231              
232             use DataFlow;
233              
234             my $flow = DataFlow->new(
235             procs => [
236             DataFlow::Proc->new( p => sub { do this thing } ), # a Proc
237             sub { ... do something }, # a code ref
238             'UC', # named Proc
239             [ # named Proc, with parameters
240             CSV => {
241             direction => 'CONVERT_TO',
242             text_csv_opts => { binary => 1 },
243             }
244             ],
245             # named Proc, named "Proc"
246             [ Proc => { p => sub { do this other thing }, deref => 1 } ],
247             DataFlow->new( ... ), # another flow
248             ]
249             );
250              
251             $flow->input( <some input> );
252             my $output = $flow->output();
253              
254             my $output = $flow->output( <some other input> );
255              
256             # other ways to invoke the constructor
257             my $flow = DataFlow->new( sub { .. do something } ); # pass a sub
258             my $flow = DataFlow->new( [ # pass an array
259             sub { ... do this },
260             'UC',
261             [
262             HTMLFilter => (
263             search_xpath => '//td',
264             result_type => 'VALUE'
265             )
266             ]
267             ] );
268             my $flow = DataFlow->new( $another_flow ); # pass another DataFlow or Proc
269              
270             # other way to pass the data through
271             my $output = $flow->process( qw/long list of data/ );
272              
273             =head1 DESCRIPTION
274              
275             A C<DataFlow> object is able to accept data, feed it into an array of
276             processors (L<DataFlow::Proc> objects), and return the result(s) back to the
277             caller.
278              
279             =head1 ATTRIBUTES
280              
281             =head2 name
282              
283             (Str) A descriptive name for the dataflow. (OPTIONAL)
284              
285             =head2 default_channel
286              
287             (Str) The name of the default communication channel. (DEFAULT: 'default')
288              
289             =head2 auto_process
290              
291             (Bool) If there is data available in the output queue, and one calls the
292             C<output()> method, this attribute will flag whether the dataflow should
293             attempt to automatically process queued data. (DEFAULT: true)
294              
295             =head2 procs
296              
297             (ArrayRef[DataFlow::Role::Processor]) The list of processors that make this
298             DataFlow. Optionally, you may pass CodeRefs that will be automatically
299             converted to L<DataFlow::Proc> objects. (REQUIRED)
300              
301             The C<procs> parameter will accept some variations in its value. Any
302             C<ArrayRef> passed will be parsed, and additionaly to plain
303             C<DataFlow::Proc> objects, it will accept: C<DataFlow> objects (so one can
304             nest flows), code references (C<sub{}> blocks), array references and plain
305             text strings. Refer to L<DataFlow::Types> for more information on these
306             different forms of passing the C<procs> parameter.
307              
308             Additionally, one may pass any of these forms as a single argument to the
309             constructor C<new>, plus a single C<DataFlow>, or C<DataFlow:Proc> or string.
310              
311             =head1 METHODS
312              
313             =head2 has_queued_data
314              
315             Returns true if the dataflow contains any queued data within.
316              
317             =head2 clone
318              
319             Returns another instance of a C<DataFlow> using the same array of processors.
320              
321             =head2 input
322              
323             Accepts input data for the data flow. It will gladly accept anything passed as
324             parameters. However, it must be noticed that it will not be able to make a
325             distinction between arrays and hashes. Both forms below will render the exact
326             same results:
327              
328             $flow->input( qw/all the simple things/ );
329             $flow->input( all => 'the', simple => 'things' );
330              
331             If you do want to handle arrays and hashes differently, we strongly suggest
332             that you use references:
333              
334             $flow->input( [ qw/all the simple things/ ] );
335             $flow->input( { all => the, simple => 'things' } );
336              
337             Processors using the L<DataFlow::Policy::ProcessInto> policy (default) will
338             process the items inside an array reference, and the values (not the keys)
339             inside a hash reference.
340              
341             =head2 channel_input
342              
343             Accepts input data into a specific channel for the data flow:
344              
345             $flow->channel_input( 'mydatachannel', qw/all the simple things/ );
346              
347             =head2 process_input
348              
349             Processes items in the array of queues and place at least one item in the
350             output (last) queue. One will typically call this to flush out some unwanted
351             data and/or if C<auto_process> has been disabled.
352              
353             =head2 output_items
354              
355             Fetches items, more specifically objects of the type L<DataFlow::Item>, from
356             the data flow. If called in scalar context it will return one processed item
357             from the flow. If called in list context it will return all the items from
358             the last queue.
359              
360             =head2 output
361              
362             Fetches data from the data flow. It accepts a parameter that points from which
363             data channel the data must be fetched. If no channel is specified, it will
364             default to the 'default' channel.
365             If called in scalar context it will return one processed item from the flow.
366             If called in list context it will return all the elements in the last queue.
367              
368             =head2 reset
369              
370             Clears all data in the dataflow and makes it ready for a new run.
371              
372             =head2 flush
373              
374             Flushes all the data through the dataflow, and returns the complete result set.
375              
376             =head2 process
377              
378             Immediately processes a bunch of data, without touching the object queues. It
379             will process all the provided data and return the complete result set for it.
380              
381             =head2 proc_by_index
382              
383             Expects an index (Num) as parameter. Returns the index-th processor in this
384             data flow, or C<undef> otherwise.
385              
386             =head2 proc_by_name
387              
388             Expects a name (Str) as parameter. Returns the first processor in this
389             data flow, for which the C<name> attribute has the same value of the C<name>
390             parameter, or C<undef> otherwise.
391              
392             =head1 FUNCTIONS
393              
394             =head2 dataflow
395              
396             Syntax sugar function that can be used to instantiate a new flow. It can be
397             used like this:
398              
399             my $flow = dataflow
400             [ 'Proc' => p => sub { ... } ],
401             ...
402             [ 'CSV' => direction => 'CONVERT_TO' ];
403              
404             $flow->process('bananas');
405              
406             =head1 HISTORY
407              
408             This is a framework for data flow processing. It started as a spin-off project
409             from the L<OpenData-BR|http://www.opendatabr.org/> initiative.
410              
411             As of now (Mar, 2011) it is still a 'work in progress', and there is a lot of
412             progress to make. It is highly recommended that you read the tests, and the
413             documentation of L<DataFlow::Proc>, to start with.
414              
415             An article has been recently written in Brazilian Portuguese about this
416             framework, per the São Paulo Perl Mongers "Equinócio" (Equinox) virtual event.
417             Although an English version of the article in in the plans, you can figure
418             a good deal out of the original one at
419              
420             L<http://sao-paulo.pm.org/equinocio/2011/mar/5>
421              
422             B<UPDATE:> L<DataFlow> is a fast-evolving project, and this article, as
423             it was published there, refers to versions 0.91.x of the framework. There has
424             been a big refactor since then and, although the concept remains the same,
425             since version 0.950000 the programming interface has been changed violently.
426              
427             Any doubts, feel free to get in touch.
428              
429             =for :stopwords cpan testmatrix url annocpan anno bugtracker rt cpants kwalitee diff irc mailto metadata placeholders metacpan
430              
431             =head1 SUPPORT
432              
433             =head2 Perldoc
434              
435             You can find documentation for this module with the perldoc command.
436              
437             perldoc DataFlow
438              
439             =head2 Websites
440              
441             The following websites have more information about this module, and may be of help to you. As always,
442             in addition to those websites please use your favorite search engine to discover more resources.
443              
444             =over 4
445              
446             =item *
447              
448             Search CPAN
449              
450             The default CPAN search engine, useful to view POD in HTML format.
451              
452             L<http://search.cpan.org/dist/DataFlow>
453              
454             =item *
455              
456             AnnoCPAN
457              
458             The AnnoCPAN is a website that allows community annotations of Perl module documentation.
459              
460             L<http://annocpan.org/dist/DataFlow>
461              
462             =item *
463              
464             CPAN Ratings
465              
466             The CPAN Ratings is a website that allows community ratings and reviews of Perl modules.
467              
468             L<http://cpanratings.perl.org/d/DataFlow>
469              
470             =item *
471              
472             CPAN Forum
473              
474             The CPAN Forum is a web forum for discussing Perl modules.
475              
476             L<http://cpanforum.com/dist/DataFlow>
477              
478             =item *
479              
480             CPANTS
481              
482             The CPANTS is a website that analyzes the Kwalitee ( code metrics ) of a distribution.
483              
484             L<http://cpants.perl.org/dist/overview/DataFlow>
485              
486             =item *
487              
488             CPAN Testers
489              
490             The CPAN Testers is a network of smokers who run automated tests on uploaded CPAN distributions.
491              
492             L<http://www.cpantesters.org/distro/D/DataFlow>
493              
494             =item *
495              
496             CPAN Testers Matrix
497              
498             The CPAN Testers Matrix is a website that provides a visual overview of the test results for a distribution on various Perls/platforms.
499              
500             L<http://matrix.cpantesters.org/?dist=DataFlow>
501              
502             =back
503              
504             =head2 Email
505              
506             You can email the author of this module at C<RUSSOZ at cpan.org> asking for help with any problems you have.
507              
508             =head2 Internet Relay Chat
509              
510             You can get live help by using IRC ( Internet Relay Chat ). If you don't know what IRC is,
511             please read this excellent guide: L<http://en.wikipedia.org/wiki/Internet_Relay_Chat>. Please
512             be courteous and patient when talking to us, as we might be busy or sleeping! You can join
513             those networks/channels and get help:
514              
515             =over 4
516              
517             =item *
518              
519             irc.perl.org
520              
521             You can connect to the server at 'irc.perl.org' and join this channel: #sao-paulo.pm then talk to this person for help: russoz.
522              
523             =back
524              
525             =head2 Bugs / Feature Requests
526              
527             Please report any bugs or feature requests by email to C<bug-dataflow at rt.cpan.org>, or through
528             the web interface at L<http://rt.cpan.org/NoAuth/ReportBug.html?Queue=DataFlow>. You will be automatically notified of any
529             progress on the request by the system.
530              
531             =head2 Source Code
532              
533             The code is open to the world, and available for you to hack on. Please feel free to browse it and play
534             with it, or whatever. If you want to contribute patches, please send me a diff or prod me to pull
535             from your repository :)
536              
537             L<https://github.com/russoz/DataFlow>
538              
539             git clone https://github.com/russoz/DataFlow.git
540              
541             =head1 AUTHOR
542              
543             Alexei Znamensky <russoz@cpan.org>
544              
545             =head1 COPYRIGHT AND LICENSE
546              
547             This software is copyright (c) 2011 by Alexei Znamensky.
548              
549             This is free software; you can redistribute it and/or modify it under
550             the same terms as the Perl 5 programming language system itself.
551              
552             =head1 BUGS AND LIMITATIONS
553              
554             You can make new bug reports, and view existing ones, through the
555             web interface at L<http://rt.cpan.org>.
556              
557             =head1 DISCLAIMER OF WARRANTY
558              
559             BECAUSE THIS SOFTWARE IS LICENSED FREE OF CHARGE, THERE IS NO WARRANTY
560             FOR THE SOFTWARE, TO THE EXTENT PERMITTED BY APPLICABLE LAW. EXCEPT
561             WHEN OTHERWISE STATED IN WRITING THE COPYRIGHT HOLDERS AND/OR OTHER
562             PARTIES PROVIDE THE SOFTWARE "AS IS" WITHOUT WARRANTY OF ANY KIND,
563             EITHER EXPRESSED OR IMPLIED, INCLUDING, BUT NOT LIMITED TO, THE
564             IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
565             PURPOSE. THE ENTIRE RISK AS TO THE QUALITY AND PERFORMANCE OF THE
566             SOFTWARE IS WITH YOU. SHOULD THE SOFTWARE PROVE DEFECTIVE, YOU ASSUME
567             THE COST OF ALL NECESSARY SERVICING, REPAIR, OR CORRECTION.
568              
569             IN NO EVENT UNLESS REQUIRED BY APPLICABLE LAW OR AGREED TO IN WRITING
570             WILL ANY COPYRIGHT HOLDER, OR ANY OTHER PARTY WHO MAY MODIFY AND/OR
571             REDISTRIBUTE THE SOFTWARE AS PERMITTED BY THE ABOVE LICENCE, BE LIABLE
572             TO YOU FOR DAMAGES, INCLUDING ANY GENERAL, SPECIAL, INCIDENTAL, OR
573             CONSEQUENTIAL DAMAGES ARISING OUT OF THE USE OR INABILITY TO USE THE
574             SOFTWARE (INCLUDING BUT NOT LIMITED TO LOSS OF DATA OR DATA BEING
575             RENDERED INACCURATE OR LOSSES SUSTAINED BY YOU OR THIRD PARTIES OR A
576             FAILURE OF THE SOFTWARE TO OPERATE WITH ANY OTHER SOFTWARE), EVEN IF
577             SUCH HOLDER OR OTHER PARTY HAS BEEN ADVISED OF THE POSSIBILITY OF SUCH
578             DAMAGES.
579              
580             =cut
581