File Coverage

blib/lib/SignalWire/Agents/Relay/Client.pm
Criterion Covered Total %
statement 130 307 42.3
branch 42 122 34.4
condition 35 108 32.4
subroutine 21 43 48.8
pod 0 13 0.0
total 228 593 38.4


line stmt bran cond sub pod time code
1             package SignalWire::Agents::Relay::Client;
2 3     3   106411 use strict;
  3         7  
  3         126  
3 3     3   13 use warnings;
  3         5  
  3         191  
4 3     3   385 use Moo;
  3         7186  
  3         14  
5              
6 3     3   2672 use JSON qw(encode_json decode_json);
  3         17850  
  3         22  
7 3     3   3016 use IO::Socket::SSL;
  3         396167  
  3         38  
8 3     3   3015 use Protocol::WebSocket::Client;
  3         137180  
  3         183  
9 3         350 use SignalWire::Agents::Relay::Constants qw(
10             PROTOCOL_VERSION
11             CALL_TERMINAL_STATES
12             DIAL_STATE_ANSWERED DIAL_STATE_FAILED
13             MESSAGE_TERMINAL_STATES
14 3     3   786 );
  3         10  
15 3     3   702 use SignalWire::Agents::Relay::Event;
  3         13  
  3         164  
16 3     3   1394 use SignalWire::Agents::Relay::Call;
  3         10  
  3         198  
17 3     3   2640 use SignalWire::Agents::Relay::Message;
  3         14  
  3         137  
18 3     3   2443 use SignalWire::Agents::Logging;
  3         11  
  3         14840  
