File Coverage

lib/PEF/Front/WebSocket/QueueServer.pm
Criterion Covered Total %
statement 40 106 37.7
branch 0 26 0.0
condition 0 9 0.0
subroutine 14 25 56.0
pod n/a
total 54 166 32.5


line stmt bran cond sub pod time code
1             package PEF::Front::WebSocket::QueueServer::Queue;
2 1     1   22686 use strict;
  1         1  
  1         24  
3 1     1   2 use warnings;
  1         1  
  1         19  
4 1     1   918 use AnyEvent;
  1         4068  
  1         30  
5 1     1   6 use Scalar::Util qw'weaken refaddr';
  1         1  
  1         88  
6 1     1   605 use Data::Dumper;
  1         7517  
  1         574  
7              
8             sub new {
9 0     0     bless {
10             queue => [],
11             clients => {},
12             id => $_[1],
13             server => $_[2],
14             };
15             }
16              
17             sub add_client {
18 0     0     my ($self, $client, $last_id) = @_;
19 0           my $id_client = $client->id;
20 0           my $lcid = refaddr $client;
21 0 0         return if $self->{clients}{$lcid};
22 0           weaken $client;
23 0           $self->{clients}{$lcid} = $client;
24 0 0 0       if (defined $last_id and $last_id != 0) {
25              
26 0 0         if (@{$self->{queue}}) {
  0            
27 0 0         if ($self->{queue}[0][0] <= $last_id) {
28              
29             # если первое сообщение в очереди имеет айди не выше последнего,
30             # то у клиента есть как минимум часть актуальной очереди и никаких сообщений не было потеряно
31             # дошлём клиенту новые сообщения, если они появились
32 0           for my $mt (@{$self->{queue}}) {
  0            
33 0           my $id_message = $mt->[0];
34 0 0         if ($id_message > $last_id) {
35 0           $self->{server}->_transfer($self->{id}, $id_message, $mt->[1], $client->group, [$id_client]);
36             }
37             }
38             } else {
39             # если айди последнего сообщения клиента "безнадёжно устарел", то ему надо сообщить о необходимости
40             # перегрузить модель данных
41 0           $self->{server}->_transfer($self->{id}, 0, $self->{server}->reload_message, $client->group, [$id_client]);
42             }
43             } else {
44             # если в очереди нет сообщений, значит всё давно заэкспайрилось, клиенту надо перегрузить модель данных
45 0           $self->{server}->_transfer($self->{id}, 0, $self->{server}->reload_message, $client->group, [$id_client]);
46             }
47             }
48              
49             # если клиент не показал "последенго айди", то у него только что загруженная модель данных
50             # return true if client was added
51 0           return 1;
52             }
53              
54             sub publish {
55 0     0     my ($self, $id_message, $message) = @_;
56 0 0         if ($id_message != 0) {
57              
58             # упрядочиваем сообщения по $id_message
59 0           my $last_index = @{$self->{queue}} - 1;
  0            
60 0 0 0       if (!@{$self->{queue}}
  0            
61             || $self->{queue}[$last_index][0] < $id_message)
62             {
63 0           push @{$self->{queue}}, [$id_message, $message, time];
  0            
64             } else {
65 0 0         if ($self->{queue}[0][0] > $id_message) {
66 0           unshift @{$self->{queue}}, [$id_message, $message, time];
  0            
67             } else {
68 0           for (my $i = $last_index; $i >= 0; --$i) {
69 0 0         if ($self->{queue}[$i][0] < $id_message) {
70 0           splice @{$self->{queue}}, $i + 1, 0, [$id_message, $message, time];
  0            
71 0           last;
72             }
73             }
74             }
75             }
76             }
77 0           my %g;
78 0           for (keys %{$self->{clients}}) {
  0            
79 0           my $c = $self->{clients}{$_};
80 0           push @{$g{$c->group}}, $c->id;
  0            
81             }
82 0           for my $group (keys %g) {
83 0           $self->{server}->_transfer($self->{id}, $id_message, $message, $group, $g{$group});
84             }
85             }
86              
87             sub remove_client {
88 0     0     my ($self, $client) = @_;
89 0           my $lcid = refaddr $client;
90 0           delete $self->{clients}{$lcid};
91 0 0         if (!%{$self->{clients}}) {
  0            
92 0           weaken $self;
93             $self->{destroy_timer} = AnyEvent->timer(
94             after => $self->{server}->no_client_expiration,
95             cb => sub {
96 0 0 0 0     if ($self && !%{$self->{clients}}) {
  0            
97 0           $self->{server}->_remove_queue($self->{id});
98 0           undef $self;
99             }
100             }
101 0           );
102             }
103             }
104              
105             package PEF::Front::WebSocket::QueueServer::Client;
106 1     1   5 use strict;
  1         1  
  1         21  
