File Coverage

blib/lib/Future/Workflow/Pipeline.pm
Criterion Covered Total %
statement 100 100 100.0
branch 19 22 86.3
condition 5 5 100.0
subroutine 18 18 100.0
pod 5 5 100.0
total 147 150 98.0


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2021 -- leonerd@leonerd.org.uk
5              
6 5     5   371217 use v5.26;
  5         68  
7 5     5   2843 use Object::Pad 0.43;
  5         30513  
  5         25  
8              
9             package Future::Workflow::Pipeline 0.01;
10             class Future::Workflow::Pipeline;
11              
12 5     5   1697 use Carp;
  5         12  
  5         309  
13              
14 5     5   2598 use Future::AsyncAwait;
  5         87093  
  5         32  
15              
16             =head1 NAME
17              
18             C - a pipeline of processing stages
19              
20             =head1 DESCRIPTION
21              
22             Instances of this class implement a "pipeline", a sequence of data-processing
23             stages. Each stage is represented by a function that is passed a single
24             argument and should return a result. The pipeline itself stores a function
25             that will be passed each eventual result.
26              
27             =head2 Queueing
28              
29             In front of every stage there exists a queue of pending items. If the first
30             stage is currently busy when C is called, the item is accepted
31             into its queue instead. Items will be taken from the queue in the order they
32             were pushed when the stage's work function finishes with prior items.
33              
34             If the queue between stages is full, then items will remain pending in prior
35             stages. Ultimately this back-pressure will make its way back to the
36             C method at the beginning of the pipeline.
37              
38             =cut
39              
40             =head1 CONSTRUCTOR
41              
42             $pipeline = Future::Workflow::Pipeline->new;
43              
44             The constructor takes no additional parameters.
45              
46             =cut
47              
48             has $_output;
49             has @_stages;
50              
51             =head1 METHODS
52              
53             =cut
54              
55             =head2 set_output
56              
57             $pipeline->set_output( $code );
58              
59             await $code->( $result );
60              
61             Sets the destination output for the pipeline. Each completed work item will be
62             passed to the invoked function, which is expected to return a C.
63              
64             =cut
65              
66 12         79 method set_output ( $code )
  12         23  
  12         17  
67 12     12 1 5558 {
68 12         21 $_output = $code;
69 12 50       47 $_stages[-1]->set_output( $_output ) if @_stages;
70             }
71              
72             =head2 set_output_sync
73              
74             $pipeline->set_output_sync( $code );
75              
76             $code->( $result );
77              
78             Similar to L, where the output function is called synchronously,
79             returning when it has finished.
80              
81             =cut
82              
83 3         23 method set_output_sync ( $code )
  3         6  
  3         5  
84 3     3 1 1897 {
85 3     4   13 $self->set_output( async sub ( $result ) { $code->( $result ) } );
  4         1313  
  4         7  
  4         8  
  4         4  
  4         13  
86             }
87              
88             =head2 append_stage
89              
90             $pipeline->append_stage( $code, %args );
91              
92             $result = await $code->( $item );
93              
94             Appends a pipeline stage that is implemented by an asynchronous function. Each
95             work item will be passed in by invoking the function, and it is expected to
96             return a C which will eventually yield the result of that stage.
97              
98             The following optional named args are recognised:
99              
100             =over 4
101              
102             =item concurrent => NUM
103              
104             Allow this number of outstanding items concurrently.
105              
106             =item max_queue => NUM
107              
108             If defined, no more than this number of items can be enqueued. If undefined,
109             no limit is applied.
110              
111             This value can be zero, which means that any attempts to push more items will
112             remain pending until the work function is free to deal with it; i.e. no
113             queueing will be permitted.
114              
115             =item on_failure => CODE
116              
117             $on_failure->( $f )
118              
119             Provides a callback event function for handling a failure thrown by the stage
120             code. If not provided, the default behaviour is to print the failure message
121             as a warning.
122              
123             Note that this handler cannot turn a failure into a successful result or
124             otherwise resume or change behaviour of the pipeline. For error-correction you
125             will have to handle that inside the stage function code itself. This handler
126             is purely the last stop of error handling, informing the user of an
127             otherwise-unhandled error before ignoring it.
128              
129             =back
130              
131             =cut
132              
133 13         25 method append_stage ( $code, %args )
  13         18  
  13         25  
  13         22  
