File Coverage

blib/lib/Mojo/Transaction/WebSocket.pm
Criterion Covered Total %
statement 99 103 96.1
branch 55 58 94.8
condition 20 28 71.4
subroutine 25 29 86.2
pod 22 22 100.0
total 221 240 92.0


line stmt bran cond sub pod time code
1             package Mojo::Transaction::WebSocket;
2 56     56   68669 use Mojo::Base 'Mojo::Transaction';
  56         155  
  56         392  
3              
4 56     56   450 use Compress::Raw::Zlib qw(Z_SYNC_FLUSH);
  56         182  
  56         3578  
5 56     56   462 use List::Util qw(first);
  56         170  
  56         3874  
6 56     56   423 use Mojo::JSON qw(encode_json j);
  56         208  
  56         3154  
7 56     56   409 use Mojo::Util qw(decode encode trim);
  56         195  
  56         3316  
8 56     56   27364 use Mojo::WebSocket qw(WS_BINARY WS_CLOSE WS_CONTINUATION WS_PING WS_PONG WS_TEXT);
  56         214  
  56         119610  
9              
10             has [qw(compressed established handshake masked)];
11             has max_websocket_size => sub { $ENV{MOJO_MAX_WEBSOCKET_SIZE} || 262144 };
12              
13             sub build_message {
14 161     161 1 6934 my ($self, $frame) = @_;
15              
16             # Text
17 161 100       677 $frame = {text => encode('UTF-8', $frame)} if ref $frame ne 'HASH';
18              
19             # JSON
20 161 100       524 $frame->{text} = encode_json($frame->{json}) if exists $frame->{json};
21              
22             # Raw text or binary
23 161 100       459 if (exists $frame->{text}) { $frame = [1, 0, 0, 0, WS_TEXT, $frame->{text}] }
  132         499  
24 29         102 else { $frame = [1, 0, 0, 0, WS_BINARY, $frame->{binary}] }
25              
26             # "permessage-deflate" extension
27 161 100       544 return $frame unless $self->compressed;
28             my $deflate = $self->{deflate}
29 8   66     149 ||= Compress::Raw::Zlib::Deflate->new(AppendOutput => 1, MemLevel => 8, WindowBits => -15);
30 8         34130 $deflate->deflate($frame->[5], my $out);
31 8         73 $deflate->flush($out, Z_SYNC_FLUSH);
32 8         542 @$frame[1, 5] = (1, substr($out, 0, length($out) - 4));
33              
34 8         33 return $frame;
35             }
36              
37 178     178 1 557 sub client_read { shift->server_read(@_) }
38 446     446 1 1168 sub client_write { shift->server_write(@_) }
39              
40             sub closed {
41 133     133 1 378 my $self = shift->completed;
42 133 100       383 my @args = $self->{close} ? (@{$self->{close}}) : (1006);
  119         295  
43 133 100       543 return $self->emit(finish => @args > 1 ? @args : (@args, undef));
44             }
45              
46 9     9 1 48 sub connection { shift->handshake->connection }
47              
48             sub finish {
49 154     154 1 381 my $self = shift;
50              
51 154         504 my $close = $self->{close} = [@_];
52 154 100       433 my $payload = $close->[0] ? pack('n', $close->[0]) : '';
53 154 100       451 $payload .= encode 'UTF-8', $close->[1] if defined $close->[1];
54 154   100     741 $close->[0] //= 1005;
55 154         557 $self->send([1, 0, 0, 0, WS_CLOSE, $payload])->{closing} = 1;
56              
57 154         770 return $self;
58             }
59              
60 530     530 1 2116 sub is_websocket {1}
61              
62 1     1 1 11 sub kept_alive { shift->handshake->kept_alive }
63 0     0 1 0 sub local_address { shift->handshake->local_address }
64 0     0 1 0 sub local_port { shift->handshake->local_port }
65              
66             sub parse_message {
67 259     259 1 2071 my ($self, $frame) = @_;
68              
69 259         869 $self->emit(frame => $frame);
70              
71             # Ping/Pong
72 259         459 my $op = $frame->[4];
73 259 100       681 return $self->send([1, 0, 0, 0, WS_PONG, $frame->[5]]) if $op == WS_PING;
74 258 100       729 return undef if $op == WS_PONG;
75              
76             # Close
77 257 100       651 if ($op == WS_CLOSE) {
78 93 100       446 return $self->finish unless length $frame->[5] >= 2;
79 13         85 return $self->finish(unpack('n', substr($frame->[5], 0, 2, '')), decode('UTF-8', $frame->[5]));
80             }
81              
82             # Append chunk and check message size
83 164 100 66     639 @{$self}{qw(op pmc)} = ($op, $self->compressed && $frame->[1]) unless exists $self->{op};
  157         635  
84 164         709 $self->{message} .= $frame->[5];
85 164         398 my $max = $self->max_websocket_size;
86 164 100       489 return $self->finish(1009) if length $self->{message} > $max;
87              
88             # No FIN bit (Continuation)
89 163 100       439 return undef unless $frame->[0];
90              
91             # "permessage-deflate" extension (handshake and RSV1)
92 156         363 my $msg = delete $self->{message};
93 156 50 66     358 if ($self->compressed && $self->{pmc}) {
94             my $inflate = $self->{inflate}
95 6   33     81 ||= Compress::Raw::Zlib::Inflate->new(Bufsize => $max, LimitOutput => 1, WindowBits => -15);
96 6         5412 $inflate->inflate(($msg .= "\x00\x00\xff\xff"), my $out);
97 6 100       41 return $self->finish(1009) if length $msg;
98 5         142 $msg = $out;
99             }
100              
101 155 100       485 $self->emit(json => j($msg)) if $self->has_subscribers('json');
102 155         366 $op = delete $self->{op};
103 155 100       610 $self->emit($op == WS_TEXT ? 'text' : 'binary' => $msg);
104 155 50       412 $self->emit(message => $op == WS_TEXT ? decode 'UTF-8', $msg : $msg) if $self->has_subscribers('message');
    100          
105             }
106              
107 12     12 1 30 sub protocol { shift->res->headers->sec_websocket_protocol }
108              
109 0     0 1 0 sub remote_address { shift->handshake->remote_address }
110 0     0 1 0 sub remote_port { shift->handshake->remote_port }
111 442     442 1 1160 sub req { shift->handshake->req }
112 564     564 1 1462 sub res { shift->handshake->res }
113              
114 106 50   106 1 341 sub resume { $_[0]->handshake->resume and return $_[0] }
115              
116             sub send {
117 317     317 1 808 my ($self, $msg, $cb) = @_;
118 317 100       845 $self->once(drain => $cb) if $cb;
119 317 100       1067 $msg = $self->build_message($msg) unless ref $msg eq 'ARRAY';
120 317         1131 $self->{write} .= Mojo::WebSocket::build_frame($self->masked, @$msg);
121 317         1288 return $self->emit('resume');
122             }
123              
124             sub server_read {
125 291     291 1 650 my ($self, $chunk) = @_;
126              
127 291         807 $self->{read} .= $chunk;
128 291         936 my $max = $self->max_websocket_size;
129 291         1191 while (my $frame = Mojo::WebSocket::parse_frame(\$self->{read}, $max)) {
130 255 100 50     713 $self->finish(1009) and last unless ref $frame;
131 253         665 $self->parse_message($frame);
132             }
133              
134 291         842 $self->emit('resume');
135             }
136              
137             sub server_write {
138 883     883 1 1614 my $self = shift;
139 883 100 100     4606 $self->emit('drain') unless length($self->{write} //= '');
140 883 100 100     3459 $self->completed if !length $self->{write} && $self->{closing};
141 883         2582 return delete $self->{write};
142             }
143              
144             sub with_compression {
145 20     20 1 38 my $self = shift;
146              
147             # "permessage-deflate" extension
148 20 100 33     51 $self->compressed(1) and $self->res->headers->sec_websocket_extensions('permessage-deflate')
      100        
149             if ($self->req->headers->sec_websocket_extensions // '') =~ /permessage-deflate/;
150             }
151              
152             sub with_protocols {
153 6     6 1 11 my $self = shift;
154              
155 6   100     15 my %protos = map { trim($_) => 1 } split /,/, $self->req->headers->sec_websocket_protocol // '';
  8         22  
156 6 100   19   45 return undef unless defined(my $proto = first { $protos{$_} } @_);
  19         61  
157              
158 3         16 $self->res->headers->sec_websocket_protocol($proto);
159 3         10 return $proto;
160             }
161              
162             1;
163              
164             =encoding utf8
165              
166             =head1 NAME
167              
168             Mojo::Transaction::WebSocket - WebSocket transaction
169              
170             =head1 SYNOPSIS
171              
172             use Mojo::Transaction::WebSocket;
173              
174             # Send and receive WebSocket messages
175             my $ws = Mojo::Transaction::WebSocket->new;
176             $ws->send('Hello World!');
177             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
178             $ws->on(finish => sub ($ws, $code, $reason) { say "WebSocket closed with status $code." });
179              
180             =head1 DESCRIPTION
181              
182             L is a container for WebSocket transactions, based on L
183             6455|https://tools.ietf.org/html/rfc6455> and L.
184              
185             =head1 EVENTS
186              
187             L inherits all events from L and can emit the following new ones.
188              
189             =head2 binary
190              
191             $ws->on(binary => sub ($ws, $bytes) {...});
192              
193             Emitted when a complete WebSocket binary message has been received.
194              
195             $ws->on(binary => sub ($ws, $bytes) { say "Binary: $bytes" });
196              
197             =head2 drain
198              
199             $ws->on(drain => sub ($ws) {...});
200              
201             Emitted once all data has been sent.
202              
203             $ws->on(drain => sub ($ws) { $ws->send(time) });
204              
205             =head2 finish
206              
207             $ws->on(finish => sub ($ws, $code, $reason) {...});
208              
209             Emitted when the WebSocket connection has been closed.
210              
211             =head2 frame
212              
213             $ws->on(frame => sub ($ws, $frame) {...});
214              
215             Emitted when a WebSocket frame has been received.
216              
217             $ws->on(frame => sub ($ws, $frame) {
218             say "FIN: $frame->[0]";
219             say "RSV1: $frame->[1]";
220             say "RSV2: $frame->[2]";
221             say "RSV3: $frame->[3]";
222             say "Opcode: $frame->[4]";
223             say "Payload: $frame->[5]";
224             });
225              
226             =head2 json
227              
228             $ws->on(json => sub ($ws, $json) {...});
229              
230             Emitted when a complete WebSocket message has been received, all text and binary messages will be automatically JSON
231             decoded. Note that this event only gets emitted when it has at least one subscriber.
232              
233             $ws->on(json => sub ($ws, $hash) { say "Message: $hash->{msg}" });
234              
235             =head2 message
236              
237             $ws->on(message => sub ($ws, $msg) {...});
238              
239             Emitted when a complete WebSocket message has been received, text messages will be automatically decoded. Note that
240             this event only gets emitted when it has at least one subscriber.
241              
242             $ws->on(message => sub ($ws, $msg) { say "Message: $msg" });
243              
244             =head2 resume
245              
246             $tx->on(resume => sub ($tx) {...});
247              
248             Emitted when transaction is resumed.
249              
250             =head2 text
251              
252             $ws->on(text => sub ($ws, $bytes) {...});
253              
254             Emitted when a complete WebSocket text message has been received.
255              
256             $ws->on(text => sub ($ws, $bytes) { say "Text: $bytes" });
257              
258             =head1 ATTRIBUTES
259              
260             L inherits all attributes from L and implements the following new
261             ones.
262              
263             =head2 compressed
264              
265             my $bool = $ws->compressed;
266             $ws = $ws->compressed($bool);
267              
268             Compress messages with C extension.
269              
270             =head2 established
271              
272             my $bool = $ws->established;
273             $ws = $ws->established($bool);
274              
275             WebSocket connection established.
276              
277             =head2 handshake
278              
279             my $handshake = $ws->handshake;
280             $ws = $ws->handshake(Mojo::Transaction::HTTP->new);
281              
282             The original handshake transaction, usually a L object.
283              
284             =head2 masked
285              
286             my $bool = $ws->masked;
287             $ws = $ws->masked($bool);
288              
289             Mask outgoing frames with XOR cipher and a random 32-bit key.
290              
291             =head2 max_websocket_size
292              
293             my $size = $ws->max_websocket_size;
294             $ws = $ws->max_websocket_size(1024);
295              
296             Maximum WebSocket message size in bytes, defaults to the value of the C environment variable
297             or C<262144> (256KiB).
298              
299             =head1 METHODS
300              
301             L inherits all methods from L and implements the following new ones.
302              
303             =head2 build_message
304              
305             my $frame = $ws->build_message({binary => $bytes});
306             my $frame = $ws->build_message({text => $bytes});
307             my $frame = $ws->build_message({json => {test => [1, 2, 3]}});
308             my $frame = $ws->build_message($chars);
309              
310             Build WebSocket message.
311              
312             =head2 client_read
313              
314             $ws->client_read($data);
315              
316             Read data client-side, used to implement user agents such as L.
317              
318             =head2 client_write
319              
320             my $bytes = $ws->client_write;
321              
322             Write data client-side, used to implement user agents such as L.
323              
324             =head2 closed
325              
326             $tx = $tx->closed;
327              
328             Same as L, but also indicates that all transaction data has been sent.
329              
330             =head2 connection
331              
332             my $id = $ws->connection;
333              
334             Connection identifier.
335              
336             =head2 finish
337              
338             $ws = $ws->finish;
339             $ws = $ws->finish(1000);
340             $ws = $ws->finish(1003 => 'Cannot accept data!');
341              
342             Close WebSocket connection gracefully.
343              
344             =head2 is_websocket
345              
346             my $bool = $ws->is_websocket;
347              
348             True, this is a L object.
349              
350             =head2 kept_alive
351              
352             my $bool = $ws->kept_alive;
353              
354             Connection has been kept alive.
355              
356             =head2 local_address
357              
358             my $address = $ws->local_address;
359              
360             Local interface address.
361              
362             =head2 local_port
363              
364             my $port = $ws->local_port;
365              
366             Local interface port.
367              
368             =head2 parse_message
369              
370             $ws->parse_message([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
371              
372             Parse WebSocket message.
373              
374             =head2 protocol
375              
376             my $proto = $ws->protocol;
377              
378             Return negotiated subprotocol or C.
379              
380             =head2 remote_address
381              
382             my $address = $ws->remote_address;
383              
384             Remote interface address.
385              
386             =head2 remote_port
387              
388             my $port = $ws->remote_port;
389              
390             Remote interface port.
391              
392             =head2 req
393              
394             my $req = $ws->req;
395              
396             Handshake request, usually a L object.
397              
398             =head2 res
399              
400             my $res = $ws->res;
401              
402             Handshake response, usually a L object.
403              
404             =head2 resume
405              
406             $ws = $ws->resume;
407              
408             Resume L transaction.
409              
410             =head2 send
411              
412             $ws = $ws->send({binary => $bytes});
413             $ws = $ws->send({text => $bytes});
414             $ws = $ws->send({json => {test => [1, 2, 3]}});
415             $ws = $ws->send([$fin, $rsv1, $rsv2, $rsv3, $op, $payload]);
416             $ws = $ws->send($chars);
417             $ws = $ws->send($chars => sub {...});
418              
419             Send message or frame non-blocking via WebSocket, the optional drain callback will be executed once all data has been
420             written.
421              
422             # Send "Ping" frame
423             use Mojo::WebSocket qw(WS_PING);
424             $ws->send([1, 0, 0, 0, WS_PING, 'Hello World!']);
425              
426             =head2 server_read
427              
428             $ws->server_read($data);
429              
430             Read data server-side, used to implement web servers such as L.
431              
432             =head2 server_write
433              
434             my $bytes = $ws->server_write;
435              
436             Write data server-side, used to implement web servers such as L.
437              
438             =head2 with_compression
439              
440             $ws->with_compression;
441              
442             Negotiate C extension for this WebSocket connection.
443              
444             =head2 with_protocols
445              
446             my $proto = $ws->with_protocols('v2.proto', 'v1.proto');
447              
448             Negotiate subprotocol for this WebSocket connection.
449              
450             =head1 SEE ALSO
451              
452             L, L, L.
453              
454             =cut