File Coverage

blib/lib/Future/Buffer.pm
Criterion Covered Total %
statement 79 79 100.0
branch 21 24 87.5
condition 6 9 66.6
subroutine 19 19 100.0
pod 7 7 100.0
total 132 138 95.6


line stmt bran cond sub pod time code
1             # You may distribute under the terms of either the GNU General Public License
2             # or the Artistic License (the same terms as Perl itself)
3             #
4             # (C) Paul Evans, 2020 -- leonerd@leonerd.org.uk
5              
6             package Future::Buffer;
7              
8 6     6   346235 use 5.010; # //
  6         64  
9 6     6   31 use strict;
  6         12  
  6         118  
10 6     6   28 use warnings;
  6         8  
  6         236  
11              
12             our $VERSION = '0.02';
13              
14 6     6   4290 use Future;
  6         85760  
  6         246  
15              
16 6     6   76 use Scalar::Util qw( weaken );
  6         13  
  6         5349  
17              
18             =head1 NAME
19              
20             C - a string buffer that uses Futures
21              
22             =head1 SYNOPSIS
23              
24             use Future::Buffer;
25              
26             use Future::AsyncAwait;
27             use Future::IO;
28              
29             my $buffer = Future::Buffer->new(
30             fill => sub { Future::IO->sysread( $socket, 8192 ) }
31             );
32              
33             async sub print_lines
34             {
35             while(1) {
36             my $line = await $buffer->read_until( "\n" );
37             chomp $line;
38              
39             say "Got a line: $line";
40             }
41             }
42              
43             print_lines()->get;
44              
45             =head1 DESCRIPTION
46              
47             Objects in this class provide a string buffer, on which operations return
48             L instances which will complete when data is available. Data can be
49             inserted into the buffer either in a push-based manner by calling the C
50             method, or in a pull-based manner by providing it with a C callback by
51             which it can request data itself. This flexibility allows the buffer to act as
52             an adapter between push- and pull-based providers and consumers.
53              
54             Each C-like method returns a L which will complete once there
55             are enough bytes in the buffer to satisfy the required condition. The buffer
56             behaves somewhat like a pipe, where bytes provided at the writing end (either
57             by the C method or the C callback) are eventually consumed at the
58             reading end by one of the C futures.
59              
60             Multiple C futures can remain pending at once, and will be completed in
61             the order they were created when more data is eventually available. Thus, any
62             call to the C method to provide more data can potentially result in
63             multiple futures becoming ready.
64              
65             =cut
66              
67             =head1 CONSTRUCTOR
68              
69             =cut
70              
71             =head2 new
72              
73             $buffer = Future::Buffer->new( %args )
74              
75             Returns a new L instance.
76              
77             Takes the following named arguments:
78              
79             =over 4
80              
81             =item fill => CODE
82              
83             $f = $fill->()
84              
85             $data = $f->get
86              
87             Optional callback which the buffer will invoke when it needs more data.
88              
89             Any read futures which are waiting on the fill future are constructed by using
90             the fill future as a prototype, ensuring they have the correct type.
91              
92             =back
93              
94             =cut
95              
96             sub new
97             {
98 6     6 1 1063 my $class = shift;
99 6         22 my %args = @_;
100              
101             return bless {
102             pending => [],
103             data => "",
104             fill => $args{fill},
105 6         48 }, $class;
106             }
107              
108             =head1 METHODS
109              
110             =cut
111              
112             sub _fill
113             {
114 7     7   9 my $self = shift;
115 7   66     20 return $self->{fill_f} //= do {
116 6         17 weaken( my $weakself = $self );
117 6         9 my $fill = $self->{fill};
118              
119             # Arm the fill loop
120             $self->{fill_f} = $fill->() # TODO: give it a size hint?
121             ->on_done( sub {
122 5     5   1299 my ( $data ) = @_;
123 5 50       14 $weakself or return;
124              
125 5         23 $weakself->{data} .= $data;
126 5         8 undef $self->{fill_f};
127              
128 5         12 $weakself->_invoke_pending;
129              
130 5 100       6 $weakself->_fill if @{ $self->{pending} };
  5         28  
131 6         14 });
132             };
133             }
134              
135             sub _new_read_future
136             {
137 15     15   35 my $self = shift;
138              
139 15 100 66     57 if( $self->{fill} and my $fill_f = $self->_fill ) {
140 5         196 return $fill_f->new;
141             }
142              
143 10         32 return Future->new;
144             }
145              
146             sub _invoke_pending
147             {
148 17     17   26 my $self = shift;
149              
150 17         29 my $pending = $self->{pending};
151              
152 17   66     102 while( @$pending and length $self->{data} ) {
153             $pending->[0]->( \$self->{data} )
154 18 100       84 or last;
155              
156 14         608 shift @$pending;
157             }
158             }
159              
160             =head2 length
161              
162             $len = $buffer->length
163              
164             Returns the length of the currently-stored data; that is, data that has been
165             provided by C calls or the C callback but not yet consumed by a
166             C future.
167              
168             =cut
169              
170 7     7 1 46 sub length :method { length $_[0]->{data} }
171              
172             =head2 is_empty
173              
174             $empty = $buffer->is_empty
175              
176             Returns true if the stored length is zero.
177              
178             =cut
179              
180 5     5 1 3152 sub is_empty { shift->length == 0 }
181              
182             =head2 write
183              
184             $f = $buffer->write( $data )
185              
186             Appends to the stored data, invoking any pending C futures that are
187             outstanding and can now complete.
188              
189             Currently this method returns an already-completed C. Some later
190             version may implement a buffer maximum size, and choose not to complete this
191             future until there is enough space to accept the new data. For now it is safe
192             for the caller to ignore the return value, but it may become not so.
193              
194             =cut
195              
196             sub write
197             {
198 12     12 1 3224 my $self = shift;
199 12         40 $self->{data} .= $_[0];
200              
201 12 100       20 $self->_invoke_pending if @{ $self->{pending} };
  12         49  
202              
203 12         52 return Future->done;
204             }
205              
206             =head2 read_atmost
207              
208             $f = $buffer->read_atmost( $len )
209              
210             $data = $f->get
211              
212             Returns a future which will complete when there is some data available in the
213             buffer and will yield I the given length. Note that, analogous to
214             calling the C IO method on a filehandle, this can still complete and
215             yield a shorter length if less is currently available.
216              
217             =cut
218              
219             sub read_atmost
220             {
221 10     10 1 2328 my $self = shift;
222 10         23 my ( $maxlen ) = @_;
223              
224 10         24 my $f = $self->_new_read_future;
225              
226 10         54 push @{ $self->{pending} }, sub {
227 9     9   17 my ( $dref ) = @_;
228 9 50       27 return unless length $$dref;
229              
230 9         30 my $ret = substr( $$dref, 0, $maxlen, "" );
231 9         24 $f->done( $ret );
232 10         66 };
233              
234 10 100       41 $self->_invoke_pending if length $self->{data};
235              
236 10         28 return $f;
237             }
238              
239             =head2 read_exactly
240              
241             $f = $buffer->read_exactly( $len )
242              
243             $data = $f->get
244              
245             Returns a future which will complete when there is enough data available in
246             the buffer to yield exactly the length given.
247              
248             =cut
249              
250             sub read_exactly
251             {
252 3     3 1 1437 my $self = shift;
253 3         9 my ( $len ) = @_;
254              
255 3         10 my $f = $self->_new_read_future;
256              
257 3         18 push @{ $self->{pending} }, sub {
258 6     6   13 my ( $dref ) = @_;
259 6 100       23 return unless length $$dref >= $len;
260              
261 3         13 my $ret = substr( $$dref, 0, $len, "" );
262 3         10 $f->done( $ret );
263 3         22 };
264              
265 3 100       15 $self->_invoke_pending if length $self->{data};
266              
267 3         9 return $f;
268             }
269              
270             =head2 read_until
271              
272             $f = $buffer->read_until( $pattern )
273              
274             $data = $f->get
275              
276             Returns a future which will complete when the buffer contains a match for the
277             given pattern (which may either be a plain string or a compiled C).
278             The future will yield the contents of the buffer up to and including this
279             match.
280              
281             For example, a C-like operation can be performed by
282              
283             $f = $buffer->read_until( "\x0d\x0a" );
284              
285             =cut
286              
287             sub read_until
288             {
289 2     2 1 324 my $self = shift;
290 2         5 my ( $pattern ) = @_;
291              
292 2 50       30 $pattern = qr/\Q$pattern/ unless ref $pattern eq "Regexp";
293              
294 2         7 my $f = $self->_new_read_future;
295              
296 2         11 push @{ $self->{pending} }, sub {
297 3     3   6 my ( $dref ) = @_;
298 3 100       20 return unless $$dref =~ m/$pattern/;
299              
300 2         12 my $ret = substr( $$dref, 0, $+[0], "" );
301 2         9 $f->done( $ret );
302 2         15 };
303              
304 2 100       13 $self->_invoke_pending if length $self->{data};
305              
306 2         6 return $f;
307             }
308              
309             =head1 TODO
310              
311             =over 4
312              
313             =item *
314              
315             An "on-read" event, taking maybe inspiration from L. This
316             would allow both pull- and push-based consumers.
317              
318             =item *
319              
320             Size limitation. Allow an upper bound of stored data, make C calls
321             return pending futures until buffer can accept it. Needs consideration of
322             unbounded C though.
323              
324             =item *
325              
326             Consider some C assistance, to allow nice handling of binary
327             protocols by unpacking out of the buffer directly.
328              
329             =item *
330              
331             Consider what happens at EOF. Add a C method for producers to call.
332             Understand what C would do there. Have all the pending C futures
333             yield an empty list maybe?
334              
335             =back
336              
337             =head1 AUTHOR
338              
339             Paul Evans
340              
341             Inspired by L by Tom Molesworth
342              
343             =cut
344              
345             0x55AA;