File Coverage

blib/lib/SockJS/Transport/WebSocket.pm
Criterion Covered Total %
statement 66 106 62.2
branch 11 34 32.3
condition 0 6 0.0
subroutine 13 15 86.6
pod 0 2 0.0
total 90 163 55.2


line stmt bran cond sub pod time code
1             package SockJS::Transport::WebSocket;
2              
3 1     1   441 use strict;
  1         2  
  1         28  
4 1     1   7 use warnings;
  1         2  
  1         78  
5              
6 1     1   7 use base 'SockJS::Transport::Base';
  1         2  
  1         412  
7              
8 1     1   548 use Encode ();
  1         10120  
  1         24  
9 1     1   556 use IO::Compress::Deflate;
  1         38413  
  1         51  
10 1     1   600 use JSON ();
  1         12133  
  1         26  
11 1     1   758 use AnyEvent::Handle;
  1         25805  
  1         41  
12 1     1   487 use Protocol::WebSocket::Handshake::Server;
  1         20161  
  1         818  
13              
14             sub new {
15 3     3 0 7798 my $self = shift->SUPER::new(@_);
16              
17 3         6 push @{$self->{allowed_methods}}, 'GET';
  3         7  
18              
19 3         6 return $self;
20             }
21              
22             sub dispatch_GET {
23 3     3 0 5 my $self = shift;
24 3         7 my ($env, $conn) = @_;
25              
26             my $hs = $self->{hs} =
27 3         20 Protocol::WebSocket::Handshake::Server->new_from_psgi($env);
28              
29 3         496 my $fh = $env->{'psgix.io'};
30              
31 3         9 my $handle = $self->{handle} = $self->_build_handle(fh => $fh);
32              
33 3 100       135 $hs->parse($fh)
34             or return $self->_return_error('Can "Upgrade" only to "WebSocket".',
35             status => 400);
36              
37             return sub {
38             my $on_close_cb = sub {
39 0         0 my $handle = shift;
40              
41 0         0 $conn->aborted;
42              
43 0 0       0 if ($handle) {
44 0         0 $handle->push_shutdown;
45 0         0 $handle->destroy;
46 0         0 delete $self->{handle};
47 0         0 undef $handle;
48             }
49 2     2   18 };
50 2         27 $handle->on_eof($on_close_cb);
51 2         104 $handle->on_error($on_close_cb);
52              
53             $handle->on_read(
54             sub {
55             $handle->push_read(
56             sub {
57 2 50       114 $self->_parse($conn, $_[0]->rbuf) or do {
58 0         0 $conn->aborted;
59              
60 0         0 $handle->push_shutdown;
61 0         0 $handle->destroy;
62 0         0 delete $self->{handle};
63 0         0 undef $handle;
64             };
65             }
66 2         101 );
67             }
68 2         103 );
69              
70 2         32 $self->_parse($conn, '');
71 2         861 };
72             }
73              
74             sub _parse {
75 4     4   114 my $self = shift;
76 4         8 my ($conn) = @_;
77              
78 4         7 my $hs = $self->{hs};
79 4         7 my $handle = $self->{handle};
80              
81 4 100       11 if (!$self->{handshake_done}) {
82 2         7 my $ok = $hs->parse($_[1]);
83 2 50       48 return unless $ok;
84              
85             #$hs->res->push_header('Sec-WebSocket-Extensions' => 'permessage-deflate');
86              
87             # Partial request (HAProxy?)
88 2 50       9 if ($hs->is_body) {
89 0         0 $handle->push_write($hs->to_string);
90 0         0 return 1;
91             }
92              
93 2 50       27 if ($hs->is_done) {
94              
95             # Connected!
96 2         24 $handle->push_write($hs->to_string);
97 2         990 $self->{handshake_done}++;
98              
99             $conn->write_cb(
100             sub {
101 2     2   3 my $conn = shift;
102 2         5 my ($message) = @_;
103              
104 2         6 my $bytes = $hs->build_frame(buffer => $message)->to_bytes;
105              
106 2 50       223 $handle->push_write($bytes) if $handle;
107             }
108 2         15 );
109             $conn->close_cb(
110             sub {
111 0     0   0 my $conn = shift;
112              
113 0         0 my $close_frame = $hs->build_frame(type => 'close')->to_bytes;
114 0         0 $conn->write($close_frame);
115              
116 0 0       0 if ($handle) {
117 0         0 $handle->push_shutdown;
118 0         0 $handle->destroy;
119 0         0 delete $self->{handle};
120 0         0 undef $handle;
121             }
122             }
123 2         11 );
124              
125 2 50       14 $conn->write('o') unless $self->name eq 'raw_websocket';
126 2         5 $conn->connected;
127             }
128             else {
129              
130             # Wait for more data
131 0         0 return 1;
132             }
133             }
134              
135 4         11 my $frame = $hs->build_frame;
136 4         208 $frame->append($_[1]);
137              
138 4         42 while (defined(my $message = $frame->next_bytes)) {
139 0 0       0 next unless length $message;
140              
141 0 0 0     0 if ($frame->rsv && $frame->rsv->[0]) {
142 0         0 my $uncompressed;
143              
144 0         0 $message .= "\x00\x00\xff\xff";
145              
146 0 0       0 IO::Compress::Deflate::deflate(\$message => \$uncompressed)
147             or return;
148 0         0 $message = $uncompressed;
149             }
150              
151 0 0       0 if ($self->name eq 'websocket') {
152 0         0 my $json = JSON->new->utf8->allow_nonref(0);
153              
154 0 0       0 eval { $message = $json->decode($message) } || do {
  0         0  
155             #warn "JSON error: $@\n";
156 0         0 return;
157             };
158              
159 0 0 0     0 return unless $message && ref $message eq 'ARRAY';
160             }
161             else {
162              
163             # We want to pass message AS IS
164 0         0 $message = [\Encode::decode('UTF-8', $message)];
165             }
166              
167 0         0 $conn->fire_event('data', @$message);
168             }
169              
170 4 50       56 if ($frame->is_close) {
171 0         0 $conn->close;
172             }
173              
174 4         40 return 1;
175             }
176              
177             sub _build_handle {
178 0     0     my $self = shift;
179              
180 0           return AnyEvent::Handle->new(@_);
181             }
182              
183             1;