File Coverage

blib/lib/MR/IProto.pm
Criterion Covered Total %
statement 24 217 11.0
branch 0 114 0.0
condition 0 35 0.0
subroutine 8 36 22.2
pod 3 6 50.0
total 35 408 8.5


line stmt bran cond sub pod time code
1             package MR::IProto;
2              
3             =head1 NAME
4              
5             MR::IProto - iproto network protocol client
6              
7             =head1 SYNOPSIS
8              
9             IProto client can be created with full control of
10             its behaviour:
11              
12             my $client = MR::IProto->new(
13             cluster => MR::IProto::Cluster->new(
14             servers => [
15             MR::IProto::Cluster::Server->new(
16             host => 'xxx.xxx.xxx.xxx',
17             port => xxxx,
18             ),
19             ...
20             ],
21             ),
22             );
23              
24             Or without it:
25              
26             my $client = MR::IProto->new(
27             servers => 'xxx.xxx.xxx.xxx:xxxx,xxx.xxx.xxx.xxx:xxxx',
28             );
29              
30             Messages can be prepared and processed using objects (requires some more CPU):
31              
32             my $request = MyProject::Message::MyOperation::Request->new(
33             arg1 => 1,
34             arg2 => 2,
35             );
36             my $response = $client->send($request);
37             # $response isa My::Project::Message::MyOperation::Response.
38             # Of course, both message classes (request and reply) must
39             # be implemented by user.
40              
41             Or without them:
42              
43             my $response = $client->send({
44             msg => x,
45             data => [...],
46             pack => 'xxx',
47             unpack => sub {
48             my ($data) = @_;
49             return (...);
50             },
51             });
52              
53             Messages can be sent synchronously:
54              
55             my $response = $client->send($response);
56             # exception is raised if error is occured
57             # besides $@ you can check $! to identify reason of error
58              
59             Or asynchronously:
60              
61             use AnyEvent;
62             my $callback = sub {
63             my ($reply, $error) = @_;
64             # on error $error is defined and $! can be set
65             return;
66             };
67             $client->send($request, $callback);
68             # callback is called when reply is received or error is occured
69              
70             It is recommended to disconnect all connections in child after fork() to
71             prevent possible conflicts:
72              
73             my $pid = fork();
74             if ($pid == 0) {
75             MR::IProto->disconnect_all();
76             }
77              
78             =head1 DESCRIPTION
79              
80             This client is used to communicate with cluster of balanced servers using
81             iproto network protocol.
82              
83             To use it nicely you should to implement two subclasses of
84             L for each message type, one for request message
85             and another for reply.
86             This classes must be named as C, where I
87             must be passed to constructor of L as value of L
88             attribute and I is either C or C.
89             This classes must be loaded before first message through client object
90             will be sent.
91              
92             To send messages asyncronously you should to implement event loop by self.
93             L is recomended.
94              
95             =cut
96              
97 1     1   6474 use Mouse;
  1         89410  
  1         6  
98 1     1   410 use Errno;
  1         3  
  1         65  
99 1     1   10146 use MRO::Compat;
  1         4254  
  1         38  
100 1     1   11 use Scalar::Util qw(weaken);
  1         2  
  1         115  
101 1     1   16730 use Time::HiRes;
  1         2165  
  1         8  
102 1     1   872 use MR::IProto::Cluster;
  1         4  
  1         80  
103 1     1   1084 use MR::IProto::Error;
  1         3  
  1         11146  
