File Coverage

blib/lib/Mojo/WebSocketProxy/Dispatcher.pm
Criterion Covered Total %
statement 121 126 96.0
branch 23 32 71.8
condition 10 18 55.5
subroutine 28 28 100.0
pod 7 7 100.0
total 189 211 89.5


line stmt bran cond sub pod time code
1             package Mojo::WebSocketProxy::Dispatcher;
2              
3 17     17   90 use strict;
  17         28  
  17         395  
4 17     17   63 use warnings;
  17         25  
  17         340  
5              
6 17     17   63 use Mojo::Base 'Mojolicious::Controller';
  17         26  
  17         72  
7 17     17   7276 use Mojo::WebSocketProxy::Parser;
  17         38  
  17         467  
8 17     17   85 use Mojo::WebSocketProxy::Config;
  17         29  
  17         70  
9              
10 17     17   6066 use Class::Method::Modifiers;
  17         19324  
  17         871  
11              
12 17     17   94 use JSON::MaybeUTF8 qw(:v1);
  17         30  
  17         1872  
13 17     17   84 use Unicode::Normalize ();
  17         29  
  17         299  
14 17     17   5736 use Future::Mojo 0.004; # ->new_timeout
  17         25693  
  17         459  
15 17     17   6050 use Future::Utils qw(fmap);
  17         27701  
  17         886  
16 17     17   99 use Scalar::Util qw(blessed);
  17         27  
  17         731  
17              
18 17   100 17   83 use constant TIMEOUT => $ENV{MOJO_WEBSOCKETPROXY_TIMEOUT} || 15;
  17         30  
  17         26510  
