File Coverage

blib/lib/DR/Tarantool/LLClient.pm
Criterion Covered Total %
statement 33 305 10.8
branch 0 118 0.0
condition 0 60 0.0
subroutine 11 50 22.0
pod 11 14 78.5
total 55 547 10.0


line stmt bran cond sub pod time code
1 9     9   414392 use utf8;
  9         20  
  9         66  
2 9     9   235 use strict;
  9         18  
  9         278  
3 9     9   47 use warnings;
  9         17  
  9         528  
4              
5             =head1 NAME
6              
7             DR::Tarantool::LLClient - a low level async client
8             for L<Tarantool|http://tarantool.org>
9              
10             =head1 SYNOPSIS
11              
12             DR::Tarantool::LLClient->connect(
13             host => '127.0.0.1',
14             port => '33033',
15             cb => {
16             my ($tnt) = @_;
17             ...
18             }
19             );
20              
21             $tnt->ping( sub { .. } );
22             $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
23             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
24             $tnt->update(0, [ 1 ], [ [ 1 => add pack 'L<', 1 ] ], sub { ... });
25             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });
26              
27              
28             =head1 DESCRIPTION
29              
30             This module provides a low-level interface to
31             L<Tarantool|http://tarantool.org>.
32              
33             =head1 METHODS
34              
35             All methods receive B<callback> as the last argument. The callback receives
36             B<HASHREF> value with the following fields:
37              
38             =over
39              
40             =item status
41              
42             Done status:
43              
44             =over
45              
46             =item fatal
47              
48             A fatal error occurred. The server closed the connection or returned a
49             broken package.
50              
51             =item buffer
52              
53             An internal driver error.
54              
55             =item error
56              
57             The request wasn't executed: the server returned an error.
58              
59             =item ok
60              
61             Request was executed OK.
62              
63             =back
64              
65             =item errstr
66              
67             If an error occurred, contains error description.
68              
69             =item code
70              
71             Contains reply code.
72              
73             =item req_id
74              
75             Contains request id.
76             (see
77             L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)
78              
79             =item type
80              
81             Contains request type
82             (see
83             L<protocol documentation|https://github.com/mailru/tarantool/blob/master/doc/box-protocol.txt>)
84              
85             =item count
86              
87             Contains the count of returned tuples.
88              
89             =item tuples
90              
91             Returned tuples (B<ARRAYREF> of B<ARRAYREF>).
92              
93             =back
94              
95             If you use B<NUM> or B<NUM64> field types, values
96             for these fields need to be packed before they are sent to the
97             server, and unpacked when received in a response.
98             This is a low-level driver :)
99              
100             =cut
101              
102              
103             package DR::Tarantool::LLClient;
104 9     9   15213 use AnyEvent;
  9         97132  
  9         327  
105 9     9   11947 use AnyEvent::Socket;
  9         384517  
  9         1425  
106 9     9   129 use Carp;
  9         20  
  9         1199  
107 9     9   8932 use Devel::GlobalDestruction;
  9         24893  
  9         64  
108 9     9   10732 use File::Spec::Functions 'catfile';
  9         7664  
  9         984  
109             $Carp::Internal{ (__PACKAGE__) }++;
110              
111 9     9   57 use Scalar::Util 'weaken';
  9         17  
  9         784  
112             require DR::Tarantool;
113 9     9   10058 use Data::Dumper;
  9         101575  
  9         761  
114 9     9   15401 use Time::HiRes ();
  9         22853  
  9         41284  
