File Coverage

blib/lib/Future/Queue.pm
Criterion Covered Total %
statement 76 76 100.0
branch 16 20 80.0
condition 17 22 77.2
subroutine 13 13 100.0
pod 5 5 100.0
total 127 136 93.3


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk
5              
6             package Future::Queue 0.50;
7              
8 5     5   348858 use v5.14;
  5         66  
9 5     5   27 use warnings;
  5         9  
  5         137  
10              
11 5     5   27 use Carp;
  5         10  
  5         4504  
12              
13             =head1 NAME
14              
15             C - a FIFO queue of values that uses Ls
16              
17             =head1 SYNOPSIS
18              
19             use Future::Queue;
20             use Future::AsyncAwait;
21              
22             my $queue = Future::Queue->new;
23              
24             async sub process_queue
25             {
26             while(1) {
27             my $thing = await $queue->shift;
28             ...
29             }
30             }
31              
32             my $f = process_queue();
33             $queue->push( "a thing" );
34              
35             =head1 DESCRIPTION
36              
37             Objects in this class provide a simple FIFO queue the stores arbitrary perl
38             values. Values may be added into the queue using the L method, and
39             retrieved from it using the L method.
40              
41             Values may be stored within the queue object for C to retrieve later,
42             or if the queue is empty then the future that C returns will be
43             completed once an item becomes available.
44              
45             =cut
46              
47             =head1 CONSTRUCTOR
48              
49             =cut
50              
51             =head2 new
52              
53             $queue = Future::Queue->new( %params );
54              
55             Returns a new C instance.
56              
57             Takes the following named arguments:
58              
59             =over 4
60              
61             =item max_items => INT
62              
63             I
64              
65             Optional. If defined, there can be at most the given number of items in the
66             queue. Attempts to call L beyond that will yield a future that remains
67             pending, until a subsequent L operation makes enough space.
68              
69             =back
70              
71             =cut
72              
73             sub new
74             {
75 11     11 1 7169 my $class = shift;
76 11         28 my %params = @_;
77              
78             return bless {
79             items => [],
80             max_items => $params{max_items},
81 11         65 shift_waiters => [],
82             }, $class;
83             }
84              
85             =head2 push
86              
87             $queue->push( @items );
88              
89             await $queue->push( @items );
90              
91             Adds more items into the queue. If the queue was previously empty and there is
92             at least one C future waiting, then the next one will be completed by
93             this method.
94              
95             I this can take multiple items; earlier versions can only
96             take one value at once.
97              
98             This method always returns a L instance. If C is defined
99             then it is possible that this future will be in a still-pending state;
100             indicating that there was not yet space in the queue to add the items. It will
101             become completed once enough L calls have been made to make space for
102             them.
103              
104             If C is not defined then these instances will always be immediately
105             complete; it is safe to drop or ignore it, or call the method in void context.
106              
107             If the queue has been finished then more items cannot be pushed and an
108             exception will be thrown.
109              
110             =cut
111              
112             sub _manage_shift_waiters
113             {
114 21     21   28 my $self = shift;
115              
116 21         33 my $items = $self->{items};
117 21         29 my $shift_waiters = $self->{shift_waiters};
118              
119 21   100     77 ( shift @$shift_waiters )->()
120             while @$shift_waiters and @$items;
121             }
122              
123             sub push :method
124             {
125 22     22 1 2012 my $self = shift;
126 22         50 my @more = @_;
127              
128             $self->{finished} and
129 22 100       255 croak "Cannot ->push more items to a Future::Queue that has been finished";
130              
131 21         37 my $items = $self->{items};
132 21         33 my $max = $self->{max_items};
133              
134 21 100       44 if( defined $max ) {
135 7         14 my $count = $max - @$items;
136 7         17 push @$items, splice @more, 0, $count;
137             }
138             else {
139 14         28 push @$items, @more;
140 14         25 @more = ();
141             }
142              
143 21         52 $self->_manage_shift_waiters;
144 21 100       95 return Future->done if !@more;
145              
146 2         9 my $f = Future->new;
147 2   50     21 push @{ $self->{push_waiters} //= [] }, sub {
148 3     3   7 my $count = $max - @$items;
149 3         11 push @$items, splice @more, 0, $count;
150 3         13 $self->_manage_push_waiters;
151              
152 3 100       11 return 0 if @more;
153              
154 2         10 $f->done;
155 2         91 return 1;
156 2         14 };
157 2         7 return $f;
158             }
159              
160             =head2 shift
161              
162             $item = await $queue->shift;
163              
164             Returns a C that will yield the next item from the queue. If there is
165             already an item then this will be taken and the returned future will be
166             immediate. If not, then the returned future will be pending, and the next
167             C method will complete it.
168              
169             If the queue has been finished then the future will yield an empty list, or
170             C in scalar context.
171              
172             If C is a valid item in your queue, make sure to test this condition
173             carefully. For example:
174              
175             while( ( my $item ) = await $queue->shift ) {
176             ...
177             }
178              
179             Here, the C expression and the assignment are in list context, so the
180             loop will continue to iterate while I value is assigned, even if that
181             value is C. The loop will only stop once no items are returned,
182             indicating the end of the queue.
183              
184             =cut
185              
186             sub _manage_push_waiters
187             {
188 22     22   38 my $self = shift;
189              
190 22         71 my $items = $self->{items};
191 22         32 my $max_items = $self->{max_items};
192 22   100     84 my $push_waiters = $self->{push_waiters} || [];
193              
194 22   66     124 shift @$push_waiters
      100        
      100        
195             while @$push_waiters and
196             ( !defined $max_items or @$items < $max_items )
197             and $push_waiters->[0]->();
198             }
199              
200             sub shift :method
201             {
202 15     15 1 2729 my $self = shift;
203              
204 15         30 my $items = $self->{items};
205              
206 15 100       38 if( @$items ) {
207 8         24 my @more = shift @$items;
208 8         24 $self->_manage_push_waiters;
209 8         53 return Future->done( @more );
210             }
211              
212 7 100       21 return Future->done if $self->{finished};
213              
214 6         22 my $f = Future->new;
215 6         41 push @{ $self->{shift_waiters} }, sub {
216 6 50 66 6   26 return $f->done if !@$items and $self->{finished};
217 5         18 $f->done( shift @$items );
218 5         210 $self->_manage_push_waiters;
219 6         37 };
220 6         15 return $f;
221             }
222              
223             =head2 shift_atmost
224              
225             @items = await $queue->shift_atmost( $count );
226              
227             I
228              
229             A bulk version of L that can return multiple items at once.
230              
231             Returns a C that will yield the next few items from the queue. If
232             there is already at least one item in the queue then up to C<$count> items
233             will be taken, and the returned future will be immediate. If not, then the
234             returned future will be pending and the next C method will complete it.
235              
236             =cut
237              
238             sub shift_atmost
239             {
240 7     7 1 1164 my $self = shift;
241 7         13 my ( $count ) = @_;
242              
243 7         11 my $items = $self->{items};
244              
245 7 100       19 if( @$items ) {
246 2         5 my @more = splice @$items, 0, $count;
247 2         7 $self->_manage_push_waiters;
248 2         9 return Future->done( @more );
249             }
250              
251 5 50       11 return Future->done if $self->{finished};
252              
253 5         15 my $f = Future->new;
254 5         22 push @{ $self->{shift_waiters} }, sub {
255 4 0 33 4   10 return $f->done if !@$items and $self->{finished};
256 4         12 $f->done( splice @$items, 0, $count );
257 4         128 $self->_manage_push_waiters;
258 5         30 };
259 5         18 return $f;
260             }
261              
262             =head2 finish
263              
264             $queue->finish;
265              
266             I
267              
268             Marks that the queue is now finished. Once the current list of items has been
269             exhausted, any further attempts to C more will yield empty.
270              
271             =cut
272              
273             sub finish
274             {
275 2     2 1 61 my $self = shift;
276 2         4 $self->{finished}++;
277              
278 2         4 ( shift @{ $self->{shift_waiters} } )->() while @{ $self->{shift_waiters} };
  3         48  
  1         3  
279             }
280              
281             =head1 AUTHOR
282              
283             Paul Evans
284              
285             =cut
286              
287             0x55AA;