File Coverage

lib/PEF/Front/WebSocket/QueueServer.pm
Criterion Covered Total %
statement 148 189 78.3
branch 15 36 41.6
condition 14 31 45.1
subroutine 38 52 73.0
pod 0 15 0.0
total 215 323 66.5


line stmt bran cond sub pod time code
1             package PEF::Front::WebSocket::QueueServer::Queue;
2 1     1   21696 use strict;
  1         1  
  1         25  
3 1     1   2 use warnings;
  1         2  
  1         17  
4 1     1   795 use AnyEvent;
  1         4431  
  1         29  
5 1     1   5 use Scalar::Util qw'weaken refaddr';
  1         1  
  1         72  
6 1     1   478 use Data::Dumper;
  1         6367  
  1         491  
7              
8             sub new {
9 3     3   11 bless {
10             queue => [],
11             clients => {},
12             id => $_[1],
13             server => $_[2],
14             };
15             }
16              
17             sub add_client {
18 12     12   8 my ($self, $client, $last_id) = @_;
19 12         14 my $id_client = $client->id;
20 12         25 my $lcid = refaddr $client;
21 12 50       22 return if $self->{clients}{$lcid};
22 12         17 weaken $client;
23 12         18 $self->{clients}{$lcid} = $client;
24 12 100 66     21 if (defined $last_id and $last_id != 0) {
25              
26 1 50       2 if (@{$self->{queue}}) {
  1         5  
27 1 50       4 if ($self->{queue}[0][0] <= $last_id) {
28              
29             # если первое сообщение в очереди имеет айди не выше последнего,
30             # то у клиента есть как минимум часть актуальной очереди и никаких сообщений не было потеряно
31             # дошлём клиенту новые сообщения, если они появились
32 1         3 for my $mt (@{$self->{queue}}) {
  1         4  
33 2         3 my $id_message = $mt->[0];
34 2 100       4 if ($id_message > $last_id) {
35 1         6 $self->{server}->_transfer($self->{id}, $id_message, $mt->[1], $client->group, [$id_client]);
36             }
37             }
38             } else {
39             # если айди последнего сообщения клиента "безнадёжно устарел", то ему надо сообщить о необходимости
40             # перегрузить модель данных
41 0         0 $self->{server}->_transfer($self->{id}, 0, $self->{server}->reload_message, $client->group, [$id_client]);
42             }
43             } else {
44             # если в очереди нет сообщений, значит всё давно заэкспайрилось, клиенту надо перегрузить модель данных
45 0         0 $self->{server}->_transfer($self->{id}, 0, $self->{server}->reload_message, $client->group, [$id_client]);
46             }
47             }
48              
49             # если клиент не показал "последенго айди", то у него только что загруженная модель данных
50             # return true if client was added
51 12         129 return 1;
52             }
53              
54             sub publish {
55 4     4   5 my ($self, $id_message, $message) = @_;
56 4 50       15 if ($id_message != 0) {
57              
58             # упрядочиваем сообщения по $id_message
59 4         3 my $last_index = @{$self->{queue}} - 1;
  4         11  
60 4 50 66     4 if (!@{$self->{queue}}
  4         19  
61             || $self->{queue}[$last_index][0] < $id_message)
62             {
63 4         6 push @{$self->{queue}}, [$id_message, $message, time];
  4         17  
64             } else {
65 0 0       0 if ($self->{queue}[0][0] > $id_message) {
66 0         0 unshift @{$self->{queue}}, [$id_message, $message, time];
  0         0  
67             } else {
68 0         0 for (my $i = $last_index; $i >= 0; --$i) {
69 0 0       0 if ($self->{queue}[$i][0] < $id_message) {
70 0         0 splice @{$self->{queue}}, $i + 1, 0, [$id_message, $message, time];
  0         0  
71 0         0 last;
72             }
73             }
74             }
75             }
76             }
77 4         5 my %g;
78 4         4 for (keys %{$self->{clients}}) {
  4         21  
79 14         14 my $c = $self->{clients}{$_};
80 14         9 push @{$g{$c->group}}, $c->id;
  14         17  
81             }
82 4         11 for my $group (keys %g) {
83 11         394 $self->{server}->_transfer($self->{id}, $id_message, $message, $group, $g{$group});
84             }
85             }
86              
87             sub remove_client {
88 2     2   2 my ($self, $client) = @_;
89 2         7 my $lcid = refaddr $client;
90 2         5 delete $self->{clients}{$lcid};
91 2 50       3 if (!%{$self->{clients}}) {
  2         12  
92 0         0 weaken $self;
93             $self->{destroy_timer} = AnyEvent->timer(
94             after => $self->{server}->no_client_expiration,
95             cb => sub {
96 0 0 0 0   0 if ($self && !%{$self->{clients}}) {
  0         0  
97 0         0 $self->{server}->_remove_queue($self->{id});
98 0         0 undef $self;
99             }
100             }
101 0         0 );
102             }
103             }
104              
105             package PEF::Front::WebSocket::QueueServer::Client;
106 1     1   5 use strict;
  1         1  
  1         19  
