| line | stmt | bran | cond | sub | pod | time | code | 
| 1 |  |  |  |  |  |  | #  You may distribute under the terms of either the GNU General Public License | 
| 2 |  |  |  |  |  |  | #  or the Artistic License (the same terms as Perl itself) | 
| 3 |  |  |  |  |  |  | # | 
| 4 |  |  |  |  |  |  | #  (C) Paul Evans, 2019-2023 -- leonerd@leonerd.org.uk | 
| 5 |  |  |  |  |  |  |  | 
| 6 |  |  |  |  |  |  | package Future::Queue 0.50; | 
| 7 |  |  |  |  |  |  |  | 
| 8 | 5 |  |  | 5 |  | 348858 | use v5.14; | 
|  | 5 |  |  |  |  | 66 |  | 
| 9 | 5 |  |  | 5 |  | 27 | use warnings; | 
|  | 5 |  |  |  |  | 9 |  | 
|  | 5 |  |  |  |  | 137 |  | 
| 10 |  |  |  |  |  |  |  | 
| 11 | 5 |  |  | 5 |  | 27 | use Carp; | 
|  | 5 |  |  |  |  | 10 |  | 
|  | 5 |  |  |  |  | 4504 |  | 
| 12 |  |  |  |  |  |  |  | 
| 13 |  |  |  |  |  |  | =head1 NAME | 
| 14 |  |  |  |  |  |  |  | 
| 15 |  |  |  |  |  |  | C - a FIFO queue of values that uses Ls | 
| 16 |  |  |  |  |  |  |  | 
| 17 |  |  |  |  |  |  | =head1 SYNOPSIS | 
| 18 |  |  |  |  |  |  |  | 
| 19 |  |  |  |  |  |  | use Future::Queue; | 
| 20 |  |  |  |  |  |  | use Future::AsyncAwait; | 
| 21 |  |  |  |  |  |  |  | 
| 22 |  |  |  |  |  |  | my $queue = Future::Queue->new; | 
| 23 |  |  |  |  |  |  |  | 
| 24 |  |  |  |  |  |  | async sub process_queue | 
| 25 |  |  |  |  |  |  | { | 
| 26 |  |  |  |  |  |  | while(1) { | 
| 27 |  |  |  |  |  |  | my $thing = await $queue->shift; | 
| 28 |  |  |  |  |  |  | ... | 
| 29 |  |  |  |  |  |  | } | 
| 30 |  |  |  |  |  |  | } | 
| 31 |  |  |  |  |  |  |  | 
| 32 |  |  |  |  |  |  | my $f = process_queue(); | 
| 33 |  |  |  |  |  |  | $queue->push( "a thing" ); | 
| 34 |  |  |  |  |  |  |  | 
| 35 |  |  |  |  |  |  | =head1 DESCRIPTION | 
| 36 |  |  |  |  |  |  |  | 
| 37 |  |  |  |  |  |  | Objects in this class provide a simple FIFO queue the stores arbitrary perl | 
| 38 |  |  |  |  |  |  | values. Values may be added into the queue using the L method, and | 
| 39 |  |  |  |  |  |  | retrieved from it using the L method. | 
| 40 |  |  |  |  |  |  |  | 
| 41 |  |  |  |  |  |  | Values may be stored within the queue object for C to retrieve later, | 
| 42 |  |  |  |  |  |  | or if the queue is empty then the future that C returns will be | 
| 43 |  |  |  |  |  |  | completed once an item becomes available. | 
| 44 |  |  |  |  |  |  |  | 
| 45 |  |  |  |  |  |  | =cut | 
| 46 |  |  |  |  |  |  |  | 
| 47 |  |  |  |  |  |  | =head1 CONSTRUCTOR | 
| 48 |  |  |  |  |  |  |  | 
| 49 |  |  |  |  |  |  | =cut | 
| 50 |  |  |  |  |  |  |  | 
| 51 |  |  |  |  |  |  | =head2 new | 
| 52 |  |  |  |  |  |  |  | 
| 53 |  |  |  |  |  |  | $queue = Future::Queue->new( %params ); | 
| 54 |  |  |  |  |  |  |  | 
| 55 |  |  |  |  |  |  | Returns a new C instance. | 
| 56 |  |  |  |  |  |  |  | 
| 57 |  |  |  |  |  |  | Takes the following named arguments: | 
| 58 |  |  |  |  |  |  |  | 
| 59 |  |  |  |  |  |  | =over 4 | 
| 60 |  |  |  |  |  |  |  | 
| 61 |  |  |  |  |  |  | =item max_items => INT | 
| 62 |  |  |  |  |  |  |  | 
| 63 |  |  |  |  |  |  | I | 
| 64 |  |  |  |  |  |  |  | 
| 65 |  |  |  |  |  |  | Optional. If defined, there can be at most the given number of items in the | 
| 66 |  |  |  |  |  |  | queue. Attempts to call L beyond that will yield a future that remains | 
| 67 |  |  |  |  |  |  | pending, until a subsequent L operation makes enough space. | 
| 68 |  |  |  |  |  |  |  | 
| 69 |  |  |  |  |  |  | =back | 
| 70 |  |  |  |  |  |  |  | 
| 71 |  |  |  |  |  |  | =cut | 
| 72 |  |  |  |  |  |  |  | 
| 73 |  |  |  |  |  |  | sub new | 
| 74 |  |  |  |  |  |  | { | 
| 75 | 11 |  |  | 11 | 1 | 7169 | my $class = shift; | 
| 76 | 11 |  |  |  |  | 28 | my %params = @_; | 
| 77 |  |  |  |  |  |  |  | 
| 78 |  |  |  |  |  |  | return bless { | 
| 79 |  |  |  |  |  |  | items => [], | 
| 80 |  |  |  |  |  |  | max_items => $params{max_items}, | 
| 81 | 11 |  |  |  |  | 65 | shift_waiters => [], | 
| 82 |  |  |  |  |  |  | }, $class; | 
| 83 |  |  |  |  |  |  | } | 
| 84 |  |  |  |  |  |  |  | 
| 85 |  |  |  |  |  |  | =head2 push | 
| 86 |  |  |  |  |  |  |  | 
| 87 |  |  |  |  |  |  | $queue->push( @items ); | 
| 88 |  |  |  |  |  |  |  | 
| 89 |  |  |  |  |  |  | await $queue->push( @items ); | 
| 90 |  |  |  |  |  |  |  | 
| 91 |  |  |  |  |  |  | Adds more items into the queue. If the queue was previously empty and there is | 
| 92 |  |  |  |  |  |  | at least one C future waiting, then the next one will be completed by | 
| 93 |  |  |  |  |  |  | this method. | 
| 94 |  |  |  |  |  |  |  | 
| 95 |  |  |  |  |  |  | I this can take multiple items; earlier versions can only | 
| 96 |  |  |  |  |  |  | take one value at once. | 
| 97 |  |  |  |  |  |  |  | 
| 98 |  |  |  |  |  |  | This method always returns a L instance. If C is defined | 
| 99 |  |  |  |  |  |  | then it is possible that this future will be in a still-pending state; | 
| 100 |  |  |  |  |  |  | indicating that there was not yet space in the queue to add the items. It will | 
| 101 |  |  |  |  |  |  | become completed once enough L calls have been made to make space for | 
| 102 |  |  |  |  |  |  | them. | 
| 103 |  |  |  |  |  |  |  | 
| 104 |  |  |  |  |  |  | If C is not defined then these instances will always be immediately | 
| 105 |  |  |  |  |  |  | complete; it is safe to drop or ignore it, or call the method in void context. | 
| 106 |  |  |  |  |  |  |  | 
| 107 |  |  |  |  |  |  | If the queue has been finished then more items cannot be pushed and an | 
| 108 |  |  |  |  |  |  | exception will be thrown. | 
| 109 |  |  |  |  |  |  |  | 
| 110 |  |  |  |  |  |  | =cut | 
| 111 |  |  |  |  |  |  |  | 
| 112 |  |  |  |  |  |  | sub _manage_shift_waiters | 
| 113 |  |  |  |  |  |  | { | 
| 114 | 21 |  |  | 21 |  | 28 | my $self = shift; | 
| 115 |  |  |  |  |  |  |  | 
| 116 | 21 |  |  |  |  | 33 | my $items = $self->{items}; | 
| 117 | 21 |  |  |  |  | 29 | my $shift_waiters = $self->{shift_waiters}; | 
| 118 |  |  |  |  |  |  |  | 
| 119 | 21 |  | 100 |  |  | 77 | ( shift @$shift_waiters )->() | 
| 120 |  |  |  |  |  |  | while @$shift_waiters and @$items; | 
| 121 |  |  |  |  |  |  | } | 
| 122 |  |  |  |  |  |  |  | 
| 123 |  |  |  |  |  |  | sub push :method | 
| 124 |  |  |  |  |  |  | { | 
| 125 | 22 |  |  | 22 | 1 | 2012 | my $self = shift; | 
| 126 | 22 |  |  |  |  | 50 | my @more = @_; | 
| 127 |  |  |  |  |  |  |  | 
| 128 |  |  |  |  |  |  | $self->{finished} and | 
| 129 | 22 | 100 |  |  |  | 255 | croak "Cannot ->push more items to a Future::Queue that has been finished"; | 
| 130 |  |  |  |  |  |  |  | 
| 131 | 21 |  |  |  |  | 37 | my $items = $self->{items}; | 
| 132 | 21 |  |  |  |  | 33 | my $max = $self->{max_items}; | 
| 133 |  |  |  |  |  |  |  | 
| 134 | 21 | 100 |  |  |  | 44 | if( defined $max ) { | 
| 135 | 7 |  |  |  |  | 14 | my $count = $max - @$items; | 
| 136 | 7 |  |  |  |  | 17 | push @$items, splice @more, 0, $count; | 
| 137 |  |  |  |  |  |  | } | 
| 138 |  |  |  |  |  |  | else { | 
| 139 | 14 |  |  |  |  | 28 | push @$items, @more; | 
| 140 | 14 |  |  |  |  | 25 | @more = (); | 
| 141 |  |  |  |  |  |  | } | 
| 142 |  |  |  |  |  |  |  | 
| 143 | 21 |  |  |  |  | 52 | $self->_manage_shift_waiters; | 
| 144 | 21 | 100 |  |  |  | 95 | return Future->done if !@more; | 
| 145 |  |  |  |  |  |  |  | 
| 146 | 2 |  |  |  |  | 9 | my $f = Future->new; | 
| 147 | 2 |  | 50 |  |  | 21 | push @{ $self->{push_waiters} //= [] }, sub { | 
| 148 | 3 |  |  | 3 |  | 7 | my $count = $max - @$items; | 
| 149 | 3 |  |  |  |  | 11 | push @$items, splice @more, 0, $count; | 
| 150 | 3 |  |  |  |  | 13 | $self->_manage_push_waiters; | 
| 151 |  |  |  |  |  |  |  | 
| 152 | 3 | 100 |  |  |  | 11 | return 0 if @more; | 
| 153 |  |  |  |  |  |  |  | 
| 154 | 2 |  |  |  |  | 10 | $f->done; | 
| 155 | 2 |  |  |  |  | 91 | return 1; | 
| 156 | 2 |  |  |  |  | 14 | }; | 
| 157 | 2 |  |  |  |  | 7 | return $f; | 
| 158 |  |  |  |  |  |  | } | 
| 159 |  |  |  |  |  |  |  | 
| 160 |  |  |  |  |  |  | =head2 shift | 
| 161 |  |  |  |  |  |  |  | 
| 162 |  |  |  |  |  |  | $item = await $queue->shift; | 
| 163 |  |  |  |  |  |  |  | 
| 164 |  |  |  |  |  |  | Returns a C that will yield the next item from the queue. If there is | 
| 165 |  |  |  |  |  |  | already an item then this will be taken and the returned future will be | 
| 166 |  |  |  |  |  |  | immediate. If not, then the returned future will be pending, and the next | 
| 167 |  |  |  |  |  |  | C method will complete it. | 
| 168 |  |  |  |  |  |  |  | 
| 169 |  |  |  |  |  |  | If the queue has been finished then the future will yield an empty list, or | 
| 170 |  |  |  |  |  |  | C in scalar context. | 
| 171 |  |  |  |  |  |  |  | 
| 172 |  |  |  |  |  |  | If C is a valid item in your queue, make sure to test this condition | 
| 173 |  |  |  |  |  |  | carefully. For example: | 
| 174 |  |  |  |  |  |  |  | 
| 175 |  |  |  |  |  |  | while( ( my $item ) = await $queue->shift ) { | 
| 176 |  |  |  |  |  |  | ... | 
| 177 |  |  |  |  |  |  | } | 
| 178 |  |  |  |  |  |  |  | 
| 179 |  |  |  |  |  |  | Here, the C expression and the assignment are in list context, so the | 
| 180 |  |  |  |  |  |  | loop will continue to iterate while I value is assigned, even if that | 
| 181 |  |  |  |  |  |  | value is C. The loop will only stop once no items are returned, | 
| 182 |  |  |  |  |  |  | indicating the end of the queue. | 
| 183 |  |  |  |  |  |  |  | 
| 184 |  |  |  |  |  |  | =cut | 
| 185 |  |  |  |  |  |  |  | 
| 186 |  |  |  |  |  |  | sub _manage_push_waiters | 
| 187 |  |  |  |  |  |  | { | 
| 188 | 22 |  |  | 22 |  | 38 | my $self = shift; | 
| 189 |  |  |  |  |  |  |  | 
| 190 | 22 |  |  |  |  | 71 | my $items = $self->{items}; | 
| 191 | 22 |  |  |  |  | 32 | my $max_items = $self->{max_items}; | 
| 192 | 22 |  | 100 |  |  | 84 | my $push_waiters = $self->{push_waiters} || []; | 
| 193 |  |  |  |  |  |  |  | 
| 194 | 22 |  | 66 |  |  | 124 | shift @$push_waiters | 
|  |  |  | 100 |  |  |  |  | 
|  |  |  | 100 |  |  |  |  | 
| 195 |  |  |  |  |  |  | while @$push_waiters and | 
| 196 |  |  |  |  |  |  | ( !defined $max_items or @$items < $max_items ) | 
| 197 |  |  |  |  |  |  | and $push_waiters->[0]->(); | 
| 198 |  |  |  |  |  |  | } | 
| 199 |  |  |  |  |  |  |  | 
| 200 |  |  |  |  |  |  | sub shift :method | 
| 201 |  |  |  |  |  |  | { | 
| 202 | 15 |  |  | 15 | 1 | 2729 | my $self = shift; | 
| 203 |  |  |  |  |  |  |  | 
| 204 | 15 |  |  |  |  | 30 | my $items = $self->{items}; | 
| 205 |  |  |  |  |  |  |  | 
| 206 | 15 | 100 |  |  |  | 38 | if( @$items ) { | 
| 207 | 8 |  |  |  |  | 24 | my @more = shift @$items; | 
| 208 | 8 |  |  |  |  | 24 | $self->_manage_push_waiters; | 
| 209 | 8 |  |  |  |  | 53 | return Future->done( @more ); | 
| 210 |  |  |  |  |  |  | } | 
| 211 |  |  |  |  |  |  |  | 
| 212 | 7 | 100 |  |  |  | 21 | return Future->done if $self->{finished}; | 
| 213 |  |  |  |  |  |  |  | 
| 214 | 6 |  |  |  |  | 22 | my $f = Future->new; | 
| 215 | 6 |  |  |  |  | 41 | push @{ $self->{shift_waiters} }, sub { | 
| 216 | 6 | 50 | 66 | 6 |  | 26 | return $f->done if !@$items and $self->{finished}; | 
| 217 | 5 |  |  |  |  | 18 | $f->done( shift @$items ); | 
| 218 | 5 |  |  |  |  | 210 | $self->_manage_push_waiters; | 
| 219 | 6 |  |  |  |  | 37 | }; | 
| 220 | 6 |  |  |  |  | 15 | return $f; | 
| 221 |  |  |  |  |  |  | } | 
| 222 |  |  |  |  |  |  |  | 
| 223 |  |  |  |  |  |  | =head2 shift_atmost | 
| 224 |  |  |  |  |  |  |  | 
| 225 |  |  |  |  |  |  | @items = await $queue->shift_atmost( $count ); | 
| 226 |  |  |  |  |  |  |  | 
| 227 |  |  |  |  |  |  | I | 
| 228 |  |  |  |  |  |  |  | 
| 229 |  |  |  |  |  |  | A bulk version of L that can return multiple items at once. | 
| 230 |  |  |  |  |  |  |  | 
| 231 |  |  |  |  |  |  | Returns a C that will yield the next few items from the queue. If | 
| 232 |  |  |  |  |  |  | there is already at least one item in the queue then up to C<$count> items | 
| 233 |  |  |  |  |  |  | will be taken, and the returned future will be immediate. If not, then the | 
| 234 |  |  |  |  |  |  | returned future will be pending and the next C method will complete it. | 
| 235 |  |  |  |  |  |  |  | 
| 236 |  |  |  |  |  |  | =cut | 
| 237 |  |  |  |  |  |  |  | 
| 238 |  |  |  |  |  |  | sub shift_atmost | 
| 239 |  |  |  |  |  |  | { | 
| 240 | 7 |  |  | 7 | 1 | 1164 | my $self = shift; | 
| 241 | 7 |  |  |  |  | 13 | my ( $count ) = @_; | 
| 242 |  |  |  |  |  |  |  | 
| 243 | 7 |  |  |  |  | 11 | my $items = $self->{items}; | 
| 244 |  |  |  |  |  |  |  | 
| 245 | 7 | 100 |  |  |  | 19 | if( @$items ) { | 
| 246 | 2 |  |  |  |  | 5 | my @more = splice @$items, 0, $count; | 
| 247 | 2 |  |  |  |  | 7 | $self->_manage_push_waiters; | 
| 248 | 2 |  |  |  |  | 9 | return Future->done( @more ); | 
| 249 |  |  |  |  |  |  | } | 
| 250 |  |  |  |  |  |  |  | 
| 251 | 5 | 50 |  |  |  | 11 | return Future->done if $self->{finished}; | 
| 252 |  |  |  |  |  |  |  | 
| 253 | 5 |  |  |  |  | 15 | my $f = Future->new; | 
| 254 | 5 |  |  |  |  | 22 | push @{ $self->{shift_waiters} }, sub { | 
| 255 | 4 | 0 | 33 | 4 |  | 10 | return $f->done if !@$items and $self->{finished}; | 
| 256 | 4 |  |  |  |  | 12 | $f->done( splice @$items, 0, $count ); | 
| 257 | 4 |  |  |  |  | 128 | $self->_manage_push_waiters; | 
| 258 | 5 |  |  |  |  | 30 | }; | 
| 259 | 5 |  |  |  |  | 18 | return $f; | 
| 260 |  |  |  |  |  |  | } | 
| 261 |  |  |  |  |  |  |  | 
| 262 |  |  |  |  |  |  | =head2 finish | 
| 263 |  |  |  |  |  |  |  | 
| 264 |  |  |  |  |  |  | $queue->finish; | 
| 265 |  |  |  |  |  |  |  | 
| 266 |  |  |  |  |  |  | I | 
| 267 |  |  |  |  |  |  |  | 
| 268 |  |  |  |  |  |  | Marks that the queue is now finished. Once the current list of items has been | 
| 269 |  |  |  |  |  |  | exhausted, any further attempts to C more will yield empty. | 
| 270 |  |  |  |  |  |  |  | 
| 271 |  |  |  |  |  |  | =cut | 
| 272 |  |  |  |  |  |  |  | 
| 273 |  |  |  |  |  |  | sub finish | 
| 274 |  |  |  |  |  |  | { | 
| 275 | 2 |  |  | 2 | 1 | 61 | my $self = shift; | 
| 276 | 2 |  |  |  |  | 4 | $self->{finished}++; | 
| 277 |  |  |  |  |  |  |  | 
| 278 | 2 |  |  |  |  | 4 | ( shift @{ $self->{shift_waiters} } )->() while @{ $self->{shift_waiters} }; | 
|  | 3 |  |  |  |  | 48 |  | 
|  | 1 |  |  |  |  | 3 |  | 
| 279 |  |  |  |  |  |  | } | 
| 280 |  |  |  |  |  |  |  | 
| 281 |  |  |  |  |  |  | =head1 AUTHOR | 
| 282 |  |  |  |  |  |  |  | 
| 283 |  |  |  |  |  |  | Paul Evans | 
| 284 |  |  |  |  |  |  |  | 
| 285 |  |  |  |  |  |  | =cut | 
| 286 |  |  |  |  |  |  |  | 
| 287 |  |  |  |  |  |  | 0x55AA; |