File Coverage

blib/lib/BusyBird/Flow.pm
Criterion Covered Total %
statement 51 52 98.0
branch 3 4 75.0
condition 3 3 100.0
subroutine 13 13 100.0
pod 3 3 100.0
total 73 75 97.3


line stmt bran cond sub pod time code
1             package BusyBird::Flow;
2 12     12   22864 use strict;
  12         16  
  12         355  
3 12     12   41 use warnings;
  12         15  
  12         241  
4 12     12   5303 use Async::Queue;
  12         11725  
  12         306  
5 12     12   405 use BusyBird::Log qw(bblog);
  12         13  
  12         475  
6 12     12   5433 use CPS qw(kforeach);
  12         33608  
  12         661  
7 12     12   65 use Carp;
  12         15  
  12         502  
8 12     12   50 use Scalar::Util qw(weaken);
  12         13  
  12         398  
9 12     12   506 use Try::Tiny;
  12         902  
  12         4425  
10              
11             sub new {
12 230     230 1 471 my ($class) = @_;
13 230         888 my $self = bless {
14             filters => [],
15             }, $class;
16 230         637 $self->{queue} = $self->_create_queue();
17 230         16490 return $self;
18             }
19              
20             sub _create_queue {
21 230     230   287 my ($self) = @_;
22 230         758 weaken $self;
23             return Async::Queue->new(concurrency => 1, worker => sub {
24 183     183   10392 my ($data, $done) = @_;
25             kforeach $self->{filters}, sub {
26 89         4809 my ($filter, $knext) = @_;
27             try {
28             $filter->($data, sub {
29 87         27015 my ($result) = @_;
30 87 100 100     498 if(ref($result) && ref($result) eq 'ARRAY') {
31 71         108 $data = $result;
32             }else {
33 16         57 bblog('warn', 'The filter did not return an array-ref. Ignored.');
34             }
35 87         254 $knext->();
36 89         2942 });
37             }catch {
38 2         70 my ($e) = @_;
39 2         12 bblog('error', "Filter dies: $e");
40 2         12 $knext->();
41 89         588 };
42             }, sub {
43 183         11177 $done->($data);
44 183         1503 };
45 230         1842 });
46             }
47              
48             sub add {
49 57     57 1 81 my ($self, $async_filter) = @_;
50 57 50       168 if($self->{queue}->running) {
51 0         0 croak "You cannot add a filter while there is a status running in it."
52             }
53 57         252 push(@{$self->{filters}}, $async_filter);
  57         174  
54             }
55              
56             sub execute {
57 183     183 1 431 my ($self, $data, $callback) = @_;
58 183         931 $self->{queue}->push($data, $callback);
59             }
60              
61              
62             1;
63              
64             __END__