File Coverage

blib/lib/Kelp/Module/WebSocket/AnyEvent.pm
Criterion Covered Total %
statement 26 60 43.3
branch 2 14 14.2
condition 2 5 40.0
subroutine 8 15 53.3
pod 4 5 80.0
total 42 99 42.4


line stmt bran cond sub pod time code
1             package Kelp::Module::WebSocket::AnyEvent;
2              
3             our $VERSION = '1.04';
4              
5 2     2   98967 use Kelp::Base qw(Kelp::Module::Symbiosis::Base);
  2         6  
  2         14  
6 2     2   4214 use Plack::App::WebSocket;
  2         32925  
  2         86  
7 2     2   1101 use Kelp::Module::WebSocket::AnyEvent::Connection;
  2         6  
  2         11  
8 2     2   79 use Carp qw(croak carp cluck);
  2         6  
  2         106  
9 2     2   14 use Try::Tiny;
  2         6  
  2         2208  
10              
11             attr "-serializer";
12             attr "-connections" => sub { {} };
13              
14             attr "on_open" => sub {
15             sub { }
16             };
17             attr "on_close" => sub {
18             sub { }
19             };
20             attr "on_message" => sub {
21             sub { }
22             };
23             attr "on_malformed_message" => sub {
24             sub { die pop }
25             };
26             attr "on_error";
27              
28             # This function is here to work around Twiggy bug that is silencing errors
29             # Warn them instead, so they can be logged and spotted
30             sub _trap(&)
31             {
32 0     0   0 my ($block) = @_;
33             try {
34 0     0   0 $block->();
35             }
36             catch {
37 0     0   0 cluck $_;
38 0         0 die $_;
39 0         0 };
40             }
41              
42 2     2 1 90 sub name { 'websocket' }
43              
44             sub psgi
45             {
46 0     0 1 0 my ($self) = @_;
47              
48 0         0 my $conn_max_id = 0;
49             my $websocket = Plack::App::WebSocket->new(
50              
51             # on_error - optional
52 0     0   0 (defined $self->on_error ? (on_error => sub { $self->on_error->(@_) }) : ()),
53              
54             # on_establish - mandatory
55             on_establish => sub {
56 0     0   0 my ($orig_conn, $env) = @_;
57              
58 0         0 my $conn = Kelp::Module::WebSocket::AnyEvent::Connection->new(
59             connection => $orig_conn,
60             id => ++$conn_max_id,
61             manager => $self
62             );
63 0         0 _trap { $self->on_open->($conn, $env) };
  0         0  
64              
65             $conn->connection->on(
66             message => sub {
67 0         0 my ($orig_conn, $message) = @_;
68 0         0 my $err;
69              
70 0 0       0 if (my $s = $self->get_serializer) {
71             try {
72 0         0 $message = $s->decode($message);
73             }
74             catch {
75 0   0     0 $err = $_ || 'unknown error';
76 0         0 };
77             }
78              
79             _trap {
80 0 0       0 if ($err) {
81 0         0 $self->on_malformed_message->($conn, $message, $err);
82             }
83             else {
84 0         0 $self->on_message->($conn, $message);
85             }
86 0         0 };
87             },
88             finish => sub {
89 0         0 _trap { $self->on_close->($conn) };
  0         0  
90 0         0 $conn->close;
91 0         0 undef $orig_conn;
92             },
93 0         0 );
94             }
95 0 0       0 );
96              
97 0         0 return $websocket->to_app;
98             }
99              
100             sub add
101             {
102 7     7 1 895 my ($self, $type, $sub) = @_;
103              
104 7 50       45 croak "websocket handler for $type is not a code reference"
105             unless ref $sub eq 'CODE';
106              
107 7         18 $type = "on_$type";
108 7         26 my $setter = $self->can($type);
109 7 50       41 croak "unknown websocket event `$type`"
110             unless defined $setter;
111              
112 7         18 return $setter->($self, $sub);
113             }
114              
115             sub get_serializer
116             {
117 0     0 0 0 my ($self) = @_;
118 0 0       0 return undef unless defined $self->serializer;
119              
120 0         0 my $real_serializer_method = $self->app->can($self->serializer);
121 0 0       0 croak "Kelp doesn't have $self->serializer serializer"
122             unless defined $real_serializer_method;
123              
124 0         0 return $real_serializer_method->($self->app);
125             }
126              
127             sub build
128             {
129 2     2 1 212 my ($self, %args) = @_;
130 2         16 $self->SUPER::build(%args);
131 2   66     11 $self->{serializer} = $args{serializer} // $self->serializer;
132              
133 2         20 $self->register(websocket => $self);
134             }
135              
136             1;
137             __END__
138              
139             =pod
140              
141             =head1 NAME
142              
143             Kelp::Module::WebSocket::AnyEvent - AnyEvent websocket server integration with Kelp
144              
145             =head1 SYNOPSIS
146              
147             # in config
148              
149             modules => [qw(Symbiosis WebSocket::AnyEvent)],
150             modules_init => {
151             "WebSocket::AnyEvent" => {
152             mount => '/ws',
153             serializer => "json",
154             },
155             },
156              
157              
158             # in application's build method
159              
160             my $ws = $self->websocket;
161             $ws->add(message => sub {
162             my ($conn, $msg) = @_;
163             $conn->send({received => $msg});
164             });
165              
166             # can also be mounted like this, if not specified in config
167             $self->symbiosis->mount("/ws" => $ws); # by module object
168             $self->symbiosis->mount("/ws" => 'websocket'); # by name
169              
170              
171             # in psgi script
172              
173             $app = MyApp->new;
174             $app->run_all;
175              
176              
177             =head1 DESCRIPTION
178              
179             This is a module that integrates a websocket instance into Kelp using L<Kelp::Module::Symbiosis>. To run it, a non-blocking Plack server based on AnyEvent is required, like L<Twiggy>. All this module does is wrap L<Plack::App::WebSocket> instance in Kelp's module, introduce a method to get this instance in Kelp and integrate it into running alongside Kelp using Symbiosis. An instance of this class will be available in Kelp under the I<websocket> method.
180              
181             =head1 METHODS INTRODUCED TO KELP
182              
183             =head2 websocket
184              
185             Returns the instance of this class (Kelp::Module::WebSocket::AnyEvent).
186              
187             =head1 METHODS
188              
189             =head2 name
190              
191             sig: name($self)
192              
193             Reimplemented from L<Kelp::Module::Symbiosis::Base>. Returns a name of a module that can be used in C<< $symbiosis->loaded >> hash or when mounting by name. The return value is constant string I<'websocket'>.
194              
195             Requires Symbiosis version I<1.10> for name mounting to function.
196              
197             =head2 connections
198              
199             sig: connections($self)
200              
201             Returns a hashref containing all available L<Kelp::Module::WebSocket::AnyEvent::Connection> instances (open connections) keyed by their unique id. An id is autoincremented from 1 and guaranteed not to change and not to be replaced by a different connection unless the server restarts.
202              
203             A connection holds some additional data that can be used to hold custom data associated with that connection:
204              
205             # set / get data fields (it's an empty hash ref by default)
206             $connection->data->{internal_id} = $internal_id;
207              
208             # get the entire hash reference
209             $hash_ref = $connection->data;
210              
211             # replace the hash reference with something else
212             $connection->data($something_else);
213              
214             =head2 middleware
215              
216             sig: middleware($self)
217              
218             Returns an arrayref of all middlewares in format: C<[ middleware_class, [ middleware_config ] ]>.
219              
220             =head2 psgi
221              
222             sig: psgi($self)
223              
224             Returns a ran instance of L<Plack::App::WebSocket>.
225              
226             =head2 run
227              
228             sig: run($self)
229              
230             Same as psgi, but wraps the instance in all wanted middlewares.
231              
232             =head2 add
233              
234             sig: add($self, $event, $handler)
235              
236             Registers a $handler (coderef) for websocket $event (string). Handler will be passed an instance of L<Kelp::Module::WebSocket::AnyEvent::Connection> and an incoming message. $event can be either one of: I<open close message error>. You can only specify one handler for each event type.
237              
238             =head1 EVENTS
239              
240             All event handlers must be code references.
241              
242             =head2 open
243              
244             open => sub ($new_connection, $env) { ... }
245              
246             B<Optional>. Called when a new connection opens. A good place to set up its variables.
247              
248             =head2 message
249              
250             message => sub ($connection, $message) { ... }
251              
252             B<Optional>. This is where you handle all the incoming websocket messages. If a serializer is specified, C<$message> will already be unserialized.
253              
254             =head2 malformed_message
255              
256             message => sub ($connection, $message, $error) { ... }
257              
258             B<Optional>. This is where you handle the incoming websocket messages which could not be unserialized by a serializer. By default, an exception will be re-thrown, and effectively the connection will be closed.
259              
260             If Kelp JSON module is initialized with I<'allow_nonref'> flag then this event will never occur.
261              
262             C<$error> will not be likely be fit for end user message, as it will contain file names and line numbers.
263              
264             =head2 close
265              
266             close => sub ($connection) { ... }
267              
268             B<Optional>. Called when the connection is closing.
269              
270             =head2 error
271              
272             error => $psgi_app
273              
274             B<Optional>. The code reference should be a psgi application. It will be called if an error occurs and the websocket connection have to be closed.
275              
276             =head1 CONFIGURATION
277              
278             =head2 middleware, middleware_init
279              
280             See L<Kelp::Module::Symbiosis::Base/middleware, middleware_init>.
281              
282             =head2 mount
283              
284             See L<Kelp::Module::Symbiosis::Base/mount>.
285              
286             =head2 serializer
287              
288             Contains the name of the method that will be called to obtain an instance of serializer. Kelp instance must have that method registered. It must be able to C<< ->encode >> and C<< ->decode >>. Should also throw exception on error.
289              
290             =head1 SEE ALSO
291              
292             =over 2
293              
294             =item * L<Dancer2::Plugin::WebSocket>, same integration for Dancer2 framework this module was inspired by
295              
296             =item * L<Kelp>, the framework
297              
298             =item * L<Twiggy>, a server capable of running this websocket
299              
300             =back
301              
302             =head1 AUTHOR
303              
304             Bartosz Jarzyna, E<lt>brtastic.dev@gmail.comE<gt>
305              
306             =head1 COPYRIGHT AND LICENSE
307              
308             Copyright (C) 2020 by Bartosz Jarzyna
309              
310             This library is free software; you can redistribute it and/or modify
311             it under the same terms as Perl itself, either Perl version 5.10.0 or,
312             at your option, any later version of Perl 5 you may have available.
313              
314              
315             =cut