107 1     1   3 use warnings;
  1         1  
  1         170  
108              
109             sub new {
110 0     0     bless {
111             group => $_[1],
112             id => $_[2],
113             queues => []
114             };
115             }
116              
117             sub group {
118 0     0     $_[0]{group};
119             }
120              
121             sub id {
122 0     0     $_[0]{id};
123             }
124              
125             sub subscribe {
126 0     0     my ($self, $queue, $last_id) = @_;
127 0 0         push @{$self->{queues}}, $queue
  0            
128             if $queue->add_client($self, $last_id);
129             }
130              
131             sub unsubscribe {
132 0     0     my ($self, $queue) = @_;
133 0 0         $queue->remove_client($self) if $queue;
134             }
135              
136             sub DESTROY {
137 0     0     $_[0]->unsubscribe($_) for @{$_[0]{queues}};
  0            
138             }
139              
140             package PEF::Front::WebSocket::QueueServer;
141 1     1   4 use strict;
  1         1  
  1         18  
142 1     1   3 use warnings;
  1         1  
  1         28  
143              
144 1     1   567 use EV;
  1         1918  
  1         23  
145 1     1   4 use AnyEvent;
  1         1  
  1         32  
146 1     1   703 use AnyEvent::Handle;
  1         14048  
  1         33  
147 1     1   534 use AnyEvent::Socket;
  1         12054  
  1         172  
148 1     1   1286 use CBOR::XS;
  0            
  0            