19              
20             our $VERSION = '0.11'; ## VERSION
21             around 'send' => sub {
22             my ($orig, $c, $api_response, $req_storage) = @_;
23              
24             my $config = $c->wsp_config->{config};
25              
26             my $max_response_size = $config->{max_response_size};
27             if ($max_response_size && length(encode_json_utf8($api_response)) > $max_response_size) {
28             $api_response->{json} = $c->wsp_error('error', 'ResponseTooLarge', 'Response too large.');
29             }
30              
31             my $before_send_api_response = $config->{before_send_api_response};
32             $_->($c, $req_storage, $api_response->{json})
33             for grep { $_ } (ref $before_send_api_response eq 'ARRAY' ? @{$before_send_api_response} : $before_send_api_response);
34              
35             my $ret = $orig->($c, $api_response);
36              
37             my $after_sent_api_response = $config->{after_sent_api_response};
38             $_->($c, $req_storage) for grep { $_ } (ref $after_sent_api_response eq 'ARRAY' ? @{$after_sent_api_response} : $after_sent_api_response);
39              
40             return $ret;
41             };
42              
43             sub ok {
44 23     23 1 271816 return 1;
45             }
46              
47             sub open_connection {
48 23     23 1 3292 my ($c) = @_;
49              
50 23         113 my $log = $c->app->log;
51 23         269 $log->debug("accepting a websocket connection from " . $c->tx->remote_address);
52              
53             # Enable permessage-deflate
54 23         898 $c->tx->with_compression;
55              
56 23         826 my $config = $c->wsp_config->{config};
57              
58 23 50       88 Mojo::IOLoop->singleton->stream($c->tx->connection)->timeout($config->{stream_timeout}) if $config->{stream_timeout};
59 23 50       81 Mojo::IOLoop->singleton->max_connections($config->{max_connections}) if $config->{max_connections};
60              
61 23 50       80 $config->{opened_connection}->($c) if $config->{opened_connection};
62              
63             $c->on(
64             text => sub {
65 34     34   108681 my ($c, $msg) = @_;
66             # Incoming data will be JSON-formatted text, as a Unicode string.
67             # We normalize the entire string before decoding.
68 34         431 my $normalized_msg = Unicode::Normalize::NFC($msg);
69 34 100       87 if (my $args = eval { decode_json_utf8($normalized_msg) }) {
  34         131  
70 33         428 on_message($c, $args);
71             } else {
72 1         31 $c->finish(1007 => 'Malformed JSON');
73 1         145 $log->debug(qq{JSON decoding failed for "$normalized_msg": $@});
74             }
75 23         241 });
76              
77             $c->on(
78             binary => sub {
79 1     1   3534 my ($d, $bytes) = @_;
80 1 50 33     10 $config->{binary_frame}(@_) if $bytes and exists($config->{binary_frame});
81 23         6065 });
82              
83 23 50       1358 $c->on(finish => $config->{finish_connection}) if $config->{finish_connection};
84              
85 23         112 return;
86             }
87              
88             sub on_message {
89 33     33 1 70 my ($c, $args) = @_;
90              
91 33         174 my $config = $c->wsp_config->{config};
92              
93 33         63 my $req_storage = {};
94 33         92 $req_storage->{args} = $args;
95              
96             # We still want to run any hooks even for invalid requests.
97 33 50       120 if (my $err = Mojo::WebSocketProxy::Parser::parse_req($c, $req_storage)) {
98 0         0 $c->send({json => $err}, $req_storage);
99 0   0     0 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
100             }
101              
102 33 100       152 my $action = $c->dispatch($args) or do {
103 2         14 my $err = $c->wsp_error('error', UnrecognisedRequest => 'Unrecognised request');
104 2         41 $c->send({json => $err}, $req_storage);
105 2   50     14 return $c->_run_hooks($config->{after_dispatch} || [])->retain;
106             };
107              
108 31         112 @{$req_storage}{keys %$action} = (values %$action);
  31         86  
109 31         87 $req_storage->{method} = $req_storage->{name};
110              
111             # main processing pipeline
112             my $f = $c->before_forward($req_storage)->transform(
113             done => sub {
114             # Note that we completely ignore the return value of ->before_forward here.
115 23 100   23   1336 return $req_storage->{instead_of_forward}->($c, $req_storage) if $req_storage->{instead_of_forward};
116 21         79 return $c->forward($req_storage);
117             }
118             )->then(
119             sub {
120 23     23   2864 my $result = shift;
121 23         72 return $c->after_forward($result, $req_storage)->transform(done => sub { $result });
  23         1306  
122             },
123             sub {
124 7     7   102234 my $result = shift;
125 7         29 Future->done($result);
126 31         98 });
127              
128             return Future->wait_any(
129             Future::Mojo->new_timeout(TIMEOUT)->else(
130             sub {
131 1     1   1001906 return Future->done($c->wsp_error('error', Timeout => 'Timeout'));
132             }
133             ),
134             $f
135             )->then(
136             sub {
137 31     31   11685 my ($result) = @_;
138 31 100       334 $c->send({json => $result}, $req_storage) if $result;
139 31   50     172 return $c->_run_hooks($config->{after_dispatch} || []);
140 31         1760 })->retain;
141             }
142              
143             sub before_forward {
144 31     31 1 58 my ($c, $req_storage) = @_;
145              
146 31         141 my $config = $c->wsp_config->{config};
147              
148 31         61 my $before_forward_hooks = [];
149              
150             # Global hooks are always first
151 31         85 for ($config, $req_storage) {
152 62 100       175 push @$before_forward_hooks, ref($_->{before_forward}) eq 'ARRAY' ? @{$_->{before_forward}} : $_->{before_forward};
  7         21  
153             }
154              
155             # We always want to clear these after every request.
156 31         69 delete $req_storage->{before_forward};
157              
158 31         92 return $c->_run_hooks($before_forward_hooks, $req_storage);
159             }
160              
161             sub after_forward {
162 23     23 1 48 my ($c, $result, $req_storage) = @_;
163              
164 23         114 my $config = $c->wsp_config->{config};
165 23   50     159 return $c->_run_hooks($config->{after_forward} || [], $result, $req_storage);
166             }
167              
168             sub _run_hooks {
169 87     87   188 my @hook_params = @_;
170 87         125 my $c = shift @hook_params;
171 87         120 my $hooks = shift @hook_params;
172              
173             my $result_f = fmap {
174 19     19   871 my $hook = shift;
175 19 100       66 my $result = $hook->($c, @hook_params) or return Future->done;
176 9 100 66     212 return $result if blessed($result) && $result->isa('Future');
177 5         30 return Future->fail($result);
178             }
179 87         463 foreach => [grep { defined } @$hooks],
  62         202  
180             concurrent => 1;
181 87         16837 return $result_f;
182             }
183              
184             sub dispatch {
185 33     33 1 88 my ($c, $args) = @_;
186              
187 33         90 my $log = $c->app->log;
188 33         399 $log->debug("websocket got json " . $c->dumper($args));
189              
190             my ($action) =
191 0         0 sort { $a->{order} <=> $b->{order} }
192 33         102 grep { defined }
193 33         4572 map { $c->wsp_config->{actions}->{$_} } keys %$args;
  33         150  
194              
195 33         126 return $action;
196             }
197              
198             sub forward {
199 23     23 1 56 my ($c, $req_storage) = @_;
200              
201 23         124 my $config = $c->wsp_config->{config};
202              
203 23         57 for my $hook (qw/ before_call before_get_rpc_response after_got_rpc_response /) {
204             $req_storage->{$hook} = [
205 71         122 grep { $_ } (ref $config->{$hook} eq 'ARRAY' ? @{$config->{$hook}} : $config->{$hook}),
  0         0  
206 69 50       243 grep { $_ } (ref $req_storage->{$hook} eq 'ARRAY' ? @{$req_storage->{$hook}} : $req_storage->{$hook}),
  69 50       127  
  0         0  
207             ];
208             }
209              
210 23   100     115 my $backend_name = $req_storage->{backend} // "default";
211 23 50       92 my $backend = $c->wsp_config->{backends}{$backend_name}
212             or die "Cannot dispatch request - no backend named '$backend_name'";
213              
214 23         104 $backend->call_rpc($c, $req_storage);
215              
216 23         62 return;
217             }
218              
219             1;
220              
221             __END__