File Coverage

blib/lib/Centrifugo/Client.pm
Criterion Covered Total %
statement 83 147 56.4
branch 14 56 25.0
condition 7 29 24.1
subroutine 14 25 56.0
pod 0 12 0.0
total 118 269 43.8


line stmt bran cond sub pod time code
1             package Centrifugo::Client;
2            
3             our $VERSION = "1.04";
4            
5 1     1   70967 use Exporter;
  1         3  
  1         83  
6             our @ISA = qw(Exporter);
7             our @EXPORT = qw(generate_token);
8            
9 1     1   8 use Carp qw( croak );
  1         3  
  1         66  
10 1     1   391 use AnyEvent::WebSocket::Client 0.40; # Version needed for reason when close. See https://github.com/plicease/AnyEvent-WebSocket-Client/issues/30
  1         205576  
  1         36  
11 1     1   449 use AnyEvent::HTTP;
  1         6650  
  1         88  
12 1     1   477 use JSON;
  1         6924  
  1         7  
13            
14             =head1 NAME
15            
16             Centrifugo::Client
17            
18             =head1 SYNOPSIS
19            
20             use Centrifugo::Client;
21             use AnyEvent;
22            
23             my $cclient = Centrifugo::Client->new("$CENTRIFUGO_WS/connection/websocket");
24            
25             $cclient -> on('connect', sub{
26             my ($infoRef)=@_;
27             print "Connected to Centrifugo version ".$infoRef->{version};
28            
29             # When connected, client_id() is defined, so we can subscribe to our private channel
30             $cclient->subscribe( '&'.$cclient->client_id() );
31            
32             }) -> on('message', sub{
33             my ($infoRef)=@_;
34             print "MESSAGE: ".encode_json $infoRef->{data};
35            
36             }) -> connect(
37             user => $USER_ID,
38             timestamp => $TIMESTAMP,
39             token => $TOKEN
40             );
41            
42             # Now start the event loop to keep the program alive
43             AnyEvent->condvar->recv;
44            
45             =head1 DESCRIPTION
46            
47             This library allows to communicate with Centrifugo through a websocket.
48            
49             =cut
50            
51 1     1   173 use strict;
  1         2  
  1         23  
52 1     1   5 use warnings;
  1         2  
  1         1133  
