File Coverage

blib/lib/SockJS/Transport/WebSocket.pm
Criterion Covered Total %
statement 66 109 60.5
branch 11 36 30.5
condition 0 6 0.0
subroutine 13 15 86.6
pod 0 2 0.0
total 90 168 53.5


line stmt bran cond sub pod time code
1             package SockJS::Transport::WebSocket;
2              
3 1     1   520 use strict;
  1         2  
  1         78  
4 1     1   9 use warnings;
  1         2  
  1         32  
5              
6 1     1   6 use base 'SockJS::Transport::Base';
  1         2  
  1         443  
7              
8 1     1   567 use Encode ();
  1         10755  
  1         29  
9 1     1   552 use IO::Compress::Deflate;
  1         41255  
  1         54  
10 1     1   620 use JSON ();
  1         12376  
  1         30  
11 1     1   797 use AnyEvent::Handle;
  1         26379  
  1         48  
12 1     1   540 use Protocol::WebSocket::Handshake::Server;
  1         20663  
  1         897  
13              
14             sub new {
15 3     3 0 7981 my $self = shift->SUPER::new(@_);
16              
17 3         6 push @{$self->{allowed_methods}}, 'GET';
  3         9  
18              
19 3         7 return $self;
20             }
21              
22             sub dispatch_GET {
23 3     3 0 6 my $self = shift;
24 3         7 my ($env, $conn) = @_;
25              
26             my $hs = $self->{hs} =
27 3         24 Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
28              
29 3         567 my $fh = $env->{'psgix.io'};
30              
31 3         11 my $handle = $self->{handle} = $self->_build_handle(fh => $fh);
32              
33 3 100       161 $hs->parse($fh)
34             or return $self->_return_error('Can "Upgrade" only to "WebSocket".',
35             status => 400);
36              
37             return sub {
38             my $on_close_cb = sub {
39 0         0 my $handle = shift;
40              
41 0         0 $conn->aborted;
42              
43 0 0       0 if ($handle) {
44 0         0 eval {
45 0         0 $handle->push_shutdown;
46 0         0 $handle->destroy;
47             };
48 0         0 undef $handle;
49             }
50              
51 0         0 delete $self->{handle};
52              
53 0 0       0 if ($fh) {
54 0         0 close $fh;
55             }
56 2     2   20 };
57 2         26 $handle->on_eof($on_close_cb);
58 2         107 $handle->on_error($on_close_cb);
59              
60             $handle->on_read(
61             sub {
62             $handle->push_read(
63             sub {
64 2 50       92 $self->_parse($conn, $_[0]->rbuf) or do {
65 0         0 $conn->aborted;
66              
67 0         0 $handle->push_shutdown;
68 0         0 $handle->destroy;
69 0         0 delete $self->{handle};
70 0         0 undef $handle;
71             };
72             }
73 2         110 );
74             }
75 2         89 );
76              
77 2         32 $self->_parse($conn, '');
78 2         961 };
79             }
80              
81             sub _parse {
82 4     4   128 my $self = shift;
83 4         9 my ($conn) = @_;
84              
85 4         7 my $hs = $self->{hs};
86 4         7 my $handle = $self->{handle};
87              
88 4 100       11 if (!$self->{handshake_done}) {
89 2         8 my $ok = $hs->parse($_[1]);
90 2 50       44 return unless $ok;
91              
92             #$hs->res->push_header('Sec-WebSocket-Extensions' => 'permessage-deflate');
93              
94             # Partial request (HAProxy?)
95 2 50       5 if ($hs->is_body) {
96 0         0 $handle->push_write($hs->to_string);
97 0         0 return 1;
98             }
99              
100 2 50       37 if ($hs->is_done) {
101              
102             # Connected!
103 2         29 $handle->push_write($hs->to_string);
104 2         1003 $self->{handshake_done}++;
105              
106             $conn->write_cb(
107             sub {
108 2     2   5 my $conn = shift;
109 2         4 my ($message) = @_;
110              
111 2         9 my $bytes = $hs->build_frame(buffer => $message)->to_bytes;
112              
113 2 50       279 $handle->push_write($bytes) if $handle;
114             }
115 2         17 );
116             $conn->close_cb(
117             sub {
118 0     0   0 my $conn = shift;
119              
120 0         0 my $close_frame = $hs->build_frame(type => 'close')->to_bytes;
121 0         0 $conn->write($close_frame);
122              
123 0 0       0 if ($handle) {
124 0         0 $handle->push_shutdown;
125 0         0 $handle->destroy;
126 0         0 undef $handle;
127             }
128              
129 0         0 delete $self->{handle};
130             }
131 2         12 );
132              
133 2 50       13 $conn->write('o') unless $self->name eq 'raw_websocket';
134 2         6 $conn->connected;
135             }
136             else {
137              
138             # Wait for more data
139 0         0 return 1;
140             }
141             }
142              
143 4         12 my $frame = $hs->build_frame;
144 4         213 $frame->append($_[1]);
145              
146 4         44 while (defined(my $message = $frame->next_bytes)) {
147 0 0       0 next unless length $message;
148              
149 0 0 0     0 if ($frame->rsv && $frame->rsv->[0]) {
150 0         0 my $uncompressed;
151              
152 0         0 $message .= "\x00\x00\xff\xff";
153              
154 0 0       0 IO::Compress::Deflate::deflate(\$message => \$uncompressed)
155             or return;
156 0         0 $message = $uncompressed;
157             }
158              
159 0 0       0 if ($self->name eq 'websocket') {
160 0         0 my $json = JSON->new->utf8->allow_nonref(0);
161              
162 0 0       0 eval { $message = $json->decode($message) } || do {
  0         0  
163             #warn "JSON error: $@\n";
164 0         0 return;
165             };
166              
167 0 0 0     0 return unless $message && ref $message eq 'ARRAY';
168             }
169             else {
170              
171             # We want to pass message AS IS
172 0         0 $message = [\Encode::decode('UTF-8', $message)];
173             }
174              
175 0         0 $conn->fire_event('data', @$message);
176             }
177              
178 4 50       58 if ($frame->is_close) {
179 0         0 $conn->close;
180             }
181              
182 4         36 return 1;
183             }
184              
185             sub _build_handle {
186 0     0     my $self = shift;
187              
188 0           return AnyEvent::Handle->new(@_);
189             }
190              
191             1;