File Coverage

blib/lib/Protocol/HTTP2/Stream.pm
Criterion Covered Total %
statement 157 203 77.3
branch 79 112 70.5
condition 21 36 58.3
subroutine 23 24 95.8
pod 0 16 0.0
total 280 391 71.6


line stmt bran cond sub pod time code
1             package Protocol::HTTP2::Stream;
2 12     12   53 use strict;
  12         14  
  12         272  
3 12     12   36 use warnings;
  12         11  
  12         292  
4 12         3234 use Protocol::HTTP2::Constants qw(:states :endpoints :settings :frame_types
5 12     12   36 :limits :errors);
  12         13  
6 12     12   55 use Protocol::HTTP2::HeaderCompression qw( headers_decode );
  12         14  
  12         483  
7 12     12   42 use Protocol::HTTP2::Trace qw(tracer);
  12         16  
  12         398  
8              
9             # Streams related part of Protocol::HTTP2::Conntection
10              
11             # Autogen properties
12             {
13 12     12   39 no strict 'refs';
  12         21  
  12         22860  
14             for my $prop (
15             qw(promised_sid headers pp_headers header_block trailer
16             trailer_headers length blocked_data weight end reset)
17             )
18             {
19             *{ __PACKAGE__ . '::stream_' . $prop } = sub {
20             return
21             !exists $_[0]->{streams}->{ $_[1] } ? undef
22             : @_ == 2 ? $_[0]->{streams}->{ $_[1] }->{$prop}
23 736 100   736   90613 : ( $_[0]->{streams}->{ $_[1] }->{$prop} = $_[2] );
    50          
24             }
25             }
26             }
27              
28             sub new_stream {
29 42     42 0 1588 my $self = shift;
30 42 100       1662 return undef if $self->goaway;
31              
32             $self->{last_stream} += 2
33 40 50       1690 if exists $self->{streams}->{ $self->{type} == CLIENT ? 1 : 2 };
    100          
34             $self->{streams}->{ $self->{last_stream} } = {
35 40         1694 'state' => IDLE,
36             'weight' => DEFAULT_WEIGHT,
37             'stream_dep' => 0,
38             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
39             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
40             };
41 40         3196 return $self->{last_stream};
42             }
43              
44             sub new_peer_stream {
45 42     42 0 1626 my $self = shift;
46 42         1606 my $stream_id = shift;
47 42 50 33     1838 if ( $stream_id < $self->{last_peer_stream}
    50          
48             || ( $stream_id % 2 ) == ( $self->{type} == CLIENT ) ? 1 : 0
49             || $self->goaway )
50             {
51 0         0 tracer->error("Peer send invalid stream id: $stream_id\n");
52 0         0 $self->error(PROTOCOL_ERROR);
53 0         0 return undef;
54             }
55 42         1624 $self->{last_peer_stream} = $stream_id;
56 42 50       1661 if ( $self->dec_setting(SETTINGS_MAX_CONCURRENT_STREAMS) <=
57             $self->{active_peer_streams} )
58             {
59 0         0 tracer->warning("SETTINGS_MAX_CONCURRENT_STREAMS exceeded\n");
60 0         0 $self->stream_error( $stream_id, REFUSED_STREAM );
61 0         0 return undef;
62             }
63 42         1625 $self->{active_peer_streams}++;
64 42         1652 tracer->debug("Active streams: $self->{active_peer_streams}");
65 42         1660 $self->{streams}->{$stream_id} = {
66             'state' => IDLE,
67             'weight' => DEFAULT_WEIGHT,
68             'stream_dep' => 0,
69             'fcw_recv' => $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE),
70             'fcw_send' => $self->enc_setting(SETTINGS_INITIAL_WINDOW_SIZE),
71             };
72             $self->{on_new_peer_stream}->($stream_id)
73 42 100       1708 if exists $self->{on_new_peer_stream};
74              
75 42         3284 return $self->{last_peer_stream};
76             }
77              
78             sub stream {
79 139     139 0 7302 my ( $self, $stream_id ) = @_;
80 139 100       9140 return undef unless exists $self->{streams}->{$stream_id};
81              
82 104         11494 $self->{streams}->{$stream_id};
83             }
84              
85             # stream_state ( $self, $stream_id, $new_state?, $pending? )
86              
87             sub stream_state {
88 151     151 0 6466 my $self = shift;
89 151         6450 my $stream_id = shift;
90 151 50       6563 return undef unless exists $self->{streams}->{$stream_id};
91 151         6478 my $s = $self->{streams}->{$stream_id};
92              
93 151 50       6555 if (@_) {
94 151         6497 my ( $new_state, $pending ) = @_;
95              
96 151 100       6528 if ($pending) {
97 1         3 $self->stream_pending_state( $stream_id, $new_state );
98             }
99             else {
100             $self->{on_change_state}->( $stream_id, $s->{state}, $new_state )
101 150 50       6549 if exists $self->{on_change_state};
102              
103 150         6511 $s->{state} = $new_state;
104              
105             # Exec callbacks for new state
106 150 100 100     6754 if ( exists $s->{cb} && exists $s->{cb}->{ $s->{state} } ) {
107 70         3290 for my $cb ( @{ $s->{cb}->{ $s->{state} } } ) {
  70         6462  
108 70         3326 $cb->();
109             }
110             }
111              
112             # Cleanup
113 150 100       16963 if ( $new_state == CLOSED ) {
114             $self->{active_peer_streams}--
115             if $self->{active_peer_streams}
116 70 100 66     3402 && ( ( $stream_id % 2 ) ^ ( $self->{type} == CLIENT ) );
117 70         3345 tracer->info(
118             "Active streams: $self->{active_peer_streams} $stream_id");
119 70         3490 for my $key ( keys %$s ) {
120 624 100       30777 next if grep { $key eq $_ } (
  3744         395595  
121             qw(state weight stream_dep
122             fcw_recv fcw_send reset)
123             );
124 264         13303 delete $s->{$key};
125             }
126             }
127             }
128             }
129              
130 151         42562 $s->{state};
131             }
132              
133             sub stream_pending_state {
134 262     262 0 14468 my $self = shift;
135 262         14768 my $stream_id = shift;
136 262 50       14668 return undef unless exists $self->{streams}->{$stream_id};
137 262         14479 my $s = $self->{streams}->{$stream_id};
138 262 100       14682 if (@_) {
139 2         3 $s->{pending_state} = shift;
140             $self->{pending_stream} =
141 2 100       3 defined $s->{pending_state} ? $stream_id : undef;
142             }
143 262         29674 $s->{pending_state};
144             }
145              
146             sub stream_cb {
147 71     71 0 3249 my ( $self, $stream_id, $state, $cb ) = @_;
148              
149 71 50       3264 return undef unless exists $self->{streams}->{$stream_id};
150              
151 71         3192 push @{ $self->{streams}->{$stream_id}->{cb}->{$state} }, $cb;
  71         12073  
152             }
153              
154             sub stream_frame_cb {
155 10     10 0 1596 my ( $self, $stream_id, $frame, $cb ) = @_;
156              
157 10 50       1592 return undef unless exists $self->{streams}->{$stream_id};
158              
159 10         1590 push @{ $self->{streams}->{$stream_id}->{frame_cb}->{$frame} }, $cb;
  10         4786  
160             }
161              
162             sub stream_data {
163 90     90 0 4803 my $self = shift;
164 90         4800 my $stream_id = shift;
165 90 50       4918 return undef unless exists $self->{streams}->{$stream_id};
166 90         4852 my $s = $self->{streams}->{$stream_id};
167              
168 90 100       4894 if (@_) {
169              
170             # Exec callbacks for data
171 23 100 66     2458 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&DATA} ) {
172 5         801 for my $cb ( @{ $s->{frame_cb}->{&DATA} } ) {
  5         1635  
173 5         817 $cb->( $_[0] );
174             }
175             }
176             else {
177 18         4845 $s->{data} .= shift;
178             }
179             }
180              
181 90         9738 $s->{data};
182             }
183              
184             sub stream_headers_done {
185 72     72 0 3215 my $self = shift;
186 72         3259 my $stream_id = shift;
187 72 50       3291 return undef unless exists $self->{streams}->{$stream_id};
188 72         3214 my $s = $self->{streams}->{$stream_id};
189              
190             my $res =
191             headers_decode( $self, \$s->{header_block}, 0,
192 72         3365 length $s->{header_block}, $stream_id );
193              
194 72         3290 tracer->debug("Headers done for stream $stream_id\n");
195              
196 72 50       3301 return undef unless defined $res;
197              
198             # Clear header_block
199 72         3268 $s->{header_block} = '';
200              
201 72         3333 my $eh = $self->decode_context->{emitted_headers};
202 72   66     3399 my $is_response = $self->{type} == CLIENT && !$s->{promised_sid};
203 72         3308 my $is_trailer = !!$self->stream_trailer($stream_id);
204              
205             return undef
206 72 50       3288 unless $self->validate_headers( $eh, $stream_id, $is_response );
207              
208 72 50       3330 if ( $s->{promised_sid} ) {
    50          
209 0         0 $self->{streams}->{ $s->{promised_sid} }->{pp_headers} = $eh;
210             }
211             elsif ($is_trailer) {
212 0         0 $self->stream_trailer_headers( $stream_id, $eh );
213             }
214             else {
215 72         6582 $s->{headers} = $eh;
216             }
217              
218             # Exec callbacks for headers
219 72 100 66     3327 if ( exists $s->{frame_cb} && exists $s->{frame_cb}->{&HEADERS} ) {
220 5         801 for my $cb ( @{ $s->{frame_cb}->{&HEADERS} } ) {
  5         1901  
221 5         823 $cb->($eh);
222             }
223             }
224              
225             # Clear emitted headers
226 72         3380 $self->decode_context->{emitted_headers} = [];
227              
228 72         6808 return 1;
229             }
230              
231             sub validate_headers {
232 72     72 0 3242 my ( $self, $headers, $stream_id, $is_response ) = @_;
233 72         3225 my $pseudo_flag = 1;
234 72         3272 my %pseudo_hash = ();
235 72 100       3340 my @h = $is_response ? (qw(:status)) : (
236             qw(:method :scheme :authority
237             :path)
238             );
239              
240             # Trailer headers ?
241 72 50       3255 if ( my $t = $self->stream_trailer($stream_id) ) {
242 0         0 for my $i ( 0 .. @$headers / 2 - 1 ) {
243 0         0 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
244 0 0       0 if ( !exists $t->{$h} ) {
245 0         0 tracer->warning(
246             "header <$h> doesn't listed in the trailer header");
247 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
248 0         0 return undef;
249             }
250             }
251 0         0 return 1;
252             }
253              
254 72         3397 for my $i ( 0 .. @$headers / 2 - 1 ) {
255 233         14908 my ( $h, $v ) = ( $headers->[ $i * 2 ], $headers->[ $i * 2 + 1 ] );
256 233 100       14873 if ( $h =~ /^\:/ ) {
257 180 50       8200 if ( !$pseudo_flag ) {
    50          
    50          
258 0         0 tracer->warning(
259             "pseudo-header <$h> appears after a regular header");
260 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
261 0         0 return undef;
262             }
263 612         54748 elsif ( !grep { $_ eq $h } @h ) {
264 0         0 tracer->warning("invalid pseudo-header <$h>");
265 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
266 0         0 return undef;
267             }
268             elsif ( exists $pseudo_hash{$h} ) {
269 0         0 tracer->warning("repeated pseudo-header <$h>");
270 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
271 0         0 return undef;
272             }
273              
274 180         8145 $pseudo_hash{$h} = $v;
275 180         16051 next;
276             }
277              
278 53 100       6412 $pseudo_flag = 0 if $pseudo_flag;
279              
280 53 50 33     9660 if ( $h eq 'connection' ) {
    50          
    100          
    50          
281 0         0 tracer->warning("connection header is not valid in http/2");
282 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
283 0         0 return undef;
284             }
285             elsif ( $h eq 'te' && $v ne 'trailers' ) {
286 0         0 tracer->warning("TE header can contain only value 'trailers'");
287 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
288 0         0 return undef;
289             }
290             elsif ( $h eq 'content-length' ) {
291 2         11 $self->stream_length( $stream_id, $v );
292             }
293             elsif ( $h eq 'trailer' ) {
294 0         0 my %th = map { $_ => 1 } split /\s*,\s*/, lc($v);
  0         0  
295 0 0       0 if (
296 0         0 grep { exists $th{$_} } (
297             qw(transfer-encoding content-length host authentication
298             cache-control expect max-forwards pragma range te
299             content-encoding content-type content-range trailer)
300             )
301             )
302             {
303 0         0 tracer->warning("trailer header contain forbidden headers");
304 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
305 0         0 return undef;
306             }
307 0         0 $self->stream_trailer( $stream_id, {%th} );
308             }
309             }
310              
311 72         3263 for my $h (@h) {
312 180 50       16143 next if exists $pseudo_hash{$h};
313              
314 0         0 tracer->warning("missed mandatory pseudo-header $h");
315 0         0 $self->stream_error( $stream_id, PROTOCOL_ERROR );
316 0         0 return undef;
317             }
318              
319 72         6531 1;
320             }
321              
322             # RST_STREAM for stream errors
323             sub stream_error {
324 5     5 0 813 my ( $self, $stream_id, $error ) = @_;
325 5         815 $self->enqueue( RST_STREAM, 0, $stream_id, $error );
326             }
327              
328             # Flow control windown of stream
329             sub _stream_fcw {
330 154     154   9694 my $dir = shift;
331 154         9587 my $self = shift;
332 154         9894 my $stream_id = shift;
333 154 50       9785 return undef unless exists $self->{streams}->{$stream_id};
334 154         9573 my $s = $self->{streams}->{$stream_id};
335              
336 154 100       9769 if (@_) {
337 102         6740 $s->{$dir} += shift;
338 102         6475 tracer->debug( "Stream $stream_id $dir now is " . $s->{$dir} . "\n" );
339             }
340 154         28768 $s->{$dir};
341             }
342              
343             sub stream_fcw_send {
344 102     102 0 6730 _stream_fcw( 'fcw_send', @_ );
345             }
346              
347             sub stream_fcw_recv {
348 52     52 0 3258 _stream_fcw( 'fcw_recv', @_ );
349             }
350              
351             sub stream_fcw_update {
352 0     0 0 0 my ( $self, $stream_id ) = @_;
353              
354             # TODO: check size of data of stream in memory
355 0         0 my $size = $self->dec_setting(SETTINGS_INITIAL_WINDOW_SIZE);
356 0         0 tracer->debug("update fcw recv of stream $stream_id with $size b.\n");
357 0         0 $self->stream_fcw_recv( $stream_id, $size );
358 0         0 $self->enqueue( WINDOW_UPDATE, 0, $stream_id, $size );
359             }
360              
361             sub stream_send_blocked {
362 3     3 0 4 my ( $self, $stream_id ) = @_;
363 3 50       5 my $s = $self->{streams}->{$stream_id} or return undef;
364              
365 3 100 66     14 if ( length( $s->{blocked_data} )
366             && $self->stream_fcw_send($stream_id) > 0 )
367             {
368 1         2 $self->send_data($stream_id);
369             }
370             }
371              
372             sub stream_reprio {
373 7     7 0 10 my ( $self, $stream_id, $exclusive, $stream_dep ) = @_;
374             return undef
375             unless exists $self->{streams}->{$stream_id}
376 7 50 66     47 && ( $stream_dep == 0 || exists $self->{streams}->{$stream_dep} )
      33        
      33        
377             && $stream_id != $stream_dep;
378 7         6 my $s = $self->{streams};
379              
380 7 100       12 if ( $s->{$stream_id}->{stream_dep} != $stream_dep ) {
381              
382             # check if new stream_dep is stream child
383 4 50       7 if ( $stream_dep != 0 ) {
384 4         5 my $sid = $stream_dep;
385 4         8 while ( $sid = $s->{$sid}->{stream_dep} ) {
386 5 100       12 next unless $sid == $stream_id;
387              
388             # Child take my stream dep
389             $s->{$stream_dep}->{stream_dep} =
390 1         1 $s->{$stream_id}->{stream_dep};
391 1         1 last;
392             }
393             }
394              
395             # Set new stream dep
396 4         6 $s->{$stream_id}->{stream_dep} = $stream_dep;
397             }
398              
399 7 100       10 if ($exclusive) {
400              
401             # move all siblings to childs
402 1         4 for my $sid ( keys %$s ) {
403             next
404 3 100 66     14 if $s->{$sid}->{stream_dep} != $stream_dep
405             || $sid == $stream_id;
406              
407 2         2 $s->{$sid}->{stream_dep} = $stream_id;
408             }
409             }
410              
411 7         18 return 1;
412             }
413              
414             1;