File Coverage

blib/lib/Coro/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 25 39 64.1
branch 3 12 25.0
condition 2 8 25.0
subroutine 8 9 88.8
pod 3 4 75.0
total 41 72 56.9


line stmt bran cond sub pod time code
1              
2             package Coro::ProcessPool::Pipeline;
3             # ABSTRACT: A producer/consumer pipeline for Coro::ProcessPool
4             $Coro::ProcessPool::Pipeline::VERSION = '0.30';
5 1     1   563 use common::sense;
  1         2  
  1         6  
6 1     1   48 use Carp;
  1         2  
  1         64  
7 1     1   5 use Coro;
  1         2  
  1         59  
8 1     1   5 use Try::Catch;
  1         2  
  1         484  
9              
10             sub new {
11 1     1 0 1120   my ($class, %param) = @_;
12 1   33     7   my $pool = $param{pool} || croak 'expected parameter "pool"';
13              
14               bless {
15                 pool => $pool,
16 1   50     10     auto_shutdown => $param{auto_shutdown} || 0,
17                 shutting_down => 0,
18                 is_shutdown => 0,
19                 num_pending => 0,
20                 complete => Coro::Channel->new,
21               }, $class;
22             }
23              
24              
25             sub next {
26 1     1 1 5913   my $self = shift;
27 1 0       6   my $finished = $self->{complete}->get or return;
28 0         0   my ($result, $error) = @$finished;
29 0 0       0   if ($error) {
30 0         0     croak $error;
31               } else {
32 0         0     return $result;
33               }
34             }
35              
36              
37             sub queue {
38 11     11 1 125   my ($self, @args) = @_;
39 11 50       27   croak 'pipeline is shut down' if $self->{is_shutdown};
40 11 100       195   croak 'pipeline is shutting down' if $self->{shutting_down};
41              
42 10         28   my $deferred = $self->{pool}->defer(@args);
43              
44               async_pool {
45 0     0   0     my ($self, $deferred) = @_;
46              
47                 try {
48 0         0       my $result = $deferred->recv;
49 0         0       $self->{complete}->put([$result, undef]);
50                 }
51                 catch {
52 0         0       $self->{complete}->put([undef, $_]);
53                 }
54                 finally {
55 0 0       0       if (--$self->{num_pending} == 0) {
56 0 0 0     0         if ($self->{shutting_down} || $self->{auto_shutdown}) {
57 0         0           $self->{complete}->shutdown;
58 0         0           $self->{is_shutdown} = 1;
59 0         0           $self->{shutting_down} = 0;
60                     }
61                   }
62 0         0     };
63              
64 10         17270   } $self, $deferred;
65              
66 10         34   ++$self->{num_pending};
67             }
68              
69              
70             sub shutdown {
71 1     1 1 6   my $self = shift;
72 1         3   $self->{shutting_down} = 1;
73             }
74              
75             1;
76              
77             __END__
78            
79             =pod
80            
81             =encoding UTF-8
82            
83             =head1 NAME
84            
85             Coro::ProcessPool::Pipeline - A producer/consumer pipeline for Coro::ProcessPool
86            
87             =head1 VERSION
88            
89             version 0.30
90            
91             =head1 SYNOPSIS
92            
93             my $pool = Coro::ProcesPool->new();
94             my $pipe = $pool->pipeline;
95            
96             # Start producer thread to queue tasks
97             my $producer = async {
98             while (my $task = get_next_task()) {
99             $pipe->queue('Some::TaskClass', $task);
100             }
101            
102             # Let the pipeline know no more tasks are coming
103             $pipe->shutdown;
104             };
105            
106             # Collect the results of each task as they are received
107             while (my $result = $pipe->next) {
108             do_stuff_with($result);
109             }
110            
111             =head1 DESCRIPTION
112            
113             Provides an iterative mechanism for feeding tasks into the process pool and
114             collecting results. A pool may have multiple pipelines.
115            
116             =head1 ATTRIBUTES
117            
118             =head2 pool (required)
119            
120             The L<Coro::ProcessPool> in which to queue tasks.
121            
122             =head2 auto_shutdown (default: false)
123            
124             When set to true, the pipeline will shut itself down as soon as the number of
125             pending tasks hits zero. At least one task must be sent for this to be
126             triggered.
127            
128             =head1 METHODS
129            
130             =head2 next
131            
132             Cedes control until a previously queued task is complete and the result is
133             available.
134            
135             =head2 queue($task, $args)
136            
137             Queues a new task. Arguments are identical to L<Coro::ProcessPool/process> and
138             L<Coro::ProcessPool/defer>.
139            
140             =head2 shutdown
141            
142             Signals shutdown of the pipeline. A shutdown pipeline may not be reused.
143            
144             =head1 AUTHOR
145            
146             Jeff Ober <sysread@fastmail.fm>
147            
148             =head1 COPYRIGHT AND LICENSE
149            
150             This software is copyright (c) 2017 by Jeff Ober.
151            
152             This is free software; you can redistribute it and/or modify it under
153             the same terms as the Perl 5 programming language system itself.
154            
155             =cut
156