File Coverage

lib/PEF/Front/WebSocket/QueueClient.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package PEF::Front::WebSocket::QueueClient;
2 1     1   22955 use strict;
  1         1  
  1         24  
3 1     1   4 use warnings;
  1         1  
  1         21  
4              
5 1     1   837 use AnyEvent;
  1         3872  
  1         26  
6 1     1   531 use AnyEvent::Socket;
  1         18942  
  1         95  
7 1     1   638 use AnyEvent::Handle;
  1         5136  
  1         32  
8 1     1   865 use CBOR::XS;
  0            
  0            
9             use Scalar::Util qw'weaken refaddr';
10              
11             sub publish {
12             my ($self, $queue, $id_message, $message) = @_;
13             $self->{handle}->push_write(
14             cbor => {
15             command => 'publish',
16             queue => $queue,
17             id_message => $id_message,
18             message => $message,
19             }
20             );
21             }
22              
23             sub subscribe {
24             my ($self, $queue, $client, $last_id) = @_;
25             my $id_client = refaddr $client;
26             return if exists $self->{queues}{$queue}{$id_client};
27             $self->{queues}{$queue}{$id_client} = undef;
28             $self->{clients}{$id_client} = $client;
29             $self->{handle}->push_write(
30             cbor => {
31             command => 'subscribe',
32             queue => $queue,
33             id_client => $id_client,
34             last_id => $last_id,
35             }
36             );
37             }
38              
39             sub unsubscribe {
40             my ($self, $queue, $client) = @_;
41             my $id_client = refaddr $client;
42             return if not exists $self->{queues}{$queue}{$id_client};
43             $self->{queues}{$queue}{$id_client} = undef;
44             $self->{handle}->push_write(
45             cbor => {
46             command => 'unsubscribe',
47             queue => $queue,
48             id_client => $id_client,
49             }
50             );
51             }
52              
53             sub unregister_client {
54             my ($self, $client) = @_;
55             my $id_client = refaddr $client;
56             return if not $self->{clients}{$id_client};
57             delete $self->{clients}{$id_client};
58             for my $queue (keys %{$self->{queues}}) {
59             delete $self->{queues}{$queue}{$id_client};
60             }
61             $self->{handle}->push_write(
62             cbor => {
63             command => 'unregister',
64             id_client => $id_client,
65             }
66             );
67             }
68              
69             sub on_disconnect {
70             my ($self, $handle, $fatal, $msg) = @_;
71             for my $cid (keys %{$self->{clients}}) {
72             my $client = $self->{clients}{$cid};
73             $client->on_queue_error('disconnect');
74             }
75             delete $self->{handle};
76             delete $self->{tcp_guard};
77             delete $self->{clients};
78             delete $self->{queues};
79             }
80              
81             sub on_queue {
82             my ($self, $handle, $cmd) = @_;
83             $handle->push_read(
84             cbor => sub {
85             $self->on_queue(@_);
86             }
87             );
88             my ($queue, $id_message, $message, $cidref) = @$cmd;
89             for my $cid (@$cidref) {
90             my $client = $self->{clients}{$cid};
91             if (exists $self->{queues}{$queue}{$cid}) {
92             $client->on_queue($queue, $id_message, $message);
93             }
94             }
95             }
96              
97             sub new {
98             my ($class, %args) = @_;
99             my $tcp_address = delete $args{address} || '127.0.0.1';
100             my $tcp_port = delete $args{port} || 54321;
101             my $self;
102             my $cv = AnyEvent->condvar();
103             $self = {
104             address => $tcp_address,
105             port => $tcp_port,
106             tcp_guard => tcp_connect(
107             $tcp_address,
108             $tcp_port,
109             sub {
110             my ($fh) = @_;
111             $self->{handle} = AnyEvent::Handle->new(
112             fh => $fh,
113             on_eof => sub {self->on_disconnect(@_)},
114             on_error => sub {self->on_disconnect(@_)},
115             );
116             $self->{handle}->push_read(cbor => sub {$self->on_queue(@_)});
117             $cv->send;
118             }
119             ),
120             clients => {},
121             queues => {}
122             };
123             bless $self, $class;
124             $cv->recv;
125             $self;
126             }
127              
128             1;