File Coverage

lib/PEF/Front/WebSocket/QueueClient.pm
Criterion Covered Total %
statement 64 75 85.3
branch 4 8 50.0
condition 2 4 50.0
subroutine 15 16 93.7
pod 0 7 0.0
total 85 110 77.2


line stmt bran cond sub pod time code
1             package PEF::Front::WebSocket::QueueClient;
2 2     2   23081 use strict;
  2         2  
  2         55  
3 2     2   5 use warnings;
  2         2  
  2         48  
4              
5 2     2   811 use AnyEvent;
  2         5109  
  2         56  
6 2     2   575 use AnyEvent::Socket;
  2         20879  
  2         170  
7 2     2   644 use AnyEvent::Handle;
  2         5275  
  2         44  
8 2     2   481 use CBOR::XS;
  2         4645  
  2         96  
9 2     2   9 use Scalar::Util qw'weaken refaddr';
  2         2  
  2         1146  
10              
11             sub publish {
12 4     4 0 49660 my ($self, $queue, $id_message, $message) = @_;
13             $self->{handle}->push_write(
14 4         29 cbor => {
15             command => 'publish',
16             queue => $queue,
17             id_message => $id_message,
18             message => $message,
19             }
20             );
21             }
22              
23             sub subscribe {
24 12     12 0 671 my ($self, $queue, $client, $last_id) = @_;
25 12         17 my $id_client = refaddr $client;
26 12 50       33 return if exists $self->{queues}{$queue}{$id_client};
27 12         20 $self->{queues}{$queue}{$id_client} = undef;
28 12         11 $self->{clients}{$id_client} = $client;
29             $self->{handle}->push_write(
30 12         43 cbor => {
31             command => 'subscribe',
32             queue => $queue,
33             id_client => $id_client,
34             last_id => $last_id,
35             }
36             );
37             }
38              
39             sub unsubscribe {
40 2     2 0 98564 my ($self, $queue, $client) = @_;
41 2         11 my $id_client = refaddr $client;
42 2 50       16 return if not exists $self->{queues}{$queue}{$id_client};
43 2         11 $self->{queues}{$queue}{$id_client} = undef;
44             $self->{handle}->push_write(
45 2         26 cbor => {
46             command => 'unsubscribe',
47             queue => $queue,
48             id_client => $id_client,
49             }
50             );
51             }
52              
53             sub unregister_client {
54 1     1 0 12713 my ($self, $client) = @_;
55 1         6 my $id_client = refaddr $client;
56 1 50       7 return if not $self->{clients}{$id_client};
57 1         3 delete $self->{clients}{$id_client};
58 1         1 for my $queue (keys %{$self->{queues}}) {
  1         6  
59 3         5 delete $self->{queues}{$queue}{$id_client};
60             }
61             $self->{handle}->push_write(
62 1         10 cbor => {
63             command => 'unregister',
64             id_client => $id_client,
65             }
66             );
67             }
68              
69             sub on_disconnect {
70 0     0 0 0 my ($self, $handle, $fatal, $msg) = @_;
71 0         0 for my $cid (keys %{$self->{clients}}) {
  0         0  
72 0         0 my $client = $self->{clients}{$cid};
73 0         0 $client->on_queue_error('disconnect');
74             }
75 0         0 delete $self->{handle};
76 0         0 delete $self->{tcp_guard};
77 0         0 delete $self->{clients};
78 0         0 delete $self->{queues};
79             }
80              
81             sub on_queue {
82 12     12 0 15 my ($self, $handle, $cmd) = @_;
83             $handle->push_read(
84             cbor => sub {
85 9     9   423 $self->on_queue(@_);
86             }
87 12         43 );
88 12         145 my ($queue, $id_message, $message, $cidref) = @$cmd;
89 12         18 for my $cid (@$cidref) {
90 15         36 my $client = $self->{clients}{$cid};
91 15 50       34 if (exists $self->{queues}{$queue}{$cid}) {
92 15         38 $client->on_queue($queue, $id_message, $message);
93             }
94             }
95             }
96              
97             sub new {
98 3     3 0 34 my ($class, %args) = @_;
99 3   50     16 my $tcp_address = delete $args{address} || '127.0.0.1';
100 3   50     11 my $tcp_port = delete $args{port} || 54321;
101 3         3 my $self;
102 3         73 my $cv = AnyEvent->condvar();
103             $self = {
104             address => $tcp_address,
105             port => $tcp_port,
106             tcp_guard => tcp_connect(
107             $tcp_address,
108             $tcp_port,
109             sub {
110 3     3   158 my ($fh) = @_;
111             $self->{handle} = AnyEvent::Handle->new(
112             fh => $fh,
113 0         0 on_eof => sub {$self->on_disconnect(@_)},
114 0         0 on_error => sub {$self->on_disconnect(@_)},
115 3         26 );
116 3         245 $self->{handle}->push_read(cbor => sub {$self->on_queue(@_)});
  3         213  
117 3         169 $cv->send;
118             }
119 3         32 ),
120             clients => {},
121             queues => {}
122             };
123 3         955 bless $self, $class;
124 3         10 $cv->recv;
125 3         246 $self;
126             }
127              
128             1;