19              
20             my $logger = SignalWire::Agents::Logging->get_logger('relay_client');
21              
22             has 'project' => ( is => 'ro', required => 1 );
23             has 'token' => ( is => 'ro', required => 1 );
24             has 'host' => ( is => 'ro', required => 1 );
25             has 'contexts' => ( is => 'rw', default => sub { [] } );
26             has 'agent' => ( is => 'ro', default => sub { 'signalwire-agents-perl/1.0' } );
27              
28             # Connection state
29             has 'protocol' => ( is => 'rw', default => sub { '' } );
30             has 'authorization_state' => ( is => 'rw', default => sub { '' } );
31             has 'connected' => ( is => 'rw', default => sub { 0 } );
32             has 'session_id' => ( is => 'rw', default => sub { '' } );
33              
34             # Correlation maps
35             has '_pending' => ( is => 'rw', default => sub { {} } ); # rpc_id => { resolve => sub, reject => sub }
36             has '_calls' => ( is => 'rw', default => sub { {} } ); # call_id => Call
37             has '_pending_dials' => ( is => 'rw', default => sub { {} } ); # tag => { resolve => sub, reject => sub }
38             has '_messages' => ( is => 'rw', default => sub { {} } ); # message_id => Message
39              
40             # WebSocket internals
41             has '_socket' => ( is => 'rw', default => sub { undef } );
42             has '_ws' => ( is => 'rw', default => sub { undef } );
43              
44             # Reconnect state
45             has '_reconnect_attempts' => ( is => 'rw', default => sub { 0 } );
46             has '_max_backoff' => ( is => 'ro', default => sub { 30 } );
47              
48             # Callbacks
49             has '_on_call' => ( is => 'rw', default => sub { undef } );
50             has '_on_message' => ( is => 'rw', default => sub { undef } );
51             has '_on_event' => ( is => 'rw', default => sub { undef } );
52              
53             # --- UUID generation ---
54             sub _generate_uuid {
55 0     0   0 my @hex = map { sprintf('%02x', int(rand(256))) } 1..16;
  0         0  
56 0         0 $hex[6] = sprintf('%02x', (hex($hex[6]) & 0x0f) | 0x40);
57 0         0 $hex[8] = sprintf('%02x', (hex($hex[8]) & 0x3f) | 0x80);
58 0         0 return join('-',
59             join('', @hex[0..3]),
60             join('', @hex[4..5]),
61             join('', @hex[6..7]),
62             join('', @hex[8..9]),
63             join('', @hex[10..15]),
64             );
65             }
66              
67             # --- Public API: register handlers ---
68              
69             sub on_call {
70 2     2 0 27 my ($self, $cb) = @_;
71 2         7 $self->_on_call($cb);
72 2         3 return $self;
73             }
74              
75             sub on_message {
76 3     3 0 36 my ($self, $cb) = @_;
77 3         10 $self->_on_message($cb);
78 3         6 return $self;
79             }
80              
81             sub on_event {
82 1     1 0 4 my ($self, $cb) = @_;
83 1         3 $self->_on_event($cb);
84 1         1 return $self;
85             }
86              
87             # --- Connection ---
88              
89             sub connect_ws {
90 0     0 0 0 my ($self) = @_;
91              
92 0         0 my $host = $self->host;
93 0         0 my $port = 443;
94              
95 0         0 $logger->debug("Connecting to wss://$host");
96              
97 0         0 my $socket = IO::Socket::SSL->new(
98             PeerHost => $host,
99             PeerPort => $port,
100             SSL_verify_mode => SSL_VERIFY_PEER,
101             Timeout => 10,
102             );
103 0 0       0 unless ($socket) {
104 0         0 $logger->error("SSL connection failed: $! $IO::Socket::SSL::SSL_ERROR");
105 0         0 return 0;
106             }
107              
108 0         0 my $ws = Protocol::WebSocket::Client->new(url => "wss://$host");
109              
110             $ws->on(write => sub {
111 0     0   0 my ($ws_client, $buf) = @_;
112 0         0 syswrite($socket, $buf);
113 0         0 });
114              
115             $ws->on(connect => sub {
116 0     0   0 $logger->debug("WebSocket connected");
117 0         0 });
118              
119             $ws->on(error => sub {
120 0     0   0 my ($ws_client, $error) = @_;
121 0         0 $logger->error("WebSocket error: $error");
122 0         0 });
123              
124             $ws->on(read => sub {
125 0     0   0 my ($ws_client, $message) = @_;
126 0         0 $self->_handle_message($message);
127 0         0 });
128              
129 0         0 $self->_socket($socket);
130 0         0 $self->_ws($ws);
131              
132             # Initiate the WebSocket handshake
133 0         0 $ws->connect;
134              
135             # Read the handshake response
136 0         0 my $buf = '';
137 0         0 while (my $bytes = sysread($socket, $buf, 4096, length($buf))) {
138 0         0 $ws->read($buf);
139 0         0 $buf = '';
140 0 0       0 last if $ws->{hs}->is_done;
141             }
142              
143 0         0 $self->connected(1);
144 0         0 $self->_reconnect_attempts(0);
145 0         0 return 1;
146             }
147              
148             sub authenticate {
149 0     0 0 0 my ($self) = @_;
150              
151 0         0 my %params = (
152             version => PROTOCOL_VERSION,
153             agent => $self->agent,
154             event_acks => JSON::true,
155             authentication => {
156             project => $self->project,
157             token => $self->token,
158             },
159             );
160              
161             # Add contexts if any
162 0 0       0 if (@{$self->contexts}) {
  0         0  
163 0         0 $params{contexts} = $self->contexts;
164             }
165              
166             # Add protocol for session resume
167 0 0       0 if ($self->protocol) {
168 0         0 $params{protocol} = $self->protocol;
169             }
170              
171             # Add authorization_state for fast re-auth
172 0 0       0 if ($self->authorization_state) {
173 0         0 $params{authorization_state} = $self->authorization_state;
174             }
175              
176 0         0 my $result = $self->execute('signalwire.connect', \%params);
177              
178 0 0       0 if (ref $result eq 'HASH') {
179 0 0       0 $self->protocol($result->{protocol}) if $result->{protocol};
180 0 0       0 $self->session_id($result->{session_id}) if $result->{session_id};
181             }
182              
183 0         0 return $result;
184             }
185              
186             # --- JSON-RPC execute ---
187              
188             sub execute {
189 0     0 0 0 my ($self, $method, $params) = @_;
190 0   0     0 $params //= {};
191              
192 0         0 my $id = _generate_uuid();
193              
194             # Add protocol to params (except for signalwire.connect itself)
195 0 0 0     0 if ($method ne 'signalwire.connect' && $self->protocol) {
196 0         0 $params->{protocol} = $self->protocol;
197             }
198              
199 0         0 my $request = {
200             jsonrpc => '2.0',
201             id => $id,
202             method => $method,
203             params => $params,
204             };
205              
206             # Register pending
207 0         0 my $result;
208 0         0 my $done = 0;
209 0         0 my $error;
210             $self->_pending->{$id} = {
211 0     0   0 resolve => sub { $result = $_[0]; $done = 1 },
  0         0  
212 0     0   0 reject => sub { $error = $_[0]; $done = 1 },
  0         0  
213 0         0 };
214              
215 0         0 $self->_send($request);
216              
217             # Poll for response (synchronous in this implementation)
218 0         0 my $timeout = 30;
219 0         0 my $start = time();
220 0   0     0 while (!$done && (time() - $start) < $timeout) {
221 0         0 $self->_read_once();
222             }
223              
224 0         0 delete $self->_pending->{$id};
225              
226 0 0       0 if ($error) {
227 0         0 die "RELAY error: $error";
228             }
229              
230 0         0 return $result;
231             }
232              
233             # --- Messaging ---
234              
235             sub send_message {
236 0     0 0 0 my ($self, %opts) = @_;
237              
238 0         0 my %params;
239 0         0 for my $key (qw(context to_number from_number body media tags region)) {
240 0 0       0 $params{$key} = $opts{$key} if exists $opts{$key};
241             }
242              
243 0         0 my $result = $self->execute('messaging.send', \%params);
244              
245 0 0 0     0 if (ref $result eq 'HASH' && $result->{message_id}) {
246             my $msg = SignalWire::Agents::Relay::Message->new(
247             message_id => $result->{message_id},
248             context => $opts{context} // '',
249             direction => 'outbound',
250             from_number => $opts{from_number} // '',
251             to_number => $opts{to_number} // '',
252             body => $opts{body} // '',
253             media => $opts{media} // [],
254 0   0     0 tags => $opts{tags} // [],
      0        
      0        
      0        
      0        
      0        
255             state => 'queued',
256             );
257 0         0 $self->_messages->{$result->{message_id}} = $msg;
258              
259 0 0       0 if ($opts{on_completed}) {
260 0         0 $msg->on_completed($opts{on_completed});
261             }
262              
263 0         0 return $msg;
264             }
265              
266 0         0 return $result;
267             }
268              
269             # --- Context management ---
270              
271             sub receive {
272 0     0 0 0 my ($self, @contexts) = @_;
273 0         0 return $self->execute('signalwire.receive', { contexts => \@contexts });
274             }
275              
276             sub unreceive {
277 0     0 0 0 my ($self, @contexts) = @_;
278 0         0 return $self->execute('signalwire.unreceive', { contexts => \@contexts });
279             }
280              
281             # --- Dial ---
282              
283             sub dial {
284 0     0 0 0 my ($self, %opts) = @_;
285 0   0     0 my $tag = $opts{tag} || _generate_uuid();
286 0   0     0 my $timeout = delete $opts{timeout} || 120;
287 0         0 my $on_completed = delete $opts{on_completed};
288              
289 0         0 my %params = ( tag => $tag );
290 0 0       0 $params{devices} = $opts{devices} if $opts{devices};
291 0 0       0 $params{region} = $opts{region} if $opts{region};
292 0 0       0 $params{max_price_per_minute} = $opts{max_price_per_minute} if exists $opts{max_price_per_minute};
293              
294             # Register pending dial BEFORE sending RPC
295 0         0 my $call;
296 0         0 my $done = 0;
297 0         0 my $dial_error;
298             $self->_pending_dials->{$tag} = {
299 0     0   0 resolve => sub { $call = $_[0]; $done = 1 },
  0         0  
300 0     0   0 reject => sub { $dial_error = $_[0]; $done = 1 },
  0         0  
301 0         0 };
302              
303             # Send the RPC -- response is just {code:200, message:"Dialing"}
304 0         0 eval { $self->execute('calling.dial', \%params) };
  0         0  
305 0 0       0 if ($@) {
306 0         0 delete $self->_pending_dials->{$tag};
307 0         0 die $@;
308             }
309              
310             # Wait for calling.call.dial event to resolve
311 0         0 my $start = time();
312 0   0     0 while (!$done && (time() - $start) < $timeout) {
313 0         0 $self->_read_once();
314             }
315              
316 0         0 delete $self->_pending_dials->{$tag};
317              
318 0 0       0 if ($dial_error) {
319 0         0 die "Dial failed: $dial_error";
320             }
321              
322 0 0 0     0 if ($call && $on_completed) {
323             $call->on(sub {
324 0     0   0 my ($c, $event) = @_;
325 0 0 0     0 if ($event->event_type eq 'calling.call.state' && $event->call_state eq 'ended') {
326 0         0 eval { $on_completed->($c) };
  0         0  
327 0 0       0 warn "dial on_completed error: $@" if $@;
328             }
329 0         0 });
330             }
331              
332 0         0 return $call;
333             }
334              
335             # --- Internal: send a JSON-RPC message ---
336              
337             sub _send {
338 0     0     my ($self, $msg) = @_;
339 0           my $json = encode_json($msg);
340 0           $logger->debug("SEND: $json");
341 0           my $ws = $self->_ws;
342 0 0         if ($ws) {
343 0           $ws->write($json);
344             }
345             }
346              
347             # --- Internal: read one frame from WebSocket ---
348              
349             sub _read_once {
350 0     0   0 my ($self) = @_;
351 0         0 my $socket = $self->_socket;
352 0 0       0 return unless $socket;
353              
354 0         0 my $buf = '';
355 0         0 my $ready = '';
356 0         0 vec($ready, fileno($socket), 1) = 1;
357 0 0       0 if (select($ready, undef, undef, 0.1)) {
358 0         0 my $bytes = sysread($socket, $buf, 4096);
359 0 0 0     0 if ($bytes && $bytes > 0) {
    0 0        
360 0         0 $self->_ws->read($buf);
361             } elsif (!defined $bytes || $bytes == 0) {
362             # Connection lost
363 0         0 $self->connected(0);
364             }
365             }
366             }
367              
368             # --- Internal: handle an incoming WebSocket message ---
369              
370             sub _handle_message {
371 4     4   231 my ($self, $raw) = @_;
372 4         25 $logger->debug("RECV: $raw");
373              
374 4         8 my $msg;
375 4         6 eval { $msg = decode_json($raw) };
  4         29  
376 4 50       8 if ($@) {
377 0         0 $logger->error("JSON parse error: $@");
378 0         0 return;
379             }
380              
381             # JSON-RPC response (has "result" or "error", matched by "id")
382 4 100 100     24 if (exists $msg->{result} || exists $msg->{error}) {
383 2   50     6 my $id = $msg->{id} // '';
384 2 50       7 if (my $pending = delete $self->_pending->{$id}) {
385 2 100       6 if (exists $msg->{error}) {
386 1         4 $pending->{reject}->($msg->{error});
387             } else {
388 1         3 $pending->{resolve}->($msg->{result});
389             }
390             }
391 2         17 return;
392             }
393              
394             # Server-initiated method call
395 2   50     4 my $method = $msg->{method} // '';
396              
397 2 100       8 if ($method eq 'signalwire.event') {
    50          
    0          
398             # ACK the event immediately
399 1         3 $self->_send_ack($msg->{id});
400 1   50     7 $self->_handle_event($msg->{params} // {});
401             }
402             elsif ($method eq 'signalwire.ping') {
403 1         3 $self->_send_ack($msg->{id});
404             }
405             elsif ($method eq 'signalwire.disconnect') {
406 0         0 $self->_send_ack($msg->{id});
407 0   0     0 $self->_handle_disconnect($msg->{params} // {});
408             }
409             }
410              
411             # --- Internal: send an ACK ---
412              
413             sub _send_ack {
414 2     2   4 my ($self, $id) = @_;
415 2         9 $self->_send({
416             jsonrpc => '2.0',
417             id => $id,
418             result => {},
419             });
420             }
421              
422             # --- Internal: handle events ---
423              
424             sub _handle_event {
425 18     18   3895 my ($self, $outer_params) = @_;
426 18   50     74 my $event_type = $outer_params->{event_type} // '';
427 18   50     45 my $inner_params = $outer_params->{params} // {};
428              
429             # Parse into typed event object
430 18         110 my $event = SignalWire::Agents::Relay::Event->parse_event($event_type, $inner_params);
431              
432             # Fire global event callback
433 18 50       114 if (my $cb = $self->_on_event) {
434 0         0 eval { $cb->($event) };
  0         0  
435 0 0       0 warn "on_event callback error: $@" if $@;
436             }
437              
438             # --- Authorization state ---
439 18 100       55 if ($event_type eq 'signalwire.authorization.state') {
440 1   50     8 $self->authorization_state($inner_params->{authorization_state} // '');
441 1         5 return;
442             }
443              
444             # --- Inbound call ---
445 17 100       41 if ($event_type eq 'calling.call.receive') {
446 1         3 $self->_handle_inbound_call($event, $inner_params);
447 1         4 return;
448             }
449              
450             # --- Inbound message ---
451 16 100       42 if ($event_type eq 'messaging.receive') {
452 2         13 $self->_handle_inbound_message($event);
453 2         5 return;
454             }
455              
456             # --- Message state ---
457 14 100       31 if ($event_type eq 'messaging.state') {
458 4   50     13 my $message_id = $inner_params->{message_id} // '';
459 4 50       22 if (my $msg = $self->_messages->{$message_id}) {
460 4         18 $msg->dispatch_event($event);
461 4 100       15 if ($msg->completed) {
462 2         10 delete $self->_messages->{$message_id};
463             }
464             }
465 4         18 return;
466             }
467              
468             # --- Dial completion ---
469 10 100       26 if ($event_type eq 'calling.call.dial') {
470 5         22 $self->_handle_dial_event($event, $inner_params);
471 5         33 return;
472             }
473              
474             # --- State events during dial (call not registered yet) ---
475 5   50     15 my $call_id = $inner_params->{call_id} // '';
476 5   100     19 my $tag = $inner_params->{tag} // '';
477              
478 5 50 66     32 if ($event_type eq 'calling.call.state' && $tag && exists $self->_pending_dials->{$tag}) {
      66        
479 2 50 33     14 if (!exists $self->_calls->{$call_id} && $call_id) {
480             # Create the Call object so events route correctly
481             my $call = SignalWire::Agents::Relay::Call->new(
482             call_id => $call_id,
483             node_id => $inner_params->{node_id} // '',
484             tag => $tag,
485             device => $inner_params->{device} // {},
486 2   50     37 _client => $self,
      50        
487             );
488 2         17 $self->_calls->{$call_id} = $call;
489             }
490             }
491              
492             # --- Normal routing by call_id ---
493 5 100 66     37 if ($call_id && (my $call = $self->_calls->{$call_id})) {
494 4         21 $call->dispatch_event($event);
495              
496             # Clean up ended calls
497 4 100       34 if ($call->state eq 'ended') {
498 1         10 delete $self->_calls->{$call_id};
499             }
500             }
501             }
502              
503             # --- Internal: handle inbound call ---
504              
505             sub _handle_inbound_call {
506 1     1   3 my ($self, $event, $params) = @_;
507 1   50     3 my $call_id = $params->{call_id} // '';
508 1 50       3 return unless $call_id;
509              
510             my $call = SignalWire::Agents::Relay::Call->new(
511             call_id => $call_id,
512             node_id => $params->{node_id} // '',
513             tag => $params->{tag} // '',
514             device => $params->{device} // {},
515             context => $params->{context} // '',
516 1   50     35 state => $params->{call_state} // 'ringing',
      50        
      50        
      50        
      50        
517             _client => $self,
518             );
519 1         9 $self->_calls->{$call_id} = $call;
520              
521 1 50       3 if (my $cb = $self->_on_call) {
522 1         2 eval { $cb->($call) };
  1         3  
523 1 50       19 warn "on_call callback error: $@" if $@;
524             }
525             }
526              
527             # --- Internal: handle inbound message ---
528              
529             sub _handle_inbound_message {
530 2     2   6 my ($self, $event) = @_;
531              
532 2 50       12 if (my $cb = $self->_on_message) {
533 2         5 eval { $cb->($event) };
  2         7  
534 2 50       12 warn "on_message callback error: $@" if $@;
535             }
536             }
537              
538             # --- Internal: handle dial completion event ---
539              
540             sub _handle_dial_event {
541 5     5   13 my ($self, $event, $params) = @_;
542 5   50     16 my $tag = $params->{tag} // '';
543 5   50     14 my $dial_state = $params->{dial_state} // '';
544 5   100     18 my $call_info = $params->{call} // {};
545              
546 5         15 my $pending = $self->_pending_dials->{$tag};
547 5 100       14 return unless $pending;
548              
549 4 100       14 if ($dial_state eq DIAL_STATE_ANSWERED) {
    50          
550 2   50     8 my $call_id = $call_info->{call_id} // '';
551 2         7 my $call = $self->_calls->{$call_id};
552 2 50       6 unless ($call) {
553             $call = SignalWire::Agents::Relay::Call->new(
554             call_id => $call_id,
555             node_id => $call_info->{node_id} // '',
556             tag => $tag,
557             device => $call_info->{device} // {},
558 0   0     0 dial_winner => 1,
      0        
559             state => 'answered',
560             _client => $self,
561             );
562 0         0 $self->_calls->{$call_id} = $call;
563             }
564 2         9 $call->state('answered');
565 2         7 $call->dial_winner(1);
566 2         7 $pending->{resolve}->($call);
567             }
568             elsif ($dial_state eq DIAL_STATE_FAILED) {
569 2         8 $pending->{reject}->("Dial failed");
570             }
571             }
572              
573             # --- Internal: handle server disconnect ---
574              
575             sub _handle_disconnect {
576 2     2   1423 my ($self, $params) = @_;
577 2   100     10 my $restart = $params->{restart} || 0;
578              
579 2 100       7 if ($restart) {
580             # Clear session state, connect fresh
581 1         8 $self->protocol('');
582 1         4 $self->authorization_state('');
583             }
584              
585 2         7 $self->connected(0);
586             # The client should reconnect (handled by the event loop)
587             }
588              
589             # --- Reconnection ---
590              
591             sub reconnect {
592 0     0 0   my ($self) = @_;
593              
594             # Reject all pending requests
595 0           for my $id (keys %{$self->_pending}) {
  0            
596 0           my $p = delete $self->_pending->{$id};
597 0 0         $p->{reject}->("Disconnected") if $p;
598             }
599              
600             # Reject all pending dials
601 0           for my $tag (keys %{$self->_pending_dials}) {
  0            
602 0           my $p = delete $self->_pending_dials->{$tag};
603 0 0         $p->{reject}->("Disconnected") if $p;
604             }
605              
606             # Exponential backoff: 1s, 2s, 4s, ... up to max_backoff
607 0           my $attempts = $self->_reconnect_attempts;
608 0           my $delay = 2 ** $attempts;
609 0 0         $delay = $self->_max_backoff if $delay > $self->_max_backoff;
610              
611 0           $logger->info("Reconnecting in ${delay}s (attempt " . ($attempts + 1) . ")");
612 0           select(undef, undef, undef, $delay);
613              
614 0           $self->_reconnect_attempts($attempts + 1);
615              
616 0 0         if ($self->connect_ws) {
617 0           return $self->authenticate;
618             }
619              
620 0           return undef;
621             }
622              
623             # --- Disconnect ---
624              
625             sub disconnect_ws {
626 0     0 0   my ($self) = @_;
627 0           $self->connected(0);
628 0 0         if ($self->_socket) {
629 0           close($self->_socket);
630 0           $self->_socket(undef);
631             }
632 0           $self->_ws(undef);
633             }
634              
635             # --- Run event loop ---
636              
637             sub run {
638 0     0 0   my ($self) = @_;
639 0           while ($self->connected) {
640 0           $self->_read_once();
641             }
642             }
643              
644             1;