134 13     13 1 94 {
135 13 100       36 my $old_tail = @_stages ? $_stages[-1] : undef;
136              
137 13         101 push @_stages, my $new_tail = Future::Workflow::Pipeline::_Stage->new(
138             code => $code,
139             %args,
140             );
141 13 50       331 $new_tail->set_output( $_output ) if $_output;
142              
143 4     4   554 $old_tail->set_output( async sub ( $item ) {
  4         8  
  4         8  
  4         5  
144 4         18 await $new_tail->push_input( $item );
145 13 100       68 } ) if $old_tail;
146             }
147              
148             =head2 append_stage_sync
149              
150             $pipeline->append_stage_sync( $code, %args )
151              
152             $result = $code->( $item );
153              
154             Similar to L, where the stage function is called synchronously,
155             returning its result immediately.
156              
157             Because of this, the C named parameter is not permitted.
158              
159             =cut
160              
161 2         5 method append_stage_sync ( $code, %args )
  2         2  
  2         4  
  2         3  
162 2     2 1 14 {
163             defined $args{concurrent} and
164 2 50       6 croak "->append_stage_sync does not permit the 'concurrent' parameter";
165              
166 2         3 return $self->append_stage(
167 2     2   11 async sub ( $item ) { return $code->( $item ) },
  2         3  
  2         4  
  2         3  
  2         7  
168             %args,
169             );
170             }
171              
172             =head2 push_input
173              
174             await $pipeline->push_input( $item );
175              
176             Adds a new work item into the pipeline, which will pass through each of the
177             stages and eventually invoke the output function.
178              
179             =cut
180              
181 25         43 async method push_input ( $item )
  25         39  
  25         40  
182 25         63 {
183             # TODO: this feels like a weird specialcase for no stages
184 25 100       56 if( @_stages ) {
185 21         61 await $_stages[0]->push_input( $item );
186             }
187             else {
188 4         10 await $_output->( $item );
189             }
190 25     25 1 2443 }
191              
192             class Future::Workflow::Pipeline::_Stage :strict(params) {
193              
194 5     5   5950 use Future;
  5         14  
  5         5808  
195              
196             has $_code :param;
197 16     16   35 has $_output :writer;
  16         39  
198              
199             has $_on_failure :param = sub ( $f ) {
200             warn "Pipeline stage failed: ", scalar $f->failure;
201             };
202              
203             # $_concurrent == maximum size of @_work_f
204             has $_concurrent :param = 1;
205             has @_work_f;
206              
207             # $_max_queue == maximum size of @_queue, or undef for unbounded
208             has $_max_queue :param = undef;
209             has @_queue;
210              
211             has @_awaiting_input;
212              
213 23         39 async method _do ( $item )
  23         49  
  23         36  
214 23         39 {
215 23         62 await $_output->( await $_code->( $item ) );
216 23     23   38 }
217              
218 23         37 method _schedule ( $item, $i )
  23         34  
  23         35  
  23         28  
219 23     23   53 {
220 23         54 my $f = $_work_f[$i] = $self->_do( $item );
221 21     21   34 $f->on_ready( sub ( $f ) {
  21         7492  
  21         34  
222 21 100       53 $_on_failure->( $f ) if $f->is_failed;
223              
224 21 100       1166 if( @_queue ) {
225 5         16 $self->_schedule( shift @_queue, $i );
226 5 100       131 ( shift @_awaiting_input )->done if @_awaiting_input;
227             }
228             else {
229 16         42 undef $_work_f[$i];
230             }
231 23         2800 } );
232             }
233              
234 25         37 async method push_input ( $item )
  25         34  
  25         38  
235 25         54 {
236 25         37 my $i;
237             defined $_work_f[$_] or ( $i = $_ ), last
238 25   100     146 for 0 .. $_concurrent-1;
239              
240 25 100       61 if( defined $i ) {
241 18         49 $self->_schedule( $item, $i );
242             }
243             else {
244 7 100 100     26 if( defined $_max_queue and @_queue >= $_max_queue ) {
245             # TODO: Maybe we should clone one of the work futures?
246 3         10 push @_awaiting_input, my $enqueue_f = Future->new;
247 3         28 await $enqueue_f;
248             }
249 5         106 push @_queue, $item;
250             }
251 25     25   40 }
252             }
253              
254             =head1 AUTHOR
255              
256             Paul Evans
257              
258             =cut
259              
260             0x55AA;