File Coverage

blib/lib/DR/Tarantool/LLSyncClient.pm
Criterion Covered Total %
statement 27 162 16.6
branch 0 66 0.0
condition 0 37 0.0
subroutine 9 27 33.3
pod 0 10 0.0
total 36 302 11.9


line stmt bran cond sub pod time code
1 2     2   48849 use utf8;
  2         5  
  2         14  
2 2     2   52 use strict;
  2         5  
  2         55  
3 2     2   9 use warnings;
  2         3  
  2         72  
4              
5             package DR::Tarantool::LLSyncClient;
6 2     2   9 use Carp;
  2         3  
  2         150  
7 2     2   829 use IO::Socket::UNIX;
  2         51451  
  2         27  
8 2     2   1828 use IO::Socket::INET;
  2         5  
  2         18  
9             require DR::Tarantool;
10              
11             my $LE = $] > 5.01 ? '<' : '';
12              
13             $Carp::Internal{ (__PACKAGE__) }++;
14              
15             sub connect {
16 0     0 0   my ($class, %opts) = @_;
17              
18 0   0       my $host = $opts{host} || 'localhost';
19 0 0         my $port = $opts{port} or croak 'port is undefined';
20              
21 0   0       my $reconnect_period = $opts{reconnect_period} || 0;
22 0   0       my $reconnect_always = $opts{reconnect_always} || 0;
23              
24              
25 0           my $raise_error = 1;
26 0 0         if (exists $opts{raise_error}) {
27 0 0         $raise_error = $opts{raise_error} ? 1 : 0;
28             }
29              
30 0   0       my $self = bless {
31             host => $host,
32             port => $port,
33             raise_error => $raise_error,
34             reconnect_period => $reconnect_period,
35             id => 0,
36             } => ref ($class) || $class;
37              
38 0 0         unless ($self->_connect()) {
39 0 0         unless ($reconnect_always) {
40 0 0         return undef unless $self->{raise_error};
41 0           croak "Can't connect to $self->{host}:$self->{port}: $@";
42             }
43 0 0         unless ($reconnect_period) {
44 0 0         return undef unless $self->{raise_error};
45 0           croak "Can't connect to $self->{host}:$self->{port}: $@";
46             }
47             }
48 0           return $self;
49             }
50              
51              
52             sub _connect {
53 0     0     my ($self) = @_;
54              
55 0 0 0       if ($self->{host} eq 'unix/' or $self->{port} =~ /\D/) {
56 0           return $self->{fh} = IO::Socket::UNIX->new(Peer => $self->{port});
57             } else {
58 0           return $self->{fh} = IO::Socket::INET->new(
59             PeerHost => $self->{host},
60             PeerPort => $self->{port},
61             Proto => 'tcp',
62             );
63             }
64             }
65              
66             sub _req_id {
67 0     0     my ($self) = @_;
68 0 0         return $self->{id}++ if $self->{id} < 0x7FFF_FFFE;
69 0           return $self->{id} = 0;
70             }
71              
72             sub _request {
73 0     0     my ($self, $id, $pkt ) = @_;
74 0           until($self->{fh}) {
75 0 0         unless ($self->{reconnect_period}) {
76 0           $self->{last_error_string} = "Connection isn't established";
77 0 0         croak $self->{last_error_string} if $self->{raise_error};
78 0           return undef;
79             }
80 0 0         next if $self->_connect;
81 0           sleep $self->{reconnect_period};
82             }
83              
84 0           my $len = length $pkt;
85              
86             # send request
87 0           while($len > 0) {
88 2     2   4170 no warnings; # closed socket
  2         12  
  2         244  
89 0           my $slen = syswrite $self->{fh}, $pkt;
90 0 0         goto SOCKET_ERROR unless defined $slen;
91 0           $len -= $slen;
92 0           substr $pkt, 0, $slen, '';
93             }
94              
95 0           $pkt = '';
96 0           while(12 > length $pkt) {
97 2     2   62 no warnings; # closed socket
  2         4  
  2         211  
98 0           my $rl = sysread $self->{fh}, $pkt, 12 - length $pkt, length $pkt;
99 0 0         goto SOCKET_ERROR unless defined $rl;
100             }
101              
102 0           my (undef, $blen) = unpack "L$LE L$LE", $pkt;
103              
104 0           while(12 + $blen > length $pkt) {
105 2     2   11 no warnings; # closed socket
  2         3  
  2         4387  
106 0           my $rl = sysread $self->{fh},
107             $pkt, 12 + $blen - length $pkt, length $pkt;
108 0 0         goto SOCKET_ERROR unless defined $rl;
109             }
110              
111 0           my $res = DR::Tarantool::_pkt_parse_response( $pkt );
112 0 0         if ($res->{status} ne 'ok') {
113 0           $self->{last_error_string} = $res->{errstr};
114 0           $self->{last_code} = $res->{code};
115             # disconnect
116 0 0         delete $self->{fh} if $res->{status} =~ /^(fatal|buffer)$/;
117 0 0         croak $self->{last_error_string} if $self->{raise_error};
118 0           return undef;
119             }
120              
121 0   0       $self->{last_error_string} = $res->{errstr} || '';
122 0           $self->{last_code} = $res->{code};
123 0           return $res;
124              
125              
126 0           SOCKET_ERROR:
127             delete $self->{fh};
128 0           $self->{last_error_string} = $!;
129 0           $self->{last_code} = undef;
130 0 0         croak $self->{last_error_string} if $self->{raise_error};
131 0           return undef;
132             }
133              
134             sub ping :method {
135 0     0 0   my ($self) = @_;
136 0 0         unless ($self->{fh}) {
137 0           $self->_connect;
138 0           $self->{last_code} = -1;
139 0           $self->{last_error_string} = "Connection isn't established";
140 0 0         return 0 unless $self->{fh};
141             }
142 0           my $id = $self->_req_id;
143 0           my $pkt = DR::Tarantool::_pkt_ping( $id );
144 0           my $res = eval { $self->_request( $id, $pkt ); };
  0            
145 0 0 0       return 0 unless $res and $res->{status} eq 'ok';
146 0           return 1;
147             }
148              
149              
150             sub call_lua :method {
151              
152 0     0 0   my $self = shift;
153 0           my $proc = shift;
154 0           my $tuple = shift;
155 0           $self->_check_tuple( $tuple );
156 0   0       my $flags = pop || 0;
157              
158 0           my $id = $self->_req_id;
159 0           my $pkt = DR::Tarantool::_pkt_call_lua($id, $flags, $proc, $tuple);
160 0           return $self->_request( $id, $pkt );
161             }
162              
163             sub select :method {
164 0     0 0   my $self = shift;
165 0           $self->_check_number( my $ns = shift );
166 0           $self->_check_number( my $idx = shift );
167 0           $self->_check_tuple_list( my $keys = shift );
168 0   0       $self->_check_number( my $limit = shift || 0x7FFFFFFF );
169 0   0       $self->_check_number( my $offset = shift || 0 );
170              
171 0           my $id = $self->_req_id;
172 0           my $pkt =
173             DR::Tarantool::_pkt_select($id, $ns, $idx, $offset, $limit, $keys);
174 0           return $self->_request( $id, $pkt );
175             }
176              
177             sub insert :method {
178              
179 0     0 0   my $self = shift;
180 0           $self->_check_number( my $space = shift );
181 0           $self->_check_tuple( my $tuple = shift );
182 0   0       $self->_check_number( my $flags = pop || 0 );
183 0 0         croak "insert: tuple must be ARRAYREF" unless ref $tuple eq 'ARRAY';
184 0   0       $flags ||= 0;
185            
186 0           my $id = $self->_req_id;
187 0           my $pkt = DR::Tarantool::_pkt_insert( $id, $space, $flags, $tuple );
188 0           return $self->_request( $id, $pkt );
189             }
190              
191             sub update :method {
192              
193 0     0 0   my $self = shift;
194 0           $self->_check_number( my $ns = shift );
195 0           $self->_check_tuple( my $key = shift );
196 0           $self->_check_operations( my $operations = shift );
197 0   0       $self->_check_number( my $flags = pop || 0 );
198              
199 0           my $id = $self->_req_id;
200 0           my $pkt = DR::Tarantool::_pkt_update($id, $ns, $flags, $key, $operations);
201 0           return $self->_request( $id, $pkt );
202             }
203              
204              
205             sub delete :method {
206 0     0 0   my $self = shift;
207 0           my $ns = shift;
208 0           my $key = shift;
209 0           $self->_check_tuple( $key );
210 0   0       my $flags = pop || 0;
211              
212 0           my $id = $self->_req_id;
213 0           my $pkt = DR::Tarantool::_pkt_delete($id, $ns, $flags, $key);
214 0           return $self->_request( $id, $pkt );
215             }
216              
217              
218              
219             sub _check_tuple {
220 0     0     my ($self, $tuple) = @_;
221 0 0         croak 'Tuple must be ARRAYREF' unless 'ARRAY' eq ref $tuple;
222             }
223              
224             sub _check_tuple_list {
225 0     0     my ($self, $list) = @_;
226 0 0         croak 'Tuplelist must be ARRAYREF of ARRAYREF' unless 'ARRAY' eq ref $list;
227 0 0         croak 'Tuplelist is empty' unless @$list;
228 0           $self->_check_tuple($_) for @$list;
229             }
230              
231             sub _check_number {
232 0     0     my ($self, $number) = @_;
233 0 0 0       croak "argument must be number"
234             unless defined $number and $number =~ /^\d+$/;
235             }
236              
237             sub _check_operation {
238 0     0     my ($self, $op) = @_;
239 0 0         croak 'Operation must be ARRAYREF' unless 'ARRAY' eq ref $op;
240 0 0         croak 'Wrong update operation: too short arglist' unless @$op >= 2;
241 0 0 0       croak "Wrong operation: $op->[1]"
242             unless $op->[1] and
243             $op->[1] =~ /^(delete|set|insert|add|and|or|xor|substr)$/;
244 0           $self->_check_number($op->[0]);
245             }
246              
247             sub _check_operations {
248 0     0     my ($self, $list) = @_;
249 0 0         croak 'Operations list must be ARRAYREF of ARRAYREF'
250             unless 'ARRAY' eq ref $list;
251 0 0         croak 'Operations list is empty' unless @$list;
252 0           $self->_check_operation( $_ ) for @$list;
253             }
254              
255              
256             sub last_error_string {
257 0     0 0   return $_[0]->{last_error_string};
258             }
259              
260             sub last_code {
261 0     0 0   return $_[0]->{last_code};
262             }
263              
264             sub raise_error {
265 0     0 0   return $_[0]->{raise_error};
266             }
267              
268             1;