107 1     1   3 use warnings;
  1         1  
  1         157  
108              
109             sub new {
110 12     12   51 bless {
111             group => $_[1],
112             id => $_[2],
113             queues => []
114             };
115             }
116              
117             sub group {
118 15     15   76 $_[0]{group};
119             }
120              
121             sub id {
122 26     26   32 $_[0]{id};
123             }
124              
125             sub subscribe {
126 12     12   11 my ($self, $queue, $last_id) = @_;
127 12 50       20 push @{$self->{queues}}, $queue
  12         27  
128             if $queue->add_client($self, $last_id);
129             }
130              
131             sub unsubscribe {
132 2     2   3 my ($self, $queue) = @_;
133 2 50       10 $queue->remove_client($self) if $queue;
134             }
135              
136             sub DESTROY {
137 0     0   0 $_[0]->unsubscribe($_) for @{$_[0]{queues}};
  0         0  
138             }
139              
140             package PEF::Front::WebSocket::QueueServer;
141 1     1   5 use strict;
  1         1  
  1         15  
142 1     1   3 use warnings;
  1         0  
  1         18  
143              
144 1     1   463 use EV;
  1         1517  
  1         22  
145 1     1   4 use AnyEvent;
  1         1  
  1         13  
146 1     1   535 use AnyEvent::Handle;
  1         13754  
  1         29  
147 1     1   435 use AnyEvent::Socket;
  1         9929  
  1         87  
148 1     1   575 use CBOR::XS;
  1         2637  
  1         49  
149 1     1   5 use Scalar::Util 'weaken';
  1         1  
  1         731  
