File Coverage

blib/lib/DR/Tarantool/LLClient.pm
Criterion Covered Total %
statement 36 280 12.8
branch 0 94 0.0
condition 0 49 0.0
subroutine 12 49 24.4
pod 11 14 78.5
total 59 486 12.1


line stmt bran cond sub pod time code
1 5     5   111963 use utf8;
  5         10  
  5         22  
2 5     5   126 use strict;
  5         6  
  5         119  
3 5     5   15 use warnings;
  5         5  
  5         231  
4              
5             =head1 NAME
6              
7             DR::Tarantool::LLClient - a low level async client
8             for L
9              
10             =head1 SYNOPSIS
11              
12             DR::Tarantool::LLClient->connect(
13             host => '127.0.0.1',
14             port => '33033',
15             cb => {
16             my ($tnt) = @_;
17             ...
18             }
19             );
20              
21             $tnt->ping( sub { .. } );
22             $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
23             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
24             $tnt->update(0, [ 1 ], [ [ 1 => add pack 'L<', 1 ] ], sub { ... });
25             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });
26              
27              
28             =head1 DESCRIPTION
29              
30             This module provides a low-level interface to
31             L.
32              
33             =head1 METHODS
34              
35             All methods receive B as the last argument. The callback receives
36             B value with the following fields:
37              
38             =over
39              
40             =item status
41              
42             Done status:
43              
44             =over
45              
46             =item fatal
47              
48             A fatal error occurred. The server closed the connection or returned a
49             broken package.
50              
51             =item buffer
52              
53             An internal driver error.
54              
55             =item error
56              
57             The request wasn't executed: the server returned an error.
58              
59             =item ok
60              
61             Request was executed OK.
62              
63             =back
64              
65             =item errstr
66              
67             If an error occurred, contains error description.
68              
69             =item code
70              
71             Contains reply code.
72              
73             =item req_id
74              
75             Contains request id.
76             (see
77             L)
78              
79             =item type
80              
81             Contains request type
82             (see
83             L)
84              
85             =item count
86              
87             Contains the count of returned tuples.
88              
89             =item tuples
90              
91             Returned tuples (B of B).
92              
93             =back
94              
95             If you use B or B field types, values
96             for these fields need to be packed before they are sent to the
97             server, and unpacked when received in a response.
98             This is a low-level driver :)
99              
100             =cut
101              
102              
103             package DR::Tarantool::LLClient;
104 5     5   15 use base qw(DR::Tarantool::AEConnection);
  5         6  
  5         1522  
105 5     5   22 use AnyEvent;
  5         6  
  5         77  
106 5     5   15 use AnyEvent::Socket;
  5         5  
  5         398  
107 5     5   19 use Carp;
  5         5  
  5         229  
108 5     5   2088 use Devel::GlobalDestruction;
  5         8224  
  5         22  
109 5     5   2511 use File::Spec::Functions 'catfile';
  5         3052  
  5         357  
110             $Carp::Internal{ (__PACKAGE__) }++;
111              
112 5     5   26 use Scalar::Util 'weaken';
  5         5  
  5         349  
113             require DR::Tarantool;
114 5     5   2945 use Data::Dumper;
  5         31364  
  5         295  
115 5     5   2488 use Time::HiRes ();
  5         5979  
  5         12695  