149             use Scalar::Util 'weaken';
150              
151             sub subscribe_client_to_queue {
152             my ($self, $handle, $cmd) = @_;
153             my $queue = $cmd->{queue};
154             my $group = $handle->fh->fileno;
155             my $cid = $cmd->{id_client};
156             my $last_id = $cmd->{last_id};
157             my $client = ($self->{groups}{$group}{clients}{$cid} ||= PEF::Front::WebSocket::QueueServer::Client->new($group, $cid));
158             my $qo = $self->{queues}{$queue} || $self->register_queue($queue);
159             $client->subscribe($qo, $last_id);
160             }
161              
162             sub unsubscribe_client_from_queue {
163             my ($self, $handle, $cmd) = @_;
164             my $queue = $cmd->{queue};
165             my $group = $handle->fh->fileno;
166             my $cid = $cmd->{id_client};
167             my $client = $self->{groups}{$group}{clients}{$cid};
168             my $qo = $self->{queues}{$queue};
169             $client->unsubscribe($qo) if $client && $qo;
170             }
171              
172             sub publish_to_queue {
173             my ($self, $handle, $cmd) = @_;
174             my $queue = $cmd->{queue};
175             my $id_message = $cmd->{id_message};
176             my $message = $cmd->{message};
177             my $qo = $self->{queues}{$queue} || $self->register_queue($queue);
178             $qo->publish($id_message, $message);
179             }
180              
181             sub unregister_client {
182             my ($self, $handle, $cmd) = @_;
183             my $group = $handle->fh->fileno;
184             my $cid = $cmd->{id_client};
185             delete $self->{groups}{$group}{clients}{$cid};
186             }
187              
188             my %cmd_switch = (
189             subscribe => \&subscribe_client_to_queue,
190             unsubscribe => \&unsubscribe_client_from_queue,
191             publish => \&publish_to_queue,
192             unregister => \&unregister_client,
193             );
194              
195             sub on_cmd {
196             my ($self, $handle, $cmd) = @_;
197             my $group = $handle->fh->fileno;
198             $handle->push_read(
199             cbor => sub {
200             $self->on_cmd(@_);
201             }
202             );
203             if (my $cmd_sub = $cmd_switch{$cmd->{command}}) {
204             $cmd_sub->($self, $handle, $cmd);
205             }
206             }
207              
208             sub register_queue {
209             my ($self, $queue) = @_;
210             $self->{queues}{$queue} = PEF::Front::WebSocket::QueueServer::Queue->new($queue, $self);
211             $self->{queues}{$queue};
212             }
213              
214             sub create_group {
215             my ($self, $group, $handle) = @_;
216             $self->{groups}{$group} = {
217             handle => $handle,
218             clients => {}
219             };
220             }
221              
222             sub destroy_group {
223             my ($self, $group) = @_;
224             delete $self->{groups}{$group};
225             }
226              
227             sub _remove_queue {
228             my ($self, $queue) = @_;
229             delete $self->{queues}{$queue};
230             }
231              
232             sub _transfer {
233             my ($self, $queue, $id_message, $message, $group, $cidref) = @_;
234             my $handle = $self->{groups}{$group}{handle};
235             $handle->push_write(cbor => [$queue, $id_message, $message, $cidref]);
236             }
237              
238             sub on_disconnect {
239             my ($self, $handle, $fatal, $msg) = @_;
240             $self->destroy_group($handle->fh->fileno);
241             $handle->destroy;
242              
243             }
244              
245             sub on_accept {
246             my ($self, $fh, $host, $port) = @_;
247             my $handle = AnyEvent::Handle->new(
248             on_error => sub {$self->on_disconnect(@_)},
249             on_eof => sub {$self->on_disconnect(@_)},
250             fh => $fh,
251             );
252             $self->create_group($fh->fileno, $handle);
253             $handle->push_read(
254             cbor => sub {
255             $self->on_cmd(@_) if $self;
256             }
257             );
258             }
259              
260             sub new {
261             my ($class, %args) = @_;
262             my $self;
263             my $tcp_address = delete $args{address} || '127.0.0.1';
264             my $tcp_port = delete $args{port} || 54321;
265             my $no_client_expiration = delete $args{no_client_expiration} || 900;
266             my $message_expiration = delete $args{message_expiration} || 3600;
267             my $reload_message = delete $args{reload_message} || {result => 'RELOAD'};
268             $self = {
269             server => tcp_server($tcp_address, $tcp_port, sub {$self->on_accept(@_)}),
270             no_client_expiration => $no_client_expiration,
271             message_expiration => $message_expiration,
272             reload_message => $reload_message,
273             groups => {},
274             queues => {}
275             };
276             bless $self, $class;
277             }
278              
279             sub no_client_expiration {
280             $_[0]{no_client_expiration};
281             }
282              
283             sub message_expiration {
284             $_[0]{message_expiration};
285             }
286              
287             sub reload_message {
288             $_[0]{reload_message};
289             }
290              
291             use Data::Dumper;
292              
293             sub run {
294             my ($slave, $tcp_address, $tcp_port, $no_client_expiration, $message_expiration, $reload_message) = @_;
295             if ($reload_message) {
296             $reload_message = decode_cbor $reload_message;
297             if (!%$reload_message) {
298             $reload_message = undef;
299             }
300             }
301             my $queue_server = new PEF::Front::WebSocket::QueueServer(
302             address => $tcp_address,
303             port => $tcp_port,
304             no_client_expiration => $no_client_expiration,
305             message_expiration => $message_expiration,
306             reload_message => $reload_message
307             );
308             my $handle = AnyEvent::Handle->new(
309             on_error => sub {
310             exit;
311             },
312             on_eof => sub {
313             exit;
314             },
315             on_read => sub {
316             },
317             fh => $slave,
318             );
319             $handle->push_write("1");
320             EV::run();
321             }
322              
323             1;