File Coverage

blib/lib/BusyBird/Flow.pm
Criterion Covered Total %
statement 51 52 98.0
branch 3 4 75.0
condition 3 3 100.0
subroutine 13 13 100.0
pod 3 3 100.0
total 73 75 97.3


line stmt bran cond sub pod time code
1             package BusyBird::Flow;
2 12     12   19047 use strict;
  12         17  
  12         382  
3 12     12   45 use warnings;
  12         19  
  12         277  
4 12     12   5975 use Async::Queue;
  12         12988  
  12         341  
5 12     12   362 use BusyBird::Log qw(bblog);
  12         21  
  12         523  
6 12     12   5944 use CPS qw(kforeach);
  12         36907  
  12         724  
7 12     12   81 use Carp;
  12         13  
  12         560  
8 12     12   56 use Scalar::Util qw(weaken);
  12         18  
  12         454  
9 12     12   505 use Try::Tiny;
  12         875  
  12         4904  
10              
11             sub new {
12 230     230 1 332 my ($class) = @_;
13 230         923 my $self = bless {
14             filters => [],
15             }, $class;
16 230         715 $self->{queue} = $self->_create_queue();
17 230         16621 return $self;
18             }
19              
20             sub _create_queue {
21 230     230   307 my ($self) = @_;
22 230         800 weaken $self;
23             return Async::Queue->new(concurrency => 1, worker => sub {
24 183     183   10259 my ($data, $done) = @_;
25             kforeach $self->{filters}, sub {
26 89         4740 my ($filter, $knext) = @_;
27             try {
28             $filter->($data, sub {
29 87         21612 my ($result) = @_;
30 87 100 100     537 if(ref($result) && ref($result) eq 'ARRAY') {
31 71         103 $data = $result;
32             }else {
33 16         69 bblog('warn', 'The filter did not return an array-ref. Ignored.');
34             }
35 87         276 $knext->();
36 89         2967 });
37             }catch {
38 2         49 my ($e) = @_;
39 2         11 bblog('error', "Filter dies: $e");
40 2         10 $knext->();
41 89         610 };
42             }, sub {
43 183         11003 $done->($data);
44 183         1726 };
45 230         1898 });
46             }
47              
48             sub add {
49 57     57 1 91 my ($self, $async_filter) = @_;
50 57 50       198 if($self->{queue}->running) {
51 0         0 croak "You cannot add a filter while there is a status running in it."
52             }
53 57         294 push(@{$self->{filters}}, $async_filter);
  57         182  
54             }
55              
56             sub execute {
57 183     183 1 419 my ($self, $data, $callback) = @_;
58 183         980 $self->{queue}->push($data, $callback);
59             }
60              
61              
62             1;
63              
64             __END__