File Coverage

blib/lib/Centrifugo/Client.pm
Criterion Covered Total %
statement 70 118 59.3
branch 14 52 26.9
condition 4 21 19.0
subroutine 11 20 55.0
pod 0 11 0.0
total 99 222 44.5


line stmt bran cond sub pod time code
1             package Centrifugo::Client;
2              
3             our $VERSION = "1.03";
4            
5 1     1   80116 use Exporter;
  1         3  
  1         74  
6             our @ISA = qw(Exporter);
7             our @EXPORT = qw();
8            
9 1     1   7 use Carp qw( croak );
  1         3  
  1         58  
10 1     1   369 use AnyEvent::WebSocket::Client 0.12;
  1         244155  
  1         66  
11 1     1   556 use JSON;
  1         8328  
  1         6  
12            
13             =head1 NAME
14            
15             Centrifugo::Client
16            
17             =head1 SYNOPSIS
18            
19             use Centrifugo::Client;
20             use AnyEvent;
21            
22             my $cclient = Centrifugo::Client->new("$CENTRIFUGO_WS/connection/websocket");
23            
24             $cclient -> on('connect', sub{
25             my ($infoRef)=@_;
26             print "Connected to Centrifugo version ".$infoRef->{version};
27            
28             # When connected, client_id() is defined, so we can subscribe to our private channel
29             $cclient->subscribe( '&'.$cclient->client_id() );
30            
31             }) -> on('message', sub{
32             my ($infoRef)=@_;
33             print "MESSAGE: ".encode_json $infoRef->{data};
34            
35             }) -> connect(
36             user => $USER_ID,
37             timestamp => $TIMESTAMP,
38             token => $TOKEN
39             );
40            
41             # Now start the event loop to keep the program alive
42             AnyEvent->condvar->recv;
43            
44             =head1 DESCRIPTION
45            
46             This library allows to communicate with Centrifugo through a websocket.
47            
48             =cut
49            
50 1     1   131 use strict;
  1         3  
  1         20  
51 1     1   4 use warnings;
  1         3  
  1         1278  
