File Coverage

blib/lib/Protocol/HTTP2/Stream.pm
Criterion Covered Total %
statement 188 221 85.0
branch 89 128 69.5
condition 17 27 62.9
subroutine 28 30 93.3
pod 0 24 0.0
total 322 430 74.8


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Stream;
2 11     11   54 use strict;
  11         19  
  11         277  
3 11     11   51 use warnings;
  11         20  
  11         328  
4 11         4463 use Protocol::HTTP2::Constants qw(:states :endpoints :settings :frame_types
5 11     11   53 :limits :errors);
  11         19  
6 11     11   62 use Protocol::HTTP2::HeaderCompression qw( headers_decode );
  11         19  
  11         515  
7 11     11   65 use Protocol::HTTP2::Trace qw(tracer);
  11         20  
  11         30998  
8              
9             # Streams related part of Protocol::HTTP2::Conntection
10              
11             sub new_stream {
12 19     19 0 2661 my $self = shift;
13 19 50       2788 return undef if $self->goaway;
14              
15             $self->{last_stream} += 2
16 19 50       2781 if exists $self->{streams}->{ $self->{type} == CLIENT ? 1 : 2 };
    100          
17             $self->{streams}->{ $self->{last_stream} } = {
18 19         2816 'state' => IDLE,
19             'weight' => DEFAULT_WEIGHT,
20             'stream_dep' => 0,
21             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
22             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
23             };
24 19         5421 return $self->{last_stream};
25             }
26              
27             sub new_peer_stream {
28 21     21 0 2741 my $self = shift;
29 21         2719 my $stream_id = shift;
30 21 50 33     2927 if ( $stream_id < $self->{last_peer_stream}
    50          
31             || ( $stream_id % 2 ) == ( $self->{type} == CLIENT ) ? 1 : 0
32             || $self->goaway )
33             {
34 0         0 tracer->error("Peer send invalid stream id: $stream_id\n");
35 0         0 $self->error(PROTOCOL_ERROR);
36 0         0 return undef;
37             }
38 21         2760 $self->{last_peer_stream} = $stream_id;
39 21 50       2771 if ( $self->dec_setting(SETTINGS_MAX_CONCURRENT_STREAMS) <=
40             $self->{active_peer_streams} )
41             {
42 0         0 tracer->warning("SETTINGS_MAX_CONCURRENT_STREAMS exceeded\n");
43 0         0 $self->stream_error( $stream_id, REFUSED_STREAM );
44 0         0 return undef;
45             }
46 21         2706 $self->{active_peer_streams}++;
47 21         2742 tracer->debug("Active streams: $self->{active_peer_streams}");
48 21         2739 $self->{streams}->{$stream_id} = {
49             'state' => IDLE,
50             'weight' => DEFAULT_WEIGHT,
51             'stream_dep' => 0,
52             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
53             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
54             };
55             $self->{on_new_peer_stream}->($stream_id)
56 21 100       2812 if exists $self->{on_new_peer_stream};
57              
58 21         5477 return $self->{last_peer_stream};
59             }
60              
61             sub stream {
62 76     76 0 12285 my ( $self, $stream_id ) = @_;
63 76 100       15236 return undef unless exists $self->{streams}->{$stream_id};
64              
65 62         19429 $self->{streams}->{$stream_id};
66             }
67              
68             # stream_state ( $self, $stream_id, $new_state?, $pending? )
69              
70             sub stream_state {
71 67     67 0 11118 my $self = shift;
72 67         10920 my $stream_id = shift;
73 67 50       11144 return undef unless exists $self->{streams}->{$stream_id};
74 67         11065 my $s = $self->{streams}->{$stream_id};
75              
76 67 50       11019 if (@_) {
77 67         11102 my ( $new_state, $pending ) = @_;
78              
79 67 100       10957 if ($pending) {
80 1         4 $self->stream_pending_state( $stream_id, $new_state );
81             }
82             else {
83             $self->{on_change_state}->( $stream_id, $s->{state}, $new_state )
84 66 50       11001 if exists $self->{on_change_state};
85              
86 66         11157 $s->{state} = $new_state;
87              
88             # Exec callbacks for new state
89 66 100 100     11221 if ( exists $s->{cb} && exists $s->{cb}->{ $s->{state} } ) {
90 28         5477 for my $cb ( @{ $s->{cb}->{ $s->{state} } } ) {
  28         11025  
91 28         5573 $cb->();
92             }
93             }
94              
95             # Cleanup
96 66 100       28533 if ( $new_state == CLOSED ) {
97             $self->{active_peer_streams}--
98             if $self->{active_peer_streams}
99 28 100 66     5528 && ( ( $stream_id % 2 ) ^ ( $self->{type} == CLIENT ) );
100 28         5581 tracer->info(
101             "Active streams: $self->{active_peer_streams} $stream_id");
102 28         5592 for my $key ( keys %$s ) {
103 246 100       46640 next if grep { $key eq $_ } (
  1230         500774  
104             qw(state weight stream_dep
105             fcw_recv fcw_send )
106             );
107 106         30209 delete $s->{$key};
108             }
109             }
110             }
111             }
112              
113 67         72053 $s->{state};
114             }
115              
116             sub stream_pending_state {
117 136     136 0 24607 my $self = shift;
118 136         24667 my $stream_id = shift;
119 136 50       24774 return undef unless exists $self->{streams}->{$stream_id};
120 136         24931 my $s = $self->{streams}->{$stream_id};
121 136 100       24813 if (@_) {
122 2         3 $s->{pending_state} = shift;
123             $self->{pending_stream} =
124 2 100       8 defined $s->{pending_state} ? $stream_id : undef;
125             }
126 136         50177 $s->{pending_state};
127             }
128              
129             sub stream_promised_sid {
130 134     134 0 24709 my $self = shift;
131 134         24658 my $stream_id = shift;
132 134 50       24837 return undef unless exists $self->{streams}->{$stream_id};
133 134         24641 my $s = $self->{streams}->{$stream_id};
134 134 50       24720 $s->{promised_sid} = shift if @_;
135 134         49282 $s->{promised_sid};
136             }
137              
138             sub stream_cb {
139 29     29 0 5492 my ( $self, $stream_id, $state, $cb ) = @_;
140              
141 29 50       5463 return undef unless exists $self->{streams}->{$stream_id};
142              
143 29         5463 push @{ $self->{streams}->{$stream_id}->{cb}->{$state} }, $cb;
  29         20639  
144             }
145              
146             sub stream_frame_cb {
147 10     10 0 2747 my ( $self, $stream_id, $frame, $cb ) = @_;
148              
149 10 50       2729 return undef unless exists $self->{streams}->{$stream_id};
150              
151 10         2685 push @{ $self->{streams}->{$stream_id}->{frame_cb}->{$frame} }, $cb;
  10         8101  
152             }
153              
154             sub stream_data {
155 48     48 0 8157 my $self = shift;
156 48         8257 my $stream_id = shift;
157 48 50       8312 return undef unless exists $self->{streams}->{$stream_id};
158 48         8149 my $s = $self->{streams}->{$stream_id};
159              
160 48 100       8288 if (@_) {
161              
162             # Exec callbacks for data
163 23 100 66     4225 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&DATA} ) {
164 5         1370 for my $cb ( @{ $s->{frame_cb}->{&DATA} } ) {
  5         2833  
165 5         1384 $cb->( $_[0] );
166             }
167             }
168             else {
169 18         8198 $s->{data} .= shift;
170             }
171             }
172              
173 48         16686 $s->{data};
174             }
175              
176             # Header Block -- The entire set of encoded header field representations
177             sub stream_header_block {
178 30     30 0 5486 my $self = shift;
179 30         5491 my $stream_id = shift;
180 30 50       5560 return undef unless exists $self->{streams}->{$stream_id};
181 30         5528 my $s = $self->{streams}->{$stream_id};
182              
183 30 50       5691 $s->{header_block} .= shift if @_;
184              
185 30         10959 $s->{header_block};
186             }
187              
188             sub stream_headers {
189 28     28 0 5441 my $self = shift;
190 28         5500 my $stream_id = shift;
191 28 50       5618 return undef unless exists $self->{streams}->{$stream_id};
192 28 50       5415 $self->{streams}->{$stream_id}->{headers} = shift if @_;
193 28         11073 $self->{streams}->{$stream_id}->{headers};
194             }
195              
196             sub stream_pp_headers {
197 0     0 0 0 my $self = shift;
198 0         0 my $stream_id = shift;
199 0 0       0 return undef unless exists $self->{streams}->{$stream_id};
200 0         0 $self->{streams}->{$stream_id}->{pp_headers};
201             }
202              
203             # Explicit content-length in headers
204             sub stream_length {
205 6     6 0 17 my $self = shift;
206 6         13 my $stream_id = shift;
207 6 50       28 return undef unless exists $self->{streams}->{$stream_id};
208 6 100       32 $self->{streams}->{$stream_id}->{length} = shift if @_;
209 6         26 $self->{streams}->{$stream_id}->{length};
210             }
211              
212             sub stream_headers_done {
213 30     30 0 5500 my $self = shift;
214 30         5486 my $stream_id = shift;
215 30 50       5571 return undef unless exists $self->{streams}->{$stream_id};
216 30         5514 my $s = $self->{streams}->{$stream_id};
217              
218             my $res =
219             headers_decode( $self, \$s->{header_block}, 0,
220 30         5618 length $s->{header_block}, $stream_id );
221              
222 30         5546 tracer->debug("Headers done for stream $stream_id\n");
223              
224 30 50       5616 return undef unless defined $res;
225              
226             # Clear header_block
227 30         5477 $s->{header_block} = '';
228              
229 30         5552 my $eh = $self->decode_context->{emitted_headers};
230 30   66     5664 my $is_response = $self->{type} == CLIENT && !$s->{promised_sid};
231              
232             return undef
233 30 50       5584 unless $self->validate_headers( $eh, $stream_id, $is_response );
234              
235 30 50       5539 if ( $s->{promised_sid} ) {
236 0         0 $self->{streams}->{ $s->{promised_sid} }->{pp_headers} = $eh;
237             }
238             else {
239 30         11018 $s->{headers} = $eh;
240             }
241              
242             # Exec callbacks for headers
243 30 100 66     5549 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&HEADERS} ) {
244 5         1364 for my $cb ( @{ $s->{frame_cb}->{&HEADERS} } ) {
  5         2789  
245 5         1414 $cb->($eh);
246             }
247             }
248              
249             # Clear emitted headers
250 30         5659 $self->decode_context->{emitted_headers} = [];
251              
252 30         10959 return 1;
253             }
254              
255             sub validate_headers {
256 30     30 0 5505 my ( $self, $headers, $stream_id, $is_response ) = @_;
257 30         5475 my $pseudo_flag = 1;
258 30         5504 my %pseudo_hash = ();
259 30 100       5542 my @h = $is_response ? (qw(:status)) : (
260             qw(:method :scheme :authority
261             :path)
262             );
263 30         5711 for my $i ( 0 .. @$headers / 2 - 1 ) {
264 128         25262 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
265 128 100       24957 if ( $h =~ /^\:/ ) {
266 75 50       13876 if ( !$pseudo_flag ) {
    50          
    50          
267 0         0 tracer->warning(
268             "pseudo-header <$h> appears after a regular header");
269 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
270 0         0 return undef;
271             }
272 255         92943 elsif ( !grep { $_ eq $h } @h ) {
273 0         0 tracer->warning("invalid pseudo-header <$h>");
274 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
275 0         0 return undef;
276             }
277             elsif ( exists $pseudo_hash{$h} ) {
278 0         0 tracer->warning("repeated pseudo-header <$h>");
279 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
280 0         0 return undef;
281             }
282              
283 75         13719 $pseudo_hash{$h} = $v;
284 75         27289 next;
285             }
286              
287 53 100       11071 $pseudo_flag = 0 if $pseudo_flag;
288              
289 53 50 33     16664 if ( $h eq 'connection' ) {
    50          
    100          
290 0         0 tracer->warning("connection header are not valid in http/2");
291 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
292 0         0 return undef;
293             }
294             elsif ( $h eq 'te' && $v ne 'trailers' ) {
295 0         0 tracer->warning("TE header can contain only value 'trailers'");
296 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
297 0         0 return undef;
298             }
299             elsif ( $h eq 'content-length' ) {
300 2         26 $self->stream_length( $stream_id, $v );
301             }
302             }
303              
304 30         5464 for my $h (@h) {
305 75 50       27329 next if exists $pseudo_hash{$h};
306              
307 0         0 tracer->warning("missed mandatory pseudo-header $h");
308 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
309 0         0 return undef;
310             }
311              
312 30         11044 1;
313             }
314              
315             # RST_STREAM for stream errors
316             sub stream_error {
317 5     5 0 1379 my ( $self, $stream_id, $error ) = @_;
318 5         1369 $self->enqueue( RST_STREAM, 0, $stream_id, $error );
319             }
320              
321             # Flow control windown of stream
322             sub _stream_fcw {
323 91     91   16443 my $dir = shift;
324 91         16401 my $self = shift;
325 91         16417 my $stream_id = shift;
326 91 50       16512 return undef unless exists $self->{streams}->{$stream_id};
327 91         16607 my $s = $self->{streams}->{$stream_id};
328              
329 91 100       16494 if (@_) {
330 60         10903 $s->{$dir} += shift;
331 60         11078 tracer->debug( "Stream $stream_id $dir now is " . $s->{$dir} . "\n" );
332             }
333 91         49212 $s->{$dir};
334             }
335              
336             sub stream_fcw_send {
337 60     60 0 11003 _stream_fcw( 'fcw_send', @_ );
338             }
339              
340             sub stream_fcw_recv {
341 31     31 0 5547 _stream_fcw( 'fcw_recv', @_ );
342             }
343              
344             sub stream_fcw_update {
345 0     0 0 0 my ( $self, $stream_id ) = @_;
346              
347             # TODO: check size of data of stream in memory
348 0         0 tracer->debug("update fcw recv of stream $stream_id\n");
349 0         0 $self->stream_fcw_recv( $stream_id, DEFAULT_INITIAL_WINDOW_SIZE );
350 0         0 $self->enqueue( WINDOW_UPDATE, 0, $stream_id, DEFAULT_INITIAL_WINDOW_SIZE );
351             }
352              
353             sub stream_blocked_data {
354 27     27 0 5473 my $self = shift;
355 27         5471 my $stream_id = shift;
356 27 50       5486 my $s = $self->{streams}->{$stream_id} or return undef;
357              
358 27 100       5534 $s->{blocked_data} = shift if @_;
359 27         10863 $s->{blocked_data};
360             }
361              
362             sub stream_send_blocked {
363 3     3 0 6 my ( $self, $stream_id ) = @_;
364 3 50       9 my $s = $self->{streams}->{$stream_id} or return undef;
365              
366 3 100 66     17 if ( length( $s->{blocked_data} )
367             && $self->stream_fcw_send($stream_id) != 0 )
368             {
369 1         3 $self->send_data($stream_id);
370             }
371             }
372              
373             sub stream_weight {
374 2     2 0 5 my ( $self, $stream_id, $weight ) = @_;
375 2 50       7 return undef unless exists $self->{streams}->{$stream_id};
376 2         4 my $s = $self->{streams}->{$stream_id};
377              
378 2 50       5 $s->{weight} = $weight if defined $weight;
379 2         5 $s->{weight};
380             }
381              
382             sub stream_end {
383 36     36 0 6881 my ( $self, $stream_id, $end_flag ) = @_;
384 36 50       6927 return undef unless exists $self->{streams}->{$stream_id};
385 36         6885 my $s = $self->{streams}->{$stream_id};
386              
387 36 100       6908 $s->{end} = $end_flag if defined $end_flag;
388 36         13672 $s->{end};
389             }
390              
391             sub stream_reprio {
392 7     7 0 16 my ( $self, $stream_id, $exclusive, $stream_dep ) = @_;
393 7 50       19 return undef unless exists $self->{streams}->{$stream_id};
394 7         10 my $s = $self->{streams};
395              
396 7 100       22 if ( $s->{$stream_id}->{stream_dep} != $stream_dep ) {
397              
398             # check if new stream_dep is stream child
399 4 50       11 if ( $stream_dep != 0 ) {
400 4         5 my $sid = $stream_dep;
401 4         11 while ( $sid = $s->{$sid}->{stream_dep} ) {
402 5 100       19 next unless $sid == $stream_id;
403              
404             # Child take my stream dep
405             $s->{$stream_dep}->{stream_dep} =
406 1         3 $s->{$stream_id}->{stream_dep};
407 1         3 last;
408             }
409             }
410              
411             # Set new stream dep
412 4         8 $s->{$stream_id}->{stream_dep} = $stream_dep;
413             }
414              
415 7 100       15 if ($exclusive) {
416              
417             # move all siblings to childs
418 1         4 for my $sid ( keys %$s ) {
419             next
420 3 100 66     17 if $s->{$sid}->{stream_dep} != $stream_dep
421             || $sid == $stream_id;
422              
423 2         5 $s->{$sid}->{stream_dep} = $stream_id;
424             }
425             }
426              
427 7         22 return 1;
428             }
429              
430             1;