File Coverage

blib/lib/MsgPack/RPC.pm
Criterion Covered Total %
statement 105 169 62.1
branch 5 12 41.6
condition n/a
subroutine 24 38 63.1
pod 0 8 0.0
total 134 227 59.0


line stmt bran cond sub pod time code
1             package MsgPack::RPC;
2             our $AUTHORITY = 'cpan:YANICK';
3             # ABSTRACT: MessagePack RPC client
4             $MsgPack::RPC::VERSION = '2.0.3';
5              
6 2     2   133513 use strict;
  2         20  
  2         54  
7 2     2   10 use warnings;
  2         3  
  2         47  
8              
9 2     2   950 use Moose;
  2         845078  
  2         12  
10              
11 2     2   15132 use IO::Async::Loop;
  2         96978  
  2         90  
12 2     2   887 use Promises qw/ deferred /, backend => [ 'IO::Async' ];
  2         8868  
  2         14  
13              
14 2     2   38643 use MsgPack::Decoder;
  2         8  
  2         103  
15 2     2   1167 use MsgPack::Encoder;
  2         7  
  2         24  
16 2     2   6273 use MsgPack::RPC::Message;
  2         6  
  2         117  
17 2     2   982 use MsgPack::RPC::Message::Request;
  2         8  
  2         85  
18 2     2   1062 use MsgPack::RPC::Event::Write;
  2         7  
  2         74  
19 2     2   15 use MsgPack::RPC::Message::Response;
  2         4  
  2         43  
20 2     2   10 use MsgPack::RPC::Message::Notification;
  2         4  
  2         48  
21 2     2   846 use MsgPack::RPC::Event::Receive;
  2         8  
  2         79  
22              
23 2     2   15 use Scalar::Util qw/ blessed /;
  2         5  
  2         110  
24              
25 2     2   13 use experimental 'signatures', 'switch';
  2         4  
  2         24  
