File Coverage

blib/lib/Protocol/SPDY/Base.pm
Criterion Covered Total %
statement 82 145 56.5
branch 13 36 36.1
condition 5 11 45.4
subroutine 23 46 50.0
pod 38 38 100.0
total 161 276 58.3


line stmt bran cond sub pod time code
1             package Protocol::SPDY::Base;
2             $Protocol::SPDY::Base::VERSION = '1.001';
3 3     3   1389 use strict;
  3         6  
  3         107  
4 3     3   15 use warnings;
  3         4  
  3         90  
5 3     3   13 use parent qw(Mixin::Event::Dispatch);
  3         4  
  3         19  
6              
7             =head1 NAME
8              
9             Protocol::SPDY::Base - abstract support for the SPDY protocol
10              
11             =head1 VERSION
12              
13             version 1.001
14              
15             =head1 DESCRIPTION
16              
17             Provides the base class for client, server and generic (proxy/analysis)
18             SPDY handling.
19              
20             =cut
21              
22 3     3   166 use Protocol::SPDY::Constants ':all';
  3         5  
  3         585  
23              
24 3     3   17 use List::UtilsBy qw(extract_by nsort_by);
  3         5  
  3         5415  
25              
26             =head1 METHODS
27              
28             =cut
29              
30             =head2 new
31              
32             Instantiates a new SPDY-handling object. Applies any attributes
33             passed as named parameters.
34              
35             =cut
36              
37             sub new {
38 4     4 1 1783 my $class = shift;
39 4         20 bless {
40             initial_window_size => 65536,
41             pending_send => [ ],
42             @_
43             }, $class
44             }
45              
46             =head2 sender_zlib
47              
48             The compression instance used for sending data.
49              
50             =cut
51              
52 20   66 20 1 131 sub sender_zlib { shift->{sender_zlib} ||= Protocol::SPDY::Compress->new }
53              
54             =head2 receiver_zlib
55              
56             Compression instance used for receiving (decompressing) data.
57              
58             =cut
59              
60 20   66 20 1 114 sub receiver_zlib { shift->{receiver_zlib} ||= Protocol::SPDY::Compress->new }
61              
62             =head2 request_close
63              
64             Issue a close request by sending a GOAWAY message.
65              
66             =cut
67              
68             sub request_close {
69 0     0 1 0 my $self = shift;
70 0   0     0 my $reason = shift || 'OK';
71 0         0 $self->goaway($reason);
72             }
73              
74             =head2 restore_initial_settings
75              
76             Send back the list of settings we'd previously persisted.
77              
78             Typically called immediately after establishing the connection.
79              
80             =cut
81              
82             sub restore_initial_settings {
83 0     0 1 0 my $self = shift;
84 0         0 my %args = @_;
85              
86             # Each key-value pair, in ascending numeric order.
87             # We set the "persisted" flag to notify the other
88             # side that we're handing back the values we stashed
89             # from the last time around.
90             my @pending = nsort_by {
91 0     0   0 $_->[0]
92 0         0 } map [
93             SETTINGS_BY_NAME->{uc $_},
94             $args{$_},
95             FLAG_SETTINGS_PERSISTED
96             ], keys %args;
97              
98 0         0 $self->queue_frame(
99             Protocol::SPDY::Frame::Control::SETTINGS->new(
100             version => $self->version,
101             settings => \@pending,
102             )
103             );
104             }
105              
106             =head2 send_settings
107              
108             Sends a SETTINGS frame generated from the key/value pairs passed
109             to this method.
110              
111             Typically called immediately after establishing the connection.
112              
113             Example:
114              
115             $spdy->send_settings(
116             initial_window_size => 32768,
117             max_concurrent_streams => 16,
118             );
119              
120             =cut
121              
122             sub send_settings {
123 0     0 1 0 my $self = shift;
124 0         0 my %args = @_;
125              
126             # Each key-value pair, in ascending numeric order.
127             my @pending = nsort_by {
128 0     0   0 $_->[0]
129 0         0 } map [
130             SETTINGS_BY_NAME->{uc $_},
131             $args{$_},
132             0,
133             ], keys %args;
134              
135 0         0 $self->queue_frame(
136             Protocol::SPDY::Frame::Control::SETTINGS->new(
137             version => $self->version,
138             settings => \@pending,
139             )
140             );
141             }
142              
143             =head2 check_version
144              
145             Called before we do anything with a control frame.
146              
147             Returns true if it's supported, false if not.
148              
149             =cut
150              
151             sub check_version {
152 0     0 1 0 my ($self, $frame) = @_;
153 0 0       0 if($frame->version > MAX_SUPPORTED_VERSION) {
154             # Send a reset if this was a SYN_STREAM
155 0 0       0 $self->send_frame(RST_STREAM => {
156             status => UNSUPPORTED_VERSION
157             }) if $frame->type == FRAME_TYPE_BY_ID->{SYN_STREAM};
158             # then bail out (we do this for any frame type
159 0         0 return 0;
160             }
161 0         0 return 1;
162             }
163              
164             =head2 next_stream_id
165              
166             Generate the next stream ID for this connection.
167              
168             Returns the next available stream ID, or 0 if we're out of available streams
169              
170             =cut
171              
172             sub next_stream_id {
173 8     8 1 14 my $self = shift;
174             # 2.3.2 - server streams are even, client streams are odd
175 8 100       36 if(defined $self->{last_stream_id}) {
176 6         16 $self->{last_stream_id} += 2;
177             } else {
178 2         8 $self->{last_stream_id} = $self->initial_stream_id;
179             }
180 8 50       55 return $self->{last_stream_id} if $self->{last_stream_id} <= 0x7FFFFFFF;
181 0         0 return 0;
182             }
183              
184             =head2 queue_frame
185              
186             Requests sending the given C< $frame > at the earliest opportunity.
187              
188             =cut
189              
190             sub queue_frame {
191 10     10 1 18 my $self = shift;
192 10         13 my $frame = shift;
193 10         46 $self->invoke_event(send_frame => $frame);
194 10         205 $self->write($frame->as_packet($self->sender_zlib));
195             }
196              
197              
198             =head2 on_read
199              
200             This is the method that an external transport would call when it has
201             some data received from the other side of the SPDY connection. It
202             expects to be called with a scalar containing bytes which can be
203             decoded as SPDY frames; any SSL/TLS decoding should happen before
204             passing data to this method.
205              
206             Will call L for any valid frames that can be
207             extracted from the stream.
208              
209             =cut
210              
211             sub on_read {
212 10     10 1 14 my $self = shift;
213 10         24 $self->{input_buffer} .= shift;
214 10         12 my @frames;
215 10         31 while(defined(my $bytes = $self->extract_frame(\($self->{input_buffer})))) {
216 10         29 push @frames, my $f = $self->parse_frame($bytes);
217 10 50       37 die "$bytes generated undef frame" unless $f;
218 10         34 $self->invoke_event(receive_frame => $f);
219             }
220 10 50       25 return $self unless @frames;
221              
222             # Get ourselves a temp copy for reentrancy protection
223 10         22 local $self->{batch};
224 10         27 $self->dispatch_frame($_) for $self->prioritise_incoming_frames(@frames);
225             # Process any tasks we queued up
226 10 50       54 $self->batch->done if exists $self->{batch};
227 10         471 $self
228             }
229              
230             =head2 prioritise_incoming_frames
231              
232             Given a list of L instances, returns them
233             reordered so that higher-priority items such as PING are handled
234             first.
235              
236             Does not yet support stream priority.
237              
238             =cut
239              
240             sub prioritise_incoming_frames {
241 10     10 1 15 my $self = shift;
242 10         14 my @frames = @_;
243 10     10   80 my @ping = extract_by { $_->type_name eq 'PING' } @frames;
  10         95  
244 10         107 return @ping, @frames;
245             }
246              
247             =head2 dispatch_frame
248              
249             Dispatches the given frame to appropriate handlers - this will
250             be the matching L if one exists, or
251             internal connection state handling for GOAWAY/SETTINGS frames.
252              
253             =cut
254              
255             sub dispatch_frame {
256 10     10 1 11 my $self = shift;
257 10         22 my $frame = shift;
258             # If we already have a stream for this frame, it probably
259             # knows better than we do how we should be handling it
260 10 50       29 if(my $stream = $self->related_stream($frame)) {
261 10         43 $stream->handle_frame($frame);
262             } else {
263             # This is either a frame without a stream ID, or we don't
264             # have that frame yet.
265 0 0       0 if($frame->type_name eq 'SYN_STREAM') {
    0          
    0          
266 0         0 $self->incoming_stream($frame);
267             } elsif($frame->type_name eq 'PING') {
268 0         0 $self->invoke_event(ping => $frame);
269             # Bounce it straight back
270 0         0 $self->queue_frame($frame);
271             } elsif($frame->type_name eq 'SETTINGS') {
272 0         0 $self->apply_settings($frame);
273             } else {
274             # Give subclasses a chance to try this one
275 0         0 return $self->dispatch_unhandled_frame($frame);
276             }
277             }
278 10         1051 return $self;
279             }
280              
281             =head2 dispatch_unhandled_frame
282              
283             Called when we receive a frame that's not been picked up by the
284             usual handlers - could be a SYN_REPLY on a stream that we don't
285             have, for example.
286              
287             =cut
288              
289             sub dispatch_unhandled_frame {
290 0     0 1 0 my $self = shift;
291 0         0 my $frame = shift;
292 0         0 die "We do not know what to do with $frame yet";
293             }
294              
295             =head2 incoming_stream
296              
297             Called when a new SYN_STREAM frame is received.
298              
299             =cut
300              
301             sub incoming_stream {
302 0     0 1 0 my $self = shift;
303 0         0 my $frame = shift;
304 0         0 my $stream = Protocol::SPDY::Stream->new_from_syn(
305             $frame,
306             connection => $self
307             );
308 0         0 $self->{streams}{$stream->id} = $stream;
309 0         0 $self->invoke_event(stream => $stream);
310 0         0 $self;
311             }
312              
313             =head2 related_stream
314              
315             Returns the L matching the stream_id
316             for this frame (if it has one).
317              
318             Will return undef if we have no stream yet or this frame
319             does not have a stream_id.
320              
321             =cut
322              
323             sub related_stream {
324 10     10 1 11 my $self = shift;
325 10         14 my $frame = shift;
326 10 50       63 return undef unless my $m = $frame->can('stream_id');
327 10         24 my $stream_id = $m->($frame);
328 10 50       27 return undef unless my $stream = $self->stream_by_id($stream_id);
329 10         22 return $stream;
330             }
331              
332             =head2 apply_settings
333              
334             Applies the given settings to our internal state.
335              
336             =cut
337              
338             sub apply_settings {
339 0     0 1 0 my $self = shift;
340 0         0 my $frame = shift;
341              
342 0         0 foreach my $setting ($frame->all_settings) {
343 0         0 my ($id, $flags, $value) = @$setting;
344 0 0       0 my $k = lc(SETTINGS_BY_ID->{$id}) or die 'unknown setting ' . $id;
345 0         0 $self->invoke_event(setting => $k => $value, $flags);
346 0         0 $self->{$k} = $value;
347             }
348             $self
349 0         0 }
350              
351             =head2 extract_frame
352              
353             Given a scalar reference to a byte buffer, this will extract the first frame if possible
354             and return the bytes if it succeeded, undef if not. No frame validation is performed: the
355             bytes are extracted based on the length information only.
356              
357             =cut
358              
359             sub extract_frame {
360 30     30 1 190 my $self = shift;
361 30         33 my $buffer = shift;
362             # 2.2 Frames always have a common header which is 8 bytes in length
363 30 100       79 return undef unless length $$buffer >= 8;
364              
365 20         75 (undef, my $len) = unpack 'N1N1', $$buffer;
366 20         18 $len &= 0x00FFFFFF;
367 20 50       45 return undef unless length($$buffer) >= (8 + $len);
368 20         46 my $bytes = substr $$buffer, 0, 8 + $len, '';
369 20         66 return $bytes;
370             }
371              
372             =head2 parse_frame
373              
374             Parse a frame extracted by L. Returns an appropriate subclass of L
375             if this succeeded, dies if it fails.
376              
377             =cut
378              
379             sub parse_frame {
380 20     20 1 25 my $self = shift;
381 20         23 my $pkt = shift;
382 20         51 return Protocol::SPDY::Frame->parse(
383             \$pkt,
384             zlib => $self->receiver_zlib
385             );
386             }
387              
388             =head2 goaway
389              
390             Requests termination of the connection.
391              
392             =cut
393              
394             sub goaway {
395 0     0 1 0 my $self = shift;
396 0         0 my $status = shift;
397              
398             # We accept numeric or string status codes at this level
399 0 0       0 $status = {
400             OK => 0,
401             PROTOCOL_ERROR => 1,
402             INTERNAL_ERROR => 2,
403             }->{$status} unless 0+$status eq $status;
404              
405 0         0 $self->queue_frame(
406             Protocol::SPDY::Frame::GOAWAY->new(
407             last_stream => $self->last_accepted_stream_id,
408             status => $status,
409             )
410             );
411             }
412              
413             =head2 ping
414              
415             Sends a ping request. We should get a PING packet back as a high-priority reply.
416              
417             =cut
418              
419             sub ping {
420 0     0 1 0 my $self = shift;
421 0         0 $self->queue_frame(
422             Protocol::SPDY::Frame::PING->new(
423             id => $self->next_ping_id,
424             )
425             );
426             }
427              
428             =head2 settings
429              
430             Send settings to the remote.
431              
432             =cut
433              
434             sub settings {
435 0     0 1 0 my $self = shift;
436 0         0 $self->queue_frame(
437             Protocol::SPDY::Frame::SETTINGS->new(
438             id => $self->next_ping_id,
439             settings => \@_,
440             )
441             );
442             }
443              
444             =head2 credential
445              
446             Sends credential information to the remote.
447              
448             =cut
449              
450             sub credential {
451 0     0 1 0 my $self = shift;
452 0         0 die "Credential frames are not yet implemented";
453             }
454              
455             =head2 version
456              
457             Returns the version supported by this instance. Currently, this is
458             always 3.
459              
460             =cut
461              
462 26     26 1 147 sub version { 3 }
463              
464             =head2 last_stream_id
465              
466             The ID for the last stream we created.
467              
468             =cut
469              
470 0     0 1 0 sub last_stream_id { shift->{last_stream_id} }
471              
472             =head2 write
473              
474             Calls the external code which is expected to handle writes.
475              
476             =cut
477              
478             sub write {
479 10     10 1 13 my $self = shift;
480 10         42 $self->{on_write}->(@_)
481             }
482              
483             =head2 create_stream
484              
485             Instantiate a new stream, returning the L instance.
486              
487             =cut
488              
489             sub create_stream {
490 8     8 1 10541 my $self = shift;
491 8         19 my %args = @_;
492 8         38 my $stream = Protocol::SPDY::Stream->new(
493             %args,
494             id => $self->next_stream_id,
495             connection => $self,
496             version => $self->version,
497             );
498 8         32 $self->{streams}{$stream->id} = $stream;
499 8         40 return $stream;
500             }
501              
502             =head2 pending_send
503              
504             Returns a count of the frames that are waiting to be sent.
505              
506             =cut
507              
508             sub pending_send {
509 0     0 1 0 scalar @{ shift->{pending_send} }
  0         0  
510             }
511              
512             =head2 has_stream
513              
514             Returns true if we have a stream matching the ID on the
515             provided L instance.
516              
517             =cut
518              
519             sub has_stream {
520 8     8 1 12 my $self = shift;
521 8         13 my $stream = shift;
522 8 50       25 return exists $self->{streams}{$stream->id} ? 1 : 0;
523             }
524              
525             =head2 stream_by_id
526              
527             Returns the L matching the given ID.
528              
529             =cut
530              
531             sub stream_by_id {
532 10     10 1 14 my $self = shift;
533 10         10 my $id = shift;
534 10         41 return $self->{streams}{$id}
535             }
536              
537             =head2 expected_upload_bandwidth
538              
539             The expected rate (kilobyte/sec) we can send data to the other side.
540              
541             =cut
542              
543 0     0 1 0 sub expected_upload_bandwidth { shift->{expected_upload_bandwidth} }
544              
545             =head2 expected_download_bandwidth
546              
547             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
548              
549             =cut
550              
551 0     0 1 0 sub expected_download_bandwidth { shift->{expected_download_bandwidth} }
552              
553             =head2 expected_round_trip_time
554              
555             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
556              
557             =cut
558              
559 0     0 1 0 sub expected_round_trip_time { shift->{expected_round_trip_time} }
560              
561             =head2 max_concurrent_streams
562              
563             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
564              
565             =cut
566              
567 0     0 1 0 sub max_concurrent_streams { shift->{max_concurrent_streams} }
568              
569             =head2 current_cwnd
570              
571             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
572              
573             =cut
574              
575 0     0 1 0 sub current_cwnd { shift->{current_cwnd} }
576              
577             =head2 download_retrans_rate
578              
579             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
580              
581             =cut
582              
583 0     0 1 0 sub download_retrans_rate { shift->{download_retrans_rate} }
584              
585             =head2 initial_window_size
586              
587             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
588              
589             =cut
590              
591 0     0 1 0 sub initial_window_size { shift->{initial_window_size} }
592              
593             =head2 client_certificate_vector_size
594              
595             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
596              
597             =cut
598              
599 0     0 1 0 sub client_certificate_vector_size { shift->{client_certificate_vector_size} }
600              
601             =head1 METHODS - Futures
602              
603             =head2 batch
604              
605             Future representing the current batch of frames being processed. Used
606             for deferring window updates.
607              
608             =cut
609              
610 10   33 10 1 54 sub batch { shift->{batch} ||= Future->new }
611              
612             1;
613              
614             __END__