File Coverage

blib/lib/MojoX/NetstringStream.pm
Criterion Covered Total %
statement 55 63 87.3
branch 12 20 60.0
condition 3 5 60.0
subroutine 7 10 70.0
pod 3 3 100.0
total 80 101 79.2


line stmt bran cond sub pod time code
1             package MojoX::NetstringStream;
2              
3 4     4   768122 use Mojo::Base 'Mojo::EventEmitter';
  4         32  
  4         22  
4              
5 4     4   1516 use Carp;
  4         8  
  4         2161  
6              
7             our $VERSION = '0.05';
8              
9             has [qw(buf debug stream want)];
10              
11             sub new {
12 4     4 1 1309 my ($class, %args) = @_;
13 4         9 my $stream = $args{stream};
14 4 50       14 croak 'no stream?' unless $stream;
15 4         21 my $self = $class->SUPER::new();
16 4         25 my $buf = '';
17 4         7 my $want = 0;
18 4         16 $self->{buf} = \$buf; # buffer for incomple chunks
19 4         10 $self->{want} = \$want; # if set: number of bytes expected
20 4         6 $self->{stream} = $stream;
21 4   50     23 $self->{debug} = $args{debug} // 0;
22 4         10 $self->{maxsize} = $args{maxsize};
23 4         15 $stream->timeout(0);
24 4     4   111 $stream->on(read => sub{ $self->_on_read(@_); });
  4         2513  
25 4     0   49 $stream->on(close => sub{ $self->_on_close(@_); });
  0         0  
26 4         25 return $self;
27             }
28              
29             sub _on_read {
30 4     4   45 my ($self, $stream, $bytes) = @_;
31 4         11 my $buf = $self->{buf};
32 4         9 my $want = $self->{want};
33 4         8 my $maxsize = $self->{maxsize};
34              
35 4         11 $$buf .= $bytes;
36 4 50       17 say "on_read: bytes: $bytes buf now: $$buf" if $self->{debug};
37            
38 4         10 while (1) { # fixme: does this always end?
39 6 50       106 if (!$$want) {
40 6 100       19 return unless $$buf;
41             #return if $$buf !~ /^(\d*):/;
42 4 50       35 return unless (my $i = index($$buf, ':')) > 0;
43             # fixme: we don't detect a framing error this way
44             # but just hang when that happens
45             #$$want = $1;
46 4         13 $$want = substr($$buf, 0, $i);
47 4 100 66     22 if ($maxsize and $$want > $maxsize) {
48 1         7 $self->emit(nserr => "netstring too big: $$want > $maxsize");
49 1         586 return;
50             }
51             #substr($$buf, 0, length($1)+1, ''); # 123:
52 3         12 substr($$buf, 0, $i+1, '');
53 3         7 $$want++; # include trailing ,
54 3         24 say "on_read: want: $$want buf now: $$buf";
55             }
56              
57 3 50       17 return if $$want > length($$buf);
58              
59 3         16 my $chunk = substr($$buf, 0, $$want, '');
60 3 100       15 if (chop $chunk ne ',') {
61 1         14 $self->emit(nserr => 'no trailing , in chunk');
62 1         687 return;
63             }
64 2         5 $$want = 0;
65 2         9 say "on_read: chunk: $chunk buf now: $$buf";
66              
67 2         15 $self->emit(chunk => $chunk);
68             }
69             }
70              
71             sub _on_close {
72 0     0   0 my ($self, $stream) = @_;
73 0         0 $self->emit(close => $stream);
74 0 0       0 say 'got close!' if $self->{debug};
75 0         0 delete $self->{stream};
76             }
77              
78             sub close {
79 0     0 1 0 my ($self) = @_;
80 0         0 $self->stream->close;
81 0         0 %$self = ();
82             }
83              
84             sub write {
85 4     4   29 use bytes;
  4         9  
  4         30  
86 2     2 1 788 my ($self, $chunk) = @_;
87 2         4 my $len = length($chunk);
88 2         12 my $out = sprintf('%u:%s,', $len, $chunk);
89 2 50       8 say "write: $out" if $self->{debug};
90 2         9 $self->{stream}->write($out);
91             }
92              
93             #sub DESTROY {
94             # my $self = shift;
95             # say 'destroying ', $self;
96             #}
97              
98             1;
99              
100              
101             =encoding utf8
102              
103             =head1 NAME
104              
105             MojoX::NetstringStream - Turn a (tcp) stream into a NetstringStream
106              
107             =head1 SYNOPSIS
108              
109             use MojoX::NetstringStream;
110              
111             my $clientid = Mojo::IOLoop->client({
112             port => $port,
113             } => sub {
114             my ($loop, $err, $stream) = @_;
115             my $ns = MojoX::NetstringStream->new(stream => $stream);
116             $ns->on(chunk => sub {
117             my ($ns, $chunk) = @_;
118             say 'got chunk: ', $chunk;
119             ...
120             });
121             $ns->on(close => sub {
122             say 'got close';
123             ...
124             });
125             });
126              
127             =head1 DESCRIPTION
128              
129             L is a wrapper around L that
130             adds framing using the netstring encoding.
131              
132             =head1 ATTRIBUTES
133              
134             =head2 stream
135              
136             The underlying Mojo::IOLoop stream to use for reading and writing
137              
138             =head2 debug
139              
140             Enables debugging
141              
142             =head2 maxsize
143              
144             Maximum size of the accepted netstring frames, if set. A nserr event is
145             raised when a oversized frame is received.
146              
147             Default: none
148              
149             =head1 EVENTS
150              
151             L inherits all events from L and can
152             emit the following new ones.
153              
154             =head2 chunk
155              
156             $ns->on(chunk => sub {
157             my ($ns, $chunk) = @_;
158             ...
159             });
160              
161             Emitted for every (full) netstring received on the underlying stream.
162              
163             =head2 close
164              
165             $ns->on(close => sub {
166             my $ns = shift;
167             ...
168             });
169              
170             Emitted if the underlying stream gets closed.
171              
172             =head2 nserr
173              
174             $ns->on(nserr => sub {
175             my ($ns, $err) = @_;
176             ...
177             });
178              
179             Emitted if there was some kind of framing error, currenty either a missing
180             ',' at the end or a oversized frame.
181              
182             =head1 ATTRIBUTES
183              
184             L implements the following attributes.
185              
186             =head2 stream
187              
188             my $stream = $ns->stream;
189              
190             The underlying L-like stream
191              
192             =head2 debug
193              
194             $ls->debug = 1;
195              
196             Enables or disables debugging output.
197              
198             =head1 METHODS
199              
200             L inherits all methods from
201             L and implements the following new ones.
202              
203             =head2 new
204              
205             my $ns = MojoX::NetstringStream->new(
206             stream => $stream,
207             debug => $debug,
208             );
209              
210             Construct a new L object. The stream argument must
211             behave like a L object. The debug argument is
212             optional and just sets the debug attribute.
213              
214             =head2 write
215              
216             $ns->write($chunk);
217              
218             Writes chunk to the underlying stream as a netstring.
219              
220             =head1 SEE ALSO
221              
222             =over
223              
224             =item *
225              
226             L, L, L: the L Web framework
227              
228             =item *
229              
230             L: netstrings specification.
231              
232             =back
233              
234             =head1 ACKNOWLEDGEMENT
235              
236             This software has been developed with support from L.
237             In German: Diese Software wurde mit Unterstützung von L entwickelt.
238              
239             =head1 AUTHORS
240              
241             =over 4
242              
243             =item *
244              
245             Wieger Opmeer
246              
247             =back
248              
249             =head1 COPYRIGHT AND LICENSE
250              
251             This software is copyright (c) 2017 by Wieger Opmeer.
252              
253             This is free software; you can redistribute it and/or modify it under
254             the same terms as the Perl 5 programming language system itself.
255              
256             =cut
257