52            
53            
54             =head1 FUNCTION new
55            
56             my $client = Centrifugo::Client->new( $URL );
57            
58             or
59            
60             my $client = Centrifugo::Client->new( $URL,
61             debug => 'true', # if true, some informations are written on STDERR
62             ws_params => { # These parameters are passed to AnyEvent::WebSocket::Client->new(...)
63             ssl_no_verify => 'true',
64             timeout => 600
65             },
66             );
67            
68             =cut
69            
70             sub new {
71 1     1 0 3494 my ($class, $ws_url, %params)=@_;
72 1         3 my $this = {};
73 1         2 bless($this, $class);
74 1         8 $this->{WS_URL} = $ws_url;
75 1   33     6 $this->{DEBUG} = $params{debug} && uc($params{debug})ne'FALSE';
76 1         2 $this->{WEBSOCKET} = AnyEvent::WebSocket::Client -> new( %{$params{ws_params}} );
  1         12  
77 1         2087 return $this;
78             }
79            
80             =head1 FUNCTION connect - send authorization parameters to Centrifugo so your connection could start subscribing on channels.
81            
82             $client->connect(
83             user => $USER_ID,
84             timestamp => $TIMESTAMP,
85             token => $TOKEN,
86             [info => $info,]
87             [uid => $uid,]
88             );
89            
90             (this function retuns $self to allow chains of multiple function calls)
91            
92             It is possible to provide a UID for this command, but if you don't, a random one will be generated for you, but cannot be retrieved afterward.
93            
94             =cut
95            
96             sub connect {
97 1     1 0 8 my ($this,%PARAMS) = @_;
98 1 50       9 croak("Missing user in Centrifugo::Client->connect(...)") if ! $PARAMS{user};
99 1 50       5 croak("Missing timestamp in Centrifugo::Client->connect(...)") if ! $PARAMS{timestamp};
100 1 50       6 croak("Missing token in Centrifugo::Client->connect(...)") if ! $PARAMS{token};
101             $this->{WEBSOCKET}->connect( $this->{WS_URL} )->cb(sub {
102             # Connects to Websocket
103 1     1   153178 $this->{WSHANDLE} = eval { shift->recv };
  1         5  
104 1 50       24 if($@) {
105             # handle error...
106 0         0 warn "Error in Centrifugo::Client : $@";
107 0 0       0 $this->{ON}->{'error'}->($@) if $this->{ON}->{'error'};
108 0         0 return;
109             }
110            
111             # Fix parameters sent to Centrifugo
112 1 50       8 $PARAMS{timestamp}="$PARAMS{timestamp}" if $PARAMS{timestamp}; # This MUST be a string
113            
114 1   33     12 my $uid=$PARAMS{uid} || _generate_random_id();
115 1         5 delete $PARAMS{uid};
116             # Sends a CONNECT message to Centrifugo
117 1         20 my $CONNECT=encode_json {
118             UID => $uid,
119             method => 'connect',
120             params => \%PARAMS
121             };
122            
123 1 50       7 print STDERR "Centrifugo::Client : WS > $CONNECT\n" if $this->{DEBUG};
124             $this->{WSHANDLE}->on(each_message => sub {
125 1         62828 my($loop, $message) = @_;
126 1 50       11 print STDERR "Centrifugo::Client : WS < $message->{body}\n" if $this->{DEBUG};
127 1         21 my $body = decode_json($message->{body});
128             # Handle a body containing {response}
129 1 50       7 if (ref($body) eq 'HASH') {
130 1         5 $body = [ $body ];
131             }
132             # Handle a body containing [{response},{response}...]
133 1         4 foreach my $info (@$body) {
134 1         4 my $uid = $info->{uid};
135 1         3 my $method = $info->{method};
136 1         4 my $error = $info->{error};
137 1         3 my $body = $info->{body}; # Not the same 'body' as above
138 1 50       5 if ($method eq 'connect') {
139             # on Connect, the client_id must be read (if available)
140 1 50 33     16 if ($body && ref($body) eq 'HASH' && $body->{client}) {
      33        
141 1         6 $this->{CLIENT_ID} = $body->{client};
142 1 50       6 print STDERR "Centrifugo::Client : CLIENT_ID=".$this->{CLIENT_ID}."\n" if $this->{DEBUG};
143             }
144             }
145             # Call the callback of the method
146 1         4 my $sub = $this->{ON}->{$method};
147 1 50       6 if ($sub) {
148             # Add UID into body if available
149 1 50       27 if ($uid) {
150 1         4 $body->{uid}=$uid;
151             }
152 1         13 $sub->( $body );
153             }
154             }
155 1         15 });
156            
157 1 50       25 unless ($^O=~/Win/i) {
158             # This event seems to be unrecognized on Windows (?)
159             $this->{WSHANDLE}->on(parse_error => sub {
160 0         0 my($loop, $error) = @_;
161 0         0 warn "Error in Centrifugo::Client : $error";
162 0 0       0 $this->{ON}->{'error'}->($error) if $this->{ON}->{'error'};
163 1         9 });
164             }
165              
166             # handle a closed connection...
167             $this->{WSHANDLE}->on(finish => sub {
168 0         0 my($loop) = @_;
169 0 0       0 print STDERR "Centrifugo::Client : Connection closed\n" if $this->{DEBUG};
170 0 0       0 $this->{ON}->{'ws_closed'}->() if $this->{ON}->{'ws_closed'};
171 0         0 undef $this->{WSHANDLE};
172 0         0 undef $this->{CLIENT_ID};
173 1         24 });
174            
175 1         19 $this->{WSHANDLE}->send($CONNECT);
176            
177 1         8 });
178 1         13945 $this;
179             }
180            
181             =head1 FUNCTION publish - allows clients directly publish messages into channel (use with caution. Client->Server communication is NOT the aim of Centrifugo)
182            
183             $client->publish( channel=>$channel, data=>$data, [uid => $uid] );
184            
185             $data must be a HASHREF to a structure (which will be encoded to JSON), for example :
186            
187             $client->public ( channel => "public",
188             data => {
189             nick => "Anonymous",
190             text => "My message",
191             } );
192            
193             or even :
194            
195             $client->public ( channel => "public", data => { } ); # Sends an empty message to the "public" channel
196            
197             This function returns the UID used to send the command to the server. (a random string if none is provided)
198             =cut
199            
200             sub publish {
201 0     0 0 0 my ($this, %PARAMS) = @_;
202 0 0       0 croak("Missing channel in Centrifugo::Client->publish(...)") unless $PARAMS{channel};
203 0 0       0 croak("Missing data in Centrifugo::Client->publish(...)") unless $PARAMS{data};
204 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
205 0         0 delete $PARAMS{'uid'};
206 0         0 my $PUBLISH = encode_json {
207             UID => $uid,
208             method => 'publish',
209             params => \%PARAMS
210             };
211 0 0       0 print STDERR "Centrifugo::Client : WS > $PUBLISH\n" if $this->{DEBUG};
212 0         0 $this->{WSHANDLE}->send( $PUBLISH );
213 0         0 return $uid;
214             }
215            
216             =head1 FUNCTION disconnect
217            
218             $client->disconnect();
219            
220             =cut
221            
222             sub disconnect {
223 0     0 0 0 my ($this) = @_;
224 0 0       0 $this->{WSHANDLE}->close() if $this->{WSHANDLE};
225 0         0 my $sub = $this->{ON}->{'disconnect'};
226 0 0       0 $sub->() if $sub;
227             }
228            
229             =head1 FUNCTION subscribe - allows to subscribe on channel after client successfully connected.
230            
231             $client->subscribe( channel => $channel, [ uid => $uid ] );
232            
233             This function returns the UID used to send the command to the server. (a random string if none is provided)
234            
235             =cut
236            
237             sub subscribe {
238 0     0 0 0 my ($this, %PARAMS) = @_;
239 0         0 return _channel_command($this,'subscribe',%PARAMS);
240             }
241            
242             sub _channel_command {
243 0     0   0 my ($this,$command,%PARAMS) = @_;
244 0         0 my $channel = $PARAMS{'channel'};
245 0 0       0 croak("Missing channel in Centrifugo::Client->$command(...)") unless $channel;
246 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
247 0         0 my $MSG = encode_json {
248             UID => $uid ,
249             method => $command,
250             params => { channel => $channel }
251             };
252 0 0       0 print STDERR "Centrifugo::Client : WS > $MSG\n" if $this->{DEBUG};
253 0         0 $this->{WSHANDLE}->send($MSG);
254 0         0 return $uid;
255             }
256            
257             =head1 FUNCTION unsubscribe - allows to unsubscribe from channel.
258            
259             $client->unsubscribe( channel => $channel, [ uid => $uid ] );
260            
261             This function returns the UID used to send the command to the server. (a random string if none is provided)
262            
263             =cut
264            
265             sub unsubscribe {
266 0     0 0 0 my ($this, %PARAMS) = @_;
267 0         0 return _channel_command($this,'unsubscribe',%PARAMS);
268             }
269            
270             =head1 FUNCTION presence – allows to ask server for channel presence information.
271            
272             $client->presence( channel => $channel, [ uid => $uid ] );
273            
274             This function returns the UID used to send the command to the server. (a random string if none is provided)
275            
276             =cut
277            
278             sub presence {
279 0     0 0 0 my ($this, %PARAMS) = @_;
280 0         0 return _channel_command($this,'presence',%PARAMS);
281             }
282            
283             =head1 FUNCTION history – allows to ask server for channel presence information.
284            
285             $client->history( channel => $channel, [ uid => $uid ] );
286            
287             This function returns the UID used to send the command to the server. (a random string if none is provided)
288            
289             =cut
290            
291             sub history {
292 0     0 0 0 my ($this, %PARAMS) = @_;
293 0         0 return _channel_command($this,'history',%PARAMS);
294             }
295            
296             =head1 FUNCTION ping – allows to send ping command to server, server will answer this command with ping response.
297            
298             $client->ping( [ uid => $uid ] );
299            
300             This function returns the UID used to send the command to the server. (a random string if none is provided)
301            
302             =cut
303            
304             sub ping {
305 0     0 0 0 my ($this,%PARAMS) = @_;
306 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
307 0         0 my $MSG = encode_json {
308             UID => $uid ,
309             method => 'ping'
310             };
311 0 0       0 print STDERR "Centrifugo::Client : WS > $MSG\n" if $this->{DEBUG};
312 0         0 $this->{WSHANDLE}->send($MSG);
313 0         0 return $uid;
314             }
315            
316             =head1 FUNCTION on
317            
318             Register a callback for the given event.
319            
320             Known events are 'message', 'connect', 'disconnect', 'subscribe', 'unsubscribe', 'publish', 'presence', 'history', 'join', 'leave',
321             'refresh', 'ping', 'ws_closed', 'ws_error'
322            
323             $client->on( 'connect', sub {
324             my( $dataRef ) = @_;
325             ...
326             });
327            
328             (this function retuns $self to allow chains of multiple function calls)
329            
330             Note : Events that are an answer to the client requests (ie 'connect', 'publish', ...) have an 'uid' which is added into the %data structure.
331            
332             =cut
333            
334             sub on {
335 4     4 0 28 my ($this, $method, $sub)=@_;
336 4         15 $this->{ON}->{$method} = $sub;
337 4         29 $this;
338             }
339            
340             =head1 FUNCTION client_id
341            
342             $client->client_id() return the client_id if it is connected to Centrifugo and the server returned this ID (which is not the case on the demo server).
343            
344             =cut
345            
346             sub client_id {
347 0     0 0 0 my ($this)=@_;
348 0         0 return $this->{CLIENT_ID};
349             }
350            
351            
352             ##### (kinda)-private functions
353            
354             # Generates a random Id for commands
355             sub _generate_random_id {
356 1     1   18 my @c = ('a'..'z','A'..'Z',0..9);
357 1         4 return join '', @c[ map{ rand @c } 1 .. 12 ];
  12         42  
358             }
359            
360             1;