File Coverage

blib/lib/MojoX/NetstringStream.pm
Criterion Covered Total %
statement 48 55 87.2
branch 9 16 56.2
condition 1 2 50.0
subroutine 8 11 72.7
pod 3 3 100.0
total 69 87 79.3


line stmt bran cond sub pod time code
1             package MojoX::NetstringStream;
2              
3 4     4   493080 use Mojo::Base 'Mojo::EventEmitter';
  4         5  
  4         20  
4              
5 4     4   1589 use Carp;
  4         6  
  4         169  
6 4     4   506 use Encode;
  4         6316  
  4         1851  
7              
8             our $VERSION = '0.04';
9              
10             has [qw(_buf debug stream _want)];
11              
12             sub new {
13 3     3 1 809 my ($class, %args) = @_;
14 3         9 my $stream = $args{stream};
15 3 50       10 croak 'no stream?' unless $stream;
16 3         22 my $self = $class->SUPER::new();
17 3         15 my $buf = '';
18 3         6 my $want = 0;
19 3         11 $self->{buf} = \$buf; # buffer for incomple chunks
20 3         6 $self->{want} = \$want; # if set: number of bytes expected
21 3         7 $self->{stream} = $stream;
22 3   50     18 $self->{debug} = $args{debug} // 0;
23 3         11 $stream->timeout(0);
24 3     3   81 $stream->on(read => sub{ $self->_on_read(@_); });
  3         2715  
25 3     0   31 $stream->on(close => sub{ $self->_on_close(@_); });
  0         0  
26 3         16 return $self;
27             }
28              
29             sub _on_read {
30 3     3   7 my ($self, $stream, $bytes) = @_;
31 3         7 my $buf = $self->{buf};
32 3         5 my $want = $self->{want};
33              
34 3         9 $$buf .= $bytes;
35 3 50       14 say "on_read: bytes: $bytes buf now: $$buf" if $self->debug;
36            
37 3         33 while (1) { # fixme: does this always end?
38 5 50       107 if (!$$want) {
39 5 100       31 return if $$buf !~ /^(\d*):/;
40             # fixme: we don't detect a framing error this way
41             # but just hang when that happens
42 3         11 $$want = $1;
43 3         16 substr($$buf, 0, length($1)+1, ''); # 123:
44 3         9 $$want++; # inlclude trailing ,
45             #say "on_read: want: $$want buf now: $$buf";
46             }
47              
48 3 50       13 return if $$want > length($$buf);
49              
50 3         12 my $chunk = substr($$buf, 0, $$want, '');
51 3 100       246 croak 'no trailing , in chunk' if chop $chunk ne ',';
52 2         4 $$want = 0;
53             #say "on_read: chunk: $chunk buf now: $$buf";
54              
55 2         23 $self->emit(chunk => $chunk);
56             }
57             }
58              
59             sub _on_close {
60 0     0   0 my ($self, $stream) = @_;
61 0         0 $self->emit(close => $stream);
62 0 0       0 say 'got close!' if $self->debug;
63 0         0 delete $self->{stream};
64             }
65              
66             sub close {
67 0     0 1 0 my ($self) = @_;
68 0         0 $self->stream->close;
69             }
70              
71             sub write {
72 4     4   20 use bytes;
  4         4  
  4         23  
73 2     2 1 783 my ($self, $chunk) = @_;
74 2         5 my $len = length($chunk);
75 2         12 my $out = sprintf('%u:%s,', $len, $chunk);
76 2 50       7 say "write: $out" if $self->debug;
77 2         17 $self->stream->write($out);
78             }
79              
80             1;
81              
82              
83             =encoding utf8
84              
85             =head1 NAME
86              
87             MojoX::NetstringStream - Turn a (tcp) stream into a NetstringStream
88              
89             =head1 SYNOPSIS
90              
91             use MojoX::NetstringStream;
92              
93             my $clientid = Mojo::IOLoop->client({
94             port => $port,
95             } => sub {
96             my ($loop, $err, $stream) = @_;
97             my $ns = MojoX::NetstringStream->new(stream => $stream);
98             $ns->on(chunk => sub {
99             my ($ns, $chunk) = @_;
100             say 'got chunk: ', $chunk;
101             ...
102             });
103             $ns->on(close => sub {
104             say 'got close';
105             ...
106             });
107             });
108              
109             =head1 DESCRIPTION
110              
111             L is a wrapper around L that
112             adds framing using the netstring encoding.
113              
114             =head1 EVENTS
115              
116             L inherits all events from L and can
117             emit the following new ones.
118              
119             =head2 chunk
120              
121             $ns->on(chunk => sub {
122             my ($ns, $chunk) = @_;
123             ...
124             });
125              
126             Emitted for every (full) netstring received on the underlying stream.
127              
128             =head2 close
129              
130             $ns->on(close => sub {
131             my $ns = shift;
132             ...
133             });
134              
135             Emitted if the underlying stream gets closed.
136              
137             =head1 ATTRIBUTES
138              
139             L implements the following attributes.
140              
141             =head2 stream
142              
143             my $stream = $ns->stream;
144              
145             The underlying L-like stream
146              
147             =head2 debug
148              
149             $ls->debug = 1;
150              
151             Enables or disables debugging output.
152              
153             =head1 METHODS
154              
155             L inherits all methods from
156             L and implements the following new ones.
157              
158             =head2 new
159              
160             my $ns = MojoX::NetstringStream->new(
161             stream => $stream,
162             debug => $debug,
163             );
164              
165             Construct a new L object. The stream argument must
166             behave like a L object. The debug argument is
167             optional and just sets the debug attribute.
168              
169             =head2 write
170              
171             $ns->write($chunk);
172              
173             Writes chunk to the underlying stream as a netstring.
174              
175             =head1 SEE ALSO
176              
177             =over
178              
179             =item *
180              
181             L, L, L: the L Web framework
182              
183             =item *
184              
185             L: netstrings specification.
186              
187             =back
188              
189             =head1 ACKNOWLEDGEMENT
190              
191             This software has been developed with support from L.
192             In German: Diese Software wurde mit Unterstützung von L entwickelt.
193              
194             =head1 AUTHORS
195              
196             =over 4
197              
198             =item *
199              
200             Wieger Opmeer
201              
202             =back
203              
204             =head1 COPYRIGHT AND LICENSE
205              
206             This software is copyright (c) 2016 by Wieger Opmeer.
207              
208             This is free software; you can redistribute it and/or modify it under
209             the same terms as the Perl 5 programming language system itself.
210              
211             =cut
212