File Coverage

blib/lib/Mojo/Transaction/WebSocket.pm
Criterion Covered Total %
statement 99 103 96.1
branch 55 58 94.8
condition 20 28 71.4
subroutine 25 29 86.2
pod 22 22 100.0
total 221 240 92.0


line stmt bran cond sub pod time code
1             package Mojo::Transaction::WebSocket;
2 55     55   62607 use Mojo::Base 'Mojo::Transaction';
  55         123  
  55         358  
3              
4 55     55   420 use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
  55         148  
  55         3214  
5 55     55   430 use List::Util qw(first);
  55         148  
  55         3386  
6 55     55   376 use Mojo::JSON qw(encode_json j);
  55         210  
  55         3025  
7 55     55   362 use Mojo::Util qw(decode encode trim);
  55         173  
  55         3169  
8 55     55   25257 use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
  55         180  
  55         109641  
9              
10             has [qw(compressed established handshake masked)];
11             has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12              
13             sub build_message {
14 161     161 1 7000 my ($self, $frame) = @_;
15              
16             # Text
17 161 100       647 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18              
19             # JSON
20 161 100       488 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21              
22             # Raw text or binary
23 161 100       382 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
  132         518  
24 29         94 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25              
26             # "permessage-deflate" extension
27 161 100       541 return $frame unless $self->compressed;
28             my $deflate = $self->{deflate}
29 8   66     98 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30 8         34120 $deflate->deflate($frame->[5], my $out);
31 8         70 $deflate->flush($out, Z_SYNC_FLUSH);
32 8         481 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
33              
34 8         35 return $frame;
35             }
36              
37 178     178 1 514 sub client_read { shift->server_read(@_) }
38 446     446 1 1015 sub client_write { shift->server_write(@_) }
39              
40             sub closed {
41 133     133 1 335 my $self = shift->completed;
42 133 100       382 my @args = $self->{close} ? (@{$self->{close}}) : (1006);
  119         302  
43 133 100       555 return $self->emit(finish => @args > 1 ? @args : (@args, undef));
44             }
45              
46 9     9 1 38 sub connection { shift->handshake->connection }
47              
48             sub finish {
49 154     154 1 354 my $self = shift;
50              
51 154         437 my $close = $self->{close} = [@_];
52 154 100       739 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
53 154 100       414 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
54 154   100     732 $close->[0] //= 1005;
55 154         571 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
56              
57 154         710 return $self;
58             }
59              
60 530     530 1 2037 sub is_websocket {1}
61              
62 1     1 1 11 sub kept_alive { shift->handshake->kept_alive }
63 0     0 1 0 sub local_address { shift->handshake->local_address }
64 0     0 1 0 sub local_port { shift->handshake->local_port }
65              
66             sub parse_message {
67 259     259 1 2190 my ($self, $frame) = @_;
68              
69 259         832 $self->emit(frame => $frame);
70              
71             # Ping/Pong
72 259         489 my $op = $frame->[4];
73 259 100       644 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
74 258 100       518 return undef if $op == WS_PONG;
75              
76             # Close
77 257 100       530 if ($op == WS_CLOSE) {
78 93 100       354 return $self->finish unless length $frame->[5] >= 2;
79 13         85 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
80             }
81              
82             # Append chunk and check message size
83 164 100 66     604 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
  157         618  
84 164         678 $self->{message} .= $frame->[5];
85 164         424 my $max = $self->max_websocket_size;
86 164 100       490 return $self->finish(1009) if length $self->{message} > $max;
87              
88             # No FIN bit (Continuation)
89 163 100       476 return undef unless $frame->[0];
90              
91             # "permessage-deflate" extension (handshake and RSV1)
92 156         372 my $msg = delete $self->{message};
93 156 50 66     374 if ($self->compressed && $self->{pmc}) {
94             my $inflate = $self->{inflate}
95 6   33     76 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
96 6         5106 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
97 6 100       43 return $self->finish(1009) if length $msg;
98 5         115 $msg = $out;
99             }
100              
101 155 100       451 $self->emit(json => j($msg)) if $self->has_subscribers('json');
102 155         349 $op = delete $self->{op};
103 155 100       562 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
104 155 50       370 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
    100          
105             }
106              
107 12     12 1 31 sub protocol { shift->res->headers->sec_websocket_protocol }
108              
109 0     0 1 0 sub remote_address { shift->handshake->remote_address }
110 0     0 1 0 sub remote_port { shift->handshake->remote_port }
111 441     441 1 972 sub req { shift->handshake->req }
112 564     564 1 1392 sub res { shift->handshake->res }
113              
114 106 50   106 1 269 sub resume { $_[0]->handshake->resume and return $_[0] }
115              
116             sub send {
117 317     317 1 760 my ($self, $msg, $cb) = @_;
118 317 100       820 $self->once(drain => $cb) if $cb;
119 317 100       1048 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
120 317         1110 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
121 317         1192 return $self->emit('resume');
122             }
123              
124             sub server_read {
125 290     290 1 622 my ($self, $chunk) = @_;
126              
127 290         838 $self->{read} .= $chunk;
128 290         889 my $max = $self->max_websocket_size;
129 290         1121 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
130 255 100 50     662 $self->finish(1009) and last unless ref $frame;
131 253         641 $self->parse_message($frame);
132             }
133              
134 290         1152 $self->emit('resume');
135             }
136              
137             sub server_write {
138 882     882 1 1440 my $self = shift;
139 882 100 100     4290 $self->emit('drain') unless length($self->{write} //= '');
140 882 100 100     3303 $self->completed if !length $self->{write} && $self->{closing};
141 882         2390 return delete $self->{write};
142             }
143              
144             sub with_compression {
145 20     20 1 44 my $self = shift;
146              
147             # "permessage-deflate" extension
148 20 100 33     69 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
      100        
149             if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
150             }
151              
152             sub with_protocols {
153 6     6 1 16 my $self = shift;
154              
155 6   100     13 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
  8         24  
156 6 100   19   40 return undef unless defined(my $proto = first { $protos{$_} } @_);
  19         48  
157              
158 3         16 $self->res->headers->sec_websocket_protocol($proto);
159 3         11 return $proto;
160             }
161              
162             1;
163              
164             =encoding utf8
165              
166             =head1 NAME
167              
168             Mojo::Transaction::WebSocket - WebSocket transaction
169              
170             =head1 SYNOPSIS
171              
172             use Mojo::Transaction::WebSocket;
173              
174             # Send and receive WebSocket messages
175             my $ws = Mojo::Transaction::WebSocket->new;
176             $ws->send('Hello World!');
177             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
178             $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
179              
180             =head1 DESCRIPTION
181              
182             L is a container for WebSocket transactions, based on L
183             6455|https://tools.ietf.org/html/rfc6455> and L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L and can emit the following new ones.
188              
189             =head2 binary
190              
191             $ws->on(binary => sub ($ws, $bytes) {...});
192              
193             Emitted when a complete WebSocket binary message has been received.
194              
195             $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
196              
197             =head2 drain
198              
199             $ws->on(drain => sub ($ws) {...});
200              
201             Emitted once all data has been sent.
202              
203             $ws->on(drain => sub ($ws) { $ws->send(time) });
204              
205             =head2 finish
206              
207             $ws->on(finish => sub ($ws, $code, $reason) {...});
208              
209             Emitted when the WebSocket connection has been closed.
210              
211             =head2 frame
212              
213             $ws->on(frame => sub ($ws, $frame) {...});
214              
215             Emitted when a WebSocket frame has been received.
216              
217             $ws->on(frame => sub ($ws, $frame) {
218             say "FIN: $frame->[0]";
219             say "RSV1: $frame->[1]";
220             say "RSV2: $frame->[2]";
221             say "RSV3: $frame->[3]";
222             say "Opcode: $frame->[4]";
223             say "Payload: $frame->[5]";
224             });
225              
226             =head2 json
227              
228             $ws->on(json => sub ($ws, $json) {...});
229              
230             Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
231             decoded. Note that this event only gets emitted when it has at least one subscriber.
232              
233             $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
234              
235             =head2 message
236              
237             $ws->on(message => sub ($ws, $msg) {...});
238              
239             Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
240             this event only gets emitted when it has at least one subscriber.
241              
242             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
243              
244             =head2 resume
245              
246             $tx->on(resume => sub ($tx) {...});
247              
248             Emitted when transaction is resumed.
249              
250             =head2 text
251              
252             $ws->on(text => sub ($ws, $bytes) {...});
253              
254             Emitted when a complete WebSocket text message has been received.
255              
256             $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
257              
258             =head1 ATTRIBUTES
259              
260             L inherits all attributes from L and implements the following new
261             ones.
262              
263             =head2 compressed
264              
265             my $bool = $ws->compressed;
266             $ws = $ws->compressed($bool);
267              
268             Compress messages with C extension.
269              
270             =head2 established
271              
272             my $bool = $ws->established;
273             $ws = $ws->established($bool);
274              
275             WebSocket connection established.
276              
277             =head2 handshake
278              
279             my $handshake = $ws->handshake;
280             $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
281              
282             The original handshake transaction, usually a L object.
283              
284             =head2 masked
285              
286             my $bool = $ws->masked;
287             $ws = $ws->masked($bool);
288              
289             Mask outgoing frames with XOR cipher and a random 32-bit key.
290              
291             =head2 max_websocket_size
292              
293             my $size = $ws->max_websocket_size;
294             $ws = $ws->max_websocket_size(1024);
295              
296             Maximum WebSocket message size in bytes, defaults to the value of the C environment variable
297             or C<262144> (256KiB).
298              
299             =head1 METHODS
300              
301             L inherits all methods from L and implements the following new ones.
302              
303             =head2 build_message
304              
305             my $frame = $ws->build_message({binary => $bytes});
306             my $frame = $ws->build_message({text => $bytes});
307             my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
308             my $frame = $ws->build_message($chars);
309              
310             Build WebSocket message.
311              
312             =head2 client_read
313              
314             $ws->client_read($data);
315              
316             Read data client-side, used to implement user agents such as L.
317              
318             =head2 client_write
319              
320             my $bytes = $ws->client_write;
321              
322             Write data client-side, used to implement user agents such as L.
323              
324             =head2 closed
325              
326             $tx = $tx->closed;
327              
328             Same as L, but also indicates that all transaction data has been sent.
329              
330             =head2 connection
331              
332             my $id = $ws->connection;
333              
334             Connection identifier.
335              
336             =head2 finish
337              
338             $ws = $ws->finish;
339             $ws = $ws->finish(1000);
340             $ws = $ws->finish(1003 => 'Cannot accept data!');
341              
342             Close WebSocket connection gracefully.
343              
344             =head2 is_websocket
345              
346             my $bool = $ws->is_websocket;
347              
348             True, this is a L object.
349              
350             =head2 kept_alive
351              
352             my $bool = $ws->kept_alive;
353              
354             Connection has been kept alive.
355              
356             =head2 local_address
357              
358             my $address = $ws->local_address;
359              
360             Local interface address.
361              
362             =head2 local_port
363              
364             my $port = $ws->local_port;
365              
366             Local interface port.
367              
368             =head2 parse_message
369              
370             $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
371              
372             Parse WebSocket message.
373              
374             =head2 protocol
375              
376             my $proto = $ws->protocol;
377              
378             Return negotiated subprotocol or C.
379              
380             =head2 remote_address
381              
382             my $address = $ws->remote_address;
383              
384             Remote interface address.
385              
386             =head2 remote_port
387              
388             my $port = $ws->remote_port;
389              
390             Remote interface port.
391              
392             =head2 req
393              
394             my $req = $ws->req;
395              
396             Handshake request, usually a L object.
397              
398             =head2 res
399              
400             my $res = $ws->res;
401              
402             Handshake response, usually a L object.
403              
404             =head2 resume
405              
406             $ws = $ws->resume;
407              
408             Resume L transaction.
409              
410             =head2 send
411              
412             $ws = $ws->send({binary => $bytes});
413             $ws = $ws->send({text => $bytes});
414             $ws = $ws->send({json => {test => [1, 2, 3]}});
415             $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
416             $ws = $ws->send($chars);
417             $ws = $ws->send($chars => sub {...});
418              
419             Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
420             written.
421              
422             # Send "Ping" frame
423             use Mojo::WebSocket qw(WS_PING);
424             $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
425              
426             =head2 server_read
427              
428             $ws->server_read($data);
429              
430             Read data server-side, used to implement web servers such as L.
431              
432             =head2 server_write
433              
434             my $bytes = $ws->server_write;
435              
436             Write data server-side, used to implement web servers such as L.
437              
438             =head2 with_compression
439              
440             $ws->with_compression;
441              
442             Negotiate C extension for this WebSocket connection.
443              
444             =head2 with_protocols
445              
446             my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
447              
448             Negotiate subprotocol for this WebSocket connection.
449              
450             =head1 SEE ALSO
451              
452             L, L, L.
453              
454             =cut