File Coverage

blib/lib/SockJS/Transport/WebSocket.pm
Criterion Covered Total %
statement 66 106 62.2
branch 11 34 32.3
condition 0 6 0.0
subroutine 13 15 86.6
pod 0 2 0.0
total 90 163 55.2


line stmt bran cond sub pod time code
1             package SockJS::Transport::WebSocket;
2              
3 1     1   429 use strict;
  1         2  
  1         25  
4 1     1   4 use warnings;
  1         2  
  1         26  
5              
6 1     1   4 use base 'SockJS::Transport::Base';
  1         2  
  1         363  
7              
8 1     1   471 use Encode ();
  1         9041  
  1         27  
9 1     1   1427 use IO::Compress::Deflate;
  1         72328  
  1         53  
10 1     1   975 use JSON ();
  1         18269  
  1         31  
11 1     1   726 use AnyEvent::Handle;
  1         93162  
  1         178  
12 1     1   1077 use Protocol::WebSocket::Handshake::Server;
  1         31672  
  1         3533  
13              
14             sub new {
15 3     3 0 7598 my $self = shift->SUPER::new(@_);
16              
17 3         8 push @{$self->{allowed_methods}}, 'GET';
  3         9  
18              
19 3         7 return $self;
20             }
21              
22             sub dispatch_GET {
23 3     3 0 5 my $self = shift;
24 3         7 my ($env, $conn) = @_;
25              
26             my $hs = $self->{hs} =
27 3         25 Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
28              
29 3         607 my $fh = $env->{'psgix.io'};
30              
31 3         10 my $handle = $self->{handle} = $self->_build_handle(fh => $fh);
32              
33 3 100       157 $hs->parse($fh)
34             or return $self->_return_error('Can "Upgrade" only to "WebSocket".',
35             status => 400);
36              
37             return sub {
38             my $on_close_cb = sub {
39 0         0 my $handle = shift;
40              
41 0         0 $conn->aborted;
42              
43 0 0       0 if ($handle) {
44 0         0 $handle->push_shutdown;
45 0         0 $handle->destroy;
46 0         0 delete $self->{handle};
47 0         0 undef $handle;
48             }
49 2     2   18 };
50 2         24 $handle->on_eof($on_close_cb);
51 2         100 $handle->on_error($on_close_cb);
52              
53             $handle->on_read(
54             sub {
55             $handle->push_read(
56             sub {
57 2 50       107 $self->_parse($conn, $_[0]->rbuf) or do {
58 0         0 $conn->aborted;
59              
60 0         0 $handle->push_shutdown;
61 0         0 $handle->destroy;
62 0         0 delete $self->{handle};
63 0         0 undef $handle;
64             };
65             }
66 2         78 );
67             }
68 2         80 );
69              
70 2         29 $self->_parse($conn, '');
71 2         879 };
72             }
73              
74             sub _parse {
75 4     4   103 my $self = shift;
76 4         9 my ($conn) = @_;
77              
78 4         6 my $hs = $self->{hs};
79 4         7 my $handle = $self->{handle};
80              
81 4 100       10 if (!$self->{handshake_done}) {
82 2         8 my $ok = $hs->parse($_[1]);
83 2 50       39 return unless $ok;
84              
85             #$hs->res->push_header('Sec-WebSocket-Extensions' => 'permessage-deflate');
86              
87             # Partial request (HAProxy?)
88 2 50       8 if ($hs->is_body) {
89 0         0 $handle->push_write($hs->to_string);
90 0         0 return 1;
91             }
92              
93 2 50       23 if ($hs->is_done) {
94              
95             # Connected!
96 2         28 $handle->push_write($hs->to_string);
97 2         909 $self->{handshake_done}++;
98              
99             $conn->write_cb(
100             sub {
101 2     2   4 my $conn = shift;
102 2         4 my ($message) = @_;
103              
104 2         9 my $bytes = $hs->build_frame(buffer => $message)->to_bytes;
105              
106 2 50       231 $handle->push_write($bytes) if $handle;
107             }
108 2         16 );
109             $conn->close_cb(
110             sub {
111 0     0   0 my $conn = shift;
112              
113 0         0 my $close_frame = $hs->build_frame(type => 'close')->to_bytes;
114 0         0 $conn->write($close_frame);
115              
116 0 0       0 if ($handle) {
117 0         0 $handle->push_shutdown;
118 0         0 $handle->destroy;
119 0         0 delete $self->{handle};
120 0         0 undef $handle;
121             }
122             }
123 2         14 );
124              
125 2 50       11 $conn->write('o') unless $self->name eq 'raw_websocket';
126 2         6 $conn->connected;
127             }
128             else {
129              
130             # Wait for more data
131 0         0 return 1;
132             }
133             }
134              
135 4         10 my $frame = $hs->build_frame;
136 4         132 $frame->append($_[1]);
137              
138 4         38 while (defined(my $message = $frame->next_bytes)) {
139 0 0       0 next unless length $message;
140              
141 0 0 0     0 if ($frame->rsv && $frame->rsv->[0]) {
142 0         0 my $uncompressed;
143              
144 0         0 $message .= "\x00\x00\xff\xff";
145              
146 0 0       0 IO::Compress::Deflate::deflate(\$message => \$uncompressed)
147             or return;
148 0         0 $message = $uncompressed;
149             }
150              
151 0 0       0 if ($self->name eq 'websocket') {
152 0         0 my $json = JSON->new->utf8->allow_nonref(0);
153              
154 0 0       0 eval { $message = $json->decode($message) } || do {
  0         0  
155             #warn "JSON error: $@\n";
156 0         0 return;
157             };
158              
159 0 0 0     0 return unless $message && ref $message eq 'ARRAY';
160             }
161             else {
162              
163             # We want to pass message AS IS
164 0         0 $message = [\Encode::decode('UTF-8', $message)];
165             }
166              
167 0         0 $conn->fire_event('data', @$message);
168             }
169              
170 4 50       55 if ($frame->is_close) {
171 0         0 $conn->close;
172             }
173              
174 4         34 return 1;
175             }
176              
177             sub _build_handle {
178 0     0     my $self = shift;
179              
180 0           return AnyEvent::Handle->new(@_);
181             }
182              
183             1;