File Coverage

blib/lib/Protocol/SPDY/Stream.pm
Criterion Covered Total %
statement 76 147 51.7
branch 19 62 30.6
condition 6 19 31.5
subroutine 26 46 56.5
pod 37 37 100.0
total 164 311 52.7


line stmt bran cond sub pod time code
1             package Protocol::SPDY::Stream;
2             {
3             $Protocol::SPDY::Stream::VERSION = '0.999_007';
4             }
5 4     4   419 use strict;
  4         4  
  4         91  
6 4     4   13 use warnings;
  4         3  
  4         82  
7 4     4   390 use parent qw(Mixin::Event::Dispatch);
  4         213  
  4         14  
8              
9             =head1 NAME
10              
11             Protocol::SPDY::Stream - single stream representation within a L connection
12              
13             =head1 VERSION
14              
15             version 0.999_007
16              
17             =head1 SYNOPSIS
18              
19             # You'd likely be using a subclass or other container here instead
20             my $spdy = Protocol::SPDY->new;
21             # Create initial stream - this example is for an HTTP request
22             my $stream = $spdy->create_frame(
23             # 0 is the default, use 1 if you don't want anything back from the
24             # other side, for example server push
25             unidirectional => 0,
26             # Set to 1 if we're not expecting to send any further frames on this stream
27             # - a GET request with no additional headers for example
28             fin => 0,
29             # Normally headers are provided as an arrayref to preserve order,
30             # but for convenience you could use a hashref instead
31             headers => [
32             ':method' => 'PUT',
33             ':path:' => '/some/path?some=param',
34             ':version' => 'HTTP/1.1',
35             ':host' => 'localhost:1234',
36             ':scheme' => 'https',
37             ]
38             );
39             # Update the headers - regular HTTP allows trailing headers, with SPDY
40             # you can send additional headers at any time
41             $stream->headers(
42             # There's more to come
43             fin => 0,
44             # Again, arrayref or hashref are allowed here
45             headers => [
46             'content-length' => 5,
47             ]
48             );
49             # Normally scalar (byte) data here, although scalar ref (\'something')
50             # and Future are also allowed
51             $stream->send_data('hello');
52             # as a scalar ref:
53             # $stream->send_data(\(my $buffer = "some data"));
54             # as a Future:
55             # $stream->send_data(my $f = Future->new);
56             # $f->done('the data you expected');
57             # If you want to cancel the stream at any time, use ->reset
58             $stream->reset('CANCEL'); # or STREAM_CANCEL if you've imported the constants
59             # Normally you'd indicate finished by marking a data packet as the final one:
60             $stream->send_data('', fin => 1);
61             # ... and an empty data packet should also be fine:
62             # $stream->send_data('', fin => 1);
63              
64             =head1 DESCRIPTION
65              
66             =head2 HTTP semantics
67              
68             Each stream corresponds to a single HTTP request/response exchange. The request
69             is contained within the SYN_STREAM frame, with optional additional HEADERS
70             after the initial stream creation, and the response will be in the SYN_REPLY,
71             which must at least include the C<:status> and C<:version> headers (so
72             the SYN_REPLY must contain the C<200 OK> response, you can't send that in
73             a later HEADERS packet).
74              
75             =head2 Window handling
76              
77             Each outgoing data frame will decrement the window size; a data frame
78             can only be sent if the data length is less than or equal to the remaining
79             window size. Sending will thus be paused if the window size is insufficient;
80             note that it may be possible for the window size to be less than zero.
81              
82             * Each frame we receive and process will trigger a window update response.
83             This applies to data frames only; windowing does not apply to control frames.
84             If we have several frames queued up for processing, we will defer the window
85             update until we know the total buffer space freed by processing those frames.
86             * Each data frame we send will cause an equivalent reduction in our window
87             size
88              
89             * Extract all frames from buffer
90             * For each frame:
91             * If we have a stream ID for the frame, pass it to that stream
92             * Stream processing for new data
93             * Calculate total from all new data frames
94             * Send window update if required
95              
96             =head2 Error handling
97              
98             There are two main types of error case: stream-level errors, which can
99             be handled by closing that stream, or connection-level errors, where
100             things have gone so badly wrong that the entire connection needs to be
101             dropped.
102              
103             Stream-level errors are handled by RST_STREAM frames.
104              
105             Connection-level errors are typically cases where framing has gone out
106             of sync (compression failures, incorrect packet lengths, etc.) and
107             these are handled by sending a single GOAWAY frame then closing the
108             connection immediately.
109              
110             =head2 Server push support
111              
112             The server can push additional streams to the client to avoid the unnecessary
113             extra SYN_STREAM request/response cycle for additional resources that the server
114             knows will be needed to fulfull the main request.
115              
116             A server push response is requested with L - this example involves
117             a single associated stream:
118              
119             try {
120             my $assoc = $stream->push_stream;
121             $assoc->closed->on_ready(sub {
122             # Associated stream completed or failed - either way,
123             # we can now start sending the main data
124             $stream->send_data($html);
125             })->on_fail(sub {
126             # The other side might already have the data or not
127             # support server push, so don't panic if our associated
128             # stream closes before we expected it
129             warn "Associated stream was rejected";
130             });
131             } catch {
132             # We'll get an exception if we tried to push data on a stream
133             # we'd already marked as FIN on our side.
134             warn "Our code is broken";
135             $stream->connection->goaway;
136             };
137              
138             You can then send that stream using L as usual:
139              
140             $assoc->start(
141             headers => {
142             ':scheme' => 'https',
143             ':host' => 'localhost',
144             ':path' => '/image/logo.png',
145             }
146             );
147              
148             Note that associated streams can only be initiated before the
149             main stream is in FIN state.
150              
151             Generally it's safest to create all the associated streams immediately
152             after the initial SYN_STREAM request has been received from the client,
153             since that will pass enough information back that the client will know
154             how to start arranging the responses for caching. You should then be
155             able to send data on the streams as and when it becomes available. The
156             L C method may be useful here.
157              
158             Attempting to initiate server-pushed streams after sending content is
159             liable to hit race conditions - see section 3.3.1 in the SPDY spec.
160              
161             =cut
162              
163 4     4   15407 use Protocol::SPDY::Constants ':all';
  4         8  
  4         541  
