File Coverage

blib/lib/Ryu/Buffer.pm
Criterion Covered Total %
statement 110 123 89.4
branch 26 38 68.4
condition 4 7 57.1
subroutine 21 23 91.3
pod 12 13 92.3
total 173 204 84.8


line stmt bran cond sub pod time code
1             package Ryu::Buffer;
2              
3 37     37   68322 use strict;
  37         75  
  37         1200  
4 37     37   160 use warnings;
  37         55  
  37         2083  
5              
6             our $VERSION = '3.001'; # VERSION
7             our $AUTHORITY = 'cpan:TEAM'; # AUTHORITY
8              
9 37     37   653 use parent qw(Ryu::Node);
  37         386  
  37         192  
10              
11             =encoding utf8
12              
13             =head1 NAME
14              
15             Ryu::Buffer - accumulate data
16              
17             =head1 DESCRIPTION
18              
19             Provides a simple way to push bytes or characters into a buffer,
20             and get them back out again.
21              
22             Typically of use for delimiter-based or fixed-size protocols.
23              
24             See also L, if you're dealing exclusively with L
25             instances and don't need the L functionality then that's
26             likely to be a better option.
27              
28             =cut
29              
30 37     37   2794 use curry;
  37         337  
  37         779  
31 37     37   186 use List::Util qw(min max);
  37         71  
  37         54043  
