File Coverage

blib/lib/Coro/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 11 33 33.3
branch 0 12 0.0
condition 2 8 25.0
subroutine 4 7 57.1
pod 3 4 75.0
total 20 64 31.2


line stmt bran cond sub pod time code
1              
2             package Coro::ProcessPool::Pipeline;
3             # ABSTRACT: A producer/consumer pipeline for Coro::ProcessPool
4             $Coro::ProcessPool::Pipeline::VERSION = '0.28';
5 1     1   1184 use Carp;
  1         5  
  1         267  
6 1     1   10 use Coro;
  1         3  
  1         525  
7              
8             sub new {
9 1     1 0 4412   my ($class, %param) = @_;
10 1   33     25   my $pool = $param{pool} || croak 'expected parameter "pool"';
11              
12               bless {
13                 pool => $pool,
14 1   50     58     auto_shutdown => $param{auto_shutdown} || 0,
15                 shutting_down => 0,
16                 is_shutdown => 0,
17                 num_pending => 0,
18                 complete => Coro::Channel->new,
19               }, $class;
20             }
21              
22              
23             sub next {
24 1     1 1 1419   my $self = shift;
25 1 0       25   my $finished = $self->{complete}->get or return;
26 0             my ($result, $error) = @$finished;
27 0 0           if ($error) {
28 0               croak $error;
29               } else {
30 0               return $result;
31               }
32             }
33              
34              
35             sub queue {
36 0     0 1     my ($self, @args) = @_;
37 0 0           croak 'pipeline is shut down' if $self->{is_shutdown};
38 0 0           croak 'pipeline is shutting down' if $self->{shutting_down};
39              
40 0             my $deferred = $self->{pool}->defer(@args);
41              
42               async_pool {
43 0     0         my ($self, $deferred) = @_;
44 0               my $result = eval { $deferred->recv };
  0            
45              
46 0               $self->{complete}->put([$result, $@]);
47 0               --$self->{num_pending};
48              
49 0 0             if ($self->{num_pending} == 0) {
50 0 0 0             if ($self->{shutting_down} || $self->{auto_shutdown}) {
51 0                   $self->{complete}->shutdown;
52 0                   $self->{is_shutdown} = 1;
53 0                   $self->{shutting_down} = 0;
54                   }
55                 }
56 0             } $self, $deferred;
57              
58 0             ++$self->{num_pending};
59             }
60              
61              
62             sub shutdown {
63 0     0 1     my $self = shift;
64 0             $self->{shutting_down} = 1;
65             }
66              
67             1;
68              
69             __END__
70            
71             =pod
72            
73             =encoding UTF-8
74            
75             =head1 NAME
76            
77             Coro::ProcessPool::Pipeline - A producer/consumer pipeline for Coro::ProcessPool
78            
79             =head1 VERSION
80            
81             version 0.28
82            
83             =head1 SYNOPSIS
84            
85             my $pool = Coro::ProcesPool->new();
86             my $pipe = $pool->pipeline;
87            
88             # Start producer thread to queue tasks
89             my $producer = async {
90             while (my $task = get_next_task()) {
91             $pipe->queue('Some::TaskClass', $task);
92             }
93            
94             # Let the pipeline know no more tasks are coming
95             $pipe->shutdown;
96             };
97            
98             # Collect the results of each task as they are received
99             while (my $result = $pipe->next) {
100             do_stuff_with($result);
101             }
102            
103             =head1 DESCRIPTION
104            
105             Provides an iterative mechanism for feeding tasks into the process pool and
106             collecting results. A pool may have multiple pipelines.
107            
108             =head1 NAME
109            
110             Coro::ProcessPool::Pipeline
111            
112             =head1 ATTRIBUTES
113            
114             =head2 pool (required)
115            
116             The L<Coro::ProcessPool> in which to queue tasks.
117            
118             =head2 auto_shutdown (default: false)
119            
120             When set to true, the pipeline will shut itself down as soon as the number of
121             pending tasks hits zero. At least one task must be sent for this to be
122             triggered.
123            
124             =head1 METHODS
125            
126             =head2 next
127            
128             Cedes control until a previously queued task is complete and the result is
129             available.
130            
131             =head2 queue($task, $args)
132            
133             Queues a new task. Arguments are identical to L<Coro::ProcessPool/process> and
134             L<Coro::ProcessPool/defer>.
135            
136             =head2 shutdown
137            
138             Signals shutdown of the pipeline. A shutdown pipeline may not be reused.
139            
140             =head1 AUTHOR
141            
142             Jeff Ober <sysread@fastmail.fm>
143            
144             =head1 COPYRIGHT AND LICENSE
145            
146             This software is copyright (c) 2017 by Jeff Ober.
147            
148             This is free software; you can redistribute it and/or modify it under
149             the same terms as the Perl 5 programming language system itself.
150            
151             =cut
152