164 4     4   17 use Scalar::Util ();
  4         5  
  4         130  
165              
166             use overload
167             '""' => 'to_string',
168 58     58   331 bool => sub { 1 },
169 4     4   866 fallback => 1;
  4         745  
  4         29  
170              
171             =head1 METHODS
172              
173             =cut
174              
175             =head2 new
176              
177             Instantiates a new stream. Expects the following named parameters:
178              
179             =over 4
180              
181             =item * connection - the L subclass which is
182             managing this side of the connection
183              
184             =item * stream_id - the ID to use for this stream
185              
186             =item * version - SPDY version, usually 3
187              
188             =back
189              
190             =cut
191              
192             sub new {
193 9     9 1 23 my $class = shift;
194 9         29 my %args = @_;
195 9         16 my $fin = delete $args{fin};
196 9         12 my $uni = delete $args{uni};
197 9         34 my $self = bless {
198             %args,
199             from_us => 1,
200             }, $class;
201 9 50       164 $self->{transfer_window} = $self->initial_window_size unless exists $self->{transfer_window};
202 9         48 Scalar::Util::weaken($self->{connection});
203 9 50       22 $self->finished->done if $fin;
204 9 50       21 $self->remote_finished->done if $uni;
205 9         31 $self;
206             }
207              
208             =head2 new_from_syn
209              
210             Constructs a new instance from a L
211             frame object.
212              
213             =cut
214              
215             sub new_from_syn {
216 0     0 1 0 my $class = shift;
217 0         0 my $frame = shift;
218 0         0 my %args = @_;
219             my $self = bless {
220             id => $frame->stream_id,
221             version => $frame->version,
222             connection => $args{connection},
223 0         0 from_us => 0,
224             }, $class;
225 0         0 Scalar::Util::weaken($self->{connection});
226 0         0 $self->update_received_headers_from($frame);
227              
228             # Check whether we were expecting any more data
229 0 0       0 $self->remote_finished->done if $frame->fin;
230 0 0       0 $self->finished->done if $frame->uni;
231 0 0       0 if(my $parent_id = $frame->associated_stream_id) {
232             # We've received a unidirectional frame from the other
233             # side, this means it's server-push stream.
234 0         0 $self->{associated_stream_id} = $parent_id;
235 0 0       0 die "not unidirectional?" unless $frame->uni;
236 0 0       0 $self->associated_stream->invoke_event(push => $self) if $self->associated_stream;
237 0         0 $self->accepted->done;
238             }
239 0         0 $self;
240             }
241              
242             =head2 update_received_headers_from
243              
244             Updates L from the given frame.
245              
246             =cut
247              
248             sub update_received_headers_from {
249 8     8 1 12 my $self = shift;
250 8         10 my $frame = shift;
251 8         38 my $hdr = $frame->headers_as_simple_hashref;
252 8         30 $self->{received_headers}{$_} = $hdr->{$_} for keys %$hdr;
253 8         16 $self
254             }
255              
256             =head2 from_us
257              
258             Returns true if we initiated this stream.
259              
260             =cut
261              
262 2 50   2 1 11 sub from_us { shift->{from_us} ? 1 : 0 }
263              
264             =head2 id
265              
266             Returns the ID for this stream.
267              
268             =cut
269              
270 60     60 1 2063 sub id { shift->{id} }
271              
272             =head2 seen_reply
273              
274             Returns true if we have seen a reply for this stream yet.
275              
276             =cut
277              
278 14 100   14 1 59 sub seen_reply { shift->{seen_reply} ? 1 : 0 }
279              
280             =head2 connection
281              
282             Returns the L instance which owns us.
283              
284             =cut
285              
286 10     10 1 41 sub connection { shift->{connection} }
287              
288             =head2 priority
289              
290             Returns the priority for this stream (0-7).
291              
292             =cut
293              
294 8     8 1 18 sub priority { shift->{version} }
295              
296             =head2 version
297              
298             Returns the SPDY version for this stream (probably 3).
299              
300             =cut
301              
302 8     8 1 47 sub version { shift->{version} }
303              
304             =head2 syn_frame
305              
306             Generates a SYN_STREAM frame for starting this stream.
307              
308             =cut
309              
310             sub syn_frame {
311 8     8 1 10 my $self = shift;
312 8         15 my %args = @_;
313 8   50     53 $args{headers} ||= [];
314 8         33 Protocol::SPDY::Frame::Control::SYN_STREAM->new(
315             %args,
316             associated_stream_id => $self->associated_stream_id,
317             stream_id => $self->id,
318             priority => $self->priority,
319             slot => 0,
320             version => $self->version,
321             );
322             }
323              
324             =head2 sent_header
325              
326             Returns the given header from our recorded list of sent headers
327              
328             =cut
329              
330 0     0 1 0 sub sent_header { $_[0]->{sent_headers}{$_[1]} }
331              
332             =head2 sent_headers
333              
334             Returns the hashref of all sent headers. Please don't change the value, it
335             might break something: changing this will B send any updates to the
336             other side.
337              
338             =cut
339              
340 0     0 1 0 sub sent_headers { $_[0]->{sent_headers} }
341              
342             =head2 received_header
343              
344             Returns the given header from our recorded list of received headers.
345              
346             =cut
347              
348 2     2 1 754 sub received_header { $_[0]->{received_headers}{$_[1]} }
349              
350             =head2 received_headers
351              
352             Returns the hashref of all received headers.
353              
354             =cut
355              
356 0     0 1 0 sub received_headers { $_[0]->{received_headers} }
357              
358             =head2 handle_frame
359              
360             Attempt to handle the given frame.
361              
362             =cut
363              
364             sub handle_frame {
365 10     10 1 12 my $self = shift;
366 10         12 my $frame = shift;
367              
368 10 50       80 if($frame->is_data) {
    50          
    100          
    100          
    50          
    0          
369 0         0 my $len = length($frame->payload);
370 0         0 $self->invoke_event(data => $frame->payload);
371 0         0 $self->queue_window_update($len);
372             } elsif($frame->type_name eq 'WINDOW_UPDATE') {
373 0         0 my $delta = $frame->window_delta;
374 0         0 $self->{transfer_window} += $delta;
375 0         0 $self->invoke_event(transfer_window => $self->transfer_window, $delta);
376             } elsif($frame->type_name eq 'RST_STREAM') {
377 2 50       6 return $self->accepted->fail($frame->status_code_as_text) if $self->from_us;
378 0         0 $self->closed->fail($frame->status_code_as_text);
379             } elsif($frame->type_name eq 'SYN_REPLY') {
380 6 50       15 die "SYN_REPLY on a stream which has already been refused or replied" if $self->accepted->is_ready;
381 6         85 $self->update_received_headers_from($frame);
382 6         12 $self->accepted->done;
383 6         2284 $self->replied->done;
384             } elsif($frame->type_name eq 'HEADERS') {
385 2 50       6 die "HEADERS on a stream which has not yet seen a reply" unless $self->accepted->is_ready;
386 2         25 $self->update_received_headers_from($frame);
387 2         14 $self->invoke_event(headers => $frame);
388             } elsif($frame->type_name eq 'SYN_STREAM') {
389 0         0 die "SYN_STREAM on an existing stream";
390             } else {
391 0         0 die "what is $frame ?";
392             }
393              
394 8 50       1967 if($frame->fin) {
395 0 0       0 die "Duplicate FIN received" if $self->remote_fin;
396 0         0 $self->remote_finished->done;
397             }
398             }
399              
400             =head2 send_window_update
401              
402             Send out any pending window updates.
403              
404             =cut
405              
406             sub send_window_update {
407 0     0 1 0 my $self = shift;
408 0 0       0 return unless my $delta = delete $self->{pending_update};
409 0         0 $self->window_update(window_delta => $delta);
410 0         0 $self
411             }
412              
413             =head2 queue_window_update
414              
415             Request a window update due to data frame processing.
416              
417             =cut
418              
419             sub queue_window_update {
420 0     0 1 0 my $self = shift;
421 0         0 my $len = shift;
422 0 0       0 if(exists $self->{pending_update}) {
423 0         0 $self->{pending_update} += $len;
424             } else {
425 0         0 $self->{pending_update} = $len;
426 0         0 $self->connection->batch->on_done($self->curry::send_window_update);
427             }
428 0         0 $self
429             }
430              
431             =head2 queue_frame
432              
433             Asks our connection object to queue the given frame instance.
434              
435             =cut
436              
437             sub queue_frame {
438 10     10 1 14 my $self = shift;
439 10         11 my $frame = shift;
440 10 50       38 $self->finished->done if $frame->fin;
441 10         23 $self->connection->queue_frame($frame);
442             }
443              
444             =head2 start
445              
446             Start this stream off by sending a SYN_STREAM frame.
447              
448             =cut
449              
450             sub start {
451 8     8 1 14 my $self = shift;
452 8         24 $self->queue_frame($self->syn_frame(@_));
453 8         59 $self
454             }
455              
456             =head2 reply
457              
458             Sends a reply to the stream instantiation request.
459              
460             =cut
461              
462             sub reply {
463 0     0 1 0 my $self = shift;
464 0         0 my %args = @_;
465 0         0 my $flags = 0;
466 0 0       0 $flags |= FLAG_FIN if $args{fin};
467             $self->queue_frame(
468             Protocol::SPDY::Frame::Control::SYN_REPLY->new(
469             stream_id => $self->id,
470             version => $self->version,
471             headers => $args{headers},
472 0 0       0 fin => ($args{fin} ? 1 : 0),
473             )
474             );
475             }
476              
477             =head2 reset
478              
479             Sends a reset request for this frame.
480              
481             =cut
482              
483             sub reset {
484 0     0 1 0 my $self = shift;
485 0         0 my $status = shift;
486 0         0 $self->queue_frame(
487             Protocol::SPDY::Frame::Control::RST_STREAM->new(
488             stream_id => $self->id,
489             status => $status,
490             )
491             );
492             }
493              
494             =head2 push_stream
495              
496             Creates and returns a new C stream.
497              
498             Note that a pushed stream starts with a B< SYN_STREAM > frame but with
499             headers that are usually found in a B< SYN_REPLY > frame.
500              
501             =cut
502              
503             sub push_stream {
504 0     0 1 0 my $self = shift;
505 0 0       0 die "This stream is in FIN state" if $self->finished->is_ready;
506              
507 0         0 $self->connection->create_stream(
508             uni => 1,
509             fin => 0,
510             associated_stream_id => $self->id,
511             );
512             }
513              
514             =head2 headers
515              
516             Send out headers for this frame.
517              
518             =cut
519              
520             sub headers {
521 0     0 1 0 my $self = shift;
522 0         0 my %args = @_;
523 0         0 $self->queue_frame(
524             Protocol::SPDY::Frame::Control::HEADERS->new(
525             %args,
526             stream_id => $self->id,
527             version => $self->version,
528             )
529             );
530             }
531              
532             =head2 window_update
533              
534             Update information on the current window progress.
535              
536             =cut
537              
538             sub window_update {
539 0     0 1 0 my $self = shift;
540 0         0 my %args = @_;
541 0 0       0 die "No window_delta" unless defined $args{window_delta};
542 0         0 $self->queue_frame(
543             Protocol::SPDY::Frame::Control::WINDOW_UPDATE->new(
544             %args,
545             stream_id => $self->id,
546             version => $self->version,
547             )
548             );
549             }
550              
551             =head2 send_data
552              
553             Sends a data packet.
554              
555             =cut
556              
557             sub send_data {
558 2     2 1 70 my $self = shift;
559 2         3 my $data = shift;
560 2         5 my %args = @_;
561 2         9 $self->queue_frame(
562             Protocol::SPDY::Frame::Data->new(
563             %args,
564             stream_id => $self->id,
565             payload => $data,
566             )
567             );
568 2         26 $self
569             }
570              
571             =head1 METHODS - Accessors
572              
573             These provide read-only access to various pieces of state information.
574              
575             =head2 associated_stream_id
576              
577             Which stream we're associated to. Returns 0 if there isn't one.
578              
579             =cut
580              
581 8 50   8 1 45 sub associated_stream_id { shift->{associated_stream_id} || 0 }
582              
583             =head2 associated_stream
584              
585             The L for the associated stream
586             (the "parent" stream to this one, if it exists). Returns undef
587             if not found.
588              
589             =cut
590              
591             sub associated_stream {
592 0     0 1 0 my $self = shift;
593 0         0 $self->connection->stream_by_id($self->associated_stream_id)
594             }
595              
596             =head2 remote_fin
597              
598             Returns true if the remote has sent us a FIN (half-closed state).
599              
600             =cut
601              
602 0 0   0 1 0 sub remote_fin { shift->{remote_fin} ? 1 : 0 }
603              
604             =head2 local_fin
605              
606             Returns true if we have sent FIN to the remote (half-closed state).
607              
608             =cut
609              
610 0 0   0 1 0 sub local_fin { shift->{local_fin} ? 1 : 0 }
611              
612             =head2 initial_window_size
613              
614             Initial window size. Default is 64KB for a new stream.
615              
616             =cut
617              
618 9   50 9 1 56 sub initial_window_size { shift->{initial_window_size} // 65536 }
619              
620             =head2 transfer_window
621              
622             Remaining bytes in the current transfer window.
623              
624             =cut
625              
626 0     0 1 0 sub transfer_window { shift->{transfer_window} }
627              
628             =head2 to_string
629              
630             String representation of this stream, for debugging.
631              
632             =cut
633              
634             sub to_string {
635 0     0 1 0 my $self = shift;
636 0         0 'SPDY:Stream ID ' . $self->id
637             }
638              
639             =head1 METHODS - Futures
640              
641             The following L-returning methods are available. Attach events using
642             C, C or C or helpers such as C as usual:
643              
644             $stream->replied->then(sub {
645             # This also returns a Future, allowing chaining
646             $stream->send_data('...')
647             })->on_fail(sub {
648             die 'here';
649             });
650              
651             or from the server side:
652              
653             $stream->closed->then(sub {
654             # cleanup here after the stream goes away
655             })->on_fail(sub {
656             die "Our stream was reset from the other side: " . shift;
657             });
658              
659             =cut
660              
661             =head2 replied
662              
663             We have received a SYN_REPLY from the other side. If the stream is reset before
664             that happens, this will be cancelled with the reason as the first parameter.
665              
666             =cut
667              
668             sub replied {
669 14     14 1 20 my $self = shift;
670             $self->{future_replied} ||= Future->new->on_done(sub {
671 6     6   180 $self->{seen_reply} = 1
672             })
673 14   66     82 }
674              
675             =head2 finished
676              
677             This frame has finished sending everything, i.e. we've set the FIN flag on a packet.
678             The difference between this and L is that the other side may have more to
679             say. Will be cancelled with the reason on reset.
680              
681             =cut
682              
683             sub finished {
684 0     0 1 0 my $self = shift;
685 0   0     0 $self->{future_finished} ||= Future->new
686             }
687              
688             =head2 remote_finished
689              
690             This frame has had all the data it's going to get from the other side,
691             i.e. we're sending unidirectional data or we have seen the FIN flag on
692             an incoming packet.
693              
694             =cut
695              
696             sub remote_finished {
697 0     0 1 0 my $self = shift;
698             $self->{future_remote_finished} ||= Future->new->on_done(sub {
699 0     0   0 $self->{remote_fin} = 1;
700 0   0     0 });
701             }
702              
703             =head2 closed
704              
705             The stream has been closed on both sides - either through reset or "natural causes".
706             Might still be cancelled if the parent object disappears.
707              
708             =cut
709              
710             sub closed {
711 0     0 1 0 my $self = shift;
712 0   0     0 $self->{future_closed} ||= Future->needs_all($self->finished, $self->remote_finished)
713             }
714              
715             =head2 accepted
716              
717             The remote accepted this stream immediately after our initial SYN_STREAM. If you
718             want notification on rejection, use an ->on_fail handler on this method.
719              
720             =cut
721              
722             sub accepted {
723 24     24 1 528 my $self = shift;
724 24   66     124 $self->{future_accepted} ||= Future->new
725             }
726              
727             1;
728              
729             __END__