File Coverage

blib/lib/Coro/ProcessPool/Pipeline.pm
Criterion Covered Total %
statement 28 42 66.6
branch 3 12 25.0
condition 2 8 25.0
subroutine 9 10 90.0
pod 3 4 75.0
total 45 76 59.2


line stmt bran cond sub pod time code
1              
2             package Coro::ProcessPool::Pipeline;
3             # ABSTRACT: A producer/consumer pipeline for Coro::ProcessPool
4             $Coro::ProcessPool::Pipeline::VERSION = '0.29';
5 1     1   557 use strict;
  1         2  
  1         24  
6 1     1   5 use warnings;
  1         2  
  1         20  
7 1     1   3 use Carp;
  1         2  
  1         46  
8 1     1   5 use Coro;
  1         2  
  1         59  
9 1     1   5 use Try::Catch;
  1         2  
  1         393  
10              
11             sub new {
12 1     1 0 1044   my ($class, %param) = @_;
13 1   33     6   my $pool = $param{pool} || croak 'expected parameter "pool"';
14              
15               bless {
16                 pool => $pool,
17 1   50     10     auto_shutdown => $param{auto_shutdown} || 0,
18                 shutting_down => 0,
19                 is_shutdown => 0,
20                 num_pending => 0,
21                 complete => Coro::Channel->new,
22               }, $class;
23             }
24              
25              
26             sub next {
27 1     1 1 5469   my $self = shift;
28 1 0       6   my $finished = $self->{complete}->get or return;
29 0         0   my ($result, $error) = @$finished;
30 0 0       0   if ($error) {
31 0         0     croak $error;
32               } else {
33 0         0     return $result;
34               }
35             }
36              
37              
38             sub queue {
39 11     11 1 112   my ($self, @args) = @_;
40 11 50       25   croak 'pipeline is shut down' if $self->{is_shutdown};
41 11 100       195   croak 'pipeline is shutting down' if $self->{shutting_down};
42              
43 10         29   my $deferred = $self->{pool}->defer(@args);
44              
45               async_pool {
46 0     0   0     my ($self, $deferred) = @_;
47              
48                 try {
49 0         0       my $result = $deferred->recv;
50 0         0       $self->{complete}->put([$result, undef]);
51                 }
52                 catch {
53 0         0       $self->{complete}->put([undef, $_]);
54                 }
55                 finally {
56 0 0       0       if (--$self->{num_pending} == 0) {
57 0 0 0     0         if ($self->{shutting_down} || $self->{auto_shutdown}) {
58 0         0           $self->{complete}->shutdown;
59 0         0           $self->{is_shutdown} = 1;
60 0         0           $self->{shutting_down} = 0;
61                     }
62                   }
63 0         0     };
64              
65 10         17964   } $self, $deferred;
66              
67 10         35   ++$self->{num_pending};
68             }
69              
70              
71             sub shutdown {
72 1     1 1 9   my $self = shift;
73 1         6   $self->{shutting_down} = 1;
74             }
75              
76             1;
77              
78             __END__
79            
80             =pod
81            
82             =encoding UTF-8
83            
84             =head1 NAME
85            
86             Coro::ProcessPool::Pipeline - A producer/consumer pipeline for Coro::ProcessPool
87            
88             =head1 VERSION
89            
90             version 0.29
91            
92             =head1 SYNOPSIS
93            
94             my $pool = Coro::ProcesPool->new();
95             my $pipe = $pool->pipeline;
96            
97             # Start producer thread to queue tasks
98             my $producer = async {
99             while (my $task = get_next_task()) {
100             $pipe->queue('Some::TaskClass', $task);
101             }
102            
103             # Let the pipeline know no more tasks are coming
104             $pipe->shutdown;
105             };
106            
107             # Collect the results of each task as they are received
108             while (my $result = $pipe->next) {
109             do_stuff_with($result);
110             }
111            
112             =head1 DESCRIPTION
113            
114             Provides an iterative mechanism for feeding tasks into the process pool and
115             collecting results. A pool may have multiple pipelines.
116            
117             =head1 ATTRIBUTES
118            
119             =head2 pool (required)
120            
121             The L<Coro::ProcessPool> in which to queue tasks.
122            
123             =head2 auto_shutdown (default: false)
124            
125             When set to true, the pipeline will shut itself down as soon as the number of
126             pending tasks hits zero. At least one task must be sent for this to be
127             triggered.
128            
129             =head1 METHODS
130            
131             =head2 next
132            
133             Cedes control until a previously queued task is complete and the result is
134             available.
135            
136             =head2 queue($task, $args)
137            
138             Queues a new task. Arguments are identical to L<Coro::ProcessPool/process> and
139             L<Coro::ProcessPool/defer>.
140            
141             =head2 shutdown
142            
143             Signals shutdown of the pipeline. A shutdown pipeline may not be reused.
144            
145             =head1 AUTHOR
146            
147             Jeff Ober <sysread@fastmail.fm>
148            
149             =head1 COPYRIGHT AND LICENSE
150            
151             This software is copyright (c) 2017 by Jeff Ober.
152            
153             This is free software; you can redistribute it and/or modify it under
154             the same terms as the Perl 5 programming language system itself.
155            
156             =cut
157