File Coverage

blib/lib/Pipeline/Segment/Async.pm
Criterion Covered Total %
statement 55 65 84.6
branch 8 12 66.6
condition 3 6 50.0
subroutine 15 17 88.2
pod 8 9 88.8
total 89 109 81.6


line stmt bran cond sub pod time code
1             package Pipeline::Segment::Async;
2              
3 1     1   73664 use strict;
  1         3  
  1         35  
4 1     1   6 use warnings;
  1         2  
  1         29  
5              
6 1     1   4 use Pipeline::Segment;
  1         2  
  1         28  
7 1     1   636 use Pipeline::Error::AsyncResults;
  1         3  
  1         6  
8 1     1   831 use Pipeline::Segment::Async::Fork;
  1         3  
  1         30  
9 1     1   756 use Pipeline::Segment::Async::IThreads;
  1         2  
  1         24  
10              
11 1     1   5 use base qw( Pipeline::Segment );
  1         1  
  1         777  
12              
13             our $VERSION = "3.12";
14              
15             sub init {
16 2     2 1 4 my $self = shift;
17 2 50       12 if ($self->SUPER::init( @_ )) {
18 2         13 $self->threading_models(
19             [
20             'Pipeline::Segment::Async::IThreads',
21             'Pipeline::Segment::Async::Fork',
22             ]
23             );
24             }
25             }
26              
27             sub threading_models {
28 3     3 1 5 my $self = shift;
29 3         3 my $list = shift;
30 3 100       7 if ( defined( $list ) ) {
31 2         4 $self->{ threading_models_available } = $list;
32 2         9 return $self;
33             } else {
34 1         1 my $retval = $self->{ threading_models_available };
35 1 50       2 if (wantarray()) {
36 1         3 return @$retval;
37             } else {
38 0         0 return $retval;
39             }
40             }
41             }
42              
43             sub predispatch {
44 1     1 1 1 my $self = shift;
45 1         12 my $sub = $self->can('dispatch');
46             my $outer = sub {
47 0     0   0 my $self = shift;
48 0         0 my @results = $sub->($self, $self->parent);
49 0         0 return [$self, [ @results ]];
50 1         5 };
51 1         6 $self->model->run( $outer, $self );
52 1         90 $self->place_in_store();
53 1         37 return 1;
54             }
55              
56             sub place_in_store {
57 1     1 0 8 my $self = shift;
58 1         40 $self->store->set( $self );
59             }
60              
61             sub model {
62 3     3 1 5 my $self = shift;
63 3         4 my $obj = shift;
64 3 50       7 if (defined( $obj )) {
65 0         0 $self->{ threading_model } = $obj;
66 0         0 return $self;
67             } else {
68 3   66     13 $self->{ threading_model } ||= $self->determine_threading_model();
69 3         21 return $self->{ threading_model };
70             }
71             }
72              
73             sub determine_threading_model {
74 1     1 1 2 my $self = shift;
75 1         3 foreach my $model ($self->threading_models) {
76 2 100       16 if ( $model->canop() ) {
77 1         8 return $model->new();
78             }
79             }
80 0         0 return undef;
81             }
82              
83             sub reattach {
84 1     1 1 1 my $self = shift;
85 1         3 my $results = $self->model->reattach;
86 1 50 33     14 if (defined($results) && ref($results) eq 'ARRAY') {
87 1         2 return @{ $results->[1] };
  1         20  
88             } else {
89 0         0 throw Pipeline::Error::AsyncResults;
90             }
91             }
92              
93             sub discard {
94 0     0 1 0 my $self = shift;
95 0         0 $self->model->discard;
96             }
97              
98             sub dispatch_method {
99 1     1 1 4 return "predispatch";
100             }
101              
102             1;
103              
104             __END__