File Coverage

blib/lib/Future/Buffer.pm
Criterion Covered Total %
statement 110 112 98.2
branch 28 38 73.6
condition 22 55 40.0
subroutine 25 25 100.0
pod 9 9 100.0
total 194 239 81.1


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2020-2022 -- leonerd@leonerd.org.uk
5              
6             package Future::Buffer;
7              
8 7     7   364490 use 5.010; # //
  7         86  
9 7     7   32 use strict;
  7         12  
  7         128  
10 7     7   27 use warnings;
  7         10  
  7         246  
11              
12             our $VERSION = '0.03';
13              
14 7     7   4434 use Future;
  7         88581  
  7         247  
15              
16 7     7   50 use Scalar::Util qw( weaken );
  7         13  
  7         5884  
17              
18             =head1 NAME
19              
20             C - a string buffer that uses Futures
21              
22             =head1 SYNOPSIS
23              
24             use Future::Buffer;
25              
26             use Future::AsyncAwait;
27             use Future::IO;
28              
29             my $buffer = Future::Buffer->new(
30             fill => sub { Future::IO->sysread( $socket, 8192 ) }
31             );
32              
33             async sub print_lines
34             {
35             while(1) {
36             my $line = await $buffer->read_until( "\n" );
37             chomp $line;
38              
39             say "Got a line: $line";
40             }
41             }
42              
43             print_lines()->get;
44              
45             =head1 DESCRIPTION
46              
47             Objects in this class provide a string buffer, on which operations return
48             L instances which will complete when data is available. Data can be
49             inserted into the buffer either in a push-based manner by calling the C
50             method, or in a pull-based manner by providing it with a C callback by
51             which it can request data itself. This flexibility allows the buffer to act as
52             an adapter between push- and pull-based providers and consumers.
53              
54             Each C-like method returns a L which will complete once there
55             are enough bytes in the buffer to satisfy the required condition. The buffer
56             behaves somewhat like a pipe, where bytes provided at the writing end (either
57             by the C method or the C callback) are eventually consumed at the
58             reading end by one of the C futures.
59              
60             Multiple C futures can remain pending at once, and will be completed in
61             the order they were created when more data is eventually available. Thus, any
62             call to the C method to provide more data can potentially result in
63             multiple futures becoming ready.
64              
65             =cut
66              
67             =head1 CONSTRUCTOR
68              
69             =cut
70              
71             =head2 new
72              
73             $buffer = Future::Buffer->new( %args )
74              
75             Returns a new L instance.
76              
77             Takes the following named arguments:
78              
79             =over 4
80              
81             =item fill => CODE
82              
83             $f = $fill->()
84              
85             $data = $f->get
86              
87             Optional callback which the buffer will invoke when it needs more data.
88              
89             Any read futures which are waiting on the fill future are constructed by using
90             the fill future as a prototype, ensuring they have the correct type.
91              
92             =back
93              
94             =cut
95              
96             sub new
97             {
98 7     7 1 962 my $class = shift;
99 7         20 my %args = @_;
100              
101             return bless {
102             pending => [],
103             data => "",
104             fill => $args{fill},
105 7         46 }, $class;
106             }
107              
108             =head1 METHODS
109              
110             =cut
111              
112             sub _fill
113             {
114 12     12   14 my $self = shift;
115 12   66     28 return $self->{fill_f} //= do {
116 10         26 weaken( my $weakself = $self );
117 10         34 my $fill = $self->{fill};
118              
119             # Arm the fill loop
120             $fill->() # TODO: give it a size hint?
121             ->on_done( sub {
122 8     8   1345 my ( $data ) = @_;
123 8 50       18 $weakself or return;
124              
125 8         13 $weakself->{data} .= $data;
126 8         11 undef $self->{fill_f};
127              
128 8         19 $weakself->_invoke_pending;
129              
130 8 100       190 $weakself->_fill if @{ $self->{pending} };
  8         22  
131 10         24 });
132             };
133             }
134              
135             sub _new_read_future
136             {
137 26     26   43 my $self = shift;
138 26         40 my ( $code ) = @_;
139              
140 26         40 my $pending = $self->{pending};
141              
142             # First see if the buffer is already sufficient;
143 26 100 100     94 if( !@$pending and
144             ( my @ret = $code->( \$self->{data} ) ) ) {
145 10         32 return Future->done( @ret );
146             }
147              
148 16         26 my $f;
149 16 100 66     56 if( $self->{fill} and my $fill_f = $self->_fill ) {
150 10         310 $f = $fill_f->new;
151             }
152             else {
153 6         18 $f = Future->new;
154             }
155              
156 16         101 push @$pending, [ $code, $f ];
157              
158 16 50       36 $self->_invoke_pending if length $self->{data};
159              
160             $f->on_cancel( sub {
161 3   100 3   57 shift @$pending while @$pending and $pending->[0]->[1]->is_cancelled;
162 3 100 66     45 return if @$pending or !$self->{fill_f};
163              
164 1         5 $self->{fill_f}->cancel;
165 1         32 undef $self->{fill_f};
166 16         81 } );
167              
168 16         276 return $f;
169             }
170              
171             sub _invoke_pending
172             {
173 15     15   20 my $self = shift;
174              
175 15         22 my $pending = $self->{pending};
176              
177 15   66     72 while( @$pending and length $self->{data} ) {
178 16         55 my $p = $pending->[0];
179 16 50 0     43 shift @$pending and next if $p->[1]->is_cancelled;
180              
181 16 100       83 defined( my $ret = $p->[0]->( \$self->{data} ) )
182             or last;
183              
184 12         20 shift @$pending;
185 12         40 $p->[1]->done( $ret );
186             }
187             }
188              
189             =head2 length
190              
191             $len = $buffer->length
192              
193             Returns the length of the currently-stored data; that is, data that has been
194             provided by C calls or the C callback but not yet consumed by a
195             C future.
196              
197             =cut
198              
199 8     8 1 46 sub length :method { length $_[0]->{data} }
200              
201             =head2 is_empty
202              
203             $empty = $buffer->is_empty
204              
205             Returns true if the stored length is zero.
206              
207             =cut
208              
209 6     6 1 3775 sub is_empty { shift->length == 0 }
210              
211             =head2 write
212              
213             $f = $buffer->write( $data )
214              
215             Appends to the stored data, invoking any pending C futures that are
216             outstanding and can now complete.
217              
218             Currently this method returns an already-completed C. Some later
219             version may implement a buffer maximum size, and choose not to complete this
220             future until there is enough space to accept the new data. For now it is safe
221             for the caller to ignore the return value, but it may become not so.
222              
223             =cut
224              
225             sub write
226             {
227 14     14 1 3143 my $self = shift;
228 14         41 $self->{data} .= $_[0];
229              
230 14 100       21 $self->_invoke_pending if @{ $self->{pending} };
  14         49  
231              
232 14         226 return Future->done;
233             }
234              
235             =head2 read_atmost
236              
237             $f = $buffer->read_atmost( $len )
238              
239             $data = $f->get
240              
241             Returns a future which will complete when there is some data available in the
242             buffer and will yield I the given length. Note that, analogous to
243             calling the C IO method on a filehandle, this can still complete and
244             yield a shorter length if less is currently available.
245              
246             =cut
247              
248             sub read_atmost
249             {
250 20     20 1 5379 my $self = shift;
251 20         37 my ( $maxlen ) = @_;
252              
253             return $self->_new_read_future(
254             sub {
255 26     26   38 my ( $dref ) = @_;
256 26 100       101 return unless length $$dref;
257              
258 16         78 return substr( $$dref, 0, $maxlen, "" );
259             }
260 20         86 );
261             }
262              
263             =head2 read_exactly
264              
265             $f = $buffer->read_exactly( $len )
266              
267             $data = $f->get
268              
269             Returns a future which will complete when there is enough data available in
270             the buffer to yield exactly the length given.
271              
272             =cut
273              
274             sub read_exactly
275             {
276 3     3 1 1228 my $self = shift;
277 3         7 my ( $len ) = @_;
278              
279             return $self->_new_read_future(
280             sub {
281 8     8   12 my ( $dref ) = @_;
282 8 100       29 return unless length $$dref >= $len;
283              
284 3         26 return substr( $$dref, 0, $len, "" );
285             }
286 3         19 );
287             }
288              
289             =head2 read_until
290              
291             $f = $buffer->read_until( $pattern )
292              
293             $data = $f->get
294              
295             Returns a future which will complete when the buffer contains a match for the
296             given pattern (which may either be a plain string or a compiled C).
297             The future will yield the contents of the buffer up to and including this
298             match.
299              
300             For example, a C-like operation can be performed by
301              
302             $f = $buffer->read_until( "\x0d\x0a" );
303              
304             =cut
305              
306             sub read_until
307             {
308 2     2 1 268 my $self = shift;
309 2         4 my ( $pattern ) = @_;
310              
311 2 50       29 $pattern = qr/\Q$pattern/ unless ref $pattern eq "Regexp";
312              
313             return $self->_new_read_future(
314             sub {
315 4     4   6 my ( $dref ) = @_;
316 4 100       36 return unless $$dref =~ m/$pattern/;
317              
318 2         15 return substr( $$dref, 0, $+[0], "" );
319             }
320 2         12 );
321             }
322              
323             =head2 read_unpacked
324              
325             $f = $buffer->read_unpacked( $pack_format )
326              
327             @fields = $f->get
328              
329             I
330              
331             Returns a future which will complete when the buffer contains enough data to
332             unpack all of the requested fields using the given C format. The
333             future will yield a list of all the fields extracted by the format.
334              
335             Note that because the implementation is shamelessly stolen from
336             L the same limitations on what pack formats are
337             recognized will apply.
338              
339             =cut
340              
341             # Gratuitously stolen from IO::Handle::Packable
342              
343             use constant {
344 7         3938 BYTES_FMT_i => length( pack "i", 0 ),
345             BYTES_FMT_f => length( pack "f", 0 ),
346             BYTES_FMT_d => length( pack "d", 0 ),
347 7     7   47 };
  7         13  
