File Coverage

blib/lib/Protocol/HTTP2/Connection.pm
Criterion Covered Total %
statement 133 190 70.0
branch 61 104 58.6
condition 34 81 41.9
subroutine 32 38 84.2
pod 0 28 0.0
total 260 441 58.9


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Connection;
2 10     10   4575 use strict;
  10         19  
  10         363  
3 10     10   53 use warnings;
  10         13  
  10         307  
4             use Protocol::HTTP2::Constants
5 10         3565 qw(const_name :frame_types :errors :settings :flags :states
6 10     10   1495 :limits :endpoints);
  10         22  
7 10     10   4167 use Protocol::HTTP2::HeaderCompression qw(headers_encode);
  10         85  
  10         673  
8 10     10   4393 use Protocol::HTTP2::Frame;
  10         28  
  10         303  
9 10     10   4082 use Protocol::HTTP2::Stream;
  10         21  
  10         313  
10 10     10   3987 use Protocol::HTTP2::Upgrade;
  10         24  
  10         323  
11 10     10   58 use Protocol::HTTP2::Trace qw(tracer);
  10         15  
  10         21664  
12              
13             # Mixin
14             our @ISA =
15             qw(Protocol::HTTP2::Frame Protocol::HTTP2::Stream Protocol::HTTP2::Upgrade);
16              
17             # Default settings
18             my %default_settings = (
19             &SETTINGS_HEADER_TABLE_SIZE => DEFAULT_HEADER_TABLE_SIZE,
20             &SETTINGS_ENABLE_PUSH => DEFAULT_ENABLE_PUSH,
21             &SETTINGS_MAX_CONCURRENT_STREAMS => DEFAULT_MAX_CONCURRENT_STREAMS,
22             &SETTINGS_INITIAL_WINDOW_SIZE => DEFAULT_INITIAL_WINDOW_SIZE,
23             &SETTINGS_MAX_FRAME_SIZE => DEFAULT_MAX_FRAME_SIZE,
24             &SETTINGS_MAX_HEADER_LIST_SIZE => DEFAULT_MAX_HEADER_LIST_SIZE,
25             );
26              
27             sub new {
28 37     37 0 13310 my ( $class, $type, %opts ) = @_;
29 37 100       5175 my $self = bless {
30             type => $type,
31              
32             streams => {},
33              
34             last_stream => $type == CLIENT ? 1 : 2,
35             last_peer_stream => 0,
36             active_peer_streams => 0,
37              
38             encode_ctx => {
39              
40             # HPACK. Header Table
41             header_table => [],
42              
43             # HPACK. Header Table size
44             ht_size => 0,
45              
46             settings => {%default_settings},
47              
48             },
49              
50             decode_ctx => {
51              
52             # HPACK. Header Table
53             header_table => [],
54              
55             # HPACK. Header Table size
56             ht_size => 0,
57              
58             # HPACK. Emitted headers
59             emitted_headers => [],
60              
61             # last frame
62             frame => {},
63              
64             settings => {%default_settings},
65             },
66              
67             # Current error
68             error => 0,
69              
70             # Output frames queue
71             queue => [],
72              
73             # Connection must be shutdown
74             shutdown => 0,
75              
76             # issued GOAWAY: no new streams on this connection
77             goaway => 0,
78              
79             # get preface
80             preface => 0,
81              
82             # perform upgrade
83             upgrade => 0,
84              
85             # flow control
86             fcw_send => DEFAULT_INITIAL_WINDOW_SIZE,
87             fcw_recv => DEFAULT_INITIAL_WINDOW_SIZE,
88              
89             # stream where expected CONTINUATION frames
90             pending_stream => undef,
91              
92             }, $class;
93              
94 37         4151 for (qw(on_change_state on_new_peer_stream on_error upgrade)) {
95 148 100       8449 $self->{$_} = $opts{$_} if exists $opts{$_};
96             }
97              
98 37 100       3950 if ( exists $opts{settings} ) {
99 15         1935 for ( keys %{ $opts{settings} } ) {
  15         3978  
100 15         5875 $self->{decode_ctx}->{settings}->{$_} = $opts{settings}{$_};
101             }
102             }
103              
104 37         7780 $self;
105             }
106              
107             sub decode_context {
108 292     292 0 97043 shift->{decode_ctx};
109             }
110              
111             sub encode_context {
112 26     26 0 8134 shift->{encode_ctx};
113             }
114              
115             sub pending_stream {
116 108     108 0 36289 shift->{pending_stream};
117             }
118              
119             sub dequeue {
120 202     202 0 34418 my $self = shift;
121 202         32318 shift @{ $self->{queue} };
  202         96395  
122             }
123              
124             sub enqueue_raw {
125 15     15 0 2062 my $self = shift;
126 15         1954 push @{ $self->{queue} }, @_;
  15         5712  
127             }
128              
129             sub enqueue {
130 110     110 0 17847 my $self = shift;
131 110         17942 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
132 112         17834 push @{ $self->{queue} },
  112         36551  
133             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
134 112         17926 $self->state_machine( 'send', $type, $flags, $stream_id );
135             }
136             }
137              
138             sub enqueue_first {
139 2     2 0 231 my $self = shift;
140 2         2 my $i = 0;
141 2         3 for ( 0 .. $#{ $self->{queue} } ) {
  2         7  
142             last
143 2 100       7 if ( ( $self->frame_header_decode( \$self->{queue}->[$_], 0 ) )[1] !=
144             CONTINUATION );
145 1         2 $i++;
146             }
147 2         10 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
148 2         2 splice @{ $self->{queue} }, $i++, 0,
  2         6  
149             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
150 2         5 $self->state_machine( 'send', $type, $flags, $stream_id );
151             }
152             }
153              
154             sub finish {
155 7     7 0 986 my $self = shift;
156 7 50       1052 $self->enqueue( GOAWAY, 0, 0,
157             [ $self->{last_peer_stream}, $self->{error} ] )
158             unless $self->shutdown;
159 7         1014 $self->shutdown(1);
160             }
161              
162             sub shutdown {
163 22     22 0 2054 my $self = shift;
164 22 100       2011 $self->{shutdown} = shift if @_;
165 22         8979 $self->{shutdown};
166             }
167              
168             sub goaway {
169 46     46 0 6936 my $self = shift;
170 46 100       7750 $self->{goaway} = shift if @_;
171 46         13976 $self->{goaway};
172             }
173              
174             sub preface {
175 201     201 0 35322 my $self = shift;
176 201 100       35120 $self->{preface} = shift if @_;
177 201         74798 $self->{preface};
178             }
179              
180             sub upgrade {
181 248     248 0 42894 my $self = shift;
182 248 50       43784 $self->{upgrade} = shift if @_;
183 248         87839 $self->{upgrade};
184             }
185              
186             sub state_machine {
187 222     222 0 36603 my ( $self, $act, $type, $flags, $stream_id ) = @_;
188              
189             return
190 222 50 66     75631 if $stream_id == 0
      66        
      33        
      33        
191             || $type == SETTINGS
192             || $type == GOAWAY
193             || $self->upgrade
194             || !$self->preface;
195              
196 104         18497 my $promised_sid = $self->stream_promised_sid($stream_id);
197              
198 104   33     18646 my $prev_state = $self->{streams}->{ $promised_sid || $stream_id }->{state};
199              
200             # REFUSED_STREAM error
201 104 0 33     18525 return if !defined $prev_state && $type == RST_STREAM && $act eq 'send';
      33        
202              
203             # Direction server->client
204 104   66     19276 my $srv2cln = ( $self->{type} == SERVER && $act eq 'send' )
205             || ( $self->{type} == CLIENT && $act eq 'recv' );
206              
207             # Direction client->server
208 104   66     18863 my $cln2srv = ( $self->{type} == SERVER && $act eq 'recv' )
209             || ( $self->{type} == CLIENT && $act eq 'send' );
210              
211             # Do we expect CONTINUATION after this frame?
212 104   100     19456 my $pending = ( $type == HEADERS || $type == PUSH_PROMISE )
213             && !( $flags & END_HEADERS );
214              
215             #tracer->debug(
216             # sprintf "\e[0;31mStream state: frame %s is %s%s on %s stream %i\e[m\n",
217             # const_name( "frame_types", $type ),
218             # $act,
219             # $pending ? "*" : "",
220             # const_name( "states", $prev_state ),
221             # $promised_sid || $stream_id,
222             #);
223              
224             # Wait until all CONTINUATION frames arrive
225 104 100       18902 if ( my $ps = $self->stream_pending_state($stream_id) ) {
    100          
    100          
    50          
    100          
    50          
226 1 50       5 if ( $type != CONTINUATION ) {
    50          
227 0         0 tracer->error(
228             sprintf "invalid frame type %s. Expected CONTINUATION frame\n",
229             const_name( "frame_types", $type )
230             );
231 0         0 $self->error(PROTOCOL_ERROR);
232             }
233             elsif ( $flags & END_HEADERS ) {
234 1 50       3 $self->stream_promised_sid( $stream_id, undef ) if $promised_sid;
235 1   33     7 $self->stream_pending_state( $promised_sid || $stream_id, undef );
236 1   33     6 $self->stream_state( $promised_sid || $stream_id, $ps );
237             }
238             }
239              
240             # State machine
241             # IDLE
242             elsif ( $prev_state == IDLE ) {
243 28 100 66     4423 if ( $type == HEADERS && $cln2srv ) {
    50 33        
    50 33        
    50          
244 27 100       4089 $self->stream_state( $stream_id,
245             ( $flags & END_STREAM ) ? HALF_CLOSED : OPEN, $pending );
246             }
247             elsif ( $type == PUSH_PROMISE && $srv2cln ) {
248 0         0 $self->stream_state( $promised_sid, RESERVED, $pending );
249 0 0       0 $self->stream_promised_sid( $stream_id, undef )
250             if $flags & END_HEADERS;
251             }
252              
253             # first frame in stream is invalid, so state is yet IDLE
254             elsif ( $type == RST_STREAM && $act eq 'send' ) {
255 0         0 tracer->notice('send RST_STREAM on IDLE state. possible bug?');
256 0         0 $self->stream_state( $stream_id, CLOSED );
257             }
258             elsif ( $type != PRIORITY ) {
259 0         0 tracer->error(
260             sprintf "invalid frame type %s for current stream state %s\n",
261             const_name( "frame_types", $type ),
262             const_name( "states", $prev_state )
263             );
264 0         0 $self->error(PROTOCOL_ERROR);
265             }
266             }
267              
268             # OPEN
269             elsif ( $prev_state == OPEN ) {
270 1 50 0     9 if ( ( $flags & END_STREAM )
    50 33        
271             && ( $type == DATA || $type == HEADERS ) )
272             {
273 0         0 $self->stream_state( $stream_id, HALF_CLOSED, $pending );
274             }
275             elsif ( $type == RST_STREAM ) {
276 0         0 $self->stream_state( $stream_id, CLOSED );
277             }
278             }
279              
280             # RESERVED (local/remote)
281             elsif ( $prev_state == RESERVED ) {
282 0 0 0     0 if ( $type == RST_STREAM ) {
    0 0        
    0          
283 0         0 $self->stream_state( $stream_id, CLOSED );
284             }
285             elsif ( $type == HEADERS && $srv2cln ) {
286 0 0       0 $self->stream_state( $stream_id,
287             ( $flags & END_STREAM ) ? CLOSED : HALF_CLOSED, $pending );
288             }
289             elsif ( $type != PRIORITY && $cln2srv ) {
290 0         0 tracer->error("invalid frame $type for state RESERVED");
291 0         0 $self->error(PROTOCOL_ERROR);
292             }
293             }
294              
295             # HALF_CLOSED (local/remote)
296             elsif ( $prev_state == HALF_CLOSED ) {
297 69 100 66     13462 if ( ( $type == RST_STREAM )
    50 66        
      33        
298             || ( ( $flags & END_STREAM ) && $srv2cln ) )
299 94         75923 {
300 22         4088 $self->stream_state( $stream_id, CLOSED, $pending );
301             }
302             elsif ( ( !grep { $type == $_ } ( WINDOW_UPDATE, PRIORITY ) )
303             && $cln2srv )
304             {
305 0         0 tracer->error( sprintf "invalid frame %s for state HALF CLOSED\n",
306             const_name( "frame_types", $type ) );
307 0         0 $self->error(PROTOCOL_ERROR);
308             }
309             }
310              
311             # CLOSED
312             elsif ( $prev_state == CLOSED ) {
313 5 50 33     3090 if ( $type != PRIORITY && ( $type != WINDOW_UPDATE && $cln2srv ) ) {
      33        
314              
315 0         0 tracer->error("stream is closed\n");
316 0         0 $self->error(STREAM_CLOSED);
317             }
318             }
319             else {
320 0         0 tracer->error("oops!\n");
321 0         0 $self->error(INTERNAL_ERROR);
322             }
323             }
324              
325             # TODO: move this to some other module
326             sub send_headers {
327 24     24 0 4098 my ( $self, $stream_id, $headers, $end ) = @_;
328 24         3981 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
329              
330 24         3909 my $header_block = headers_encode( $self->encode_context, $headers );
331              
332 24 100       3982 my $flags = $end ? END_STREAM : 0;
333 24 50       3971 $flags |= END_HEADERS if length($header_block) <= $max_size;
334              
335 24         4166 $self->enqueue( HEADERS, $flags, $stream_id,
336             { hblock => \substr( $header_block, 0, $max_size, '' ) } );
337 24         10185 while ( length($header_block) > 0 ) {
338 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
339 0         0 $self->enqueue( CONTINUATION, $flags,
340             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
341             }
342             }
343              
344             sub send_pp_headers {
345 0     0 0 0 my ( $self, $stream_id, $promised_id, $headers ) = @_;
346 0         0 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
347              
348 0         0 my $header_block = headers_encode( $self->encode_context, $headers );
349              
350 0 0       0 my $flags = length($header_block) <= $max_size ? END_HEADERS : 0;
351              
352 0         0 $self->enqueue( PUSH_PROMISE, $flags, $stream_id,
353             [ $promised_id, \substr( $header_block, 0, $max_size - 4, '' ) ] );
354              
355 0         0 while ( length($header_block) > 0 ) {
356 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
357 0         0 $self->enqueue( CONTINUATION, $flags,
358             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
359             }
360             }
361              
362             sub send_data {
363 20     20 0 4195 my ( $self, $stream_id, $chunk, $end ) = @_;
364 20         4298 my $data = $self->stream_blocked_data($stream_id);
365 20 100       4238 $data .= defined $chunk ? $chunk : '';
366 20 100       4248 $self->stream_end( $stream_id, $end ) if defined $end;
367 20         4035 $end = $self->stream_end($stream_id);
368              
369 20         3990 while (1) {
370 20         4210 my $l = length($data);
371 20         4449 my $size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
372 20         4149 for ( $l, $self->fcw_send, $self->stream_fcw_send($stream_id) ) {
373 60 100       7646 $size = $_ if $size > $_;
374             }
375              
376             # Flow control
377 20 50 66     4243 if ( $l != 0 && $size == 0 ) {
378 0         0 $self->stream_blocked_data( $stream_id, $data );
379 0         0 last;
380             }
381 20         4082 $self->fcw_send( -$size );
382 20         3990 $self->stream_fcw_send( $stream_id, -$size );
383              
384 20 100 66     3976 $self->enqueue(
385             DATA, $end && $l == $size ? END_STREAM : 0,
386             $stream_id, \substr( $data, 0, $size, '' )
387             );
388 20 50       23841 last if $l == $size;
389             }
390             }
391              
392             sub send_blocked {
393 0     0 0 0 my $self = shift;
394 0         0 for my $stream_id ( keys %{ $self->{streams} } ) {
  0         0  
395 0         0 $self->stream_send_blocked($stream_id);
396             }
397             }
398              
399             sub error {
400 0     0 0 0 my $self = shift;
401 0 0 0     0 if ( @_ && !$self->{shutdown} ) {
402 0         0 $self->{error} = shift;
403 0 0       0 $self->{on_error}->( $self->{error} ) if exists $self->{on_error};
404 0         0 $self->finish;
405             }
406 0         0 $self->{error};
407             }
408              
409             sub setting {
410 0     0 0 0 require Carp;
411 0         0 Carp::confess("setting is deprecated\n");
412             }
413              
414             sub _setting {
415 308     308   49449 my ( $ctx, $self, $setting ) = @_;
416 308         49394 my $s = $self->{$ctx}->{settings};
417 308 50       49571 return undef unless exists $s->{$setting};
418 308 100       48998 $s->{$setting} = pop if @_ > 3;
419 308         148740 $s->{$setting};
420             }
421              
422             sub enc_setting {
423 104     104 0 16433 _setting( 'encode_ctx', @_ );
424             }
425              
426             sub dec_setting {
427 204     204 0 33165 _setting( 'decode_ctx', @_ );
428             }
429              
430             sub accept_settings {
431 25     25 0 3842 my $self = shift;
432 25         3865 $self->enqueue( SETTINGS, ACK, 0, {} );
433             }
434              
435             # Flow control windown of connection
436             sub _fcw {
437 62     62   11839 my $dir = shift;
438 62         11775 my $self = shift;
439              
440 62 100       11662 if (@_) {
441 42         8160 $self->{$dir} += shift;
442 42         8192 tracer->debug( "$dir now is " . $self->{$dir} . "\n" );
443             }
444 62         35884 $self->{$dir};
445             }
446              
447             sub fcw_send {
448 40     40 0 7644 _fcw( 'fcw_send', @_ );
449             }
450              
451             sub fcw_recv {
452 22     22 0 4020 _fcw( 'fcw_recv', @_ );
453             }
454              
455             sub fcw_update {
456 0     0 0 0 my $self = shift;
457              
458             # TODO: check size of data in memory
459 0         0 tracer->debug("update fcw recv of connection\n");
460 0         0 $self->fcw_recv(DEFAULT_INITIAL_WINDOW_SIZE);
461 0         0 $self->enqueue( WINDOW_UPDATE, 0, 0, DEFAULT_INITIAL_WINDOW_SIZE );
462             }
463              
464             sub fcw_initial_change {
465 0     0 0 0 my ( $self, $size ) = @_;
466 0         0 my $prev_size = $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE);
467 0         0 my $diff = $size - $prev_size;
468 0         0 tracer->debug(
469             "Change flow control window on not closed streams with diff $diff\n");
470 0         0 for my $stream_id ( keys %{ $self->{streams} } ) {
  0         0  
471 0 0       0 next if $self->stream_state($stream_id) == CLOSED;
472 0         0 $self->stream_fcw_send( $stream_id, $diff );
473             }
474             }
475              
476             sub ack_ping {
477 1     1 0 1 my ( $self, $payload_ref ) = @_;
478 1         11 $self->enqueue_first( PING, ACK, 0, $payload_ref );
479             }
480              
481             1;