File Coverage

blib/lib/Kamaitachi/IOStream.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package Kamaitachi::IOStream;
2 1     1   524 use Moose;
  0            
  0            
3              
4             use constant ENDIAN => unpack('S', pack('C2', 0, 1)) == 1 ? 'BIG' : 'LITTLE';
5              
6             extends 'Data::AMF::IO';
7              
8             use Carp;
9             use Kamaitachi::Packet;
10              
11             has socket => (
12             is => 'rw',
13             isa => 'Object',
14             weak_ref => 1,
15             required => 1,
16             );
17              
18             has buffer => (
19             is => 'rw',
20             isa => 'Str',
21             default => sub { q[] },
22             trigger => sub {
23             my $self = shift;
24             $self->buffer_length( bytes::length($self->buffer) );
25             },
26             );
27              
28             has buffer_length => (
29             is => 'rw',
30             isa => 'Int',
31             default => sub { 0 },
32             );
33              
34             has cursor => (
35             is => 'rw',
36             isa => 'Int',
37             default => sub { 0 },
38             );
39              
40             has chunk_size => (
41             is => 'rw',
42             isa => 'Int',
43             default => sub { 128 },
44             );
45              
46             has packets => (
47             is => 'rw',
48             isa => 'ArrayRef',
49             lazy => 1,
50             default => sub { [] },
51             );
52              
53             no Moose;
54              
55             =head1 NAME
56              
57             Kamaitachi::IOStream - RTMP stream reader/writer
58              
59             =head1 DESCRIPTION
60              
61             See L<Kamaitachi>.
62              
63             =head1 METHODS
64              
65             =head2 new
66              
67             =head2 push
68              
69             =cut
70              
71             sub push {
72             my ($self, $data) = @_;
73              
74             $self->buffer( $self->buffer . $data );
75             }
76              
77             =head2 reset
78              
79             =cut
80              
81             sub reset {
82             my $self = shift;
83              
84             $self->cursor(0);
85             return;
86             }
87              
88             =head2 spin
89              
90             =cut
91              
92             sub spin {
93             my $self = shift;
94              
95             my $read = substr $self->buffer, 0, $self->cursor;
96             $self->{buffer} = substr $self->buffer, $self->cursor;
97             $self->buffer_length( bytes::length($self->buffer) );
98             $self->cursor(0);
99              
100             $read;
101             }
102              
103             =head2 clear
104              
105             =cut
106              
107             sub clear {
108             my $self = shift;
109             $self->buffer(q[]);
110             $self->buffer_length(0);
111             $self->cursor(0);
112             }
113              
114             =head2 read
115              
116             =cut
117              
118             sub read {
119             my ($self, $len) = @_;
120              
121             return if $self->buffer_length < ($self->cursor + $len);
122              
123             my $data = substr $self->buffer, $self->cursor, $len;
124             $self->{cursor} += $len;
125              
126             \$data;
127             }
128              
129             =head2 read_u8
130              
131             =cut
132              
133             sub read_u8 {
134             my $self = shift;
135              
136             my $bref = $self->read(1) or return;
137             \unpack('C', $$bref);
138             }
139              
140             =head2 read_u16
141              
142             =cut
143              
144             sub read_u16 {
145             my $self = shift;
146              
147             my $data = $self->read(2) or return;
148             \unpack('n', $$data);
149             }
150              
151             =head2 read_s16
152              
153             =cut
154              
155             sub read_s16 {
156             my $self = shift;
157              
158             my $data = $self->read(2) or return;
159              
160             return \unpack('s>', $$data) if $] >= 5.009002;
161             return \unpack('s', $$data) if ENDIAN eq 'BIG';
162             return \unpack('s', swap($$data));
163             }
164              
165             =head2 read_u24
166              
167             =cut
168              
169             sub read_u24 {
170             my $self = shift;
171              
172             my $data = $self->read(3) or return;
173             \unpack('N', "\0".$$data);
174             }
175              
176             =head2 read_u32
177              
178             =cut
179              
180             sub read_u32 {
181             my $self = shift;
182              
183             my $data = $self->read(4) or return;
184             \unpack('N', $$data);
185             }
186              
187             =head2 read_double
188              
189             =cut
190              
191             sub read_double {
192             my $self = shift;
193              
194             my $data = $self->read(8) or return;
195              
196             return \unpack('d>', $$data) if $] >= 5.009002;
197             return \unpack('d', $$data) if ENDIAN eq 'BIG';
198             return \unpack('d', swap($$data));
199             }
200              
201             =head2 read_utf8
202              
203             =cut
204              
205             sub read_utf8 {
206             my $self = shift;
207              
208             my $len = $self->read_u16 or return;
209             my $bref = $self->read($len) or return;
210             }
211              
212             =head2 read_utf8_long
213              
214             =cut
215              
216             sub read_utf8_long {
217             my $self = shift;
218              
219             my $len = $self->read_u32 or return;
220             my $bref = $self->read($len) or return;
221             }
222              
223             =head2 write
224              
225             =cut
226              
227             sub write {
228             my ($self, $data) = @_;
229              
230             if (ref $data) {
231             confess qq{Can't write this object: "@{[ ref $data ]}"} unless $data->can('serialize');
232             $data = $data->serialize($self->chunk_size);
233             }
234              
235             $self->socket->write($data) if $self->socket;
236             }
237              
238             =head2 close
239              
240             =cut
241              
242             sub close {
243             my $self = shift;
244             $self->socket->close if $self->socket;
245             }
246              
247             =head2 get_packet
248              
249             =cut
250              
251             sub get_packet {
252             my ($self) = @_;
253             my $bref;
254              
255             my $chunk_size = $self->chunk_size;
256             my $packet_list = $self->packets;
257              
258             $bref = $self->read_u8 or return $self->reset;
259             my $first = $$bref;
260              
261             my $header_size = $first >> 6;
262             my $amf_number = $first & 0x3f;
263              
264             if ($amf_number == 0) {
265             $bref = $self->read_u8 or return $self->reset;
266             $amf_number = $$bref;
267             }
268             elsif ($amf_number == 1) {
269             $bref = $self->read_u16 or return $self->reset;
270             $amf_number = $$bref;
271             }
272              
273             my $packet = $packet_list->[ $amf_number ] || Kamaitachi::Packet->new( socket => $self->socket, number => $amf_number );
274              
275             if ($header_size <= 2) {
276             if ($header_size == 2 and !$packet->size) { # XXX
277             warn 'skip packet';
278             $self->clear;
279             return;
280             }
281              
282             $bref = $self->read_u24 or return $self->reset;
283             $packet->timer( $$bref );
284             $packet->partial(1);
285             }
286             if ($header_size <= 1) {
287             $bref = $self->read_u24 or return $self->reset;
288             if ($$bref >= 100000) { # XXX: might be invalid packet...
289             warn 'skip packet, invalid size:' . $$bref;
290             $self->clear;
291             return;
292             }
293             $packet->size( $$bref );
294              
295             $bref = $self->read_u8 or return $self->reset;
296             $packet->type( $$bref );
297              
298             $packet->data(q[]);
299             $packet->raw(q[]);
300             $packet->partial(0);
301             }
302             if ($header_size <= 0) {
303             $bref = $self->read_u32 or return $self->reset;
304             $packet->obj( $$bref );
305             }
306              
307             my $data = q[];
308             my $size = $packet->size;
309              
310             if ($packet->data and bytes::length($packet->data) < $size) {
311             $data = $packet->data;
312             $size -= bytes::length($packet->data);
313             }
314              
315             if ($size > 0) {
316             my $want = $size <= $chunk_size ? $size : $chunk_size;
317              
318             $bref = $self->read($want) or return $self->reset;
319             $packet->partial_data( $$bref );
320             $data .= $packet->partial_data;
321             }
322              
323             $packet->data($data);
324             $packet->{raw} = $self->spin;
325              
326             $packet_list->[ $amf_number ] = $packet;
327              
328             $packet;
329             }
330              
331             =head1 AUTHOR
332              
333             Daisuke Murase <typester@cpan.org>
334              
335             Hideo Kimura <hide@cpan.org>
336              
337             =head1 COPYRIGHT
338              
339             This program is free software; you can redistribute
340             it and/or modify it under the same terms as Perl itself.
341              
342             The full text of the license can be found in the
343             LICENSE file included with this module.
344              
345             =cut
346              
347             __PACKAGE__->meta->make_immutable;