348              
349             sub _length_of_packformat
350             {
351 1     1   2 my ( $format ) = @_;
352 1         1 local $_ = $format;
353              
354 1         1 my $bytes = 0;
355 1         3 while( length ) {
356 3         6 s/^\s+//;
357 3 50       5 length or last;
358              
359 3         12 my $this;
360              
361             # Basic template
362 3 0 100     18 s/^[aAcC]// and $this = 1 or
      50        
      66        
      0        
      33        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
      0        
363             s/^[sSnv]// and $this = 2 or
364             s/^[iI]// and $this = BYTES_FMT_i or
365             s/^[lLNV]// and $this = 4 or
366             s/^[qQ]// and $this = 8 or
367             s/^f// and $this = BYTES_FMT_f or
368             s/^d// and $this = BYTES_FMT_d or
369 0         0 die "TODO: unrecognised template char ${\substr $_, 0, 1}\n";
370              
371             # Ignore endian specifiers
372 3         6 s/^[<>]//;
373              
374             # Repeat count
375 3 50       8 s/^(\d+)// and $this *= $1;
376              
377 3         6 $bytes += $this;
378             }
379              
380 1         2 return $bytes;
381             }
382              
383             sub read_unpacked
384             {
385 1     1 1 25 my $self = shift;
386 1         2 my ( $format ) = @_;
387              
388 1         2 my $len = _length_of_packformat $format;
389             return $self->_new_read_future(
390             sub {
391 1     1   2 my ( $dref ) = @_;
392 1 50       2 return unless length $$dref >= $len;
393              
394 1         10 return unpack $format, substr( $$dref, 0, $len, "" );
395             }
396 1         6 );
397             }
398              
399             =head2 unread
400              
401             $buffer->unread( $data )
402              
403             I
404              
405             Prepends more data back into the buffer,
406              
407             It is uncommon to need this method, but it may be useful in certain situations
408             such as when it is hard to determine upfront how much data needs to be read
409             for a single operation, and it turns out too much was read. The trailing
410             content past what is needed can be put back for a later operation.
411              
412             Note that use of this method causes an inherent race condition between
413             outstanding read futures and existing data in the buffer. If there are no
414             pending futures then this is safe. If there is no existing data already in the
415             buffer this is also safe. If neither of these is true then a warning is
416             printed indicating that the logic of the caller is not well-defined.
417              
418             =cut
419              
420             sub unread
421             {
422 2     2 1 461 my $self = shift;
423 2         4 my ( $data ) = @_;
424              
425 2 50 66     3 if( @{ $self->{pending} } and length $self->{data} ) {
  2         10  
426 0         0 warn "Racy use of ->unread with both pending read futures and existing data";
427             }
428              
429 2         5 $self->{data} = $data . $self->{data};
430 2 100       3 $self->_invoke_pending if @{ $self->{pending} };
  2         7  
431              
432 2         43 return Future->done;
433             }
434              
435             =head1 TODO
436              
437             =over 4
438              
439             =item *
440              
441             An "on-read" event, taking maybe inspiration from L. This
442             would allow both pull- and push-based consumers.
443              
444             =item *
445              
446             Size limitation. Allow an upper bound of stored data, make C calls
447             return pending futures until buffer can accept it. Needs consideration of
448             unbounded C though.
449              
450             =item *
451              
452             Consider extensions of the L method to handle more situations.
453             This may require building a shared CPAN module for doing streaming-unpack
454             along with C and other situations.
455              
456             =item *
457              
458             Consider what happens at EOF. Add a C method for producers to call.
459             Understand what C would do there. Have all the pending C futures
460             yield an empty list maybe?
461              
462             =back
463              
464             =head1 AUTHOR
465              
466             Paul Evans
467              
468             Inspired by L by Tom Molesworth
469              
470             =cut
471              
472             0x55AA;