115              
116             my $LE = $] > 5.01 ? '<' : '';
117              
118              
119             =head2 connect
120              
121             Creates a connection to L<Tarantool| http://tarantool.org>
122              
123             DR::Tarantool::LLClient->connect(
124             host => '127.0.0.1',
125             port => '33033',
126             cb => {
127             my ($tnt) = @_;
128             ...
129             }
130             );
131              
132             =head3 Arguments
133              
134             =over
135              
136             =item host & port
137              
138             Host and port to connect to.
139              
140             =item reconnect_period
141              
142             An interval to wait before trying to reconnect after a fatal error or
143             unsuccessful connect. If the field is defined and is greater than 0, the
144             driver tries to reconnect to the server after this interval.
145              
146             B<Important>: the driver does not reconnect after B<the first>
147             unsuccessful connection. It calls B<callback> instead.
148              
149             =item reconnect_always
150              
151             Try to reconnect even after the first unsuccessful connection.
152              
153             =item cb
154              
155             Done callback. The callback receives a connection handle
156             connected to the server or an error string.
157              
158             =back
159              
160             =cut
161              
162             sub connect {
163 0     0 1   my $class = shift;
164              
165 0           my (%opts, $cb);
166              
167 0 0         if (@_ % 2) {
168 0           $cb = pop;
169 0           %opts = @_;
170             } else {
171 0           %opts = @_;
172 0           $cb = delete $opts{cb};
173             }
174              
175 0   0 0     $class->_check_cb( $cb || sub { });
  0            
176              
177 0   0       my $host = $opts{host} || 'localhost';
178 0 0         my $port = $opts{port} or croak "port is undefined";
179              
180 0   0       my $reconnect_period = $opts{reconnect_period} || 0;
181 0   0       my $reconnect_always = $opts{reconnect_always} || 0;
182              
183 0   0       my $self = bless {
184             host => $host,
185             port => $port,
186             reconnect_period => $reconnect_period,
187             reconnect_always => $reconnect_always,
188             first_connect => 1,
189             connection_status => 'not_connected',
190             wbuf => '',
191             rbuf => '',
192             } => ref($class) || $class;
193              
194 0           $self->_connect_reconnect( $cb );
195              
196 0           return $self;
197             }
198              
199             sub disconnect {
200 0     0 0   my ($self, $cb) = @_;
201 0   0 0     $cb ||= sub { };
  0            
202 0           $self->_check_cb( $cb );
203              
204 0           delete $self->{reconnect_timer};
205 0           delete $self->{connecting};
206 0 0         if ($self->is_connected) {
207 0           delete $self->{rhandle};
208 0           delete $self->{whandle};
209 0           delete $self->{fh};
210             }
211 0           $cb->( 'ok' );
212             }
213              
214             sub DESTROY {
215 0 0   0     return if in_global_destruction;
216 0           my ($self) = @_;
217 0 0         if ($self->is_connected) {
218 0           delete $self->{rhandle};
219 0           delete $self->{whandle};
220 0           delete $self->{fh};
221             }
222             }
223              
224             =head2 is_connected
225              
226             B<True> if this connection is established.
227              
228             =cut
229              
230             sub is_connected {
231 0     0 1   my ($self) = @_;
232 0 0         return $self->fh ? 1 : 0;
233             }
234              
235             =head2 connection_status
236              
237             Contains a string with the status of connection. Return value can be:
238              
239             =over
240              
241             =item ok
242              
243             Connection is established.
244              
245             =item not_connected
246              
247             Connection isn't established yet, or was lost.
248              
249             =item connecting
250              
251             The driver is connecting to the server.
252              
253             =item fatal
254              
255             An attempt to connect was made, but ended up with an error.
256             If the event loop is running, and B<reconnect_period> option
257             is set, the driver continues to try to reconnect and update its status.
258              
259             =back
260              
261             =cut
262              
263             sub connection_status {
264 0     0 1   my ($self) = @_;
265 0 0         $self->{connection_status} || 'unknown';
266             }
267              
268              
269             =head2 ping
270              
271             Ping the server.
272              
273             $tnt->ping( sub { .. } );
274              
275             =head3 Arguments
276              
277             =over
278              
279             =item a callback
280              
281             =back
282              
283             =cut
284              
285             sub ping :method {
286 0     0 1   my ($self, $cb) = @_;
287 0           my $id = $self->_req_id;
288 0           $self->_check_cb( $cb );
289 0           my $pkt = DR::Tarantool::_pkt_ping( $id );
290              
291 0 0         if ($self->is_connected) {
292 0           $self->_request( $id, $pkt, $cb );
293 0           return;
294             }
295            
296 0 0         unless($self->{reconnect_period}) {
297 0           $cb->({
298             status => 'fatal',
299             req_id => $id,
300             errstr => "Connection isn't established (yet)"
301             }
302             );
303 0           return;
304             }
305              
306 0           my $this = $self;
307 0           weaken $this;
308              
309 0           my $tmr;
310             $tmr = AE::timer $self->{reconnect_period}, 0, sub {
311 0     0     undef $tmr;
312 0 0 0       if ($this and $this->is_connected) {
313 0           $this->_request( $id, $pkt, $cb );
314 0           return;
315             }
316 0           $cb->({
317             status => 'fatal',
318             req_id => $id,
319             errstr => "Connection isn't established (yet)"
320             }
321             );
322 0           };
323             }
324              
325              
326             =head2 insert
327              
328             Insert a tuple.
329              
330             $tnt->insert(0, [ 1, 2, 3 ], sub { ... });
331             $tnt->insert(0, [ 4, 5, 6 ], $flags, sub { .. });
332              
333             =head3 Arguments
334              
335             =over
336              
337             =item space
338              
339             =item tuple
340              
341             =item flags (optional)
342              
343             =item callback
344              
345             =back
346              
347             =cut
348              
349             sub insert :method {
350              
351 0     0 1   my $self = shift;
352 0           $self->_check_number( my $space = shift );
353 0           $self->_check_tuple( my $tuple = shift );
354 0           $self->_check_cb( my $cb = pop );
355 0   0       $self->_check_number( my $flags = pop || 0 );
356 0 0         croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
357 0   0       $flags ||= 0;
358              
359 0           my $id = $self->_req_id;
360 0           my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
361 0           $self->_request( $id, $pkt, $cb );
362 0           return;
363             }
364              
365             =head2 select
366              
367             Select a tuple or tuples.
368              
369             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], sub { ... });
370             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, sub { ... });
371             $tnt->select(1, 0, [ [ 1, 2 ], [ 3, 4 ] ], 1, 2, sub { ... });
372              
373             =head3 Arguments
374              
375             =over
376              
377             =item space
378              
379             =item index
380              
381             =item tuple_keys
382              
383             =item limit (optional)
384              
385             If the limit isn't set or is zero, select extracts all records without
386             a limit.
387              
388             =item offset (optional)
389              
390             Default value is B<0>.
391              
392             =item callback for results
393              
394             =back
395              
396             =cut
397              
398             sub select :method {
399              
400 0     0 1   my $self = shift;
401 0           $self->_check_number( my $ns = shift );
402 0           $self->_check_number( my $idx = shift );
403 0           $self->_check_tuple_list( my $keys = shift );
404 0           $self->_check_cb( my $cb = pop );
405 0   0       $self->_check_number( my $limit = shift || 0x7FFFFFFF );
406 0   0       $self->_check_number( my $offset = shift || 0 );
407              
408 0           my $id = $self->_req_id;
409 0           my $pkt =
410             DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
411 0           $self->_request( $id, $pkt, $cb );
412 0           return;
413             }
414              
415             =head2 update
416              
417             Update a tuple.
418              
419             $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], sub { ... });
420             $tnt->update(
421             0, # space
422             [ 1 ], # key
423             [ [ 1 => add 1 ], [ 2 => add => 1 ], # operations
424             $flags, # flags
425             sub { ... } # callback
426             );
427             $tnt->update(0, [ 1 ], [ [ 1 => add 1 ] ], $flags, sub { ... });
428              
429             =head3 Arguments
430              
431             =over
432              
433             =item space
434              
435             =item tuple_key
436              
437             =item operations list
438              
439             =item flags (optional)
440              
441             =item callback for results
442              
443             =back
444              
445             =cut
446              
447             sub update :method {
448              
449 0     0 1   my $self = shift;
450 0           $self->_check_number( my $ns = shift );
451 0           $self->_check_tuple( my $key = shift );
452 0           $self->_check_operations( my $operations = shift );
453 0           $self->_check_cb( my $cb = pop );
454 0   0       $self->_check_number( my $flags = pop || 0 );
455              
456 0           my $id = $self->_req_id;
457 0           my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
458 0           $self->_request( $id, $pkt, $cb );
459 0           return;
460              
461             }
462              
463             =head2 delete
464              
465             Delete a tuple.
466              
467             $tnt->delete( 0, [ 1 ], sub { ... });
468             $tnt->delete( 0, [ 1 ], $flags, sub { ... });
469              
470             =head3 Arguments
471              
472             =over
473              
474             =item space
475              
476             =item tuple_key
477              
478             =item flags (optional)
479              
480             =item callback for results
481              
482             =back
483              
484             =cut
485              
486             sub delete :method {
487 0     0 1   my $self = shift;
488 0           my $ns = shift;
489 0           my $key = shift;
490 0           $self->_check_tuple( $key );
491 0           my $cb = pop;
492 0           $self->_check_cb( $cb );
493 0   0       my $flags = pop || 0;
494              
495 0           my $id = $self->_req_id;
496 0           my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
497 0           $self->_request( $id, $pkt, $cb );
498 0           return;
499             }
500              
501              
502             =head2 call_lua
503              
504             Calls a lua procedure.
505              
506             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], sub { ... });
507             $tnt->call_lua( 'box.select', [ 0, 1, 2 ], $flags, sub { ... });
508              
509             =head3 Arguments
510              
511             =over
512              
513             =item name of the procedure
514              
515             =item tuple_key
516              
517             =item flags (optional)
518              
519             =item callback to call when the request is ready
520              
521             =back
522              
523             =cut
524              
525             sub call_lua :method {
526              
527 0     0 1   my $self = shift;
528 0           my $proc = shift;
529 0           my $tuple = shift;
530 0           $self->_check_tuple( $tuple );
531 0           my $cb = pop;
532 0           $self->_check_cb( $cb );
533 0   0       my $flags = pop || 0;
534              
535 0           my $id = $self->_req_id;
536 0           my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
537 0           $self->_request( $id, $pkt, $cb );
538 0           return;
539             }
540              
541              
542             =head2 last_code
543              
544             Return code of the last request or B<undef> if there was no
545             request.
546              
547             =cut
548              
549             sub last_code {
550 0     0 1   my ($self) = @_;
551 0 0         return $self->{last_code} if exists $self->{last_code};
552 0           return undef;
553             }
554              
555              
556             =head2 last_error_string
557              
558             An error string if the last request ended up with an
559             error, or B<undef> otherwise.
560              
561             =cut
562              
563             sub last_error_string {
564 0     0 1   my ($self) = @_;
565 0 0         return $self->{last_error_string} if exists $self->{last_error_string};
566 0           return undef;
567             }
568              
569             =head1 Logging
570              
571             The module can log requests/responses. Logging can be turned ON by
572             setting these environment variables:
573              
574             =over
575              
576             =item TNT_LOG_DIR
577              
578             Instructs LLClient to record all requests/responses into this directory.
579              
580             =item TNT_LOG_ERRDIR
581              
582             Instructs LLClient to record all requests/responses which
583             ended up with an error into this directory.
584              
585             =back
586              
587             =cut
588              
589              
590             sub _log_transaction {
591 0     0     my ($self, $id, $pkt, $response, $res_pkt) = @_;
592              
593 0           my $logdir = $ENV{TNT_LOG_DIR};
594 0 0         goto DOLOG if $logdir;
595 0           $logdir = $ENV{TNT_LOG_ERRDIR};
596 0 0 0       goto DOLOG if $logdir and $response->{status} ne 'ok';
597 0           return;
598              
599             DOLOG:
600 0           eval {
601 0 0         die "Directory $logdir was not found, transaction wasn't logged\n"
602             unless -d $logdir;
603              
604 0           my $now = Time::HiRes::time;
605              
606 0           my $logdirname = catfile $logdir,
607             sprintf '%s-%s', $now, $response->{status};
608              
609 0 0 0       die "Object $logdirname is already exists, transaction wasn't logged\n"
610             if -e $logdirname or -d $logdirname;
611            
612 0 0         die $! unless mkdir $logdirname;
613            
614 0           my $rrname = catfile $logdirname,
615             sprintf 'rawrequest-%04d.bin', $id;
616 0 0         open my $fh, '>:raw', $rrname or die "Can't open $rrname: $!\n";
617 0           print $fh $pkt;
618 0           close $fh;
619              
620 0           my $respname = catfile $logdirname,
621             sprintf 'dumpresponse-%04d.txt', $id;
622              
623 0 0         open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
624            
625 0           local $Data::Dumper::Indent = 1;
626 0           local $Data::Dumper::Terse = 1;
627 0           local $Data::Dumper::Useqq = 1;
628 0           local $Data::Dumper::Deepcopy = 1;
629 0           local $Data::Dumper::Maxdepth = 0;
630 0           print $fh Dumper($response);
631 0           close $fh;
632              
633 0 0         if (defined $res_pkt) {
634 0           $respname = catfile $logdirname,
635             sprintf 'rawresponse-%04d.bin', $id;
636 0 0         open $fh, '>:raw', $respname or die "Can't open $respname: $!\n";
637 0           print $fh $res_pkt;
638 0           close $fh;
639             }
640             };
641 0 0         warn $@ if $@;
642             }
643              
644              
645             sub _request {
646 0     0     my ($self, $id, $pkt, $cb ) = @_;
647            
648 0           my $cbres = $cb;
649 0     0     $cbres = sub { $self->_log_transaction($id, $pkt, @_); &$cb }
  0            
650 0 0 0       if $ENV{TNT_LOG_ERRDIR} or $ENV{TNT_LOG_DIR};
651              
652 0           $self->{ wait }{ $id } = $cbres;
653             # TODO: use watcher
654 0           $self->{wbuf} .= $pkt;
655              
656 0 0         if ($self->fh) {
657 0 0         return if $self->{whandle};
658 0           $self->{whandle} = AE::io $self->fh, 1, $self->_on_write;
659             }
660             }
661              
662             sub _on_write {
663 0     0     my ($self) = @_;
664             sub {
665 0     0     my $wb = syswrite $self->fh, $self->{wbuf};
666 0 0         unless(defined $wb) {
667 0           $self->_socket_error->($self->fh, 1, $!);
668 0           return;
669             }
670 0 0         return unless $wb;
671 0           substr $self->{wbuf}, 0, $wb, '';
672 0 0         delete $self->{whandle} unless length $self->{wbuf};
673             }
674 0           }
675             sub _req_id {
676 0     0     my ($self) = @_;
677 0   0       for (my $id = $self->{req_id} || 0;; $id++) {
678 0 0         $id = 0 unless $id < 0x7FFF_FFFF;
679 0 0         next if exists $self->{wait}{$id};
680 0           $self->{req_id} = $id + 1;
681 0           return $id;
682             }
683             }
684              
685             sub _fatal_error {
686 0     0     my ($self, $msg, $raw) = @_;
687 0   0       $self->{last_code} ||= -1;
688 0   0       $self->{last_error_string} ||= $msg;
689              
690 0           delete $self->{rhandle};
691 0           delete $self->{whandle};
692 0           delete $self->{fh};
693 0           $self->{wbuf} = '';
694 0           $self->{connection_status} = 'not_connected',
695              
696             my $wait = delete $self->{wait};
697 0           $self->{wait} = {};
698 0           for (keys %$wait) {
699 0           my $cb = delete $wait->{$_};
700 0           $cb->({ status => 'fatal', errstr => $msg, req_id => $_ }, $raw);
701             }
702              
703 0           $self->_connect_reconnect;
704             }
705              
706             sub _connect_reconnect {
707              
708 0     0     my ($self, $cb) = @_;
709              
710 0 0         $self->_check_cb( $cb ) if $cb;
711 0 0         return if $self->{reconnect_timer};
712 0 0         return if $self->{connecting};
713 0 0 0       return unless $self->{first_connect} or $self->{reconnect_period};
714              
715             $self->{reconnect_timer} = AE::timer
716             $self->{first_connect} ? 0 : $self->{reconnect_period},
717             $self->{reconnect_period} || 0,
718             sub {
719 0 0   0     return if $self->{connecting};
720 0           $self->{connecting} = 1;
721              
722 0           $self->{connection_status} = 'connecting';
723              
724             tcp_connect $self->{host}, $self->{port}, sub {
725 0           my ($fh) = @_;
726 0           delete $self->{connecting};
727 0 0         if ($fh) {
728 0           $self->{fh} = $fh;
729 0           $self->{rbuf} = '';
730 0           $self->{rhandle} = AE::io $self->fh, 0, $self->on_read;
731 0 0         $self->{whandle} = AE::io $self->fh, 1, $self->_on_write
732             if length $self->{wbuf};
733              
734              
735 0           delete $self->{reconnect_timer};
736 0           delete $self->{first_connect};
737 0           $self->{connection_status} = 'ok';
738              
739 0 0         $cb->( $self ) if $cb;
740 0           return;
741             }
742              
743 0           my $emsg = $!;
744 0 0 0       if ($self->{first_connect} and not $self->{reconnect_always}) {
745 0 0         $cb->( $! ) if $cb;
746 0           delete $self->{reconnect_timer};
747 0           delete $self->{connecting};
748             }
749 0           $self->{connection_status} = "Couldn't connect to server: $!";
750 0           };
751             }
752 0 0 0       ;
753             }
754              
755             sub _check_rbuf {{
756 0     0     my ($self) = @_;
  0            
757 0 0         return unless length $self->{rbuf} >= 12;
758 0           my (undef, $blen) = unpack "L$LE L$LE", $self->{rbuf};
759 0 0         return unless length $self->{rbuf} >= 12 + $blen;
760            
761              
762 0           my $pkt = substr $self->{rbuf}, 0, 12 + $blen, '';
763              
764 0           my $res = DR::Tarantool::_pkt_parse_response( $pkt );
765              
766 0           $self->{last_code} = $res->{code};
767 0 0         if (exists $res->{errstr}) {
768 0           $self->{last_error_string} = $res->{errstr};
769             } else {
770 0           delete $self->{last_error_string};
771             }
772              
773 0 0         if ($res->{status} =~ /^(fatal|buffer)$/) {
774 0           $self->_fatal_error( $res->{errstr}, $pkt );
775 0           return;
776             }
777              
778 0           my $id = $res->{req_id};
779 0           my $cb = delete $self->{ wait }{ $id };
780 0 0         if ('CODE' eq ref $cb) {
781 0           $cb->( $res, $pkt );
782             } else {
783 0           warn "Unexpected reply from tarantool with id = $id";
784             }
785 0           redo;
786             }}
787              
788              
789             sub on_read {
790 0     0 0   my ($self) = @_;
791             sub {
792 0     0     my $rd = sysread $self->fh, my $buf, 4096;
793 0 0         unless(defined $rd) {
794 0           $self->_socket_error->($self->fh, 1, $!);
795 0           return;
796             }
797              
798 0 0         unless($rd) {
799 0           $self->_socket_eof->($self->fh, 1);
800 0           return;
801             }
802 0           $self->{rbuf} .= $buf;
803 0           $self->_check_rbuf;
804             }
805             # write responses as binfile for tests
806             # {
807             # my ($type, $blen, $id, $code, $body) =
808             # unpack 'L< L< L< L< A*', $hdr . $data;
809              
810             # my $sname = sprintf 't/test-data/%05d-%03d-%s.bin',
811             # $type || 0, $code, $code ? 'fail' : 'ok';
812             # open my $fh, '>:raw', $sname;
813             # print $fh $hdr;
814             # print $fh $data;
815             # warn "$sname saved (body length: $blen)";
816             # }
817 0           }
818              
819              
820             sub fh {
821 0     0 0   my ($self) = @_;
822 0           return $self->{fh};
823             }
824              
825             sub _socket_error {
826 0     0     my ($self) = @_;
827             return sub {
828 0     0     my (undef, $fatal, $msg) = @_;
829 0           $self->_fatal_error("Socket error: $msg");
830             }
831 0           }
832              
833             sub _socket_eof {
834 0     0     my ($self) = @_;
835             return sub {
836 0     0     $self->_fatal_error("Socket error: Server closed connection");
837             }
838 0           }
839              
840              
841             sub _check_cb {
842 0     0     my ($self, $cb) = @_;
843 0 0         croak 'Callback must be CODEREF' unless 'CODE' eq ref $cb;
844             }
845              
846             sub _check_tuple {
847 0     0     my ($self, $tuple) = @_;
848 0 0         croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
849             }
850              
851             sub _check_tuple_list {
852 0     0     my ($self, $list) = @_;
853 0 0         croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
854 0 0         croak 'Tuplelist is empty' unless @$list;
855 0           $self->_check_tuple($_) for @$list;
856             }
857              
858             sub _check_number {
859 0     0     my ($self, $number) = @_;
860 0 0 0       croak "argument must be number"
861             unless defined $number and $number =~ /^\d+$/;
862             }
863              
864              
865             sub _check_operation {
866 0     0     my ($self, $op) = @_;
867 0 0         croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
868 0 0         croak 'Wrong update operation: too short arglist' unless @$op >= 2;
869 0 0 0       croak "Wrong operation: $op->[1]"
870             unless $op->[1] and
871             $op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
872 0           $self->_check_number($op->[0]);
873             }
874              
875             sub _check_operations {
876 0     0     my ($self, $list) = @_;
877 0 0         croak 'Operations list must be ARRAYREF of ARRAYREF'
878             unless 'ARRAY' eq ref $list;
879 0 0         croak 'Operations list is empty' unless @$list;
880 0           $self->_check_operation( $_ ) for @$list;
881             }
882              
883             =head1 COPYRIGHT AND LICENSE
884              
885             Copyright (C) 2011 Dmitry E. Oboukhov <unera@debian.org>
886             Copyright (C) 2011 Roman V. Nikolaev <rshadow@rambler.ru>
887              
888             This program is free software, you can redistribute it and/or
889             modify it under the terms of the Artistic License.
890              
891             =head1 VCS
892              
893             The project is placed git repo on github:
894             L<https://github.com/dr-co/dr-tarantool/>.
895              
896             =cut
897              
898             1;