File Coverage

blib/lib/Protocol/HTTP2/Connection.pm
Criterion Covered Total %
statement 144 190 75.7
branch 64 104 61.5
condition 38 81 46.9
subroutine 34 38 89.4
pod 0 28 0.0
total 280 441 63.4


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Connection;
2 11     11   6588 use strict;
  11         36  
  11         317  
3 11     11   56 use warnings;
  11         20  
  11         353  
4             use Protocol::HTTP2::Constants
5 11         5325 qw(const_name :frame_types :errors :settings :flags :states
6 11     11   3253 :limits :endpoints);
  11         37  
7 11     11   6482 use Protocol::HTTP2::HeaderCompression qw(headers_encode);
  11         30  
  11         672  
8 11     11   6303 use Protocol::HTTP2::Frame;
  11         33  
  11         332  
9 11     11   6549 use Protocol::HTTP2::Stream;
  11         30  
  11         412  
10 11     11   6061 use Protocol::HTTP2::Upgrade;
  11         33  
  11         445  
11 11     11   71 use Protocol::HTTP2::Trace qw(tracer);
  11         22  
  11         30201  
12              
13             # Mixin
14             our @ISA =
15             qw(Protocol::HTTP2::Frame Protocol::HTTP2::Stream Protocol::HTTP2::Upgrade);
16              
17             # Default settings
18             my %default_settings = (
19             &SETTINGS_HEADER_TABLE_SIZE => DEFAULT_HEADER_TABLE_SIZE,
20             &SETTINGS_ENABLE_PUSH => DEFAULT_ENABLE_PUSH,
21             &SETTINGS_MAX_CONCURRENT_STREAMS => DEFAULT_MAX_CONCURRENT_STREAMS,
22             &SETTINGS_INITIAL_WINDOW_SIZE => DEFAULT_INITIAL_WINDOW_SIZE,
23             &SETTINGS_MAX_FRAME_SIZE => DEFAULT_MAX_FRAME_SIZE,
24             &SETTINGS_MAX_HEADER_LIST_SIZE => DEFAULT_MAX_HEADER_LIST_SIZE,
25             );
26              
27             sub new {
28 39     39 0 15367 my ( $class, $type, %opts ) = @_;
29 39 100       7207 my $self = bless {
30             type => $type,
31              
32             streams => {},
33              
34             last_stream => $type == CLIENT ? 1 : 2,
35             last_peer_stream => 0,
36             active_peer_streams => 0,
37              
38             encode_ctx => {
39              
40             # HPACK. Header Table
41             header_table => [],
42              
43             # HPACK. Header Table size
44             ht_size => 0,
45              
46             settings => {%default_settings},
47              
48             },
49              
50             decode_ctx => {
51              
52             # HPACK. Header Table
53             header_table => [],
54              
55             # HPACK. Header Table size
56             ht_size => 0,
57              
58             # HPACK. Emitted headers
59             emitted_headers => [],
60              
61             # last frame
62             frame => {},
63              
64             settings => {%default_settings},
65             },
66              
67             # Current error
68             error => 0,
69              
70             # Output frames queue
71             queue => [],
72              
73             # Connection must be shutdown
74             shutdown => 0,
75              
76             # issued GOAWAY: no new streams on this connection
77             goaway => 0,
78              
79             # get preface
80             preface => 0,
81              
82             # perform upgrade
83             upgrade => 0,
84              
85             # flow control
86             fcw_send => DEFAULT_INITIAL_WINDOW_SIZE,
87             fcw_recv => DEFAULT_INITIAL_WINDOW_SIZE,
88              
89             # stream where expected CONTINUATION frames
90             pending_stream => undef,
91              
92             }, $class;
93              
94 39         5617 for (qw(on_change_state on_new_peer_stream on_error upgrade)) {
95 156 100       11537 $self->{$_} = $opts{$_} if exists $opts{$_};
96             }
97              
98 39 100       5464 if ( exists $opts{settings} ) {
99 16         2636 for ( keys %{ $opts{settings} } ) {
  16         5454  
100 16         7974 $self->{decode_ctx}->{settings}->{$_} = $opts{settings}{$_};
101             }
102             }
103              
104 39         10876 $self;
105             }
106              
107             sub decode_context {
108 352     352 0 131367 shift->{decode_ctx};
109             }
110              
111             sub encode_context {
112 32     32 0 10979 shift->{encode_ctx};
113             }
114              
115             sub pending_stream {
116 129     129 0 49009 shift->{pending_stream};
117             }
118              
119             sub dequeue {
120 234     234 0 43967 my $self = shift;
121 234         43529 shift @{ $self->{queue} };
  234         130287  
122             }
123              
124             sub enqueue_raw {
125 16     16 0 2726 my $self = shift;
126 16         2701 push @{ $self->{queue} }, @_;
  16         8063  
127             }
128              
129             sub enqueue {
130 131     131 0 24472 my $self = shift;
131 131         24792 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
132 133         24370 push @{ $self->{queue} },
  133         48996  
133             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
134 133         24944 $self->state_machine( 'send', $type, $flags, $stream_id );
135             }
136             }
137              
138             sub enqueue_first {
139 2     2 0 278 my $self = shift;
140 2         3 my $i = 0;
141 2         4 for ( 0 .. $#{ $self->{queue} } ) {
  2         7  
142             last
143 2 100       10 if ( ( $self->frame_header_decode( \$self->{queue}->[$_], 0 ) )[1] !=
144             CONTINUATION );
145 1         2 $i++;
146             }
147 2         9 while ( my ( $type, $flags, $stream_id, $data_ref ) = splice( @_, 0, 4 ) ) {
148 2         4 splice @{ $self->{queue} }, $i++, 0,
  2         9  
149             $self->frame_encode( $type, $flags, $stream_id, $data_ref );
150 2         5 $self->state_machine( 'send', $type, $flags, $stream_id );
151             }
152             }
153              
154             sub finish {
155 8     8 0 1383 my $self = shift;
156             $self->enqueue( GOAWAY, 0, 0,
157 8 50       1396 [ $self->{last_peer_stream}, $self->{error} ] )
158             unless $self->shutdown;
159 8         1396 $self->shutdown(1);
160             }
161              
162             sub shutdown {
163 24     24 0 2772 my $self = shift;
164 24 100       2788 $self->{shutdown} = shift if @_;
165 24         12284 $self->{shutdown};
166             }
167              
168             sub goaway {
169 54     54 0 8189 my $self = shift;
170 54 100       8158 $self->{goaway} = shift if @_;
171 54         16369 $self->{goaway};
172             }
173              
174             sub preface {
175 249     249 0 46520 my $self = shift;
176 249 100       46441 $self->{preface} = shift if @_;
177 249         98273 $self->{preface};
178             }
179              
180             sub upgrade {
181 304     304 0 57104 my $self = shift;
182 304 50       57453 $self->{upgrade} = shift if @_;
183 304         114946 $self->{upgrade};
184             }
185              
186             sub state_machine {
187 264     264 0 49191 my ( $self, $act, $type, $flags, $stream_id ) = @_;
188              
189             return
190 264 50 66     104387 if $stream_id == 0
      66        
      33        
      33        
191             || $type == SETTINGS
192             || $type == GOAWAY
193             || $self->upgrade
194             || !$self->preface;
195              
196 134         25093 my $promised_sid = $self->stream_promised_sid($stream_id);
197              
198 134   33     25494 my $prev_state = $self->{streams}->{ $promised_sid || $stream_id }->{state};
199              
200             # REFUSED_STREAM error
201 134 0 33     24868 return if !defined $prev_state && $type == RST_STREAM && $act eq 'send';
      33        
202              
203             # Direction server->client
204             my $srv2cln = ( $self->{type} == SERVER && $act eq 'send' )
205 134   66     25479 || ( $self->{type} == CLIENT && $act eq 'recv' );
206              
207             # Direction client->server
208             my $cln2srv = ( $self->{type} == SERVER && $act eq 'recv' )
209 134   66     25503 || ( $self->{type} == CLIENT && $act eq 'send' );
210              
211             # Do we expect CONTINUATION after this frame?
212 134   100     25209 my $pending = ( $type == HEADERS || $type == PUSH_PROMISE )
213             && !( $flags & END_HEADERS );
214              
215             #tracer->debug(
216             # sprintf "\e[0;31mStream state: frame %s is %s%s on %s stream %i\e[m\n",
217             # const_name( "frame_types", $type ),
218             # $act,
219             # $pending ? "*" : "",
220             # const_name( "states", $prev_state ),
221             # $promised_sid || $stream_id,
222             #);
223              
224             # Wait until all CONTINUATION frames arrive
225 134 100       24995 if ( my $ps = $self->stream_pending_state($stream_id) ) {
    100          
    100          
    50          
    100          
    50          
226 1 50       7 if ( $type != CONTINUATION ) {
    50          
227 0         0 tracer->error(
228             sprintf "invalid frame type %s. Expected CONTINUATION frame\n",
229             const_name( "frame_types", $type )
230             );
231 0         0 $self->error(PROTOCOL_ERROR);
232             }
233             elsif ( $flags & END_HEADERS ) {
234 1 50       3 $self->stream_promised_sid( $stream_id, undef ) if $promised_sid;
235 1   33     8 $self->stream_pending_state( $promised_sid || $stream_id, undef );
236 1   33     7 $self->stream_state( $promised_sid || $stream_id, $ps );
237             }
238             }
239              
240             # State machine
241             # IDLE
242             elsif ( $prev_state == IDLE ) {
243 34 100 66     5565 if ( $type == HEADERS && $cln2srv ) {
    50 33        
    50 33        
    50          
244 33 100       5702 $self->stream_state( $stream_id,
245             ( $flags & END_STREAM ) ? HALF_CLOSED : OPEN, $pending );
246             }
247             elsif ( $type == PUSH_PROMISE && $srv2cln ) {
248 0         0 $self->stream_state( $promised_sid, RESERVED, $pending );
249 0 0       0 $self->stream_promised_sid( $stream_id, undef )
250             if $flags & END_HEADERS;
251             }
252              
253             # first frame in stream is invalid, so state is yet IDLE
254             elsif ( $type == RST_STREAM && $act eq 'send' ) {
255 0         0 tracer->notice('send RST_STREAM on IDLE state. possible bug?');
256 0         0 $self->stream_state( $stream_id, CLOSED );
257             }
258             elsif ( $type != PRIORITY ) {
259 0         0 tracer->error(
260             sprintf "invalid frame type %s for current stream state %s\n",
261             const_name( "frame_types", $type ),
262             const_name( "states", $prev_state )
263             );
264 0         0 $self->error(PROTOCOL_ERROR);
265             }
266             }
267              
268             # OPEN
269             elsif ( $prev_state == OPEN ) {
270 13 100 33     83 if ( ( $flags & END_STREAM )
    50 66        
271             && ( $type == DATA || $type == HEADERS ) )
272             {
273 4         13 $self->stream_state( $stream_id, HALF_CLOSED, $pending );
274             }
275             elsif ( $type == RST_STREAM ) {
276 0         0 $self->stream_state( $stream_id, CLOSED );
277             }
278             }
279              
280             # RESERVED (local/remote)
281             elsif ( $prev_state == RESERVED ) {
282 0 0 0     0 if ( $type == RST_STREAM ) {
    0 0        
    0          
283 0         0 $self->stream_state( $stream_id, CLOSED );
284             }
285             elsif ( $type == HEADERS && $srv2cln ) {
286 0 0       0 $self->stream_state( $stream_id,
287             ( $flags & END_STREAM ) ? CLOSED : HALF_CLOSED, $pending );
288             }
289             elsif ( $type != PRIORITY && $cln2srv ) {
290 0         0 tracer->error("invalid frame $type for state RESERVED");
291 0         0 $self->error(PROTOCOL_ERROR);
292             }
293             }
294              
295             # HALF_CLOSED (local/remote)
296             elsif ( $prev_state == HALF_CLOSED ) {
297 81 100 66     18128 if ( ( $type == RST_STREAM )
    50 66        
      33        
298             || ( ( $flags & END_STREAM ) && $srv2cln ) )
299             {
300 28         5577 $self->stream_state( $stream_id, CLOSED, $pending );
301             }
302 106         87454 elsif ( ( !grep { $type == $_ } ( WINDOW_UPDATE, PRIORITY ) )
303             && $cln2srv )
304             {
305 0         0 tracer->error( sprintf "invalid frame %s for state HALF CLOSED\n",
306             const_name( "frame_types", $type ) );
307 0         0 $self->error(PROTOCOL_ERROR);
308             }
309             }
310              
311             # CLOSED
312             elsif ( $prev_state == CLOSED ) {
313 5 50 33     4125 if ( $type != PRIORITY && ( $type != WINDOW_UPDATE && $cln2srv ) ) {
      33        
314              
315 0         0 tracer->error("stream is closed\n");
316 0         0 $self->error(STREAM_CLOSED);
317             }
318             }
319             else {
320 0         0 tracer->error("oops!\n");
321 0         0 $self->error(INTERNAL_ERROR);
322             }
323             }
324              
325             # TODO: move this to some other module
326             sub send_headers {
327 30     30 0 5441 my ( $self, $stream_id, $headers, $end ) = @_;
328 30         5422 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
329              
330 30         5488 my $header_block = headers_encode( $self->encode_context, $headers );
331              
332 30 100       5533 my $flags = $end ? END_STREAM : 0;
333 30 50       5602 $flags |= END_HEADERS if length($header_block) <= $max_size;
334              
335 30         5582 $self->enqueue( HEADERS, $flags, $stream_id,
336             { hblock => \substr( $header_block, 0, $max_size, '' ) } );
337 30         11000 while ( length($header_block) > 0 ) {
338 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
339 0         0 $self->enqueue( CONTINUATION, $flags,
340             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
341             }
342             }
343              
344             sub send_pp_headers {
345 0     0 0 0 my ( $self, $stream_id, $promised_id, $headers ) = @_;
346 0         0 my $max_size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
347              
348 0         0 my $header_block = headers_encode( $self->encode_context, $headers );
349              
350 0 0       0 my $flags = length($header_block) <= $max_size ? END_HEADERS : 0;
351              
352 0         0 $self->enqueue( PUSH_PROMISE, $flags, $stream_id,
353             [ $promised_id, \substr( $header_block, 0, $max_size - 4, '' ) ] );
354              
355 0         0 while ( length($header_block) > 0 ) {
356 0 0       0 my $flags = length($header_block) <= $max_size ? 0 : END_HEADERS;
357 0         0 $self->enqueue( CONTINUATION, $flags,
358             $stream_id, \substr( $header_block, 0, $max_size, '' ) );
359             }
360             }
361              
362             sub send_data {
363 26     26 0 5478 my ( $self, $stream_id, $chunk, $end ) = @_;
364 26         5490 my $data = $self->stream_blocked_data($stream_id);
365 26 100       5726 $data .= defined $chunk ? $chunk : '';
366 26 100       5515 $self->stream_end( $stream_id, $end ) if defined $end;
367 26         5493 $end = $self->stream_end($stream_id);
368              
369 26         5506 while (1) {
370 30         5424 my $l = length($data);
371 30         5436 my $size = $self->enc_setting(SETTINGS_MAX_FRAME_SIZE);
372 30         5502 for ( $l, $self->fcw_send, $self->stream_fcw_send($stream_id) ) {
373 90 100       11116 $size = $_ if $size > $_;
374             }
375              
376             # Flow control
377 30 100 100     5609 if ( $l != 0 && $size == 0 ) {
378 1         5 $self->stream_blocked_data( $stream_id, $data );
379 1         7 last;
380             }
381 29         5441 $self->fcw_send( -$size );
382 29         5511 $self->stream_fcw_send( $stream_id, -$size );
383              
384 29 100 100     5700 $self->enqueue(
385             DATA, $end && $l == $size ? END_STREAM : 0,
386             $stream_id, \substr( $data, 0, $size, '' )
387             );
388 29 100       27186 last if $l == $size;
389             }
390             }
391              
392             sub send_blocked {
393 1     1 0 3 my $self = shift;
394 1         2 for my $stream_id ( keys %{ $self->{streams} } ) {
  1         4  
395 3         16 $self->stream_send_blocked($stream_id);
396             }
397             }
398              
399             sub error {
400 0     0 0 0 my $self = shift;
401 0 0 0     0 if ( @_ && !$self->{shutdown} ) {
402 0         0 $self->{error} = shift;
403 0 0       0 $self->{on_error}->( $self->{error} ) if exists $self->{on_error};
404 0         0 $self->finish;
405             }
406 0         0 $self->{error};
407             }
408              
409             sub setting {
410 0     0 0 0 require Carp;
411 0         0 Carp::confess("setting is deprecated\n");
412             }
413              
414             sub _setting {
415 380     380   65703 my ( $ctx, $self, $setting ) = @_;
416 380         65582 my $s = $self->{$ctx}->{settings};
417 380 50       65905 return undef unless exists $s->{$setting};
418 380 100       65918 $s->{$setting} = pop if @_ > 3;
419 380         199214 $s->{$setting};
420             }
421              
422             sub enc_setting {
423 128     128 0 21912 _setting( 'encode_ctx', @_ );
424             }
425              
426             sub dec_setting {
427 252     252 0 43934 _setting( 'decode_ctx', @_ );
428             }
429              
430             sub accept_settings {
431 27     27 0 5404 my $self = shift;
432 27         5460 $self->enqueue( SETTINGS, ACK, 0, {} );
433             }
434              
435             # Flow control windown of connection
436             sub _fcw {
437 92     92   16428 my $dir = shift;
438 92         16345 my $self = shift;
439              
440 92 100       16494 if (@_) {
441 62         10868 $self->{$dir} += shift;
442 62         11164 tracer->debug( "$dir now is " . $self->{$dir} . "\n" );
443             }
444 92         49277 $self->{$dir};
445             }
446              
447             sub fcw_send {
448 60     60 0 11053 _fcw( 'fcw_send', @_ );
449             }
450              
451             sub fcw_recv {
452 32     32 0 5491 _fcw( 'fcw_recv', @_ );
453             }
454              
455             sub fcw_update {
456 1     1 0 2 my $self = shift;
457              
458             # TODO: check size of data in memory
459 1         4 tracer->debug("update fcw recv of connection\n");
460 1         3 $self->fcw_recv(DEFAULT_INITIAL_WINDOW_SIZE);
461 1         4 $self->enqueue( WINDOW_UPDATE, 0, 0, DEFAULT_INITIAL_WINDOW_SIZE );
462             }
463              
464             sub fcw_initial_change {
465 0     0 0 0 my ( $self, $size ) = @_;
466 0         0 my $prev_size = $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE);
467 0         0 my $diff = $size - $prev_size;
468 0         0 tracer->debug(
469             "Change flow control window on not closed streams with diff $diff\n");
470 0         0 for my $stream_id ( keys %{ $self->{streams} } ) {
  0         0  
471 0 0       0 next if $self->stream_state($stream_id) == CLOSED;
472 0         0 $self->stream_fcw_send( $stream_id, $diff );
473             }
474             }
475              
476             sub ack_ping {
477 1     1 0 2 my ( $self, $payload_ref ) = @_;
478 1         5 $self->enqueue_first( PING, ACK, 0, $payload_ref );
479             }
480              
481             1;