116              
117             my $LE = $] > 5.01 ? '<' : '';
118              
119              
120             =head2 connect
121              
122             Creates a connection to L
123              
124             DR::Tarantool::LLClient->connect(
125             host => '127.0.0.1',
126             port => '33033',
127             cb => {
128             my ($tnt) = @_;
129             ...
130             }
131             );
132              
133             =head3 Arguments
134              
135             =over
136              
137             =item host & port
138              
139             Host and port to connect to.
140              
141             =item reconnect_period
142              
143             An interval to wait before trying to reconnect after a fatal error or
144             unsuccessful connect. If the field is defined and is greater than 0, the
145             driver tries to reconnect to the server after this interval.
146              
147             B: the driver does not reconnect after B
148             unsuccessful connection. It calls B instead.
149              
150             =item reconnect_always
151              
152             Try to reconnect even after the first unsuccessful connection.
153              
154             =item cb
155              
156             Done callback. The callback receives a connection handle
157             connected to the server or an error string.
158              
159             =back
160              
161             =cut
162              
163             sub connect {
164 0     0 1   my $class = shift;
165              
166 0           my (%opts, $cb);
167              
168 0 0         if (@_ % 2) {
169 0           $cb = pop;
170 0           %opts = @_;
171             } else {
172 0           %opts = @_;
173 0           $cb = delete $opts{cb};
174             }
175              
176 0   0 0     $cb ||= sub { };
  0            
177              
178 0           $class->_check_cb( $cb );
179              
180 0 0         return $class->SUPER::connect if ref $class;
181              
182              
183 0   0       my $host = $opts{host} || 'localhost';
184 0 0         my $port = $opts{port} or croak "port is undefined";
185              
186 0   0       my $reconnect_period = $opts{reconnect_period} || 0;
187 0   0       my $reconnect_always = $opts{reconnect_always} || 0;
188              
189 0           my $self = $class->SUPER::new(
190             host => $host,
191             port => $port,
192             reconnect_period => $reconnect_period,
193             reconnect_always => $reconnect_always,
194             );
195              
196             $self->on(connected => sub {
197 0     0     my ($self) = @_;
198 0           $self->on(connected => $self->on_connected);
199 0           $self->on_connected->($self);
200 0           $cb->($self);
201 0           });
202              
203             $self->on(connfail => sub {
204 0     0     my ($self) = @_;
205 0           $self->on(connfail => undef);
206 0 0         unless($self->reconnect_always) {
207 0           $self->on(connected => undef);
208 0           $cb->($self->error);
209             }
210 0           });
211              
212             $self->on(error => sub {
213 0     0     my ($self) = @_;
214 0           $self->_fatal_error($self->error);
215 0           });
216              
217 0           $self->SUPER::connect;
218              
219 0 0         unless (defined wantarray) {
220 0           my $cbb = $cb;
221             $cb = sub {
222 0     0     &$cbb;
223 0           undef $self;
224 0           };
225 0           return;
226             }
227              
228 0           return $self;
229             }
230              
231 0     0     sub _reconnected {
232             }
233              
234              
235             sub on_connected {
236             sub {
237 0     0     my ($self) = @_;
238 0           $self->_reconnected;
239 0           $self->{guard}{read} = AE::io $self->fh, 0, $self->on_read;
240             }
241 0     0 0   }
242              
243              
244             sub disconnect {
245 0     0 0   my ($self, $cb) = @_;
246 0   0 0     $cb ||= sub { };
  0            
247 0           $self->_check_cb( $cb );
248              
249 0           $self->SUPER::disconnect;
250 0           $cb->( 'ok' );
251             }
252              
253             sub DESTROY {
254 0 0   0     return if in_global_destruction;
255 0           my ($self) = @_;
256 0           $self->disconnect;
257             }
258              
259             =head2 is_connected
260              
261             B if this connection is established.
262              
263             =cut
264              
265             sub is_connected {
266 0     0 1   my ($self) = @_;
267 0           $self->state eq 'connected';
268             }
269              
270             =head2 connection_status
271              
272             Contains a string with the status of connection. Return value can be:
273              
274             =over
275              
276             =item ok
277              
278             Connection is established.
279              
280             =item not_connected
281              
282             Connection isn't established yet, or was lost.
283              
284             =item connecting
285              
286             The driver is connecting to the server.
287              
288             =item fatal
289              
290             An attempt to connect was made, but ended up with an error.
291             If the event loop is running, and B option
292             is set, the driver continues to try to reconnect and update its status.
293              
294             =back
295              
296             =cut
297              
298             sub connection_status {
299 0     0 1   my ($self) = @_;
300 0 0         return 'ok' if $self->state eq 'connected';
301 0 0         return 'connecting' if $self->state eq 'connecting';
302 0 0         return 'fatal' if $self->state eq 'error';
303 0           return 'not_connected';
304             }
305              
306              
307             =head2 ping
308              
309             Ping the server.
310              
311             $tnt->ping( sub { .. } );
312              
313             =head3 Arguments
314              
315             =over
316              
317             =item a callback
318              
319             =back
320              
321             =cut
322              
323             sub ping :method {
324 0     0 1   my ($self, $cb) = @_;
325 0           my $id = $self->_req_id;
326 0           $self->_check_cb( $cb );
327 0           my $pkt = DR::Tarantool::_pkt_ping( $id );
328              
329 0 0         if ($self->is_connected) {
330 0           $self->_request( $id, $pkt, $cb );
331 0           return;
332             }
333            
334 0 0         unless($self->reconnect_period) {
335 0           $cb->({
336             status => 'fatal',
337             req_id => $id,
338             errstr => "Connection isn't established (yet)"
339             }
340             );
341 0           return;
342             }
343              
344 0           my $this = $self;
345 0           weaken $this;
346              
347 0           my $tmr;
348             $tmr = AE::timer $self->reconnect_period, 0, sub {
349 0     0     undef $tmr;
350 0 0 0       if ($this and $this->is_connected) {
351 0           $this->_request( $id, $pkt, $cb );
352 0           return;
353             }
354 0           $cb->({
355             status => 'fatal',
356             req_id => $id,
357             errstr => "Connection isn't established (yet)"
358             }
359             );
360 0           };
361             }
362              
363              
364             =head2 insert
365              
366             Insert a tuple.
367              
368             $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
369             $tnt->insert(0, [ 4, 5, 6 ], $flags, sub { .. });
370              
371             =head3 Arguments
372              
373             =over
374              
375             =item space
376              
377             =item tuple
378              
379             =item flags (optional)
380              
381             =item callback
382              
383             =back
384              
385             =cut
386              
387             sub insert :method {
388              
389 0     0 1   my $self = shift;
390 0           $self->_check_number( my $space = shift );
391 0           $self->_check_tuple( my $tuple = shift );
392 0           $self->_check_cb( my $cb = pop );
393 0   0       $self->_check_number( my $flags = pop || 0 );
394 0 0         croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
395 0   0       $flags ||= 0;
396              
397 0           my $id = $self->_req_id;
398 0           my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
399 0           $self->_request( $id, $pkt, $cb );
400 0           return;
401             }
402              
403             =head2 select
404              
405             Select a tuple or tuples.
406              
407             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
408             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, sub { ... });
409             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, 2, sub { ... });
410              
411             =head3 Arguments
412              
413             =over
414              
415             =item space
416              
417             =item index
418              
419             =item tuple_keys
420              
421             =item limit (optional)
422              
423             If the limit isn't set or is zero, select extracts all records without
424             a limit.
425              
426             =item offset (optional)
427              
428             Default value is B<0>.
429              
430             =item callback for results
431              
432             =back
433              
434             =cut
435              
436             sub select :method {
437              
438 0     0 1   my $self = shift;
439 0           $self->_check_number( my $ns = shift );
440 0           $self->_check_number( my $idx = shift );
441 0           $self->_check_tuple_list( my $keys = shift );
442 0           $self->_check_cb( my $cb = pop );
443 0   0       $self->_check_number( my $limit = shift || 0x7FFFFFFF );
444 0   0       $self->_check_number( my $offset = shift || 0 );
445              
446 0           my $id = $self->_req_id;
447 0           my $pkt =
448             DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
449 0           $self->_request( $id, $pkt, $cb );
450 0           return;
451             }
452              
453             =head2 update
454              
455             Update a tuple.
456              
457             $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], sub { ... });
458             $tnt->update(
459             0, # space
460             [ 1 ], # key
461             [ [ 1 => add 1 ], [ 2 => add => 1 ], # operations
462             $flags, # flags
463             sub { ... } # callback
464             );
465             $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], $flags, sub { ... });
466              
467             =head3 Arguments
468              
469             =over
470              
471             =item space
472              
473             =item tuple_key
474              
475             =item operations list
476              
477             =item flags (optional)
478              
479             =item callback for results
480              
481             =back
482              
483             =cut
484              
485             sub update :method {
486              
487 0     0 1   my $self = shift;
488 0           $self->_check_number( my $ns = shift );
489 0           $self->_check_tuple( my $key = shift );
490 0           $self->_check_operations( my $operations = shift );
491 0           $self->_check_cb( my $cb = pop );
492 0   0       $self->_check_number( my $flags = pop || 0 );
493              
494 0           my $id = $self->_req_id;
495 0           my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
496 0           $self->_request( $id, $pkt, $cb );
497 0           return;
498              
499             }
500              
501             =head2 delete
502              
503             Delete a tuple.
504              
505             $tnt->delete( 0, [ 1 ], sub { ... });
506             $tnt->delete( 0, [ 1 ], $flags, sub { ... });
507              
508             =head3 Arguments
509              
510             =over
511              
512             =item space
513              
514             =item tuple_key
515              
516             =item flags (optional)
517              
518             =item callback for results
519              
520             =back
521              
522             =cut
523              
524             sub delete :method {
525 0     0 1   my $self = shift;
526 0           my $ns = shift;
527 0           my $key = shift;
528 0           $self->_check_tuple( $key );
529 0           my $cb = pop;
530 0           $self->_check_cb( $cb );
531 0   0       my $flags = pop || 0;
532              
533 0           my $id = $self->_req_id;
534 0           my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
535 0           $self->_request( $id, $pkt, $cb );
536 0           return;
537             }
538              
539              
540             =head2 call_lua
541              
542             Calls a lua procedure.
543              
544             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });
545             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], $flags, sub { ... });
546              
547             =head3 Arguments
548              
549             =over
550              
551             =item name of the procedure
552              
553             =item tuple_key
554              
555             =item flags (optional)
556              
557             =item callback to call when the request is ready
558              
559             =back
560              
561             =cut
562              
563             sub call_lua :method {
564              
565 0     0 1   my $self = shift;
566 0           my $proc = shift;
567 0           my $tuple = shift;
568 0           $self->_check_tuple( $tuple );
569 0           my $cb = pop;
570 0           $self->_check_cb( $cb );
571 0   0       my $flags = pop || 0;
572              
573 0           my $id = $self->_req_id;
574 0           my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
575 0           $self->_request( $id, $pkt, $cb );
576 0           return;
577             }
578              
579              
580             =head2 last_code
581              
582             Return code of the last request or B if there was no
583             request.
584              
585             =cut
586              
587             sub last_code {
588 0     0 1   my ($self) = @_;
589 0 0         return $self->{last_code} if exists $self->{last_code};
590 0           return undef;
591             }
592              
593              
594             =head2 last_error_string
595              
596             An error string if the last request ended up with an
597             error, or B otherwise.
598              
599             =cut
600              
601             sub last_error_string {
602 0     0 1   my ($self) = @_;
603 0 0         return $self->{last_error_string} if exists $self->{last_error_string};
604 0           return undef;
605             }
606              
607             =head1 Logging
608              
609             The module can log requests/responses. Logging can be turned ON by
610             setting these environment variables:
611              
612             =over
613              
614             =item TNT_LOG_DIR
615              
616             Instructs LLClient to record all requests/responses into this directory.
617              
618             =item TNT_LOG_ERRDIR
619              
620             Instructs LLClient to record all requests/responses which
621             ended up with an error into this directory.
622              
623             =back
624              
625             =cut
626              
627              
628             sub _log_transaction {
629 0     0     my ($self, $id, $pkt, $response, $res_pkt) = @_;
630              
631 0           my $logdir = $ENV{TNT_LOG_DIR};
632 0 0         goto DOLOG if $logdir;
633 0           $logdir = $ENV{TNT_LOG_ERRDIR};
634 0 0 0       goto DOLOG if $logdir and $response->{status} ne 'ok';
635 0           return;
636              
637             DOLOG:
638 0           eval {
639 0 0         die "Directory $logdir was not found, transaction wasn't logged\n"
640             unless -d $logdir;
641              
642 0           my $now = Time::HiRes::time;
643              
644 0           my $logdirname = catfile $logdir,
645             sprintf '%s-%s', $now, $response->{status};
646              
647 0 0 0       die "Object $logdirname is already exists, transaction wasn't logged\n"
648             if -e $logdirname or -d $logdirname;
649            
650 0 0         die $! unless mkdir $logdirname;
651            
652 0           my $rrname = catfile $logdirname,
653             sprintf 'rawrequest-%04d.bin', $id;
654 0 0         open my $fh, '>:raw', $rrname or die "Can't open $rrname: $!\n";
655 0           print $fh $pkt;
656 0           close $fh;
657              
658 0           my $respname = catfile $logdirname,
659             sprintf 'dumpresponse-%04d.txt', $id;
660              
661 0 0         open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
662            
663 0           local $Data::Dumper::Indent = 1;
664 0           local $Data::Dumper::Terse = 1;
665 0           local $Data::Dumper::Useqq = 1;
666 0           local $Data::Dumper::Deepcopy = 1;
667 0           local $Data::Dumper::Maxdepth = 0;
668 0           print $fh Dumper($response);
669 0           close $fh;
670              
671 0 0         if (defined $res_pkt) {
672 0           $respname = catfile $logdirname,
673             sprintf 'rawresponse-%04d.bin', $id;
674 0 0         open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
675 0           print $fh $res_pkt;
676 0           close $fh;
677             }
678             };
679 0 0         warn $@ if $@;
680             }
681              
682              
683             sub _request {
684 0     0     my ($self, $id, $pkt, $cb ) = @_;
685             # Scalar::Util::weaken $self;
686            
687 0           my $cbres = $cb;
688 0     0     $cbres = sub { $self->_log_transaction($id, $pkt, @_); &$cb }
  0            
689 0 0 0       if $ENV{TNT_LOG_ERRDIR} or $ENV{TNT_LOG_DIR};
690              
691 0           $self->{ wait }{ $id } = $cbres;
692              
693 0           $self->push_write($pkt);
694             }
695              
696             sub _req_id {
697 0     0     my ($self) = @_;
698 0   0       for (my $id = $self->{req_id} || 0;; $id++) {
699 0 0         $id = 0 unless $id < 0x7FFF_FFFF;
700 0 0         next if exists $self->{wait}{$id};
701 0           $self->{req_id} = $id + 1;
702 0           return $id;
703             }
704             }
705              
706             sub _fatal_error {
707 0     0     my ($self, $msg, $raw) = @_;
708              
709 0   0       $self->{last_code} ||= -1;
710 0   0       $self->{last_error_string} ||= $msg;
711              
712 0           delete $self->{fh};
713 0           $self->{wbuf} = '';
714              
715 0           my $wait = delete $self->{wait};
716 0           $self->{wait} = {};
717 0           for (keys %$wait) {
718 0           my $cb = delete $wait->{$_};
719 0           $cb->({ status => 'fatal', errstr => $msg, req_id => $_ }, $raw);
720             }
721              
722 0 0         $self->set_error($msg) if $self->state ne 'error';
723             }
724              
725              
726             sub _check_rbuf {{
727 0     0     my ($self) = @_;
  0            
728 0 0         return unless length $self->{rbuf} >= 12;
729 0           my (undef, $blen) = unpack "L$LE L$LE", $self->{rbuf};
730 0 0         return unless length $self->{rbuf} >= 12 + $blen;
731            
732              
733 0           my $pkt = substr $self->{rbuf}, 0, 12 + $blen, '';
734              
735 0           my $res = DR::Tarantool::_pkt_parse_response( $pkt );
736              
737 0           $self->{last_code} = $res->{code};
738 0 0         if (exists $res->{errstr}) {
739 0           $self->{last_error_string} = $res->{errstr};
740             } else {
741 0           delete $self->{last_error_string};
742             }
743              
744 0 0         if ($res->{status} =~ /^(fatal|buffer)$/) {
745 0           $self->_fatal_error( $res->{errstr}, $pkt );
746 0           return;
747             }
748              
749 0           my $id = $res->{req_id};
750 0           my $cb = delete $self->{ wait }{ $id };
751 0 0         if ('CODE' eq ref $cb) {
752 0           $cb->( $res, $pkt );
753             } else {
754 0           warn "Unexpected reply from tarantool with id = $id";
755             }
756 0           redo;
757             }}
758              
759              
760             sub on_read {
761 0     0 0   my $self = shift;
762             sub {
763 0     0     my $rd = sysread $self->fh, my $buf, 4096;
764 0 0         unless(defined $rd) {
765 0 0         return if $!{EINTR};
766 0           $self->_fatal_error("Socket error: $!");
767 0           return;
768             }
769              
770 0 0         unless($rd) {
771 0           $self->_fatal_error("Socket error: Server closed connection");
772 0           return;
773             }
774 0           $self->{rbuf} .= $buf;
775 0           $self->_check_rbuf;
776             }
777             # write responses as binfile for tests
778             # {
779             # my ($type, $blen, $id, $code, $body) =
780             # unpack 'L< L< L< L< A*', $hdr . $data;
781              
782             # my $sname = sprintf 't/test-data/%05d-%03d-%s.bin',
783             # $type || 0, $code, $code ? 'fail' : 'ok';
784             # open my $fh, '>:raw', $sname;
785             # print $fh $hdr;
786             # print $fh $data;
787             # warn "$sname saved (body length: $blen)";
788             # }
789 0           }
790              
791             sub _check_cb {
792 0     0     my ($self, $cb) = @_;
793 0 0         croak 'Callback must be CODEREF' unless 'CODE' eq ref $cb;
794             }
795              
796             sub _check_tuple {
797 0     0     my ($self, $tuple) = @_;
798 0 0         croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
799             }
800              
801             sub _check_tuple_list {
802 0     0     my ($self, $list) = @_;
803 0 0         croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
804 0 0         croak 'Tuplelist is empty' unless @$list;
805 0           $self->_check_tuple($_) for @$list;
806             }
807              
808             sub _check_number {
809 0     0     my ($self, $number) = @_;
810 0 0 0       croak "argument must be number"
811             unless defined $number and $number =~ /^\d+$/;
812             }
813              
814              
815             sub _check_operation {
816 0     0     my ($self, $op) = @_;
817 0 0         croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
818 0 0         croak 'Wrong update operation: too short arglist' unless @$op >= 2;
819 0 0 0       croak "Wrong operation: $op->[1]"
820             unless $op->[1] and
821             $op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
822 0           $self->_check_number($op->[0]);
823             }
824              
825             sub _check_operations {
826 0     0     my ($self, $list) = @_;
827 0 0         croak 'Operations list must be ARRAYREF of ARRAYREF'
828             unless 'ARRAY' eq ref $list;
829 0 0         croak 'Operations list is empty' unless @$list;
830 0           $self->_check_operation( $_ ) for @$list;
831             }
832              
833             =head1 COPYRIGHT AND LICENSE
834              
835             Copyright (C) 2011 Dmitry E. Oboukhov
836             Copyright (C) 2011 Roman V. Nikolaev
837              
838             This program is free software, you can redistribute it and/or
839             modify it under the terms of the Artistic License.
840              
841             =head1 VCS
842              
843             The project is placed git repo on github:
844             L.
845              
846             =cut
847              
848             1;