File Coverage

blib/lib/Protocol/SPDY/Base.pm
Criterion Covered Total %
statement 82 145 56.5
branch 13 36 36.1
condition 5 11 45.4
subroutine 23 46 50.0
pod 38 38 100.0
total 161 276 58.3


line stmt bran cond sub pod time code
1             package Protocol::SPDY::Base;
2             {
3             $Protocol::SPDY::Base::VERSION = '1.000';
4             }
5 3     3   1898 use strict;
  3         7  
  3         97  
6 3     3   15 use warnings;
  3         6  
  3         91  
7 3     3   19 use parent qw(Mixin::Event::Dispatch);
  3         7  
  3         25  
8              
9             =head1 NAME
10              
11             Protocol::SPDY::Base - abstract support for the SPDY protocol
12              
13             =head1 VERSION
14              
15             version 1.000
16              
17             =head1 DESCRIPTION
18              
19             Provides the base class for client, server and generic (proxy/analysis)
20             SPDY handling.
21              
22             =cut
23              
24 3     3   216 use Protocol::SPDY::Constants ':all';
  3         7  
  3         673  
25              
26 3     3   17 use List::UtilsBy qw(extract_by nsort_by);
  3         8  
  3         6896  
27              
28             =head1 METHODS
29              
30             =cut
31              
32             =head2 new
33              
34             Instantiates a new SPDY-handling object. Applies any attributes
35             passed as named parameters.
36              
37             =cut
38              
39             sub new {
40 4     4 1 2149 my $class = shift;
41 4         26 bless {
42             initial_window_size => 65536,
43             pending_send => [ ],
44             @_
45             }, $class
46             }
47              
48             =head2 sender_zlib
49              
50             The compression instance used for sending data.
51              
52             =cut
53              
54 20   66 20 1 141 sub sender_zlib { shift->{sender_zlib} ||= Protocol::SPDY::Compress->new }
55              
56             =head2 receiver_zlib
57              
58             Compression instance used for receiving (decompressing) data.
59              
60             =cut
61              
62 20   66 20 1 144 sub receiver_zlib { shift->{receiver_zlib} ||= Protocol::SPDY::Compress->new }
63              
64             =head2 request_close
65              
66             Issue a close request by sending a GOAWAY message.
67              
68             =cut
69              
70             sub request_close {
71 0     0 1 0 my $self = shift;
72 0   0     0 my $reason = shift || 'OK';
73 0         0 $self->goaway($reason);
74             }
75              
76             =head2 restore_initial_settings
77              
78             Send back the list of settings we'd previously persisted.
79              
80             Typically called immediately after establishing the connection.
81              
82             =cut
83              
84             sub restore_initial_settings {
85 0     0 1 0 my $self = shift;
86 0         0 my %args = @_;
87              
88             # Each key-value pair, in ascending numeric order.
89             # We set the "persisted" flag to notify the other
90             # side that we're handing back the values we stashed
91             # from the last time around.
92             my @pending = nsort_by {
93 0     0   0 $_->[0]
94 0         0 } map [
95             SETTINGS_BY_NAME->{uc $_},
96             $args{$_},
97             FLAG_SETTINGS_PERSISTED
98             ], keys %args;
99              
100 0         0 $self->queue_frame(
101             Protocol::SPDY::Frame::Control::SETTINGS->new(
102             version => $self->version,
103             settings => \@pending,
104             )
105             );
106             }
107              
108             =head2 send_settings
109              
110             Sends a SETTINGS frame generated from the key/value pairs passed
111             to this method.
112              
113             Typically called immediately after establishing the connection.
114              
115             Example:
116              
117             $spdy->send_settings(
118             initial_window_size => 32768,
119             max_concurrent_streams => 16,
120             );
121              
122             =cut
123              
124             sub send_settings {
125 0     0 1 0 my $self = shift;
126 0         0 my %args = @_;
127              
128             # Each key-value pair, in ascending numeric order.
129             my @pending = nsort_by {
130 0     0   0 $_->[0]
131 0         0 } map [
132             SETTINGS_BY_NAME->{uc $_},
133             $args{$_},
134             0,
135             ], keys %args;
136              
137 0         0 $self->queue_frame(
138             Protocol::SPDY::Frame::Control::SETTINGS->new(
139             version => $self->version,
140             settings => \@pending,
141             )
142             );
143             }
144              
145             =head2 check_version
146              
147             Called before we do anything with a control frame.
148              
149             Returns true if it's supported, false if not.
150              
151             =cut
152              
153             sub check_version {
154 0     0 1 0 my ($self, $frame) = @_;
155 0 0       0 if($frame->version > MAX_SUPPORTED_VERSION) {
156             # Send a reset if this was a SYN_STREAM
157 0 0       0 $self->send_frame(RST_STREAM => {
158             status => UNSUPPORTED_VERSION
159             }) if $frame->type == FRAME_TYPE_BY_ID->{SYN_STREAM};
160             # then bail out (we do this for any frame type
161 0         0 return 0;
162             }
163 0         0 return 1;
164             }
165              
166             =head2 next_stream_id
167              
168             Generate the next stream ID for this connection.
169              
170             Returns the next available stream ID, or 0 if we're out of available streams
171              
172             =cut
173              
174             sub next_stream_id {
175 8     8 1 15 my $self = shift;
176             # 2.3.2 - server streams are even, client streams are odd
177 8 100       45 if(defined $self->{last_stream_id}) {
178 6         13 $self->{last_stream_id} += 2;
179             } else {
180 2         13 $self->{last_stream_id} = $self->initial_stream_id;
181             }
182 8 50       57 return $self->{last_stream_id} if $self->{last_stream_id} <= 0x7FFFFFFF;
183 0         0 return 0;
184             }
185              
186             =head2 queue_frame
187              
188             Requests sending the given C< $frame > at the earliest opportunity.
189              
190             =cut
191              
192             sub queue_frame {
193 10     10 1 14 my $self = shift;
194 10         11 my $frame = shift;
195 10         52 $self->invoke_event(send_frame => $frame);
196 10         392 $self->write($frame->as_packet($self->sender_zlib));
197             }
198              
199              
200             =head2 on_read
201              
202             This is the method that an external transport would call when it has
203             some data received from the other side of the SPDY connection. It
204             expects to be called with a scalar containing bytes which can be
205             decoded as SPDY frames; any SSL/TLS decoding should happen before
206             passing data to this method.
207              
208             Will call L for any valid frames that can be
209             extracted from the stream.
210              
211             =cut
212              
213             sub on_read {
214 10     10 1 16 my $self = shift;
215 10         27 $self->{input_buffer} .= shift;
216 10         15 my @frames;
217 10         35 while(defined(my $bytes = $self->extract_frame(\($self->{input_buffer})))) {
218 10         30 push @frames, my $f = $self->parse_frame($bytes);
219 10 50       53 die "$bytes generated undef frame" unless $f;
220 10         46 $self->invoke_event(receive_frame => $f);
221             }
222 10 50       25 return $self unless @frames;
223              
224             # Get ourselves a temp copy for reentrancy protection
225 10         22 local $self->{batch};
226 10         41 $self->dispatch_frame($_) for $self->prioritise_incoming_frames(@frames);
227             # Process any tasks we queued up
228 10 50       62 $self->batch->done if exists $self->{batch};
229 10         466 $self
230             }
231              
232             =head2 prioritise_incoming_frames
233              
234             Given a list of L instances, returns them
235             reordered so that higher-priority items such as PING are handled
236             first.
237              
238             Does not yet support stream priority.
239              
240             =cut
241              
242             sub prioritise_incoming_frames {
243 10     10 1 11 my $self = shift;
244 10         17 my @frames = @_;
245 10     10   73 my @ping = extract_by { $_->type_name eq 'PING' } @frames;
  10         101  
246 10         110 return @ping, @frames;
247             }
248              
249             =head2 dispatch_frame
250              
251             Dispatches the given frame to appropriate handlers - this will
252             be the matching L if one exists, or
253             internal connection state handling for GOAWAY/SETTINGS frames.
254              
255             =cut
256              
257             sub dispatch_frame {
258 10     10 1 14 my $self = shift;
259 10         14 my $frame = shift;
260             # If we already have a stream for this frame, it probably
261             # knows better than we do how we should be handling it
262 10 50       37 if(my $stream = $self->related_stream($frame)) {
263 10         58 $stream->handle_frame($frame);
264             } else {
265             # This is either a frame without a stream ID, or we don't
266             # have that frame yet.
267 0 0       0 if($frame->type_name eq 'SYN_STREAM') {
    0          
    0          
268 0         0 $self->incoming_stream($frame);
269             } elsif($frame->type_name eq 'PING') {
270 0         0 $self->invoke_event(ping => $frame);
271             # Bounce it straight back
272 0         0 $self->queue_frame($frame);
273             } elsif($frame->type_name eq 'SETTINGS') {
274 0         0 $self->apply_settings($frame);
275             } else {
276             # Give subclasses a chance to try this one
277 0         0 return $self->dispatch_unhandled_frame($frame);
278             }
279             }
280 10         1153 return $self;
281             }
282              
283             =head2 dispatch_unhandled_frame
284              
285             Called when we receive a frame that's not been picked up by the
286             usual handlers - could be a SYN_REPLY on a stream that we don't
287             have, for example.
288              
289             =cut
290              
291             sub dispatch_unhandled_frame {
292 0     0 1 0 my $self = shift;
293 0         0 my $frame = shift;
294 0         0 die "We do not know what to do with $frame yet";
295             }
296              
297             =head2 incoming_stream
298              
299             Called when a new SYN_STREAM frame is received.
300              
301             =cut
302              
303             sub incoming_stream {
304 0     0 1 0 my $self = shift;
305 0         0 my $frame = shift;
306 0         0 my $stream = Protocol::SPDY::Stream->new_from_syn(
307             $frame,
308             connection => $self
309             );
310 0         0 $self->{streams}{$stream->id} = $stream;
311 0         0 $self->invoke_event(stream => $stream);
312 0         0 $self;
313             }
314              
315             =head2 related_stream
316              
317             Returns the L matching the stream_id
318             for this frame (if it has one).
319              
320             Will return undef if we have no stream yet or this frame
321             does not have a stream_id.
322              
323             =cut
324              
325             sub related_stream {
326 10     10 1 13 my $self = shift;
327 10         13 my $frame = shift;
328 10 50       70 return undef unless my $m = $frame->can('stream_id');
329 10         28 my $stream_id = $m->($frame);
330 10 50       36 return undef unless my $stream = $self->stream_by_id($stream_id);
331 10         27 return $stream;
332             }
333              
334             =head2 apply_settings
335              
336             Applies the given settings to our internal state.
337              
338             =cut
339              
340             sub apply_settings {
341 0     0 1 0 my $self = shift;
342 0         0 my $frame = shift;
343              
344 0         0 foreach my $setting ($frame->all_settings) {
345 0         0 my ($id, $flags, $value) = @$setting;
346 0 0       0 my $k = lc(SETTINGS_BY_ID->{$id}) or die 'unknown setting ' . $id;
347 0         0 $self->invoke_event(setting => $k => $value, $flags);
348 0         0 $self->{$k} = $value;
349             }
350             $self
351 0         0 }
352              
353             =head2 extract_frame
354              
355             Given a scalar reference to a byte buffer, this will extract the first frame if possible
356             and return the bytes if it succeeded, undef if not. No frame validation is performed: the
357             bytes are extracted based on the length information only.
358              
359             =cut
360              
361             sub extract_frame {
362 30     30 1 214 my $self = shift;
363 30         33 my $buffer = shift;
364             # 2.2 Frames always have a common header which is 8 bytes in length
365 30 100       93 return undef unless length $$buffer >= 8;
366              
367 20         70 (undef, my $len) = unpack 'N1N1', $$buffer;
368 20         30 $len &= 0x00FFFFFF;
369 20 50       49 return undef unless length($$buffer) >= (8 + $len);
370 20         52 my $bytes = substr $$buffer, 0, 8 + $len, '';
371 20         75 return $bytes;
372             }
373              
374             =head2 parse_frame
375              
376             Parse a frame extracted by L. Returns an appropriate subclass of L
377             if this succeeded, dies if it fails.
378              
379             =cut
380              
381             sub parse_frame {
382 20     20 1 32 my $self = shift;
383 20         29 my $pkt = shift;
384 20         71 return Protocol::SPDY::Frame->parse(
385             \$pkt,
386             zlib => $self->receiver_zlib
387             );
388             }
389              
390             =head2 goaway
391              
392             Requests termination of the connection.
393              
394             =cut
395              
396             sub goaway {
397 0     0 1 0 my $self = shift;
398 0         0 my $status = shift;
399              
400             # We accept numeric or string status codes at this level
401 0 0       0 $status = {
402             OK => 0,
403             PROTOCOL_ERROR => 1,
404             INTERNAL_ERROR => 2,
405             }->{$status} unless 0+$status eq $status;
406              
407 0         0 $self->queue_frame(
408             Protocol::SPDY::Frame::GOAWAY->new(
409             last_stream => $self->last_accepted_stream_id,
410             status => $status,
411             )
412             );
413             }
414              
415             =head2 ping
416              
417             Sends a ping request. We should get a PING packet back as a high-priority reply.
418              
419             =cut
420              
421             sub ping {
422 0     0 1 0 my $self = shift;
423 0         0 $self->queue_frame(
424             Protocol::SPDY::Frame::PING->new(
425             id => $self->next_ping_id,
426             )
427             );
428             }
429              
430             =head2 settings
431              
432             Send settings to the remote.
433              
434             =cut
435              
436             sub settings {
437 0     0 1 0 my $self = shift;
438 0         0 $self->queue_frame(
439             Protocol::SPDY::Frame::SETTINGS->new(
440             id => $self->next_ping_id,
441             settings => \@_,
442             )
443             );
444             }
445              
446             =head2 credential
447              
448             Sends credential information to the remote.
449              
450             =cut
451              
452             sub credential {
453 0     0 1 0 my $self = shift;
454 0         0 die "Credential frames are not yet implemented";
455             }
456              
457             =head2 version
458              
459             Returns the version supported by this instance. Currently, this is
460             always 3.
461              
462             =cut
463              
464 26     26 1 161 sub version { 3 }
465              
466             =head2 last_stream_id
467              
468             The ID for the last stream we created.
469              
470             =cut
471              
472 0     0 1 0 sub last_stream_id { shift->{last_stream_id} }
473              
474             =head2 write
475              
476             Calls the external code which is expected to handle writes.
477              
478             =cut
479              
480             sub write {
481 10     10 1 19 my $self = shift;
482 10         40 $self->{on_write}->(@_)
483             }
484              
485             =head2 create_stream
486              
487             Instantiate a new stream, returning the L instance.
488              
489             =cut
490              
491             sub create_stream {
492 8     8 1 11874 my $self = shift;
493 8         21 my %args = @_;
494 8         41 my $stream = Protocol::SPDY::Stream->new(
495             %args,
496             id => $self->next_stream_id,
497             connection => $self,
498             version => $self->version,
499             );
500 8         37 $self->{streams}{$stream->id} = $stream;
501 8         42 return $stream;
502             }
503              
504             =head2 pending_send
505              
506             Returns a count of the frames that are waiting to be sent.
507              
508             =cut
509              
510             sub pending_send {
511 0     0 1 0 scalar @{ shift->{pending_send} }
  0         0  
512             }
513              
514             =head2 has_stream
515              
516             Returns true if we have a stream matching the ID on the
517             provided L instance.
518              
519             =cut
520              
521             sub has_stream {
522 8     8 1 18 my $self = shift;
523 8         15 my $stream = shift;
524 8 50       32 return exists $self->{streams}{$stream->id} ? 1 : 0;
525             }
526              
527             =head2 stream_by_id
528              
529             Returns the L matching the given ID.
530              
531             =cut
532              
533             sub stream_by_id {
534 10     10 1 14 my $self = shift;
535 10         19 my $id = shift;
536 10         49 return $self->{streams}{$id}
537             }
538              
539             =head2 expected_upload_bandwidth
540              
541             The expected rate (kilobyte/sec) we can send data to the other side.
542              
543             =cut
544              
545 0     0 1 0 sub expected_upload_bandwidth { shift->{expected_upload_bandwidth} }
546              
547             =head2 expected_download_bandwidth
548              
549             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
550              
551             =cut
552              
553 0     0 1 0 sub expected_download_bandwidth { shift->{expected_download_bandwidth} }
554              
555             =head2 expected_round_trip_time
556              
557             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
558              
559             =cut
560              
561 0     0 1 0 sub expected_round_trip_time { shift->{expected_round_trip_time} }
562              
563             =head2 max_concurrent_streams
564              
565             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
566              
567             =cut
568              
569 0     0 1 0 sub max_concurrent_streams { shift->{max_concurrent_streams} }
570              
571             =head2 current_cwnd
572              
573             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
574              
575             =cut
576              
577 0     0 1 0 sub current_cwnd { shift->{current_cwnd} }
578              
579             =head2 download_retrans_rate
580              
581             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
582              
583             =cut
584              
585 0     0 1 0 sub download_retrans_rate { shift->{download_retrans_rate} }
586              
587             =head2 initial_window_size
588              
589             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
590              
591             =cut
592              
593 0     0 1 0 sub initial_window_size { shift->{initial_window_size} }
594              
595             =head2 client_certificate_vector_size
596              
597             The rate (kilobyte/sec) we expect to be able to receive data from the other side.
598              
599             =cut
600              
601 0     0 1 0 sub client_certificate_vector_size { shift->{client_certificate_vector_size} }
602              
603             =head1 METHODS - Futures
604              
605             =head2 batch
606              
607             Future representing the current batch of frames being processed. Used
608             for deferring window updates.
609              
610             =cut
611              
612 10   33 10 1 57 sub batch { shift->{batch} ||= Future->new }
613              
614             1;
615              
616             __END__