32              
33             =head1 METHODS
34              
35             =cut
36              
37             =head2 new
38              
39             Instantiates a new, empty L.
40              
41             =cut
42              
43             sub new {
44 5     5 1 8791 my ($class, %args) = @_;
45 5   50     59 $args{data} //= '';
46 5   50     26 $args{ops} //= [];
47 5         36 my $self = $class->next::method(%args);
48 5         16 return $self;
49             }
50              
51             =head1 METHODS - Reading data
52              
53             These methods provide ways of accessing the buffer either
54             destructively (C) or non-destructively (C).
55              
56             =cut
57              
58             =head2 read_exactly
59              
60             Reads exactly the given number of bytes or characters.
61              
62             Takes the following parameters:
63              
64             =over 4
65              
66             =item * C<$size> - number of characters or bytes to return
67              
68             =back
69              
70             Returns a L which will resolve to a scalar containing the requested data.
71              
72             =cut
73              
74             sub read_exactly {
75 5     5 1 1784 my ($self, $size) = @_;
76 5         16 my $f = $self->new_future;
77 5         52 push @{$self->{ops}}, $self->$curry::weak(sub {
78 7     7   33 my ($self) = @_;
79 7 50       20 return $f if $f->is_ready;
80 7 100       44 return $f unless $size <= length($self->{data});
81 5         13 my $data = substr($self->{data}, 0, $size, '');
82 5         19 $f->done($data);
83 5         234 $self->on_change;
84 5         6 return $f;
85 5         96 });
86 5         85 $self->process_pending;
87 5         26 $f;
88             }
89              
90             =head2 read_atmost
91              
92             Reads up to the given number of bytes or characters - if
93             we have at least one byte or character in the buffer, we'll
94             return that even if it's shorter than the requested C<$size>.
95             This method is guaranteed not to return B than the
96             C<$size>.
97              
98             Takes the following parameters:
99              
100             =over 4
101              
102             =item * C<$size> - maximum number of characters or bytes to return
103              
104             =back
105              
106             Returns a L which will resolve to a scalar containing the requested data.
107              
108             =cut
109              
110             sub read_atmost {
111 0     0 1 0 my ($self, $size) = @_;
112 0         0 my $f = $self->new_future;
113 0         0 push @{$self->{ops}}, $self->$curry::weak(sub {
114 0     0   0 my ($self) = @_;
115 0 0       0 return $f if $f->is_ready;
116 0 0       0 return $f unless length($self->{data});
117 0         0 my $data = substr($self->{data}, 0, min($size, length($self->{data})), '');
118 0         0 $f->done($data);
119 0         0 $self->on_change;
120 0         0 return $f;
121 0         0 });
122 0         0 $self->process_pending;
123 0         0 $f;
124             }
125              
126             =head2 read_atleast
127              
128             Reads at least the given number of bytes or characters - if
129             we have a buffer that's the given size or larger, we'll
130             return everything available, even if it's larger than the
131             requested C<$size>.
132              
133             Takes the following parameters:
134              
135             =over 4
136              
137             =item * C<$size> - minimum number of characters or bytes to return
138              
139             =back
140              
141             Returns a L which will resolve to a scalar containing the requested data.
142              
143             =cut
144              
145             sub read_atleast {
146 2     2 1 827 my ($self, $size) = @_;
147 2         8 my $f = $self->new_future;
148 2         19 push @{$self->{ops}}, $self->$curry::weak(sub {
149 2     2   10 my ($self) = @_;
150 2 50       6 return $f if $f->is_ready;
151 2 50       15 return $f unless length($self->{data}) >= $size;
152 2         13 my $data = substr($self->{data}, 0, max($size, length($self->{data})), '');
153 2         7 $f->done($data);
154 2         60 $self->on_change;
155 2         3 return $f;
156 2         32 });
157 2         30 $self->process_pending;
158 2         7 $f;
159             }
160              
161             =head2 read_until
162              
163             Reads up to the given string or regex match.
164              
165             Pass a C<< qr// >> instance if you want to use a regular expression to match,
166             or a plain string if you want exact-string matching behaviour.
167              
168             The data returned will B the match.
169              
170             Takes the following parameters:
171              
172             =over 4
173              
174             =item * C<$match> - the string or regex to match against
175              
176             =back
177              
178             Returns a L which will resolve to the requested bytes or characters.
179              
180             =cut
181              
182             sub read_until {
183 1     1 1 374 my ($self, $match) = @_;
184 1 50       30 $match = qr/\Q$match/ unless ref($match) eq 'Regexp';
185 1         5 my $f = $self->new_future;
186 1         10 push @{$self->{ops}}, $self->$curry::weak(sub {
187 5     5   20 my ($self) = @_;
188 5 50       14 return $f if $f->is_ready;
189 5 100       28 return $f unless length($self->{data});
190 4 100       24 return $f unless $self->{data} =~ /$match/g;
191 1         6 my $data = substr($self->{data}, 0, pos($self->{data}), '');
192 1         6 $f->done($data);
193 1         42 $self->on_change;
194 1         2 return $f;
195 1         19 });
196 1         17 $self->process_pending;
197 1         7 $f;
198             }
199              
200             my $pack_characters = q{aAZbBhHcCWsSlLqQiInNvVjJfdFpPUwx};
201             my %character_sizes = map {
202             $_ => length(pack("x[$_]", ""))
203             } split //, $pack_characters;
204              
205             =head2 read_packed
206              
207             Uses L template notation to define a pattern to extract.
208             Will attempt to accumulate enough bytes to fulfill the request,
209             then unpack and extract from the buffer.
210              
211             This method only supports a B subset of the
212             full L functionality - currently, this includes
213             sequences such as C or C, but does B handle
214             multi-stage templates such as C.
215              
216             These would need to parse the initial C bytes to
217             determine the full extent of the data to be processed, and
218             the logic for handling this is not yet implemented.
219              
220             Takes the following parameters:
221              
222             =over 4
223              
224             =item * C<$format> - a L-style format string
225              
226             =back
227              
228             Returns a L which will resolve to the requested items,
229             of which there can be more than one depending on the format string.
230              
231             =cut
232              
233             sub read_packed {
234 3     3 1 4015 my ($self, $format) = @_;
235 3         12 my $f = $self->new_future;
236 3         55 my @handler;
237 3         6 my $simple_format = $format;
238              
239             # Might as well avoid too much complexity
240             # in the parser
241 3         10 $simple_format =~ s{\[([0-9]+)\]}{$1}g;
242 3         8 $simple_format =~ s{\s+}{}g;
243             PARSER:
244 3         5 while(1) {
245 3         7 for($simple_format) {
246 3 50       91 if(my ($char, $count) = /\G([$pack_characters])[!><]?([0-9]*)/gc) {
247 3         13 $count *= $character_sizes{$char};
248 3         45 push @handler, {
249             regex => qr/(.{$count})/,
250             }
251             }
252 3 50       20 last PARSER unless pos($_) < length($_);
253             }
254             }
255 3         9 my $re = join '', map { $_->{regex} } @handler;
  3         12  
256 3         28 push @{$self->{ops}}, $self->$curry::weak(sub {
257 9     9   47 my ($self) = @_;
258 9 50       30 return $f if $f->is_ready;
259 9 100       51 return $f unless length($self->{data});
260              
261 6 100       78 return $f unless $self->{data} =~ m{^$re};
262 3         21 my @items = unpack $format, $self->{data};
263 3         30 $self->{data} =~ s{^$re}{};
264 3         16 $f->done(@items);
265 3         120 $self->on_change;
266 3         6 return $f;
267 3         7 });
268 3         57 $self->process_pending;
269 3         30 $f;
270             }
271              
272             =head2 write
273              
274             Add more data to the buffer.
275              
276             Call this with a single scalar, and the results will be appended
277             to the internal buffer, triggering any callbacks for read activity
278             as required.
279              
280             =cut
281              
282             sub write {
283 19     19 1 3953 my ($self, $data) = @_;
284 19         38 $self->{data} .= $data;
285 19 100       24 $self->process_pending if @{$self->{ops}};
  19         62  
286 19         61 return $self;
287             }
288              
289             =head2 size
290              
291             Returns the current buffer size.
292              
293             =cut
294              
295 17     17 1 542 sub size { length(shift->{data}) }
296              
297             =head2 is_empty
298              
299             Returns true if the buffer is currently empty (size = 0), false otherwise.
300              
301             =cut
302              
303 5     5 1 1561 sub is_empty { !length(shift->{data}) }
304              
305             =head1 METHODS - Internal
306              
307             These are documented for convenience, but generally not recommended
308             to call any of these directly.
309              
310             =head2 data
311              
312             Accessor for the internal buffer. Not recommended to use this,
313             but if you break it you get to keep all the pieces.
314              
315             =cut
316              
317 2     2 1 11 sub data { shift->{data} }
318              
319             =head2 process_pending
320              
321             Used internally to trigger callbacks once L has been called.
322              
323             =cut
324              
325             sub process_pending {
326 23     23 1 38 my ($self) = @_;
327 23         32 while(1) {
328 34 100       42 my ($op) = @{$self->{ops}} or return;
  34         89  
329 23         45 my $f = $op->();
330 23 100       47 return unless $f->is_ready;
331 11         44 shift @{$self->{ops}};
  11         75  
332             }
333             }
334              
335             sub on_change {
336 11     11 0 23 my ($self) = @_;
337 11 100       33 $self->{on_change}->($self) if $self->{on_change};
338 11         16 return;
339             }
340              
341             =head2 new_future
342              
343             Instantiates a new L, used to ensure we get something awaitable.
344              
345             Can be overridden using C<$Ryu::FUTURE_FACTORY>.
346              
347             =cut
348              
349             sub new_future {
350 11     11 1 20 my $self = shift;
351 11         695 require Ryu;
352             (
353 11   66     352 $self->{new_future} //= $Ryu::FUTURE_FACTORY
354             )->($self, @_)
355             }
356              
357             1;
358              
359             __END__