File Coverage

blib/lib/Ryu/Buffer.pm
Criterion Covered Total %
statement 110 123 89.4
branch 26 38 68.4
condition 4 7 57.1
subroutine 21 23 91.3
pod 12 13 92.3
total 173 204 84.8


line stmt bran cond sub pod time code
1             package Ryu::Buffer;
2              
3 37     37   70950 use strict;
  37         77  
  37         1187  
4 37     37   202 use warnings;
  37         76  
  37         1939  
5              
6             our $VERSION = '3.000'; # VERSION
7             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
8              
9 37     37   729 use parent qw(Ryu::Node);
  37         380  
  37         210  
10              
11             =encoding utf8
12              
13             =head1 NAME
14              
15             Ryu::Buffer - accumulate data
16              
17             =head1 DESCRIPTION
18              
19             Provides a simple way to push bytes or characters into a buffer,
20             and get them back out again.
21              
22             Typically of use for delimiter-based or fixed-size protocols.
23              
24             See also L, if you're dealing exclusively with L
25             instances and don't need the L functionality then that's
26             likely to be a better option.
27              
28             =cut
29              
30 37     37   2919 use curry;
  37         384  
  37         845  
31 37     37   195 use List::Util qw(min max);
  37         68  
  37         61384  
32              
33             =head1 METHODS
34              
35             =cut
36              
37             =head2 new
38              
39             Instantiates a new, empty L.
40              
41             =cut
42              
43             sub new {
44 5     5 1 9741 my ($class, %args) = @_;
45 5   50     44 $args{data} //= '';
46 5   50     35 $args{ops} //= [];
47 5         28 my $self = $class->next::method(%args);
48 5         17 return $self;
49             }
50              
51             =head1 METHODS - Reading data
52              
53             These methods provide ways of accessing the buffer either
54             destructively (C) or non-destructively (C).
55              
56             =cut
57              
58             =head2 read_exactly
59              
60             Reads exactly the given number of bytes or characters.
61              
62             Takes the following parameters:
63              
64             =over 4
65              
66             =item * C<$size> - number of characters or bytes to return
67              
68             =back
69              
70             Returns a L which will resolve to a scalar containing the requested data.
71              
72             =cut
73              
74             sub read_exactly {
75 5     5 1 2031 my ($self, $size) = @_;
76 5         12 my $f = $self->new_future;
77 5         55 push @{$self->{ops}}, $self->$curry::weak(sub {
78 7     7   34 my ($self) = @_;
79 7 50       23 return $f if $f->is_ready;
80 7 100       50 return $f unless $size <= length($self->{data});
81 5         18 my $data = substr($self->{data}, 0, $size, '');
82 5         17 $f->done($data);
83 5         200 $self->on_change;
84 5         9 return $f;
85 5         138 });
86 5         82 $self->process_pending;
87 5         32 $f;
88             }
89              
90             =head2 read_atmost
91              
92             Reads up to the given number of bytes or characters - if
93             we have at least one byte or character in the buffer, we'll
94             return that even if it's shorter than the requested C<$size>.
95             This method is guaranteed not to return B than the
96             C<$size>.
97              
98             Takes the following parameters:
99              
100             =over 4
101              
102             =item * C<$size> - maximum number of characters or bytes to return
103              
104             =back
105              
106             Returns a L which will resolve to a scalar containing the requested data.
107              
108             =cut
109              
110             sub read_atmost {
111 0     0 1 0 my ($self, $size) = @_;
112 0         0 my $f = $self->new_future;
113 0         0 push @{$self->{ops}}, $self->$curry::weak(sub {
114 0     0   0 my ($self) = @_;
115 0 0       0 return $f if $f->is_ready;
116 0 0       0 return $f unless length($self->{data});
117 0         0 my $data = substr($self->{data}, 0, min($size, length($self->{data})), '');
118 0         0 $f->done($data);
119 0         0 $self->on_change;
120 0         0 return $f;
121 0         0 });
122 0         0 $self->process_pending;
123 0         0 $f;
124             }
125              
126             =head2 read_atleast
127              
128             Reads at least the given number of bytes or characters - if
129             we have a buffer that's the given size or larger, we'll
130             return everything available, even if it's larger than the
131             requested C<$size>.
132              
133             Takes the following parameters:
134              
135             =over 4
136              
137             =item * C<$size> - minimum number of characters or bytes to return
138              
139             =back
140              
141             Returns a L which will resolve to a scalar containing the requested data.
142              
143             =cut
144              
145             sub read_atleast {
146 2     2 1 977 my ($self, $size) = @_;
147 2         6 my $f = $self->new_future;
148 2         15 push @{$self->{ops}}, $self->$curry::weak(sub {
149 2     2   13 my ($self) = @_;
150 2 50       8 return $f if $f->is_ready;
151 2 50       21 return $f unless length($self->{data}) >= $size;
152 2         13 my $data = substr($self->{data}, 0, max($size, length($self->{data})), '');
153 2         7 $f->done($data);
154 2         67 $self->on_change;
155 2         5 return $f;
156 2         30 });
157 2         31 $self->process_pending;
158 2         9 $f;
159             }
160              
161             =head2 read_until
162              
163             Reads up to the given string or regex match.
164              
165             Pass a C<< qr// >> instance if you want to use a regular expression to match,
166             or a plain string if you want exact-string matching behaviour.
167              
168             The data returned will B the match.
169              
170             Takes the following parameters:
171              
172             =over 4
173              
174             =item * C<$match> - the string or regex to match against
175              
176             =back
177              
178             Returns a L which will resolve to the requested bytes or characters.
179              
180             =cut
181              
182             sub read_until {
183 1     1 1 445 my ($self, $match) = @_;
184 1 50       22 $match = qr/\Q$match/ unless ref($match) eq 'Regexp';
185 1         6 my $f = $self->new_future;
186 1         10 push @{$self->{ops}}, $self->$curry::weak(sub {
187 5     5   23 my ($self) = @_;
188 5 50       44 return $f if $f->is_ready;
189 5 100       35 return $f unless length($self->{data});
190 4 100       28 return $f unless $self->{data} =~ /$match/g;
191 1         6 my $data = substr($self->{data}, 0, pos($self->{data}), '');
192 1         4 $f->done($data);
193 1         37 $self->on_change;
194 1         2 return $f;
195 1         17 });
196 1         17 $self->process_pending;
197 1         11 $f;
198             }
199              
200             my $pack_characters = q{aAZbBhHcCWsSlLqQiInNvVjJfdFpPUwx};
201             my %character_sizes = map {
202             $_ => length(pack("x[$_]", ""))
203             } split //, $pack_characters;
204              
205             =head2 read_packed
206              
207             Uses L template notation to define a pattern to extract.
208             Will attempt to accumulate enough bytes to fulfill the request,
209             then unpack and extract from the buffer.
210              
211             This method only supports a B subset of the
212             full L functionality - currently, this includes
213             sequences such as C or C, but does B handle
214             multi-stage templates such as C.
215              
216             These would need to parse the initial C bytes to
217             determine the full extent of the data to be processed, and
218             the logic for handling this is not yet implemented.
219              
220             Takes the following parameters:
221              
222             =over 4
223              
224             =item * C<$format> - a L-style format string
225              
226             =back
227              
228             Returns a L which will resolve to the requested items,
229             of which there can be more than one depending on the format string.
230              
231             =cut
232              
233             sub read_packed {
234 3     3 1 1702 my ($self, $format) = @_;
235 3         9 my $f = $self->new_future;
236 3         50 my @handler;
237 3         4 my $simple_format = $format;
238              
239             # Might as well avoid too much complexity
240             # in the parser
241 3         7 $simple_format =~ s{\[([0-9]+)\]}{$1}g;
242 3         14 $simple_format =~ s{\s+}{}g;
243             PARSER:
244 3         5 while(1) {
245 3         5 for($simple_format) {
246 3 50       69 if(my ($char, $count) = /\G([$pack_characters])[!><]?([0-9]*)/gc) {
247 3         12 $count *= $character_sizes{$char};
248 3         31 push @handler, {
249             regex => qr/(.{$count})/,
250             }
251             }
252 3 50       17 last PARSER unless pos($_) < length($_);
253             }
254             }
255 3         10 my $re = join '', map { $_->{regex} } @handler;
  3         11  
256 3         20 push @{$self->{ops}}, $self->$curry::weak(sub {
257 9     9   45 my ($self) = @_;
258 9 50       25 return $f if $f->is_ready;
259 9 100       57 return $f unless length($self->{data});
260              
261 6 100       73 return $f unless $self->{data} =~ m{^$re};
262 3         21 my @items = unpack $format, $self->{data};
263 3         30 $self->{data} =~ s{^$re}{};
264 3         13 $f->done(@items);
265 3         105 $self->on_change;
266 3         6 return $f;
267 3         5 });
268 3         43 $self->process_pending;
269 3         33 $f;
270             }
271              
272             =head2 write
273              
274             Add more data to the buffer.
275              
276             Call this with a single scalar, and the results will be appended
277             to the internal buffer, triggering any callbacks for read activity
278             as required.
279              
280             =cut
281              
282             sub write {
283 19     19 1 4573 my ($self, $data) = @_;
284 19         40 $self->{data} .= $data;
285 19 100       25 $self->process_pending if @{$self->{ops}};
  19         90  
286 19         65 return $self;
287             }
288              
289             =head2 size
290              
291             Returns the current buffer size.
292              
293             =cut
294              
295 17     17 1 610 sub size { length(shift->{data}) }
296              
297             =head2 is_empty
298              
299             Returns true if the buffer is currently empty (size = 0), false otherwise.
300              
301             =cut
302              
303 5     5 1 1674 sub is_empty { !length(shift->{data}) }
304              
305             =head1 METHODS - Internal
306              
307             These are documented for convenience, but generally not recommended
308             to call any of these directly.
309              
310             =head2 data
311              
312             Accessor for the internal buffer. Not recommended to use this,
313             but if you break it you get to keep all the pieces.
314              
315             =cut
316              
317 2     2 1 12 sub data { shift->{data} }
318              
319             =head2 process_pending
320              
321             Used internally to trigger callbacks once L has been called.
322              
323             =cut
324              
325             sub process_pending {
326 23     23 1 43 my ($self) = @_;
327 23         26 while(1) {
328 34 100       48 my ($op) = @{$self->{ops}} or return;
  34         95  
329 23         53 my $f = $op->();
330 23 100       49 return unless $f->is_ready;
331 11         49 shift @{$self->{ops}};
  11         87  
332             }
333             }
334              
335             sub on_change {
336 11     11 0 20 my ($self) = @_;
337 11 100       42 $self->{on_change}->($self) if $self->{on_change};
338 11         18 return;
339             }
340              
341             =head2 new_future
342              
343             Instantiates a new L, used to ensure we get something awaitable.
344              
345             Can be overridden using C<$Ryu::FUTURE_FACTORY>.
346              
347             =cut
348              
349             sub new_future {
350 11     11 1 17 my $self = shift;
351 11         528 require Ryu;
352             (
353 11   66     60 $self->{new_future} //= $Ryu::FUTURE_FACTORY
354             )->($self, @_)
355             }
356              
357             1;
358              
359             __END__