104              
105             with 'MR::IProto::Role::Debuggable';
106              
107             =head1 ATTRIBUTES
108              
109             =over
110              
111             =item prefix
112              
113             Prefix of the class name in which hierarchy subclasses of L
114             are located. Used to find reply message classes.
115              
116             =cut
117              
118             has prefix => (
119             is => 'ro',
120             isa => 'Str',
121             default => sub { ref shift },
122             );
123              
124             =item cluster
125              
126             Instance of L. Contains all servers between which
127             requests can be balanced.
128             Also can be specified in I parameter of constructor as a list of
129             C pairs separated by comma.
130              
131             =cut
132              
133             has cluster => (
134             is => 'ro',
135             isa => 'MR::IProto::Cluster',
136             required => 1,
137             coerce => 1,
138             handles => [qw( timeout )],
139             );
140              
141             =item max_parallel
142              
143             Max amount of simultaneous request to all servers.
144              
145             =cut
146              
147             has max_parallel => (
148             is => 'ro',
149             isa => 'Int',
150             default => 1000,
151             );
152              
153             =item max_request_retries
154              
155             Max amount of request retries which must be sent to different servers
156             before error is returned.
157              
158             =cut
159              
160             has max_request_retries => (
161             is => 'ro',
162             isa => 'Int',
163             default => 2,
164             );
165              
166             =item retry_delay
167              
168             Delay between request retries.
169              
170             =cut
171              
172             has retry_delay => (
173             is => 'ro',
174             isa => 'Num',
175             default => 0,
176             );
177              
178             =back
179              
180             =cut
181              
182             has _reply_class => (
183             is => 'ro',
184             isa => 'HashRef[ClassName]',
185             lazy_build => 1,
186             );
187              
188             has _queue => (
189             is => 'ro',
190             isa => 'ArrayRef',
191             lazy_build => 1,
192             );
193              
194             has _in_progress => (
195             is => 'rw',
196             isa => 'Int',
197             default => 0,
198             );
199              
200             =head1 PUBLIC METHODS
201              
202             =over
203              
204             =item new( [ %args | \%args ] )
205              
206             Constructor.
207             See L and L for more information about allowed arguments.
208              
209             =item send( [ $message | \%args ], $callback? )
210              
211             Send C<$message> to server and receive reply.
212              
213             If C<$callback> is passed then request is done asyncronously and reply is passed
214             to callback as first argument.
215             Method B be called in void context to prevent possible errors.
216             Only client errors can be raised in async mode. All communication errors are
217             passed to callback as second argument. Additional information can be extracted
218             from C<$!> variable.
219              
220             In sync mode (when C<$callback> argument is skipped) all errors are raised
221             and C<$!> is also set. Response is returned from method, so method B
222             be called in scalar context.
223              
224             Request C<$message> can be instance of L subclass.
225             In this case reply will be also subclass of L.
226             Or it can be passed as C<\%args> hash reference with keys described
227             in L.
228              
229             =cut
230              
231             sub send {
232 0     0 1   my ($self, $message, $callback) = @_;
233 0 0         if($callback) {
234 0 0         die "Method must be called in void context if you want to use async" if defined wantarray;
235 0           $self->_send($message, $callback);
236 0           return;
237             } else {
238 0 0         die "Method must be called in scalar context if you want to use sync" unless defined wantarray;
239 0 0         my $olddie = ref $SIG{__DIE__} eq 'CODE' ? $SIG{__DIE__} : ref $SIG{__DIE__} eq 'GLOB' ? *{$SIG{__DIE__}}{CODE} : undef;
  0 0          
240 0 0   0     local $SIG{__DIE__} = sub { local $! = 0; $olddie->(@_); } if $olddie;
  0            
  0            
241 0           my %servers;
242 0           my ($data, $error, $errno);
243             my $conn = $self->_send_now($message, sub {
244 0     0     ($data, $error) = @_;
245 0           $errno = $!;
246 0           return;
247 0           }, \%servers);
248              
249 0 0 0       return if $message->{continue} && !$conn;
250              
251             my $cont = sub {
252 0 0   0     $self->_recv_now(\%servers, max => $message->{continue}?1:0);
253 0           $! = $errno;
254 0 0         return $message->{continue}->($data, $error, $errno) if ref $message->{continue} eq 'CODE';
255 0 0         die $error if $error;
256 0           return $data;
257 0           };
258              
259             return {
260 0 0         fh => $conn->fh,
261             connection => $conn,
262             continue => $cont,
263             } if $message->{continue};
264              
265 0           return &$cont();
266             }
267             }
268              
269             =item send_bulk( \@messages, $callback? )
270              
271             Send all of messages in C<\@messages> and return result (sync-mode) or
272             call callback (async-mode) after all replies was received.
273             Result is returned as array reference, which values can be instances of
274             L or L if request was passed
275             as object, or hash with keys C and C if message was passed
276             as C<\%args>.
277             Replies in result can be returned in order different then order of requests.
278              
279             See L for more information about message data. Either
280             C<$message> or C<\%args> allowed as content of C<\@messages>.
281              
282             =cut
283              
284             sub send_bulk {
285 0     0 1   my ($self, $messages, $callback) = @_;
286 0           my @result;
287 0 0         if($callback) {
288 0 0         die "Method must be called in void context if you want to use async" if defined wantarray;
289 0           my $cv = AnyEvent->condvar();
290 0     0     $cv->begin( sub { $callback->(\@result) } );
  0            
291 0           foreach my $message ( @$messages ) {
292 0           $cv->begin();
293             $self->_send($message, sub {
294 0     0     my ($data, $error) = @_;
295 0 0         push @result, blessed($data) ? $data
296             : { data => $data, error => $error };
297 0           $cv->end();
298 0           return;
299 0           });
300             }
301 0           $cv->end();
302 0           return;
303             }
304             else {
305 0 0         die "Method must be called in scalar context if you want to use sync" unless defined wantarray;
306 0 0         my $olddie = ref $SIG{__DIE__} eq 'CODE' ? $SIG{__DIE__} : ref $SIG{__DIE__} eq 'GLOB' ? *{$SIG{__DIE__}}{CODE} : undef;
  0 0          
307 0 0   0     local $SIG{__DIE__} = sub { local $! = 0; $olddie->(@_); } if $olddie;
  0            
  0            
308 0           my %servers;
309 0           foreach my $message ( @$messages ) {
310             $self->_send_now($message, sub {
311 0     0     my ($data, $error) = @_;
312 0 0         push @result, blessed($data) ? $data
313             : { data => $data, error => $error };
314 0           return;
315 0           }, \%servers);
316             }
317 0           $self->_recv_now(\%servers);
318 0           return \@result;
319             }
320             }
321              
322             sub Chat {
323 0     0 0   my $self = shift;
324 0 0         my $message = @_ == 1 ? shift : { @_ };
325 0 0         $message->{retry} = 1 if ref $message eq 'HASH';
326 0           my $data;
327 0 0         eval { $data = $self->send($message); 1 } or return;
  0            
  0            
328 0 0         return wantarray ? @$data : $data->[0];
329             }
330              
331             sub Chat1 {
332 0     0 0   my $self = shift;
333 0 0         my $message = @_ == 1 ? shift : { @_ };
334 0           my $data;
335 0 0         return eval { $data = $self->send($message); 1 } ? { ok => $data }
  0 0          
  0            
336             : { fail => $@ =~ /^(.*?) at \S+ line \d+/s ? $1 : $@, timeout => $! == Errno::ETIMEDOUT };
337             }
338              
339             sub SetTimeout {
340 0     0 0   my ($self, $timeout) = @_;
341 0           $self->timeout($timeout);
342 0           return;
343             }
344              
345             =item disconnect_all
346              
347             Class method used to disconnect all iproto-connections. Very useful in case of fork().
348              
349             =cut
350              
351             sub disconnect_all {
352 0     0 1   my ($class) = @_;
353 0           MR::IProto::Cluster::Server->disconnect_all();
354 0           return;
355             }
356              
357             =back
358              
359             =head1 PROTECTED METHODS
360              
361             =over
362              
363             =item BUILDARGS( [ %args | \%args ] )
364              
365             For compatibility with previous version of client and simplicity
366             some additional arguments to constructor is allowed:
367              
368             =over
369              
370             =item servers
371              
372             C pairs separated by comma used to create
373             L objects.
374              
375             =item timeout, tcp_nodelay, tcp_keepalive, dump_no_ints
376              
377             Are passed directly to constructor of L.
378              
379             =item balance
380              
381             Is passed directly to constructor of L.
382              
383             =back
384              
385             See L for more information.
386              
387             =cut
388              
389             my %servers;
390             around BUILDARGS => sub {
391             my $orig = shift;
392             my $class = shift;
393             my %args = @_ == 1 ? %{shift()} : @_;
394             $args{prefix} = $args{name} if exists $args{name};
395             if( $args{servers} ) {
396             my $cluster_class = $args{cluster_class} || 'MR::IProto::Cluster';
397             my $server_class = $args{server_class} || 'MR::IProto::Cluster::Server';
398             my %srvargs;
399             $srvargs{debug} = $args{debug} if exists $args{debug};
400             $srvargs{timeout} = delete $args{timeout} if exists $args{timeout};
401             $srvargs{tcp_nodelay} = delete $args{tcp_nodelay} if exists $args{tcp_nodelay};
402             $srvargs{tcp_keepalive} = delete $args{tcp_keepalive} if exists $args{tcp_keepalive};
403             $srvargs{dump_no_ints} = delete $args{dump_no_ints} if exists $args{dump_no_ints};
404             $srvargs{prefix} = $args{name} if exists $args{name} and defined $args{name};
405             my %clusterargs;
406             $clusterargs{balance} = delete $args{balance} if exists $args{balance};
407             $clusterargs{servers} = [
408             map {
409             my ($host, $port, $weight) = split /:/, $_;
410             $args{no_pool} ? my $server : $servers{"$host:$port"} ||= $server_class->new(
411             %srvargs,
412             host => $host,
413             port => $port,
414             defined $weight ? ( weight => $weight ) : (),
415             );
416             } split /,/, delete $args{servers}
417             ];
418             $args{cluster} = $cluster_class->new(%clusterargs);
419             }
420             return $class->$orig(%args);
421             };
422              
423             sub _build_debug_cb {
424 0     0     my ($self) = @_;
425 0           my $prefix = $self->prefix;
426             return sub {
427 0     0     my ($msg) = @_;
428 0           chomp $msg;
429 0           warn sprintf "%s: %s\n", $prefix, $msg;
430 0           return;
431 0           };
432             }
433              
434             sub _build__callbacks {
435 0     0     my ($self) = @_;
436 0           return {};
437             }
438              
439             sub _build__reply_class {
440 0     0     my ($self) = @_;
441 0           my $re = sprintf '^%s::', $self->prefix;
442 0           my %reply = map { $_->msg => $_ }
  0            
443             grep $_->can('msg'),
444             grep /$re/,
445             # MR::IProto::Response->meta->subclasses();
446 0           @{ mro::get_isarev('MR::IProto::Response') };
447 0           return \%reply;
448             }
449              
450             sub _build__queue {
451 0     0     my ($self) = @_;
452 0           return [];
453             }
454              
455             =item _send( [ $message | \%args ], $callback? )
456              
457             Pure asyncronious internal implementation of send.
458              
459             C<$message> is an instance of L.
460             If C<\%args> hash reference is passed instead of C<$message> then it can
461             contain following keys:
462              
463             =over
464              
465             =item msg
466              
467             Message code.
468              
469             =item key
470              
471             Depending on this value balancing between servers is implemented.
472              
473             =item data
474              
475             Message data. Already packed or unpacked. Unpacked data must be passed as
476             array reference and additional parameter I must be passed.
477              
478             =item pack
479              
480             First argument of L function.
481              
482             =item unpack
483              
484             Code reference which is used to unpack reply.
485              
486             =item no_reply
487              
488             Message have no reply.
489              
490             =item retry
491              
492             Is retry is allowed. Values of attributes L and
493             L is used if retry is allowed.
494              
495             =item is_retry
496              
497             Callback used to determine if server asks for retry. Unpacked data is passed
498             to it as a first argument.
499              
500             =back
501              
502             =cut
503              
504             sub _send {
505 0     0     my ($self, $message, $callback) = @_;
506 0 0         if( $self->_in_progress < $self->max_parallel ) {
507 0           $self->_in_progress( $self->_in_progress + 1 );
508 0 0         eval { $self->_send_now($message, $callback); 1 }
  0            
  0            
509             or $self->_report_error($message, $callback, $@);
510             }
511             else {
512 0           push @{$self->_queue}, [ $message, $callback ];
  0            
513             }
514 0           return;
515             }
516              
517             sub _finish_and_start {
518 0     0     my ($self) = @_;
519 0 0         if( my $task = shift @{$self->_queue} ) {
  0            
520 0 0         eval { $self->_send_now(@$task); 1 }
  0            
  0            
521             or $self->_report_error(@$task, $@);
522             }
523             else {
524 0           $self->_in_progress( $self->_in_progress - 1 );
525             }
526 0           return;
527             }
528              
529             sub _send_now {
530 0     0     my ($self, $message, $callback, $sync) = @_;
531 0           my $args;
532             # MR::IProto::Message OO-API
533 0 0         if( ref $message ne 'HASH' ) {
534 0           my $msg = $message->msg;
535 0           my $response_class = $self->_reply_class->{$msg};
536 0 0         die sprintf "Cannot find response class for message code %d\n", $msg unless $response_class;
537 0           $args = {
538             request => $message,
539             msg => $msg,
540             key => $message->key,
541             body => $message->data,
542             response_class => $response_class,
543             no_reply => $response_class->isa('MR::IProto::NoResponse'),
544             };
545             }
546             # Old-style compatible API
547             else {
548 0 0 0       die "unpack or no_reply must be specified" unless $message->{unpack} || $message->{no_reply};
549 0           $args = $message;
550 0           $args->{body} = exists $args->{payload} ? delete $args->{payload}
551 0 0 0       : ref $message->{data} ? pack delete $message->{pack} || 'L*', @{ delete $message->{data} }
    0          
552             : delete $message->{data};
553             }
554              
555 0           my $try = 1;
556 0           weaken($self);
557 0           my $handler;
558             $handler = sub {
559 0     0     $self->_server_callback(
560             [\$handler, $args, $callback, $sync, \$try],
561             [@_],
562             );
563 0           return;
564 0           };
565 0           return $self->_send_try($sync, $args, $handler, $try);
566             }
567              
568             sub _send_try {
569 0     0     my ($self, $sync, $args, $handler, $try) = @_;
570 0 0         my $xsync = $sync ? 'sync' : 'async';
571 0   0       $args->{max_request_retries} ||= $self->max_request_retries;
572 0 0         $self->_debug(sprintf "send msg=%d try %d of %d total", $args->{msg}, $try, $args->{max_request_retries} ) if $self->debug >= 2;
573 0           my $server = $self->cluster->server( $args->{key} );
574 0           my $connection = $server->$xsync();
575 0 0         return unless $connection->send($args->{msg}, $args->{body}, $handler, $args->{no_reply}, $args->{sync});
576 0 0 0       $sync->{$connection} ||= $connection if $sync;
577 0           return $connection;
578             }
579              
580             sub _send_retry {
581 0     0     my ($self, @in) = @_;
582 0           my ($sync) = @in;
583 0 0         if( $sync ) {
584 0           Time::HiRes::sleep($self->retry_delay);
585 0           $self->_send_try(@in);
586             }
587             else {
588 0           my $timer;
589             $timer = AnyEvent->timer(
590             after => $self->retry_delay,
591             cb => sub {
592 0     0     undef $timer;
593 0           $self->_send_try(@in);
594 0           return;
595             },
596 0           );
597             }
598 0           return;
599             }
600              
601             sub _server_callback {
602 0     0     my ($self, $req_args, $resp_args) = @_;
603 0           my ($handler, $args, $callback, $sync, $try) = @$req_args;
604 0           my ($resp_msg, $data, $error, $errno) = @$resp_args;
605             eval {
606 0 0         if ($error) {
607 0           $! = $errno;
608 0           $@ = $error;
609 0 0         my $retry = defined $args->{request} ? $args->{request}->retry()
    0          
610             : ref $args->{retry} eq 'CODE' ? $args->{retry}->()
611             : $args->{retry};
612 0 0         $self->_debug("send: failed[@{[$retry, $$try+1, $args->{max_request_retries}]}]") if $self->debug >= 2;
  0            
613 0 0 0       if( $retry && $$try++ < $args->{max_request_retries} ) {
614 0           $self->_send_retry($sync, $args, $$handler, $$try);
615             }
616             else {
617 0           undef $$handler;
618 0           $self->_report_error($args->{request}, $callback, $error, $sync, $errno);
619             }
620             }
621             else {
622 0           my $ok = eval {
623 0 0 0       die "Request and reply message code is different: $resp_msg != $args->{msg}\n"
624             unless $args->{no_reply} || $resp_msg == $args->{msg};
625 0 0         if( defined $args->{request} ) {
626 0           $data = $args->{response_class}->new( data => $data, request => $args->{request} );
627             }
628             else {
629 0 0         $data = $args->{no_reply} ? [ 0 ] : [ ref $args->{unpack} eq 'CODE' ? $args->{unpack}->($data) : unpack $args->{unpack}, $data ];
    0          
630             }
631 0           1;
632             };
633 0 0         if($ok) {
634 0 0 0       if( defined $args->{request} && $data->retry && $$try++ < $args->{max_request_retries} ) {
    0 0        
      0        
      0        
635 0           $self->_send_retry($sync, $args, $$handler, $$try);
636             }
637             elsif( defined $args->{is_retry} && $args->{is_retry}->($data) && $$try++ < $args->{max_request_retries} ) {
638 0           $self->_send_retry($sync, $args, $$handler, $$try);
639             }
640             else {
641 0           undef $$handler;
642 0 0         $self->_finish_and_start() unless $sync;
643 0           $callback->($data);
644             }
645             }
646             else {
647 0           undef $$handler;
648 0           $self->_report_error($args->{request}, $callback, $@, $sync);
649             }
650             }
651 0           1;
652 0 0         } or do {
653 0           undef $$handler;
654 0           $self->_debug("unhandled fatal error: $@");
655             };
656 0           return;
657             }
658              
659             sub _recv_now {
660 0     0     my ($self, $servers, %opts) = @_;
661 0           while(my @servers = values %$servers) {
662 0           %$servers = ();
663 0           $_->recv_all(%opts) foreach @servers;
664             }
665 0           return;
666             }
667              
668             sub _report_error {
669 0     0     my ($self, $request, $callback, $error, $sync, $errno) = @_;
670 0 0 0       my $errobj = defined $request && ref $request ne 'HASH'
    0          
671             ? MR::IProto::Error->new(
672             request => $request,
673             error => $error,
674             errno => defined $errno ? 0 + $errno : 0,
675             )
676             : undef;
677 0 0         $self->_finish_and_start() unless $sync;
678 0           $! = $errno;
679 0           $@ = $error;
680 0           $callback->($errobj, $error, $errno);
681 0           return;
682             }
683              
684             =back
685              
686             =head1 SEE ALSO
687              
688             L, L, L.
689              
690             =cut
691              
692 1     1   143 no Mouse;
  1         2  
  1         20  
693             __PACKAGE__->meta->make_immutable();
694              
695             1;