File Coverage

blib/lib/Future/Buffer.pm
Criterion Covered Total %
statement 79 79 100.0
branch 21 24 87.5
condition 6 9 66.6
subroutine 19 19 100.0
pod 7 7 100.0
total 132 138 95.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2020 -- leonerd@leonerd.org.uk
5              
6             package Future::Buffer;
7              
8 6     6   354611 use 5.010; # //
  6         68  
9 6     6   31 use strict;
  6         12  
  6         120  
10 6     6   28 use warnings;
  6         12  
  6         242  
11              
12             our $VERSION = '0.01';
13              
14 6     6   4660 use Future;
  6         88095  
  6         218  
15              
16 6     6   49 use Scalar::Util qw( weaken );
  6         12  
  6         5527  
17              
18             =head1 NAME
19              
20             C - a string buffer that uses Futures
21              
22             =head1 SYNOPSIS
23              
24             use Future::Buffer;
25              
26             use Future::AsyncAwait;
27             use Future::IO;
28              
29             my $buffer = Future::Buffer->new(
30             fill => sub { Future::IO->sysread( $socket, 8192 ) }
31             );
32              
33             async sub print_lines
34             {
35             while(1) {
36             my $line = await $buffer->read_until( "\n" );
37             chomp $line;
38              
39             say "Got a line: $line";
40             }
41             }
42              
43             print_lines()->get;
44              
45             =head1 DESCRIPTION
46              
47             Objects in this class provide a string buffer, on which operations return
48             L instances which will complete when data is available. Data can be
49             inserted into the buffer either in a push-based manner by calling the C
50             method, or in a pull-based manner by providing it with a C callback by
51             which it can request data itself. This flexibility allows the buffer to act as
52             an adapter between push- and pull-based providers and consumers.
53              
54             Each C-like method returns a L which will complete once there
55             are enough bytes in the buffer to satisfy the required condition. The buffer
56             behaves somewhat like a pipe, where bytes provided at the writing end (either
57             by the C method or the C callback) are eventually consumed at the
58             reading end by one of the C futures.
59              
60             Multiple C futures can remain pending at once, and will be completed in
61             the order they were created when more data is eventually available. Thus, any
62             call to the C method to provide more data can potentially result in
63             multiple futures becoming ready.
64              
65             =cut
66              
67             =head1 CONSTRUCTOR
68              
69             =cut
70              
71             =head2 new
72              
73             $buffer = Future::Buffer->new( %args )
74              
75             Returns a new L instance.
76              
77             Takes the following named arguments:
78              
79             =over 4
80              
81             =item fill => CODE
82              
83             $f = $fill->()
84              
85             $data = $f->get
86              
87             Optional callback which the buffer will invoke when it needs more data.
88              
89             Any read futures which are waiting on the fill future are constructed by using
90             the fill future as a prototype, ensuring they have the correct type.
91              
92             =back
93              
94             =cut
95              
96             sub new
97             {
98 6     6 1 1022 my $class = shift;
99 6         20 my %args = @_;
100              
101             return bless {
102             pending => [],
103             data => "",
104             fill => $args{fill},
105 6         46 }, $class;
106             }
107              
108             =head1 METHODS
109              
110             =cut
111              
112             sub _fill
113             {
114 7     7   12 my $self = shift;
115 7   66     18 return $self->{fill_f} //= do {
116 6         18 weaken( my $weakself = $self );
117 6         9 my $fill = $self->{fill};
118              
119             # Arm the fill loop
120             $self->{fill_f} = $fill->() # TODO: give it a size hint?
121             ->on_done( sub {
122 5     5   1171 my ( $data ) = @_;
123 5 50       12 $weakself or return;
124              
125 5         34 $weakself->{data} .= $data;
126 5         9 undef $self->{fill_f};
127              
128 5         12 $weakself->_invoke_pending;
129              
130 5 100       7 $weakself->_fill if @{ $self->{pending} };
  5         15  
131 6         15 });
132             };
133             }
134              
135             sub _new_read_future
136             {
137 15     15   32 my $self = shift;
138              
139 15 100 66     63 if( $self->{fill} and my $fill_f = $self->_fill ) {
140 5         198 return $fill_f->new;
141             }
142              
143 10         31 return Future->new;
144             }
145              
146             sub _invoke_pending
147             {
148 17     17   30 my $self = shift;
149              
150 17         28 my $pending = $self->{pending};
151              
152 17   66     100 while( @$pending and length $self->{data} ) {
153             $pending->[0]->( \$self->{data} )
154 18 100       78 or last;
155              
156 14         593 shift @$pending;
157             }
158             }
159              
160             =head2 length
161              
162             $len = $buffer->length
163              
164             Returns the length of the currently-stored data; that is, data that has been
165             provided by C calls but not yet consumed by C.
166              
167             =cut
168              
169 7     7 1 46 sub length :method { length $_[0]->{data} }
170              
171             =head2 is_empty
172              
173             $empty = $buffer->is_empty
174              
175             Returns true if the stored length is zero.
176              
177             =cut
178              
179 5     5 1 3244 sub is_empty { shift->length == 0 }
180              
181             =head2 write
182              
183             $f = $buffer->write( $data )
184              
185             Appends to the stored data, invoking any pending C futures that are
186             outstanding and can now complete.
187              
188             Currently this method returns an already-completed C. Some later
189             version may implement a buffer maximum size, and choose not to complete this
190             future until there is enough space to accept the new data. For now it is safe
191             for the caller to ignore the return value, but it may become not so.
192              
193             =cut
194              
195             sub write
196             {
197 12     12 1 3071 my $self = shift;
198 12         43 $self->{data} .= $_[0];
199              
200 12 100       20 $self->_invoke_pending if @{ $self->{pending} };
  12         46  
201              
202 12         49 return Future->done;
203             }
204              
205             =head2 read
206              
207             $f = $buffer->read( $len )
208              
209             $data = $f->get
210              
211             Returns a future which will complete when there is some data available in the
212             buffer and will yield I the given length. Note that, analogous to
213             calling the C IO method on a filehandle, this can still complete and
214             yield a shorter length if less is currently available.
215              
216             =cut
217              
218             sub read
219             {
220 10     10 1 2439 my $self = shift;
221 10         19 my ( $maxlen ) = @_;
222              
223 10         26 my $f = $self->_new_read_future;
224              
225 10         51 push @{ $self->{pending} }, sub {
226 9     9   24 my ( $dref ) = @_;
227 9 50       23 return unless length $$dref;
228              
229 9         27 my $ret = substr( $$dref, 0, $maxlen, "" );
230 9         28 $f->done( $ret );
231 10         63 };
232              
233 10 100       42 $self->_invoke_pending if length $self->{data};
234              
235 10         29 return $f;
236             }
237              
238             =head2 read_exactly
239              
240             $f = $buffer->read_exactly( $len )
241              
242             $data = $f->get
243              
244             Returns a future which will complete when there is enough data available in
245             the buffer to yield exactly the length given.
246              
247             =cut
248              
249             sub read_exactly
250             {
251 3     3 1 1366 my $self = shift;
252 3         9 my ( $len ) = @_;
253              
254 3         10 my $f = $self->_new_read_future;
255              
256 3         16 push @{ $self->{pending} }, sub {
257 6     6   10 my ( $dref ) = @_;
258 6 100       22 return unless length $$dref >= $len;
259              
260 3         13 my $ret = substr( $$dref, 0, $len, "" );
261 3         10 $f->done( $ret );
262 3         23 };
263              
264 3 100       16 $self->_invoke_pending if length $self->{data};
265              
266 3         8 return $f;
267             }
268              
269             =head2 read_until
270              
271             $f = $buffer->read_until( $pattern )
272              
273             $data = $f->get
274              
275             Returns a future which will complete when the buffer contains a match for the
276             given pattern (which may either be a plain string or a compiled C).
277             The future will yield the contents of the buffer up to and including this
278             match.
279              
280             For example, a C-like operation can be performed by
281              
282             $f = $buffer->read_until( "\x0d\x0a" );
283              
284             =cut
285              
286             sub read_until
287             {
288 2     2 1 295 my $self = shift;
289 2         5 my ( $pattern ) = @_;
290              
291 2 50       31 $pattern = qr/\Q$pattern/ unless ref $pattern eq "Regexp";
292              
293 2         8 my $f = $self->_new_read_future;
294              
295 2         12 push @{ $self->{pending} }, sub {
296 3     3   6 my ( $dref ) = @_;
297 3 100       19 return unless $$dref =~ m/$pattern/;
298              
299 2         12 my $ret = substr( $$dref, 0, $+[0], "" );
300 2         8 $f->done( $ret );
301 2         16 };
302              
303 2 100       13 $self->_invoke_pending if length $self->{data};
304              
305 2         6 return $f;
306             }
307              
308             =head1 TODO
309              
310             =over 4
311              
312             =item *
313              
314             An "on-read" event, taking maybe inspiration from L. This
315             would allow both pull- and push-based consumers.
316              
317             =item *
318              
319             Size limitation. Allow an upper bound of stored data, make C calls
320             return pending futures until buffer can accept it. Needs consideration of
321             unbounded C though.
322              
323             =item *
324              
325             Consider some C assistance, to allow nice handling of binary
326             protocols by unpacking out of the buffer directly.
327              
328             =item *
329              
330             Consider what happens at EOF. Add a C method for producers to call.
331             Understand what C would do there. Have all the pending C futures
332             yield an empty list maybe?
333              
334             =back
335              
336             =head1 AUTHOR
337              
338             Paul Evans
339              
340             Inspired by L by Tom Molesworth
341              
342             =cut
343              
344             0x55AA;