53            
54            
55             =head1 FUNCTION new
56            
57             my $client = Centrifugo::Client->new( $URL );
58            
59             or
60            
61             my $client = Centrifugo::Client->new( $URL,
62             debug => 'true', # If true, some informations are written on STDERR
63             authEndpoint => "...", # The full URL used to ask for a key to subscribe to private channels
64             max_alive_period => 30, # If max_alive_period has passed since last communication with server, a PING is send (default 0)
65             refresh_period => 5, # Check frequency for max_alive_period (default 10s)
66             ws_params => { # These parameters are passed to AnyEvent::WebSocket::Client->new(...)
67             ssl_no_verify => 'true',
68             timeout => 600
69             },
70             );
71            
72             =cut
73            
74             sub new {
75 1     1 0 20 my ($class, $ws_url, %params)=@_;
76 1         4 my $this = {};
77 1         4 bless($this, $class);
78 1         12 $this->{WS_URL} = $ws_url;
79 1   33     8 $this->{DEBUG} = $params{debug} && uc($params{debug})ne'FALSE';
80 1   50     12 $this->{AUTH_URL} = $params{authEndpoint} || "/centrifuge/auth/";
81 1         3 $this->{WEBSOCKET} = AnyEvent::WebSocket::Client -> new( %{$params{ws_params}} );
  1         18  
82 1   50     1901 $this->{MAX_ALIVE} = $params{max_alive_period} || 0;
83 1   50     7 $this->{REFRESH} = $params{refresh_period} || 10;
84 1         3 return $this;
85             }
86            
87             =head1 FUNCTION connect - send authorization parameters to Centrifugo so your connection could start subscribing on channels.
88            
89             $client->connect(
90             user => $USER_ID,
91             timestamp => $TIMESTAMP,
92             token => $TOKEN,
93             [info => $info,]
94             [uid => $uid,]
95             );
96            
97             (this function retuns $self to allow chains of multiple function calls)
98            
99             It is possible to provide a UID for this command, but if you don't, a random one will be generated for you, but cannot be retrieved afterward.
100            
101             =cut
102            
103             sub connect {
104 1     1 0 4 my ($this,%PARAMS) = @_;
105 1 50       5 croak("Missing user in Centrifugo::Client->connect(...)") if ! $PARAMS{user};
106 1 50       5 croak("Missing timestamp in Centrifugo::Client->connect(...)") if ! $PARAMS{timestamp};
107 1 50       9 croak("Missing token in Centrifugo::Client->connect(...)") if ! $PARAMS{token};
108             $this->{WEBSOCKET}->connect( $this->{WS_URL} )->cb(sub {
109             # Connects to Websocket
110 1     1   55226 $this->{WSHANDLE} = eval { shift->recv };
  1         6  
111 1 50       15 if($@) {
112             # handle error...
113 0         0 warn "Error in Centrifugo::Client : $@";
114 0 0       0 $this->{ON}->{'error'}->($@) if $this->{ON}->{'error'};
115 0         0 return;
116             }
117            
118             # Fix parameters sent to Centrifugo
119 1 50       6 $PARAMS{timestamp}="$PARAMS{timestamp}" if $PARAMS{timestamp}; # This MUST be a string
120            
121 1   33     10 my $uid=$PARAMS{uid} || _generate_random_id();
122 1         3 delete $PARAMS{uid};
123             # Sends a CONNECT message to Centrifugo
124 1         18 my $CONNECT=encode_json {
125             UID => $uid,
126             method => 'connect',
127             params => \%PARAMS
128             };
129            
130 1 50       5 print STDERR "Centrifugo::Client : WebSocket > $CONNECT\n" if $this->{DEBUG};
131             $this->{WSHANDLE}->on(each_message => sub {
132 1         54514 my($loop, $message) = @_;
133 1 50       12 print STDERR "Centrifugo::Client : R< WS : $message->{body}\n" if $this->{DEBUG};
134 1         5 $this->{last_alive_message} = time();
135 1         36 my $fullbody = decode_json($message->{body});
136             # Handle a body containing {response}
137 1 50       9 if (ref($fullbody) eq 'HASH') {
138 1         26 $fullbody = [ $fullbody ];
139             }
140             # Handle a body containing [{response},{response}...]
141 1         6 foreach my $info (@$fullbody) {
142 1         12 my $uid = $info->{uid};
143 1         6 my $method = $info->{method};
144 1         4 my $error = $info->{error};
145 1         3 my $body = $info->{body}; # Not the same 'body' as above
146 1 50       6 if ($method eq 'connect') {
147             # on Connect, the client_id must be read (if available)
148 1 50 33     16 if ($body && ref($body) eq 'HASH' && $body->{client}) {
      33        
149 1         5 $this->{CLIENT_ID} = $body->{client};
150 1 50       5 print STDERR "Centrifugo::Client : CLIENT_ID=".$this->{CLIENT_ID}."\n" if $this->{DEBUG};
151             }
152 1 50       13 if ($this->{MAX_ALIVE}) {
153             # Creates the timer to send periodic ping
154             $this->{alive_handler} = AnyEvent->timer(
155             after => $this->{REFRESH},
156             interval => $this->{REFRESH},
157             cb => sub {
158 0         0 my $late = time() - $this->{last_alive_message};
159 0 0       0 if ($late > $this->{MAX_ALIVE}) {
160 0 0       0 print STDERR "Sending ping (${late}s without message)\n" if $this->{DEBUG};
161 0         0 $this->ping();
162             }
163             }
164 0         0 );
165             }
166             }
167             # Call the callback of the method
168 1         6 my $sub = $this->{ON}->{$method};
169 1 50       9 if ($sub) {
170             # Add UID into body if available
171 1 50       7 if ($uid) {
172 1         4 $body->{uid}=$uid;
173             }
174 1         8 $sub->( $body );
175             }
176             }
177 1         13 });
178            
179             $this->{WSHANDLE}->on(parse_error => sub {
180 0         0 my($cnx, $error) = @_;
181 0         0 warn "Error in Centrifugo::Client : $error";
182 0 0       0 $this->{ON}->{'error'}->($error) if $this->{ON}->{'error'};
183 1         19 });
184            
185             # handle a closed connection...
186             $this->{WSHANDLE}->on(finish => sub {
187 0         0 my($cnx) = @_;
188 0         0 my $reason = $cnx->close_reason();
189            
190 0 0       0 print STDERR "Centrifugo::Client : Connection closed (reason=$reason)\n" if $this->{DEBUG};
191 0 0       0 $this->{ON}->{'ws_closed'}->($reason) if $this->{ON}->{'ws_closed'};
192 0         0 undef $this->{alive_handler};
193 0         0 undef $this->{WSHANDLE};
194 0         0 undef $this->{CLIENT_ID};
195 1         15 });
196            
197 1         12 $this->{WSHANDLE}->send($CONNECT);
198            
199 1         7 });
200 1         13415 $this;
201             }
202            
203             =head1 FUNCTION publish - allows clients directly publish messages into channel (use with caution. Client->Server communication is NOT the aim of Centrifugo)
204            
205             $client->publish( channel=>$channel, data=>$data, [uid => $uid] );
206            
207             $data must be a HASHREF to a structure (which will be encoded to JSON), for example :
208            
209             $client->public ( channel => "public",
210             data => {
211             nick => "Anonymous",
212             text => "My message",
213             } );
214            
215             or even :
216            
217             $client->public ( channel => "public", data => { } ); # Sends an empty message to the "public" channel
218            
219             This function returns the UID used to send the command to the server. (a random string if none is provided)
220             =cut
221            
222             sub publish {
223 0     0 0 0 my ($this, %PARAMS) = @_;
224 0 0       0 croak("Missing channel in Centrifugo::Client->publish(...)") unless $PARAMS{channel};
225 0 0       0 croak("Missing data in Centrifugo::Client->publish(...)") unless $PARAMS{data};
226 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
227 0         0 delete $PARAMS{'uid'};
228 0         0 my $PUBLISH = encode_json {
229             UID => $uid,
230             method => 'publish',
231             params => \%PARAMS
232             };
233 0         0 $this->_send_message($PUBLISH);
234 0         0 return $uid;
235             }
236            
237             =head1 FUNCTION disconnect
238            
239             $client->disconnect();
240            
241             =cut
242            
243             sub disconnect {
244 0     0 0 0 my ($this) = @_;
245 0 0       0 $this->{WSHANDLE}->close() if $this->{WSHANDLE};
246 0         0 my $sub = $this->{ON}->{'disconnect'};
247 0 0       0 $sub->() if $sub;
248             }
249            
250             =head1 FUNCTION subscribe - allows to subscribe on channel after client successfully connected.
251            
252             $client->subscribe( channel => $channel, [ client => $clientId ,] [ uid => $uid ,] );
253            
254             If the channel is private (starts with a '$'), then a request to $this->{AUTH_URL} is done automatically to get the channel key. In that case, the 'client' parameter is mandatory.
255            
256             This function returns the UID used to send the command to the server. (a random string if none is provided)
257            
258             =cut
259            
260             sub subscribe {
261 0     0 0 0 my ($this, %PARAMS) = @_;
262 0         0 my $channel = $PARAMS{channel};
263 0 0       0 return _channel_command($this,'subscribe',%PARAMS) unless $channel=~/^\$/;
264             # If the channel is private, then an API-call to /centrifuge/auth/ must be done
265 0 0       0 croak "'client' parameter is mandatory to subscribe to private channels in Centrifugo::Client->subscribe(...)" unless $PARAMS{client};
266            
267             # Request a channel key
268             my $data = encode_json {
269             client => $PARAMS{client},
270 0         0 channels => [ $channel ]
271             };
272 0         0 my $URL = $this->{AUTH_URL};
273             http_post $URL, $data,
274             headers => {
275             contentType => "application/json"
276             },
277             sub {
278 0     0   0 my ($data, $headers) = @_;
279 0 0 0     0 warn "Couldn't connect to $URL : Status=".$headers->{Status} and return unless $headers->{Status}==200;
280 0         0 my $result = decode_json $data;
281 0         0 my $key = $result->{$channel}->{sign};
282 0         0 $PARAMS{sign} = $key;
283             # The request is now complete : {channel:"...", client:"...", sign:"..."}
284 0         0 return _channel_command($this,'subscribe',%PARAMS);
285 0         0 };
286             }
287            
288             sub _channel_command {
289 0     0   0 my ($this,$command,%PARAMS) = @_;
290             # my $channel = $PARAMS{'channel'};
291             # croak("Missing channel in Centrifugo::Client->$command(...)") unless $PARAMS{'channel'};
292 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
293 0         0 my $MSG = encode_json {
294             UID => $uid ,
295             method => $command,
296             params => \%PARAMS
297             };
298 0         0 $this->_send_message($MSG);
299 0         0 return $uid;
300             }
301            
302             =head1 FUNCTION unsubscribe - allows to unsubscribe from channel.
303            
304             $client->unsubscribe( channel => $channel, [ uid => $uid ] );
305            
306             This function returns the UID used to send the command to the server. (a random string if none is provided)
307            
308             =cut
309            
310             sub unsubscribe {
311 0     0 0 0 my ($this, %PARAMS) = @_;
312 0         0 return _channel_command($this,'unsubscribe',%PARAMS);
313             }
314            
315             =head1 FUNCTION presence - allows to ask server for channel presence information.
316            
317             $client->presence( channel => $channel, [ uid => $uid ] );
318            
319             This function returns the UID used to send the command to the server. (a random string if none is provided)
320            
321             =cut
322            
323             sub presence {
324 0     0 0 0 my ($this, %PARAMS) = @_;
325 0         0 return _channel_command($this,'presence',%PARAMS);
326             }
327            
328             =head1 FUNCTION history - allows to ask server for channel presence information.
329            
330             $client->history( channel => $channel, [ uid => $uid ] );
331            
332             This function returns the UID used to send the command to the server. (a random string if none is provided)
333            
334             =cut
335            
336             sub history {
337 0     0 0 0 my ($this, %PARAMS) = @_;
338 0         0 return _channel_command($this,'history',%PARAMS);
339             }
340            
341             =head1 FUNCTION ping - allows to send ping command to server, server will answer this command with ping response.
342            
343             $client->ping( [ uid => $uid ] );
344            
345             This function returns the UID used to send the command to the server. (a random string if none is provided)
346            
347             =cut
348            
349             sub ping {
350 0     0 0 0 my ($this,%PARAMS) = @_;
351 0   0     0 my $uid = $PARAMS{'uid'} || _generate_random_id();
352 0         0 my $MSG = encode_json {
353             UID => $uid ,
354             method => 'ping'
355             };
356 0         0 $this->_send_message($MSG);
357 0         0 return $uid;
358             }
359            
360             =head1 FUNCTION on - Register a callback for the given event.
361            
362             Known events are 'message', 'connect', 'disconnect', 'subscribe', 'unsubscribe', 'publish', 'presence', 'history', 'join', 'leave',
363             'refresh', 'ping', 'ws_closed', 'ws_error'
364            
365             $client->on( 'connect', sub {
366             my( $dataRef ) = @_;
367             ...
368             });
369            
370             (this function retuns $self to allow chains of multiple function calls)
371            
372             Note : Events that are an answer to the client requests (ie 'connect', 'publish', ...) have an 'uid' which is added into the %data structure.
373            
374             =cut
375            
376             sub on {
377 4     4 0 17 my ($this, $method, $sub)=@_;
378 4         11 $this->{ON}->{$method} = $sub;
379 4         18 $this;
380             }
381            
382             =head1 FUNCTION client_id - return the client_id if it is connected to Centrifugo and the server returned this ID (which is not the case on the demo server).
383            
384             $client->client_id()
385            
386             =cut
387            
388             sub client_id {
389 0     0 0 0 my ($this)=@_;
390 0         0 return $this->{CLIENT_ID};
391             }
392            
393            
394             =head1 FUNCTION generate_token - return the private token that must be used to connect a client to Centrifugo.
395            
396             $key = Centrifugo::Client::generate_token($secret, $user, $timestamp [,$info])
397            
398             INPUT : $secret is the private secret key, only known by the server.
399            
400             $user is the user name.
401            
402             $timestamp is the current timestamp.
403            
404             $info is a JSON encoded string.
405            
406             The same function may be used to generate a private channel key :
407            
408             $key = generate_token($secret, $client, $channel [,$info])
409            
410             INPUT : $client is the client_id given when connected to Centrifugo.
411            
412             $channel is the name of the channel (should start with a '$' as it is private).
413            
414             And to sign each request to access to the HTTP API :
415            
416             $sign = generate_token($self, $data)
417            
418             INPUT : $data is a JSON string with your API commands
419            
420             =cut
421            
422             sub generate_token {
423 1     1 0 3165 my ($secret, @infos)=@_;
424 1         4 my $info = join'', @infos;
425 1     1   10 use Digest::SHA qw( hmac_sha256_hex );
  1         2  
  1         223  
426 1         16 return hmac_sha256_hex( $info, $secret );
427             }
428            
429             ##### (kinda)-private functions
430            
431             sub _send_message {
432 0     0   0 my ($this,$MSG)=@_;
433 0 0       0 print STDERR "Centrifugo::Client : S> WebSocket : $MSG\n" if $this->{DEBUG};
434 0         0 $this->{WSHANDLE}->send($MSG);
435            
436             }
437            
438            
439             # Generates a random Id for commands
440             sub _generate_random_id {
441 1     1   37 my @c = ('a'..'z','A'..'Z',0..9);
442 1         4 return join '', @c[ map{ rand @c } 1 .. 12 ];
  12         33  
443             }
444             1;