File Coverage

blib/lib/DR/Tarantool/LLSyncClient.pm
Criterion Covered Total %
statement 27 168 16.0
branch 0 72 0.0
condition 0 37 0.0
subroutine 9 27 33.3
pod 0 10 0.0
total 36 314 11.4


line stmt bran cond sub pod time code
1 1     1   3 use utf8;
  1         1  
  1         4  
2 1     1   23 use strict;
  1         1  
  1         22  
3 1     1   4 use warnings;
  1         1  
  1         28  
4              
5             package DR::Tarantool::LLSyncClient;
6 1     1   3 use Carp;
  1         1  
  1         53  
7 1     1   7 use IO::Socket::UNIX;
  1         1  
  1         13  
8 1     1   469 use IO::Socket::INET;
  1         1  
  1         6  
9             require DR::Tarantool;
10              
11             my $LE = $] > 5.01 ? '<' : '';
12              
13             $Carp::Internal{ (__PACKAGE__) }++;
14              
15             sub connect {
16 0     0 0   my ($class, %opts) = @_;
17              
18 0   0       my $host = $opts{host} || 'localhost';
19 0 0         my $port = $opts{port} or croak 'port is undefined';
20              
21 0   0       my $reconnect_period = $opts{reconnect_period} || 0;
22 0   0       my $reconnect_always = $opts{reconnect_always} || 0;
23              
24              
25 0           my $raise_error = 1;
26 0 0         if (exists $opts{raise_error}) {
27 0 0         $raise_error = $opts{raise_error} ? 1 : 0;
28             }
29              
30 0   0       my $self = bless {
31             host => $host,
32             port => $port,
33             raise_error => $raise_error,
34             reconnect_period => $reconnect_period,
35             id => 0,
36             } => ref ($class) || $class;
37              
38 0 0         unless ($self->_connect()) {
39 0 0         unless ($reconnect_always) {
40 0 0         return undef unless $self->{raise_error};
41 0           croak "Can't connect to $self->{host}:$self->{port}: $@";
42             }
43 0 0         unless ($reconnect_period) {
44 0 0         return undef unless $self->{raise_error};
45 0           croak "Can't connect to $self->{host}:$self->{port}: $@";
46             }
47             }
48 0           return $self;
49             }
50              
51              
52             sub _connect {
53 0     0     my ($self) = @_;
54              
55 0 0 0       if ($self->{host} eq 'unix/' or $self->{port} =~ /\D/) {
56 0           return $self->{fh} = IO::Socket::UNIX->new(Peer => $self->{port});
57             } else {
58 0           return $self->{fh} = IO::Socket::INET->new(
59             PeerHost => $self->{host},
60             PeerPort => $self->{port},
61             Proto => 'tcp',
62             );
63             }
64             }
65              
66             sub _req_id {
67 0     0     my ($self) = @_;
68 0 0         return $self->{id}++ if $self->{id} < 0x7FFF_FFFE;
69 0           return $self->{id} = 0;
70             }
71              
72             sub _request {
73 0     0     my ($self, $id, $pkt ) = @_;
74 0           until($self->{fh}) {
75 0 0         unless ($self->{reconnect_period}) {
76 0           $self->{last_error_string} = "Connection isn't established";
77 0 0         croak $self->{last_error_string} if $self->{raise_error};
78 0           return undef;
79             }
80 0 0         next if $self->_connect;
81 0           sleep $self->{reconnect_period};
82             }
83              
84 0           my $len = length $pkt;
85              
86             # send request
87 0           while($len > 0) {
88 1     1   1141 no warnings; # closed socket
  1         1  
  1         94  
89 0           my $slen = syswrite $self->{fh}, $pkt;
90 0 0         unless(defined $slen) {
91 0 0         next if $!{EINTR};
92 0           goto SOCKET_ERROR;
93             }
94 0           $len -= $slen;
95 0           substr $pkt, 0, $slen, '';
96             }
97              
98 0           $pkt = '';
99 0           while(12 > length $pkt) {
100 1     1   4 no warnings; # closed socket
  1         1  
  1         143  
101 0           my $rl = sysread $self->{fh}, $pkt, 12 - length $pkt, length $pkt;
102 0 0         unless (defined $rl) {
103 0 0         next if $!{EINTR};
104 0           goto SOCKET_ERROR;
105             }
106             }
107              
108 0           my (undef, $blen) = unpack "L$LE L$LE", $pkt;
109              
110 0           while(12 + $blen > length $pkt) {
111 1     1   5 no warnings; # closed socket
  1         4  
  1         1050  
112 0           my $rl = sysread $self->{fh},
113             $pkt, 12 + $blen - length $pkt, length $pkt;
114 0 0         unless (defined $rl) {
115 0 0         next if $!{EINTR};
116 0           goto SOCKET_ERROR;
117             }
118             }
119              
120 0           my $res = DR::Tarantool::_pkt_parse_response( $pkt );
121 0 0         if ($res->{status} ne 'ok') {
122 0           $self->{last_error_string} = $res->{errstr};
123 0           $self->{last_code} = $res->{code};
124             # disconnect
125 0 0         delete $self->{fh} if $res->{status} =~ /^(fatal|buffer)$/;
126 0 0         croak $self->{last_error_string} if $self->{raise_error};
127 0           return undef;
128             }
129              
130 0   0       $self->{last_error_string} = $res->{errstr} || '';
131 0           $self->{last_code} = $res->{code};
132 0           return $res;
133              
134              
135 0           SOCKET_ERROR:
136             delete $self->{fh};
137 0           $self->{last_error_string} = $!;
138 0           $self->{last_code} = undef;
139 0 0         croak $self->{last_error_string} if $self->{raise_error};
140 0           return undef;
141             }
142              
143             sub ping :method {
144 0     0 0   my ($self) = @_;
145 0 0         unless ($self->{fh}) {
146 0           $self->_connect;
147 0           $self->{last_code} = -1;
148 0           $self->{last_error_string} = "Connection isn't established";
149 0 0         return 0 unless $self->{fh};
150             }
151 0           my $id = $self->_req_id;
152 0           my $pkt = DR::Tarantool::_pkt_ping( $id );
153 0           my $res = eval { $self->_request( $id, $pkt ); };
  0            
154 0 0 0       return 0 unless $res and $res->{status} eq 'ok';
155 0           return 1;
156             }
157              
158              
159             sub call_lua :method {
160              
161 0     0 0   my $self = shift;
162 0           my $proc = shift;
163 0           my $tuple = shift;
164 0           $self->_check_tuple( $tuple );
165 0   0       my $flags = pop || 0;
166              
167 0           my $id = $self->_req_id;
168 0           my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
169 0           return $self->_request( $id, $pkt );
170             }
171              
172             sub select :method {
173 0     0 0   my $self = shift;
174 0           $self->_check_number( my $ns = shift );
175 0           $self->_check_number( my $idx = shift );
176 0           $self->_check_tuple_list( my $keys = shift );
177 0   0       $self->_check_number( my $limit = shift || 0x7FFFFFFF );
178 0   0       $self->_check_number( my $offset = shift || 0 );
179              
180 0           my $id = $self->_req_id;
181 0           my $pkt =
182             DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
183 0           return $self->_request( $id, $pkt );
184             }
185              
186             sub insert :method {
187              
188 0     0 0   my $self = shift;
189 0           $self->_check_number( my $space = shift );
190 0           $self->_check_tuple( my $tuple = shift );
191 0   0       $self->_check_number( my $flags = pop || 0 );
192 0 0         croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
193 0   0       $flags ||= 0;
194            
195 0           my $id = $self->_req_id;
196 0           my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
197 0           return $self->_request( $id, $pkt );
198             }
199              
200             sub update :method {
201              
202 0     0 0   my $self = shift;
203 0           $self->_check_number( my $ns = shift );
204 0           $self->_check_tuple( my $key = shift );
205 0           $self->_check_operations( my $operations = shift );
206 0   0       $self->_check_number( my $flags = pop || 0 );
207              
208 0           my $id = $self->_req_id;
209 0           my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
210 0           return $self->_request( $id, $pkt );
211             }
212              
213              
214             sub delete :method {
215 0     0 0   my $self = shift;
216 0           my $ns = shift;
217 0           my $key = shift;
218 0           $self->_check_tuple( $key );
219 0   0       my $flags = pop || 0;
220              
221 0           my $id = $self->_req_id;
222 0           my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
223 0           return $self->_request( $id, $pkt );
224             }
225              
226              
227              
228             sub _check_tuple {
229 0     0     my ($self, $tuple) = @_;
230 0 0         croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
231             }
232              
233             sub _check_tuple_list {
234 0     0     my ($self, $list) = @_;
235 0 0         croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
236 0 0         croak 'Tuplelist is empty' unless @$list;
237 0           $self->_check_tuple($_) for @$list;
238             }
239              
240             sub _check_number {
241 0     0     my ($self, $number) = @_;
242 0 0 0       croak "argument must be number"
243             unless defined $number and $number =~ /^\d+$/;
244             }
245              
246             sub _check_operation {
247 0     0     my ($self, $op) = @_;
248 0 0         croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
249 0 0         croak 'Wrong update operation: too short arglist' unless @$op >= 2;
250 0 0 0       croak "Wrong operation: $op->[1]"
251             unless $op->[1] and
252             $op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
253 0           $self->_check_number($op->[0]);
254             }
255              
256             sub _check_operations {
257 0     0     my ($self, $list) = @_;
258 0 0         croak 'Operations list must be ARRAYREF of ARRAYREF'
259             unless 'ARRAY' eq ref $list;
260 0 0         croak 'Operations list is empty' unless @$list;
261 0           $self->_check_operation( $_ ) for @$list;
262             }
263              
264              
265             sub last_error_string {
266 0     0 0   return $_[0]->{last_error_string};
267             }
268              
269             sub last_code {
270 0     0 0   return $_[0]->{last_code};
271             }
272              
273             sub raise_error {
274 0     0 0   return $_[0]->{raise_error};
275             }
276              
277             1;