File Coverage

blib/lib/Future/Queue.pm
Criterion Covered Total %
statement 77 77 100.0
branch 22 28 78.5
condition 19 24 79.1
subroutine 13 13 100.0
pod 5 5 100.0
total 136 147 92.5


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk
5              
6             package Future::Queue 0.51;
7              
8 6     6   1172431 use v5.14;
  6         52  
9 6     6   34 use warnings;
  6         11  
  6         147  
10              
11 6     6   30 use Carp;
  6         11  
  6         5579  
12              
13             =head1 NAME
14              
15             C - a FIFO queue of values that uses Ls
16              
17             =head1 SYNOPSIS
18              
19             use Future::Queue;
20             use Future::AsyncAwait;
21              
22             my $queue = Future::Queue->new;
23              
24             async sub process_queue
25             {
26             while(1) {
27             my $thing = await $queue->shift;
28             ...
29             }
30             }
31              
32             my $f = process_queue();
33             $queue->push( "a thing" );
34              
35             =head1 DESCRIPTION
36              
37             Objects in this class provide a simple FIFO queue the stores arbitrary perl
38             values. Values may be added into the queue using the L method, and
39             retrieved from it using the L method.
40              
41             Values may be stored within the queue object for C to retrieve later,
42             or if the queue is empty then the future that C returns will be
43             completed once an item becomes available.
44              
45             =cut
46              
47             =head1 CONSTRUCTOR
48              
49             =cut
50              
51             =head2 new
52              
53             $queue = Future::Queue->new( %params );
54              
55             Returns a new C instance.
56              
57             Takes the following named arguments:
58              
59             =over 4
60              
61             =item max_items => INT
62              
63             I
64              
65             Optional. If defined, there can be at most the given number of items in the
66             queue. Attempts to call L beyond that will yield a future that remains
67             pending, until a subsequent L operation makes enough space.
68              
69             =item prototype => STRING or OBJECT or CODE
70              
71             I
72              
73             Optional. If defined, gives either a class name, an object instance to clone
74             or a code reference to invoke when a new pending C instance is needed
75             by the C or C methods when they cannot complete immediately.
76              
77             $f = $prototype->(); # if CODE reference
78             $f = $prototype->new; # otherwise
79              
80             If not provided, a default of C will be used.
81              
82             =back
83              
84             =cut
85              
86             sub new
87             {
88 14     14 1 18764 my $class = shift;
89 14         41 my %params = @_;
90              
91 14         30 my $prototype = $params{prototype};
92              
93             return bless {
94             items => [],
95             max_items => $params{max_items},
96 14 100 100     152 shift_waiters => [],
97             ( ref $prototype eq "CODE" ) ?
98             ( f_factory => $prototype ) :
99             ( f_prototype => $prototype // "Future" ),
100             }, $class;
101             }
102              
103             =head2 push
104              
105             $queue->push( @items );
106              
107             await $queue->push( @items );
108              
109             Adds more items into the queue. If the queue was previously empty and there is
110             at least one C future waiting, then the next one will be completed by
111             this method.
112              
113             I this can take multiple items; earlier versions can only
114             take one value at once.
115              
116             This method always returns a L instance. If C is defined
117             then it is possible that this future will be in a still-pending state;
118             indicating that there was not yet space in the queue to add the items. It will
119             become completed once enough L calls have been made to make space for
120             them.
121              
122             If C is not defined then these instances will always be immediately
123             complete; it is safe to drop or ignore it, or call the method in void context.
124              
125             If the queue has been finished then more items cannot be pushed and an
126             exception will be thrown.
127              
128             =cut
129              
130             sub _manage_shift_waiters
131             {
132 21     21   32 my $self = shift;
133              
134 21         30 my $items = $self->{items};
135 21         32 my $shift_waiters = $self->{shift_waiters};
136              
137 21   100     84 ( shift @$shift_waiters )->()
138             while @$shift_waiters and @$items;
139             }
140              
141             sub push :method
142             {
143 22     22 1 7196 my $self = shift;
144 22         47 my @more = @_;
145              
146             $self->{finished} and
147 22 100       227 croak "Cannot ->push more items to a Future::Queue that has been finished";
148              
149 21         32 my $items = $self->{items};
150 21         37 my $max = $self->{max_items};
151              
152 21 100       39 if( defined $max ) {
153 7         12 my $count = $max - @$items;
154 7         17 push @$items, splice @more, 0, $count;
155             }
156             else {
157 14         31 push @$items, @more;
158 14         25 @more = ();
159             }
160              
161 21         53 $self->_manage_shift_waiters;
162 21 100       104 return Future->done if !@more;
163              
164 2 50       12 my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
165 2   50     21 push @{ $self->{push_waiters} //= [] }, sub {
166 3     3   8 my $count = $max - @$items;
167 3         9 push @$items, splice @more, 0, $count;
168 3         12 $self->_manage_push_waiters;
169              
170 3 100       11 return 0 if @more;
171              
172 2         10 $f->done;
173 2         92 return 1;
174 2         13 };
175 2         7 return $f;
176             }
177              
178             =head2 shift
179              
180             $item = await $queue->shift;
181              
182             Returns a C that will yield the next item from the queue. If there is
183             already an item then this will be taken and the returned future will be
184             immediate. If not, then the returned future will be pending, and the next
185             C method will complete it.
186              
187             If the queue has been finished then the future will yield an empty list, or
188             C in scalar context.
189              
190             If C is a valid item in your queue, make sure to test this condition
191             carefully. For example:
192              
193             while( ( my $item ) = await $queue->shift ) {
194             ...
195             }
196              
197             Here, the C expression and the assignment are in list context, so the
198             loop will continue to iterate while I value is assigned, even if that
199             value is C. The loop will only stop once no items are returned,
200             indicating the end of the queue.
201              
202             =cut
203              
204             sub _manage_push_waiters
205             {
206 22     22   43 my $self = shift;
207              
208 22         34 my $items = $self->{items};
209 22         64 my $max_items = $self->{max_items};
210 22   100     76 my $push_waiters = $self->{push_waiters} || [];
211              
212 22   66     132 shift @$push_waiters
      100        
      100        
213             while @$push_waiters and
214             ( !defined $max_items or @$items < $max_items )
215             and $push_waiters->[0]->();
216             }
217              
218             sub shift :method
219             {
220 18     18 1 13042 my $self = shift;
221              
222 18         37 my $items = $self->{items};
223              
224 18 100       54 if( @$items ) {
225 8         38 my @more = shift @$items;
226 8         28 $self->_manage_push_waiters;
227 8         31 return Future->done( @more );
228             }
229              
230 10 100       32 return Future->done if $self->{finished};
231              
232 9 100       52 my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
233 9         42 push @{ $self->{shift_waiters} }, sub {
234 6 50 66 6   34 return $f->done if !@$items and $self->{finished};
235 5         23 $f->done( shift @$items );
236 5         250 $self->_manage_push_waiters;
237 9         67 };
238 9         32 return $f;
239             }
240              
241             =head2 shift_atmost
242              
243             @items = await $queue->shift_atmost( $count );
244              
245             I
246              
247             A bulk version of L that can return multiple items at once.
248              
249             Returns a C that will yield the next few items from the queue. If
250             there is already at least one item in the queue then up to C<$count> items
251             will be taken, and the returned future will be immediate. If not, then the
252             returned future will be pending and the next C method will complete it.
253              
254             =cut
255              
256             sub shift_atmost
257             {
258 7     7 1 2241 my $self = shift;
259 7         13 my ( $count ) = @_;
260              
261 7         12 my $items = $self->{items};
262              
263 7 100       16 if( @$items ) {
264 2         7 my @more = splice @$items, 0, $count;
265 2         6 $self->_manage_push_waiters;
266 2         8 return Future->done( @more );
267             }
268              
269 5 50       15 return Future->done if $self->{finished};
270              
271 5 50       16 my $f = $self->{f_factory} ? $self->{f_factory}->() : $self->{f_prototype}->new;
272 5         23 push @{ $self->{shift_waiters} }, sub {
273 4 0 33 4   11 return $f->done if !@$items and $self->{finished};
274 4         13 $f->done( splice @$items, 0, $count );
275 4         125 $self->_manage_push_waiters;
276 5         30 };
277 5         17 return $f;
278             }
279              
280             =head2 finish
281              
282             $queue->finish;
283              
284             I
285              
286             Marks that the queue is now finished. Once the current list of items has been
287             exhausted, any further attempts to C more will yield empty.
288              
289             =cut
290              
291             sub finish
292             {
293 2     2 1 61 my $self = shift;
294 2         5 $self->{finished}++;
295              
296 2         3 ( shift @{ $self->{shift_waiters} } )->() while @{ $self->{shift_waiters} };
  3         41  
  1         3  
297             }
298              
299             =head1 AUTHOR
300              
301             Paul Evans
302              
303             =cut
304              
305             0x55AA;