26              
27             with 'Beam::Emitter';
28              
29             has io => (
30             is => 'ro',
31             trigger => \&_set_io_accessors,
32             );
33              
34             has stream => (
35             is => 'rw',
36             handles => [ 'write' ],
37             );
38              
39             has loop => (
40             is => 'ro',
41             lazy => 1,
42             default => sub { IO::Async::Loop->new },
43             handles => [ 'run' ],
44             );
45              
46             sub bin_2_hex {
47 0     0 0 0 join '', map { sprintf "%#x", ord } split '', shift;
  0         0  
48             }
49              
50             has log => (
51             is => 'ro',
52             lazy => 1,
53             default => sub {
54             require Log::Any;
55             Log::Any->get_logger;
56             },
57             );
58              
59 0     0   0 sub _buffer_read ( $rpc, $stream, $buffref, $eof ) {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
60 0         0 $rpc->log->tracef( 'reading %s', bin_2_hex($$buffref) );
61              
62 0         0 $rpc->read( $$buffref );
63              
64 0         0 $$buffref = '';
65              
66 0         0 return 0;
67             }
68              
69 0     0   0 sub _set_inet_io ( $self, $host, $port ) {
  0         0  
  0         0  
  0         0  
  0         0  
70              
71             $self->loop->connect(
72             host => $host,
73             port => $port,
74             socktype => 'stream',
75             on_stream => sub {
76 0     0   0 my $stream = shift;
77             $stream->configure(
78 0         0 on_read => sub { $self->_buffer_read(@_) },
79 0         0 );
80 0         0 $self->loop->add($stream);
81             $self->on( 'write', sub {
82 0         0 my $event = shift;
83 0         0 $stream->write( $event->encoded );
84 0         0 });
85             },
86 0     0   0 on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
87 0     0   0 on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
88 0         0 );
89             }
90              
91 0     0   0 sub _set_socket_io ( $self, $socket ) {
  0         0  
  0         0  
  0         0  
92              
93             $self->loop->connect(
94             addr => {
95             family => 'unix',
96             socktype => 'stream',
97             path => $socket,
98             },
99             on_stream => sub {
100 0     0   0 my $stream = shift;
101 0         0 $self->stream($stream);
102             $stream->configure(
103 0         0 on_read => sub { $self->_buffer_read(@_) },
104 0         0 );
105 0         0 $self->loop->add($stream);
106             $self->on( 'write', sub {
107 0         0 my $event = shift;
108 0         0 $stream->write( $event->encoded );
109 0         0 });
110             },
111 0     0   0 on_resolve_error => sub { die "Cannot resolve - $_[-1]\n"; },
112 0     0   0 on_connect_error => sub { die "Cannot connect - $_[0] failed $_[-1]\n"; },
113 0         0 );
114             }
115              
116 0     0   0 sub _set_fh_io ( $self, $in_fh, $out_fh ) {
  0         0  
  0         0  
  0         0  
  0         0  
117              
118 0         0 $out_fh->autoflush(1);
119              
120 0         0 require IO::Async::Stream;
121             my $stream = IO::Async::Stream->new(
122             read_handle => $in_fh,
123 0     0   0 on_read => sub { $self->_buffer_read(@_) },
124 0         0 );
125              
126 0         0 $self->loop->add($stream);
127             $self->on( 'write', sub {
128 0     0   0 $self->log->debugf( 'uh? %s', $_[0] );
129 0         0 my $event = shift;
130 0         0 $out_fh->syswrite( $event->encoded );
131 0         0 });
132             }
133              
134 0     0   0 sub _set_io_accessors($self,$io,@) {
  0         0  
  0         0  
  0         0  
135              
136 0 0       0 return $self->_set_fh_io( @$io ) if ref $io eq 'ARRAY';
137              
138 0 0       0 return $self->_set_inet_io(split ':', $io)
139             if $io =~ /:/;
140              
141 0         0 return $self->_set_socket_io($io);
142              
143             }
144              
145             has decoder => (
146             isa => 'MsgPack::Decoder',
147             is => 'ro',
148             lazy => 1,
149             default => sub {
150             my $self = shift;
151              
152             my $decoder = MsgPack::Decoder->new( emitter => 1 );
153              
154             $decoder->on( decoded => sub {
155             my $event = shift;
156             $self->receive($_) for $event->payload_list;
157             });
158              
159             return $decoder;
160             },
161             handles => [ 'read' ],
162             );
163              
164 6     6 0 12 sub receive ( $self, $message ) {
  6         11  
  6         15  
  6         8  
165 6         19 my @message = @$message;
166              
167 6         10 my $m;
168 6         14 given ( $message[0] ) {
169 6         47 $m = MsgPack::RPC::Message::Request->new( id => $message[1], method => $message[2], params => $message[3] ) when 0;
170 2         17 $m = MsgPack::RPC::Message::Response->new( id => $message[1], error => $message[2], result => $message[3] ) when 1;
171 1         12 $m = MsgPack::RPC::Message::Notification->new( method => $message[1], params => $message[2] ) when 2;
172             }
173              
174 6         5643 $self->emit( 'receive', class => 'MsgPack::RPC::Event::Receive', message => $m );
175              
176             # if a response, trigger the callback
177 6 100       12694 if( $m->is_response ) {
178 1 50       31 if ( my $callback = delete $self->response_callbacks->{$m->id} ) {
179 1 50       15 if( $m->is_error ) {
180 0         0 $callback->{deferred}->reject($m->error);
181             }
182             else {
183 1         39 $callback->{deferred}->resolve($m->result);
184             }
185             }
186             }
187             else {
188 5         152 $self->emit( $m->method, class => 'MsgPack::RPC::Event::Receive', message => $m );
189             }
190             }
191              
192              
193             has "response_callbacks" => (
194             is => 'ro',
195             lazy => 1,
196             default => sub {
197             {};
198             },
199             );
200              
201             has timeout => (
202             is => 'ro',
203             default => 300,
204             );
205              
206             sub add_response_callback {
207 4     4 0 88 my( $self, $id ) = @_;
208 4         24 my $deferred = deferred;
209 4         231 $self->response_callbacks->{$id} = {
210             timestamp => time,
211             deferred => $deferred,
212             };
213              
214 4         27 require IO::Async::Timer::Countdown;
215             my $timeout = IO::Async::Timer::Countdown->new(
216             delay => $self->timeout,
217             on_expire => sub {
218 1     1   994678 delete $self->response_callbacks->{$id};
219 1         14 $deferred->reject('timeout');
220             }
221 4         251 );
222              
223 4         523 $self->loop->add($timeout->start);
224              
225 4     2   10686 $deferred->finally(sub{ $self->loop->remove($timeout) });
  2         359  
226             }
227              
228 4     4 0 71986 sub send_request($self,$method,$args=[],$id=++$MsgPack::RPC::MSG_ID) {
  4         12  
  4         9  
  4         10  
  4         12  
  4         6  
229 4         53 my $request = MsgPack::RPC::Message::Request->new(
230             method => $method,
231             params => $args,
232             );
233              
234 4         3678 my $callback = $self->add_response_callback($request->id);
235              
236 4         347 $self->send($request);
237              
238 4         6463 return $callback->{deferred};
239             }
240              
241 2     2 0 103 sub send_response($self,$id,$args) {
  2         4  
  2         6  
  2         3  
  2         5  
242 2         21 $self->send(
243             MsgPack::RPC::Message::Response->new(
244             id => $id,
245             result => $args,
246             )
247             );
248             }
249              
250 1     1 0 45 sub send_response_error($self,$id,$args) {
  1         2  
  1         2  
  1         2  
  1         3  
251 1         6 $self->send(
252             MsgPack::RPC::Message::Response->new(
253             id => $id,
254             error => $args,
255             )
256             );
257             }
258              
259 2     2 0 2586 sub send_notification ($self,$method,$args=[]) {
  2         5  
  2         4  
  2         7  
  2         4  
260 2         22 $self->send( MsgPack::RPC::Message::Notification->new(
261             method => $method, params => $args,
262             ));
263             }
264              
265              
266 9     9 0 3329 sub send($self,$struct) {
  9         18  
  9         18  
  9         12  
267 9 50       49 my $type = blessed $struct ? 'message' : 'payload';
268 9         44 $self->emit( 'write', class => 'MsgPack::RPC::Event::Write', $type => $struct );
269             }
270              
271             1;
272              
273             __END__
274              
275             =pod
276              
277             =encoding UTF-8
278              
279             =head1 NAME
280              
281             MsgPack::RPC - MessagePack RPC client
282              
283             =head1 VERSION
284              
285             version 2.0.3
286              
287             =head1 SYNOPSIS
288              
289             use MsgPack::RPC;
290              
291             my $rpc = MsgPack::RPC->new( io => '127.0.0.1:6666' );
292              
293             $rpc->notify( 'something' => [ 'with', 'args' ] );
294              
295             $rpc->request(
296             request_method => [ 'some', 'args' ]
297             )->on_done(sub{
298             print "replied with: ", @_;
299             });
300              
301             $rpc->loop;
302              
303             =head1 DESCRIPTION
304              
305             C<MsgPack::RPC> implements a MessagePack RPC client following
306             the protocol described at L<https://github.com/msgpack-rpc/msgpack-rpc/blob/master/spec.md>.
307              
308             =head1 METHODS
309              
310             =head2 new( %args )
311              
312             =over
313              
314             =item io( $socket )
315              
316             =item io( [ $in_fh, $out_fh ] )
317              
318             Required. Defines which IO on which the MessagePack messages will be received and sent.
319              
320             The IO can be a local socket (e.g., C</tmp/rpc.socket> ), a network socket (e.g., C<127.0.0.1:6543>),
321             or a pair of filehandles.
322              
323             =back
324              
325             =head2 io()
326              
327             Returns the IO descriptor(s) used by the object.
328              
329             =head2 request( $method, $args, $id )
330              
331             Sends the request. The C<$id> is optional, and will be automatically
332             assigned from an internal self-incrementing list if not given.
333              
334             Returns a promise that will be fulfilled once a response is received. The response can be either a success
335             or a failure, and in both case the fulfilled promise will be given whatever values are passed in the response.
336              
337             $rpc->request( 'ls', [ '/home', '/tmp' ] )
338             ->on_done(sub{ say for @_ })
339             ->on_fail(sub{ die "couldn't read directories: ", @_ });
340              
341             =head2 notify( $method, $args )
342              
343             Sends a notification.
344              
345             =head2 subscribe( $event_name, \&callback )
346              
347             # 'ping' is a request
348             $rpc->subscribe( ping => sub($msg) {
349             $msg->response->done('pong');
350             });
351              
352             # 'log' is a notification
353             $rpc->subscribe( log => sub($msg) {
354             print {$fh} @{$msg->args};
355             });
356              
357             Register a callback for the given event. If a notification or a request matching the
358             event
359             is received, the callback will be called. The callback will be passed either a L<MsgPack::RPC::Message> (if triggered by
360             a notification) or
361             L<MsgPack::RPC::Message::Request> object.
362              
363             Events can have any number of callbacks assigned to them.
364              
365             The subscription system is implemented using the L<Beam::Emitter> role.
366              
367             =head2 loop( $end_condition )
368              
369             Reads and process messages from the incoming stream, endlessly if not be given an optional C<$end_condition>.
370             The end condition can be given a number of messages to read, or a promise that will end the loop once
371             fulfilled.
372              
373             # loop until we get a response from our request
374              
375             my $response = $rpc->request('add', [1,2] );
376              
377             $response->on_done(sub{ print "sum is ", @_ });
378              
379             $rpc->loop($response);
380              
381              
382             # loop 100 times
383             $rpc->loop(100);
384              
385             =head1 SEE ALSO
386              
387             =over
388              
389             =item L<MsgPack::RPC::Message>
390              
391             =item L<MsgPack::RPC::Message::Request>
392              
393             =item L<MsgPack::Encoder>
394              
395             =item L<MsgPack::Decoder>
396              
397             =item L<Data::MessagePack> (alternative to C<MsgPack::Encoder> and C<MsgPack::Decoder>.
398              
399             =back
400              
401             =head1 AUTHOR
402              
403             Yanick Champoux <yanick@cpan.org>
404              
405             =head1 COPYRIGHT AND LICENSE
406              
407             This software is copyright (c) 2019, 2017, 2016, 2015 by Yanick Champoux.
408              
409             This is free software; you can redistribute it and/or modify it under
410             the same terms as the Perl 5 programming language system itself.
411              
412             =cut