File Coverage

blib/lib/Protocol/HTTP2/Connection.pm
Criterion Covered Total %
statement 149 202 73.7
branch 68 112 60.7
condition 42 90 46.6
subroutine 35 39 89.7
pod 0 29 0.0
total 294 472 62.2


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Connection;
2 12     12   3926 use strict;
  12         19  
  12         268  
3 12     12   33 use warnings;
  12         10  
  12         266  
4             use Protocol::HTTP2::Constants
5 12         3642 qw(const_name :frame_types :errors :settings :flags :states
6 12     12   1935 :limits :endpoints);
  12         16  
7 12     12   4232 use Protocol::HTTP2::HeaderCompression qw(headers_encode);
  12         20  
  12         643  
8 12     12   4255 use Protocol::HTTP2::Frame;
  12         24  
  12         316  
9 12     12   4471 use Protocol::HTTP2::Stream;
  12         22  
  12         314  
10 12     12   4194 use Protocol::HTTP2::Upgrade;
  12         19  
  12         305  
11 12     12   58 use Protocol::HTTP2::Trace qw(tracer);
  12         10  
  12         23193  
12              
13             # Mixin
14             our @ISA =
15             qw(Protocol::HTTP2::Frame Protocol::HTTP2::Stream Protocol::HTTP2::Upgrade);
16              
17             # Default settings
18             my %default_settings = (
19             &SETTINGS_HEADER_TABLE_SIZE => DEFAULT_HEADER_TABLE_SIZE,
20             &SETTINGS_ENABLE_PUSH => DEFAULT_ENABLE_PUSH,
21             &SETTINGS_MAX_CONCURRENT_STREAMS => DEFAULT_MAX_CONCURRENT_STREAMS,
22             &SETTINGS_INITIAL_WINDOW_SIZE => DEFAULT_INITIAL_WINDOW_SIZE,
23             &SETTINGS_MAX_FRAME_SIZE => DEFAULT_MAX_FRAME_SIZE,
24             &SETTINGS_MAX_HEADER_LIST_SIZE => DEFAULT_MAX_HEADER_LIST_SIZE,
25             );
26              
27             sub new {
28 45     45 0 12263 my ( $class, $type, %opts ) = @_;
29 45 100       4325 my $self = bless {
30             type => $type,
31              
32             streams => {},
33              
34             last_stream => $type == CLIENT ? 1 : 2,
35             last_peer_stream => 0,
36             active_peer_streams => 0,
37              
38             encode_ctx => {
39              
40             # HPACK. Header Table
41             header_table => [],
42              
43             # HPACK. Header Table size
44             ht_size => 0,
45             max_ht_size => DEFAULT_HEADER_TABLE_SIZE,
46              
47             settings => {%default_settings},
48              
49             },
50              
51             decode_ctx => {
52              
53             # HPACK. Header Table
54             header_table => [],
55              
56             # HPACK. Header Table size
57             ht_size => 0,
58             max_ht_size => DEFAULT_HEADER_TABLE_SIZE,
59              
60             # HPACK. Emitted headers
61             emitted_headers => [],
62              
63             # last frame
64             frame => {},
65              
66             settings => {%default_settings},
67             },
68              
69             # Current error
70             error => 0,
71              
72             # Output frames queue
73             queue => [],
74              
75             # Connection must be shutdown
76             shutdown => 0,
77              
78             # issued GOAWAY: no new streams on this connection
79             goaway => 0,
80              
81             # get preface
82             preface => 0,
83              
84             # perform upgrade
85             upgrade => 0,
86              
87             # flow control
88             fcw_send => DEFAULT_INITIAL_WINDOW_SIZE,
89             fcw_recv => DEFAULT_INITIAL_WINDOW_SIZE,
90              
91             # stream where expected CONTINUATION frames
92             pending_stream => undef,
93              
94             }, $class;
95              
96 45         3298 for (qw(on_change_state on_new_peer_stream on_error upgrade)) {
97 180 100       6634 $self->{$_} = $opts{$_} if exists $opts{$_};
98             }
99              
100 45 100       3187 if ( exists $opts{settings} ) {
101 19         1555 for ( keys %{ $opts{settings} } ) {
  19         3163  
102 19         4720 $self->{decode_ctx}->{settings}->{$_} = $opts{settings}{$_};
103             }
104             }
105              
106             # Sync decode context max_ht_size
107             $self->{decode_ctx}->{max_ht_size} =
108 45         3281 $self->{decode_ctx}->{settings}->{&SETTINGS_HEADER_TABLE_SIZE};
109              
110 45         6384 $self;
111             }
112              
113             sub decode_context {
114 634     634 0 77231 shift->{decode_ctx};
115             }
116              
117             sub encode_context {
118 74     74 0 6501 shift->{encode_ctx};
119             }
120              
121             sub pending_stream {
122 207     207 0 28762 shift->{pending_stream};
123             }
124              
125             sub dequeue {
126 407     407 0 26113 my $self = shift;
127 407         25746 shift @{ $self->{queue} };
  407         76610  
128             }
129              
130             sub enqueue_raw {
131 19     19 0 1598 my $self = shift;
132 19         1581 push @{ $self->{queue} }, @_;
  19         4765  
133             }
134              
135             sub enqueue {
136 210     210 0 14387 my $self = shift;
137 210         14750 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
138 212         14290 push @{ $self->{queue} },
  212         28940  
139             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
140 212         14596 $self->state_machine( 'send', $type, $flags, $stream_id );
141             }
142             }
143              
144             sub enqueue_first {
145 2     2 0 321 my $self = shift;
146 2         3 my $i = 0;
147 2         2 for ( 0 .. $#{ $self->{queue} } ) {
  2         7  
148             my $type =
149 2         6 ( $self->frame_header_decode( \$self->{queue}->[$_], 0 ) )[1];
150 2 100 66     9 last if $type != CONTINUATION && $type != PING;
151 1         2 $i++;
152             }
153 2         8 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
154 2         1 splice @{ $self->{queue} }, $i++, 0,
  2         7  
155             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
156 2         10 $self->state_machine( 'send', $type, $flags, $stream_id );
157             }
158             }
159              
160             sub finish {
161 11     11 0 797 my $self = shift;
162             $self->enqueue( GOAWAY, 0, 0,
163 11 50       811 [ $self->{last_peer_stream}, $self->{error} ] )
164             unless $self->shutdown;
165 11         824 $self->shutdown(1);
166             }
167              
168             sub shutdown {
169 30     30 0 1615 my $self = shift;
170 30 100       1665 $self->{shutdown} = shift if @_;
171 30         7255 $self->{shutdown};
172             }
173              
174             sub goaway {
175 104     104 0 4831 my $self = shift;
176 104 100       4892 $self->{goaway} = shift if @_;
177 104         9675 $self->{goaway};
178             }
179              
180             sub preface {
181 435     435 0 27587 my $self = shift;
182 435 100       27430 $self->{preface} = shift if @_;
183 435         57997 $self->{preface};
184             }
185              
186             sub upgrade {
187 535     535 0 33656 my $self = shift;
188 535 50       34265 $self->{upgrade} = shift if @_;
189 535         68495 $self->{upgrade};
190             }
191              
192             sub state_machine {
193 421     421 0 28976 my ( $self, $act, $type, $flags, $stream_id ) = @_;
194              
195             return
196 421 50 66     61921 if $stream_id == 0
      66        
      33        
      33        
197             || $type == SETTINGS
198             || $type == GOAWAY
199             || $self->upgrade
200             || !$self->preface;
201              
202 260         14913 my $promised_sid = $self->stream_promised_sid($stream_id);
203              
204 260   33     15099 my $prev_state = $self->{streams}->{ $promised_sid || $stream_id }->{state};
205              
206             # REFUSED_STREAM error
207 260 0 33     15043 return if !defined $prev_state && $type == RST_STREAM && $act eq 'send';
      33        
208              
209             # Direction server->client
210             my $srv2cln = ( $self->{type} == SERVER && $act eq 'send' )
211 260   66     15417 || ( $self->{type} == CLIENT && $act eq 'recv' );
212              
213             # Direction client->server
214             my $cln2srv = ( $self->{type} == SERVER && $act eq 'recv' )
215 260   66     15402 || ( $self->{type} == CLIENT && $act eq 'send' );
216              
217             # Do we expect CONTINUATION after this frame?
218 260   100     15126 my $pending = ( $type == HEADERS || $type == PUSH_PROMISE )
219             && !( $flags & END_HEADERS );
220              
221             #tracer->debug(
222             # sprintf "\e[0;31mStream state: frame %s is %s%s on %s stream %i\e[m\n",
223             # const_name( "frame_types", $type ),
224             # $act,
225             # $pending ? "*" : "",
226             # const_name( "states", $prev_state ),
227             # $promised_sid || $stream_id,
228             #);
229              
230             # Wait until all CONTINUATION frames arrive
231 260 100       14860 if ( my $ps = $self->stream_pending_state($stream_id) ) {
    50          
    100          
    100          
    50          
    100          
    50          
232 1 50       5 if ( $type != CONTINUATION ) {
    50          
233 0         0 tracer->error(
234             sprintf "invalid frame type %s. Expected CONTINUATION frame\n",
235             const_name( "frame_types", $type )
236             );
237 0         0 $self->error(PROTOCOL_ERROR);
238             }
239             elsif ( $flags & END_HEADERS ) {
240 1 50       2 $self->stream_promised_sid( $stream_id, undef ) if $promised_sid;
241 1   33     6 $self->stream_pending_state( $promised_sid || $stream_id, undef );
242 1   33     6 $self->stream_state( $promised_sid || $stream_id, $ps );
243             }
244             }
245              
246             # Unexpected CONTINUATION frame
247             elsif ( $type == CONTINUATION ) {
248 0         0 tracer->error("Unexpected CONTINUATION frame\n");
249 0         0 $self->error(PROTOCOL_ERROR);
250             }
251              
252             # State machine
253             # IDLE
254             elsif ( $prev_state == IDLE ) {
255 76 100 66     3392 if ( $type == HEADERS && $cln2srv ) {
    50 33        
    50 33        
    50          
256 75 100       3355 $self->stream_state( $stream_id,
257             ( $flags & END_STREAM ) ? HALF_CLOSED : OPEN, $pending );
258             }
259             elsif ( $type == PUSH_PROMISE && $srv2cln ) {
260 0         0 $self->stream_state( $promised_sid, RESERVED, $pending );
261 0 0       0 $self->stream_promised_sid( $stream_id, undef )
262             if $flags & END_HEADERS;
263             }
264              
265             # first frame in stream is invalid, so state is yet IDLE
266             elsif ( $type == RST_STREAM && $act eq 'send' ) {
267 0         0 tracer->notice('send RST_STREAM on IDLE state. possible bug?');
268 0         0 $self->stream_state( $stream_id, CLOSED );
269             }
270             elsif ( $type != PRIORITY ) {
271 0         0 tracer->error(
272             sprintf "invalid frame type %s for current stream state %s\n",
273             const_name( "frame_types", $type ),
274             const_name( "states", $prev_state )
275             );
276 0         0 $self->error(PROTOCOL_ERROR);
277             }
278             }
279              
280             # OPEN
281             elsif ( $prev_state == OPEN ) {
282 13 100 33     74 if ( ( $flags & END_STREAM )
    50 66        
    50 33        
      33        
283             && ( $type == DATA || $type == HEADERS ) )
284             {
285 4         11 $self->stream_state( $stream_id, HALF_CLOSED, $pending );
286             }
287             elsif ( $type == RST_STREAM ) {
288 0         0 $self->stream_state( $stream_id, CLOSED );
289             }
290             elsif ($type == HEADERS
291             && !$pending
292             && $self->stream_trailer($stream_id) )
293             {
294 0         0 tracer->error("expected END_STREAM flag for trailer HEADERS frame");
295 0         0 $self->error(PROTOCOL_ERROR);
296             }
297             }
298              
299             # RESERVED (local/remote)
300             elsif ( $prev_state == RESERVED ) {
301 0 0 0     0 if ( $type == RST_STREAM ) {
    0 0        
    0          
302 0         0 $self->stream_state( $stream_id, CLOSED );
303             }
304             elsif ( $type == HEADERS && $srv2cln ) {
305 0 0       0 $self->stream_state( $stream_id,
306             ( $flags & END_STREAM ) ? CLOSED : HALF_CLOSED, $pending );
307             }
308             elsif ( $type != PRIORITY && $cln2srv ) {
309 0         0 tracer->error("invalid frame $type for state RESERVED");
310 0         0 $self->error(PROTOCOL_ERROR);
311             }
312             }
313              
314             # HALF_CLOSED (local/remote)
315             elsif ( $prev_state == HALF_CLOSED ) {
316 165 100 66     10906 if ( ( $type == RST_STREAM )
    50 66        
      33        
317             || ( ( $flags & END_STREAM ) && $srv2cln ) )
318             {
319 70         3351 $self->stream_state( $stream_id, CLOSED, $pending );
320             }
321 190         51686 elsif ( ( !grep { $type == $_ } ( WINDOW_UPDATE, PRIORITY ) )
322             && $cln2srv )
323             {
324 0         0 tracer->error( sprintf "invalid frame %s for state HALF CLOSED\n",
325             const_name( "frame_types", $type ) );
326 0         0 $self->error(PROTOCOL_ERROR);
327             }
328             }
329              
330             # CLOSED
331             elsif ( $prev_state == CLOSED ) {
332 5 50 33     2431 if ( $type != PRIORITY && ( $type != WINDOW_UPDATE && $cln2srv ) ) {
      33        
333              
334 0         0 tracer->error("stream is closed\n");
335 0         0 $self->error(STREAM_CLOSED);
336             }
337             }
338             else {
339 0         0 tracer->error("oops!\n");
340 0         0 $self->error(INTERNAL_ERROR);
341             }
342             }
343              
344             # TODO: move this to some other module
345             sub send_headers {
346 72     72 0 3237 my ( $self, $stream_id, $headers, $end ) = @_;
347 72         3256 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
348              
349 72         3270 my $header_block = headers_encode( $self->encode_context, $headers );
350              
351 72 100       3290 my $flags = $end ? END_STREAM : 0;
352 72 50       3290 $flags |= END_HEADERS if length($header_block) <= $max_size;
353              
354 72         3453 $self->enqueue( HEADERS, $flags, $stream_id,
355             { hblock => \substr( $header_block, 0, $max_size, '' ) } );
356 72         6576 while ( length($header_block) > 0 ) {
357 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
358 0         0 $self->enqueue( CONTINUATION, $flags,
359             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
360             }
361             }
362              
363             sub send_pp_headers {
364 0     0 0 0 my ( $self, $stream_id, $promised_id, $headers ) = @_;
365 0         0 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
366              
367 0         0 my $header_block = headers_encode( $self->encode_context, $headers );
368              
369 0 0       0 my $flags = length($header_block) <= $max_size ? END_HEADERS : 0;
370              
371 0         0 $self->enqueue( PUSH_PROMISE, $flags, $stream_id,
372             [ $promised_id, \substr( $header_block, 0, $max_size - 4, '' ) ] );
373              
374 0         0 while ( length($header_block) > 0 ) {
375 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
376 0         0 $self->enqueue( CONTINUATION, $flags,
377             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
378             }
379             }
380              
381             sub send_data {
382 47     47 0 3212 my ( $self, $stream_id, $chunk, $end ) = @_;
383 47         3295 my $data = $self->stream_blocked_data($stream_id);
384 47 100       3277 $data .= defined $chunk ? $chunk : '';
385 47 100       3296 $self->stream_end( $stream_id, $end ) if defined $end;
386 47         3260 $end = $self->stream_end($stream_id);
387              
388 47         3219 while (1) {
389 51         3196 my $l = length($data);
390 51         3258 my $size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
391 51         3286 for ( $l, $self->fcw_send, $self->stream_fcw_send($stream_id) ) {
392 153 100       6520 $size = $_ if $size > $_;
393             }
394              
395             # Flow control
396 51 100 100     3290 last if $l != 0 && $size <= 0;
397 50         3255 $self->fcw_send( -$size );
398 50         3273 $self->stream_fcw_send( $stream_id, -$size );
399              
400 50 100 100     3491 $self->enqueue(
401             DATA, $end && $l == $size ? END_STREAM : 0,
402             $stream_id, \substr( $data, 0, $size, '' )
403             );
404 50 100       6449 last if $l == $size;
405             }
406 47         3281 $self->stream_blocked_data( $stream_id, $data );
407             }
408              
409             sub send_blocked {
410 1     1 0 2 my $self = shift;
411 1         2 for my $stream_id ( keys %{ $self->{streams} } ) {
  1         4  
412 3         10 $self->stream_send_blocked($stream_id);
413             }
414             }
415              
416             sub error {
417 0     0 0 0 my $self = shift;
418 0 0 0     0 if ( @_ && !$self->{shutdown} ) {
419 0         0 $self->{error} = shift;
420 0 0       0 $self->{on_error}->( $self->{error} ) if exists $self->{on_error};
421 0         0 $self->finish;
422             }
423 0         0 $self->{error};
424             }
425              
426             sub setting {
427 0     0 0 0 require Carp;
428 0         0 Carp::confess("setting is deprecated\n");
429             }
430              
431             sub _setting {
432 675     675   38473 my ( $ctx, $self, $setting ) = @_;
433 675         38617 my $s = $self->{$ctx}->{settings};
434 675 50       38844 return undef unless exists $s->{$setting};
435 675 100       38775 $s->{$setting} = pop if @_ > 3;
436 675         117097 $s->{$setting};
437             }
438              
439             sub enc_setting {
440 239     239 0 12905 _setting( 'encode_ctx', @_ );
441             }
442              
443             sub dec_setting {
444 436     436 0 25874 _setting( 'decode_ctx', @_ );
445             }
446              
447             sub accept_settings {
448 33     33 0 3164 my $self = shift;
449 33         3205 $self->enqueue( SETTINGS, ACK, 0, {} );
450             }
451              
452             # Flow control windown of connection
453             sub _fcw {
454 155     155   9621 my $dir = shift;
455 155         9594 my $self = shift;
456              
457 155 100       9681 if (@_) {
458 104         6451 $self->{$dir} += shift;
459 104         6505 tracer->debug( "$dir now is " . $self->{$dir} . "\n" );
460             }
461 155         28715 $self->{$dir};
462             }
463              
464             sub fcw_send {
465 102     102 0 6427 _fcw( 'fcw_send', @_ );
466             }
467              
468             sub fcw_recv {
469 53     53 0 3228 _fcw( 'fcw_recv', @_ );
470             }
471              
472             sub fcw_update {
473 1     1 0 2 my $self = shift;
474              
475             # TODO: check size of data in memory
476 1         2 my $size = $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE);
477 1         2 tracer->debug("update fcw recv of connection with $size b.\n");
478 1         2 $self->fcw_recv($size);
479 1         2 $self->enqueue( WINDOW_UPDATE, 0, 0, $size );
480             }
481              
482             sub fcw_initial_change {
483 0     0 0 0 my ( $self, $size ) = @_;
484 0         0 my $prev_size = $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE);
485 0         0 my $diff = $size - $prev_size;
486 0         0 tracer->debug(
487             "Change flow control window on not closed streams with diff $diff\n");
488 0         0 for my $stream_id ( keys %{ $self->{streams} } ) {
  0         0  
489 0 0       0 next if $self->stream_state($stream_id) == CLOSED;
490 0         0 $self->stream_fcw_send( $stream_id, $diff );
491             }
492             }
493              
494             sub ack_ping {
495 1     1 0 1 my ( $self, $payload_ref ) = @_;
496 1         4 $self->enqueue_first( PING, ACK, 0, $payload_ref );
497             }
498              
499             sub send_ping {
500 1     1 0 2 my ( $self, $payload ) = @_;
501 1 50       5 if ( !defined $payload ) {
    50          
502 0         0 $payload = pack "C*", map { rand(256) } 1 .. PING_PAYLOAD_SIZE;
  0         0  
503             }
504             elsif ( length($payload) != PING_PAYLOAD_SIZE ) {
505 0         0 $payload = sprintf "%*.*s",
506             -PING_PAYLOAD_SIZE(), PING_PAYLOAD_SIZE, $payload;
507             }
508 1         3 $self->enqueue( PING, 0, 0, \$payload );
509             }
510              
511             1;