File Coverage

blib/lib/BusyBird/Flow.pm
Criterion Covered Total %
statement 54 55 98.1
branch 3 4 75.0
condition 3 3 100.0
subroutine 14 14 100.0
pod 3 3 100.0
total 77 79 97.4


line stmt bran cond sub pod time code
1             package BusyBird::Flow;
2 12     12   30980 use v5.8.0;
  12         36  
  12         527  
3 12     12   55 use strict;
  12         20  
  12         364  
4 12     12   54 use warnings;
  12         21  
  12         378  
5 12     12   6715 use Async::Queue;
  12         15108  
  12         408  
6 12     12   596 use BusyBird::Log qw(bblog);
  12         18  
  12         624  
7 12     12   6413 use CPS qw(kforeach);
  12         40784  
  12         833  
8 12     12   92 use Carp;
  12         24  
  12         650  
9 12     12   99 use Scalar::Util qw(weaken);
  12         17  
  12         494  
10 12     12   994 use Try::Tiny;
  12         1394  
  12         5075  
11              
12             sub new {
13 230     230 1 449 my ($class) = @_;
14 230         1111 my $self = bless {
15             filters => [],
16             }, $class;
17 230         713 $self->{queue} = $self->_create_queue();
18 230         18912 return $self;
19             }
20              
21             sub _create_queue {
22 230     230   346 my ($self) = @_;
23 230         821 weaken $self;
24             return Async::Queue->new(concurrency => 1, worker => sub {
25 183     183   11731 my ($data, $done) = @_;
26             kforeach $self->{filters}, sub {
27 89         5034 my ($filter, $knext) = @_;
28             try {
29             $filter->($data, sub {
30 87         21506 my ($result) = @_;
31 87 100 100     594 if(ref($result) && ref($result) eq 'ARRAY') {
32 71         107 $data = $result;
33             }else {
34 16         76 bblog('warn', 'The filter did not return an array-ref. Ignored.');
35             }
36 87         308 $knext->();
37 89         2981 });
38             }catch {
39 2         53 my ($e) = @_;
40 2         11 bblog('error', "Filter dies: $e");
41 2         11 $knext->();
42 89         638 };
43             }, sub {
44 183         12010 $done->($data);
45 183         1840 };
46 230         2233 });
47             }
48              
49             sub add {
50 57     57 1 83 my ($self, $async_filter) = @_;
51 57 50       234 if($self->{queue}->running) {
52 0         0 croak "You cannot add a filter while there is a status running in it."
53             }
54 57         286 push(@{$self->{filters}}, $async_filter);
  57         188  
55             }
56              
57             sub execute {
58 183     183 1 316 my ($self, $data, $callback) = @_;
59 183         1018 $self->{queue}->push($data, $callback);
60             }
61              
62              
63             1;
64              
65             __END__