150              
151             sub subscribe_client_to_queue {
152 12     12 0 9 my ($self, $handle, $cmd) = @_;
153 12         13 my $queue = $cmd->{queue};
154 12         19 my $group = $handle->fh->fileno;
155 12         42 my $cid = $cmd->{id_client};
156 12         11 my $last_id = $cmd->{last_id};
157 12   33     48 my $client = ($self->{groups}{$group}{clients}{$cid} ||= PEF::Front::WebSocket::QueueServer::Client->new($group, $cid));
158 12   66     31 my $qo = $self->{queues}{$queue} || $self->register_queue($queue);
159 12         16 $client->subscribe($qo, $last_id);
160             }
161              
162             sub unsubscribe_client_from_queue {
163 2     2 0 4 my ($self, $handle, $cmd) = @_;
164 2         5 my $queue = $cmd->{queue};
165 2         6 my $group = $handle->fh->fileno;
166 2         10 my $cid = $cmd->{id_client};
167 2         8 my $client = $self->{groups}{$group}{clients}{$cid};
168 2         5 my $qo = $self->{queues}{$queue};
169 2 50 33     16 $client->unsubscribe($qo) if $client && $qo;
170             }
171              
172             sub publish_to_queue {
173 4     4 0 5 my ($self, $handle, $cmd) = @_;
174 4         8 my $queue = $cmd->{queue};
175 4         3 my $id_message = $cmd->{id_message};
176 4         7 my $message = $cmd->{message};
177 4   33     12 my $qo = $self->{queues}{$queue} || $self->register_queue($queue);
178 4         12 $qo->publish($id_message, $message);
179             }
180              
181             sub unregister_client {
182 1     1 0 1 my ($self, $handle, $cmd) = @_;
183 1         4 my $group = $handle->fh->fileno;
184 1         4 my $cid = $cmd->{id_client};
185 1         5 delete $self->{groups}{$group}{clients}{$cid};
186             }
187              
188             my %cmd_switch = (
189             subscribe => \&subscribe_client_to_queue,
190             unsubscribe => \&unsubscribe_client_from_queue,
191             publish => \&publish_to_queue,
192             unregister => \&unregister_client,
193             );
194              
195             sub on_cmd {
196 19     19 0 28 my ($self, $handle, $cmd) = @_;
197 19         39 my $group = $handle->fh->fileno;
198             $handle->push_read(
199             cbor => sub {
200 16     16   78668 $self->on_cmd(@_);
201             }
202 19         184 );
203 19 50       322 if (my $cmd_sub = $cmd_switch{$cmd->{command}}) {
204 19         34 $cmd_sub->($self, $handle, $cmd);
205             }
206             }
207              
208             sub register_queue {
209 3     3 0 4 my ($self, $queue) = @_;
210 3         9 $self->{queues}{$queue} = PEF::Front::WebSocket::QueueServer::Queue->new($queue, $self);
211 3         9 $self->{queues}{$queue};
212             }
213              
214             sub create_group {
215 3     3 0 6235 my ($self, $group, $handle) = @_;
216 3         13 $self->{groups}{$group} = {
217             handle => $handle,
218             clients => {}
219             };
220             }
221              
222             sub destroy_group {
223 0     0 0 0 my ($self, $group) = @_;
224 0         0 delete $self->{groups}{$group};
225             }
226              
227             sub _remove_queue {
228 0     0   0 my ($self, $queue) = @_;
229 0         0 delete $self->{queues}{$queue};
230             }
231              
232             sub _transfer {
233 12     12   26 my ($self, $queue, $id_message, $message, $group, $cidref) = @_;
234 12         20 my $handle = $self->{groups}{$group}{handle};
235 12         38 $handle->push_write(cbor => [$queue, $id_message, $message, $cidref]);
236             }
237              
238             sub on_disconnect {
239 0     0 0 0 my ($self, $handle, $fatal, $msg) = @_;
240 0         0 $self->destroy_group($handle->fh->fileno);
241 0         0 $handle->destroy;
242              
243             }
244              
245             sub on_accept {
246 3     3 0 6 my ($self, $fh, $host, $port) = @_;
247             my $handle = AnyEvent::Handle->new(
248 0     0   0 on_error => sub {$self->on_disconnect(@_)},
249 0     0   0 on_eof => sub {$self->on_disconnect(@_)},
250 3         22 fh => $fh,
251             );
252 3         162 $self->create_group($fh->fileno, $handle);
253             $handle->push_read(
254             cbor => sub {
255 3 50   3   232 $self->on_cmd(@_) if $self;
256             }
257 3         14 );
258             }
259              
260             sub new {
261 1     1 0 1121 my ($class, %args) = @_;
262 1         2 my $self;
263 1   50     6 my $tcp_address = delete $args{address} || '127.0.0.1';
264 1   50     5 my $tcp_port = delete $args{port} || 54321;
265 1   50     4 my $no_client_expiration = delete $args{no_client_expiration} || 900;
266 1   50     4 my $message_expiration = delete $args{message_expiration} || 3600;
267 1   50     5 my $reload_message = delete $args{reload_message} || {result => 'RELOAD'};
268             $self = {
269 1     3   8 server => tcp_server($tcp_address, $tcp_port, sub {$self->on_accept(@_)}),
  3         184  
270             no_client_expiration => $no_client_expiration,
271             message_expiration => $message_expiration,
272             reload_message => $reload_message,
273             groups => {},
274             queues => {}
275             };
276 1         177 bless $self, $class;
277             }
278              
279             sub no_client_expiration {
280 0     0 0   $_[0]{no_client_expiration};
281             }
282              
283             sub message_expiration {
284 0     0 0   $_[0]{message_expiration};
285             }
286              
287             sub reload_message {
288 0     0 0   $_[0]{reload_message};
289             }
290              
291 1     1   5 use Data::Dumper;
  1         1  
  1         157  
292              
293             sub run {
294 0     0 0   my ($slave, $tcp_address, $tcp_port, $no_client_expiration, $message_expiration, $reload_message) = @_;
295 0 0         if ($reload_message) {
296 0           $reload_message = decode_cbor $reload_message;
297 0 0         if (!%$reload_message) {
298 0           $reload_message = undef;
299             }
300             }
301 0           my $queue_server = new PEF::Front::WebSocket::QueueServer(
302             address => $tcp_address,
303             port => $tcp_port,
304             no_client_expiration => $no_client_expiration,
305             message_expiration => $message_expiration,
306             reload_message => $reload_message
307             );
308             my $handle = AnyEvent::Handle->new(
309             on_error => sub {
310 0     0     exit;
311             },
312             on_eof => sub {
313 0     0     exit;
314             },
315       0     on_read => sub {
316             },
317 0           fh => $slave,
318             );
319 0           $handle->push_write("1");
320 0           EV::run();
321             }
322              
323             1;