File Coverage

blib/lib/Shardcache/Client.pm
Criterion Covered Total %
statement 12 268 4.4
branch 0 136 0.0
condition 0 33 0.0
subroutine 4 29 13.7
pod 0 20 0.0
total 16 486 3.2


line stmt bran cond sub pod time code
1             package Shardcache::Client;
2              
3 1     1   21798 use strict;
  1         2  
  1         33  
4 1     1   486231 use IO::Socket::INET;
  1         21571  
  1         8  
5 1     1   964 use Digest::SipHash qw/siphash64/;
  1         1122  
  1         59  
6 1     1   430 use Algorithm::ConsistentHash::CHash;
  1         344  
  1         2651  
7              
8             our $VERSION = "0.04";
9              
10             sub _read_bytes {
11 0     0     my ($sock, $len) = @_;
12 0           my $to_read = $len;
13 0           my $read;
14             my $data;
15 0           do {
16 0           my $buffer;
17 0           my $rb = read($sock, $buffer, $to_read);
18 0 0         if ($rb > 0) {
19 0           $data .= $buffer;
20 0           $read += $rb;
21 0           $to_read -= $rb;
22             } else {
23 0           last;
24             }
25             } while ($read != $len);
26              
27 0 0         return ($read == $len) ? $data : undef;
28             }
29              
30             sub new {
31 0     0 0   my ($class, $host, $secret) = @_;
32              
33 0 0         croak("The host parameter is mandatory!")
34             unless($host);
35              
36 0 0         $secret = '' unless($secret);
37              
38 0           my $self = {
39             _secret => $secret,
40             _nodes => [],
41             };
42              
43              
44 0 0 0       if (ref($host) && ref($host) eq "ARRAY") {
45 0           foreach my $h (@$host) {
46 0 0         if ($h !~ /^[a-zA-Z0-9_\.\-]+:[a-zA-Z0-9_\.\-]+(?:[:][0-9]+)?$/) {
47 0           die "Invalid host string $h";
48             }
49 0           my ($label, $addr, $port) = split(':', $h);
50 0           push(@{$self->{_nodes}}, {
  0            
51             label => $label,
52             addr => $addr,
53             port => $port
54             });
55             }
56 0           $self->{_chash} = Algorithm::ConsistentHash::CHash->new(
57 0           ids => [map { $_->{label} } @{$self->{_nodes}} ],
  0            
58             replicas => 200);
59             } else {
60 0 0         if ($host !~ /^[a-zA-Z0-9_\.\-]+(?:[:][0-9]+)?$/) {
61 0           die "Invalid host string $host";
62             }
63 0           my ($addr, $port) = split(':', $host);
64 0           push(@{$self->{_nodes}}, {
  0            
65             label => "$addr:$port",
66             addr => $addr,
67             port => $port
68             });
69             }
70              
71 0           bless $self, $class;
72            
73 0           return $self;
74             }
75              
76             sub _chunkize_var {
77 0     0     my ($var) = @_;
78 0           my $templ;
79             my @vars;
80 0           my $vlen = length($var);
81 0           while ($vlen > 0) {
82 0 0         if ($vlen <= 65535) {
83 0           $templ .= "na$vlen";
84 0           push(@vars, $vlen, $var);
85 0           $vlen = 0;
86             } else {
87 0           $templ .= "na65535";
88 0           my $substr = substr($var, 0, 65535, "");
89 0           $vlen = length($var);
90 0           push(@vars, 65535, $substr);
91             }
92             }
93 0           return pack $templ, @vars;
94             }
95              
96             sub send_msg {
97 0     0 0   my ($self, $hdr, $records, $sock) = @_;
98              
99 0           my $templ = "C";
100 0           my @vars = ($hdr);
101              
102 0           my $key;
103              
104 0           my $cnt = 0;
105 0 0 0       $records = [ $records ] unless (ref($records) && ref($records) eq "ARRAY");
106 0           foreach my $record (@$records) {
107 0 0         if ($cnt++) {
108 0           $templ .= "C";
109 0           push @vars, 0x80;
110             } else {
111 0           $key = $record;
112             }
113              
114 0 0         if ($record) {
115 0           my $buf = _chunkize_var($record);
116 0           $templ .= sprintf "a%dCC", length($buf);
117 0           push @vars, $buf, 0x00, 0x00;
118             } else {
119 0           $templ .= "CC";
120 0           push(@vars, 0x00, 0x00);
121             }
122             }
123              
124 0 0         if (!$cnt) {
125 0           $templ .= "CC";
126 0           push @vars, 0x00, 0x00;
127             }
128              
129 0           $templ .= "C";
130 0           push @vars, 0x00;
131              
132 0           my $msg = pack $templ, @vars;
133              
134 0 0         if ($self->{_secret}) {
135 0           my $sig = siphash64($msg, pack("a16", $self->{_secret}));
136 0           $msg .= pack("Q", $sig);
137             }
138              
139 0           my $addr;
140             my $port;
141              
142 0 0         if (!$sock) {
143 0 0         if (@{$self->{_nodes}} == 1) {
  0            
144 0           $addr = $self->{_nodes}->[0]->{addr};
145 0           $port = $self->{_nodes}->[0]->{port};
146             } else {
147 0           my ($node) = grep { $_->{label} eq $self->{_chash}->lookup($key) } @{$self->{_nodes}};
  0            
  0            
148 0 0         if ($node) {
149 0           $addr = $node->{addr};
150 0           $port = $node->{port};
151             } else {
152 0           my $index = rand() % @{$self->{_nodes}};
  0            
153 0           $addr = $self->{_nodes}->[$index]->{addr};
154 0           $port = $self->{_nodes}->[$index]->{port};
155             }
156              
157             }
158            
159 0 0 0       if (!$self->{_sock}->{"$addr:$port"} || !$self->{_sock}->{"$addr:$port"}->connected ||
      0        
160             $self->{_sock}->{"$addr:$port"}->write(pack("C1", 0x90)) != 1)
161             {
162 0           $self->{_sock}->{"$addr:$port"} = IO::Socket::INET->new(PeerAddr => $addr,
163             PeerPort => $port,
164             Proto => 'tcp');
165             }
166              
167 0           $sock = $self->{_sock}->{"$addr:$port"};
168             }
169            
170 0 0         return unless $sock->write(pack("C4", 0x73, 0x68, 0x63, 0x01));
171              
172 0 0         if ($self->{_secret}) {
173 0 0         return undef unless $sock->write(pack("C", 0xF0));
174             }
175              
176 0           my $wb = $sock->write($msg);
177            
178             # read the response
179 0           my $in;
180              
181             # read the magic
182 0           my $magic = _read_bytes($sock, 4);
183 0 0         if (!$magic) {
184 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
185 0           return undef;
186             }
187              
188             # read the signature if necessary
189 0 0         if ($self->{_secret}) {
190 0           my $byte = _read_bytes($sock, 1);
191 0 0         if (!$byte) {
192 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
193 0           return undef;
194             }
195              
196 0 0         if (unpack("C", $byte) != 0xF0) {
197 0           return undef;
198             }
199             }
200            
201             # read the header
202 0           my $data = _read_bytes($sock, 1);
203 0 0         if (!$data) {
204 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
205 0           return undef;
206             }
207              
208 0 0         if (unpack("C", $data) == 0xF0) {
209 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
210 0           return undef;
211             }
212              
213 0           $in .= $data;
214              
215 0           my $stop = 0;
216 0           my $out;
217              
218             # read the records
219             my @output_records;
220 0           while (!$stop) {
221 0 0         unless ($data = _read_bytes($sock, 2)) {
222 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
223 0           return undef;
224             }
225 0           my ($len) = unpack("n", $data);
226 0           while ($len) {
227 0           $in .= $data;
228 0 0         unless ($data = _read_bytes($sock, $len)) {
229 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
230 0           return undef;
231             }
232 0           $in .= $data;
233 0           $out .= $data;
234 0           $len = 0;
235 0 0         unless ($data = _read_bytes($sock, 2)) {
236 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
237 0           return undef;
238             }
239 0           ($len) = unpack("n", $data);
240 0           $in .= $data;
241             }
242            
243 0 0         unless ($data = _read_bytes($sock, 1)) {
244 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
245 0           return undef;
246             }
247 0           $in .= $data;
248 0           my ($sep) = unpack("C", $data);
249             # TODO - should check if it's a correct rsep (0x80)
250 0 0         $stop = 1 if ($sep == 0);
251 0           push(@output_records, $out);
252 0           undef($out);
253             }
254              
255 0 0         if ($self->{_secret}) {
256             # now that we have the whole message, let's compute the signature
257             # (we know it's 8 bytes long and is the trailer of the message
258 0           my $signature = siphash64(substr($in, 0, length($in)), pack("a16", $self->{_secret}));
259              
260 0           my $csig = pack("Q", $signature);
261              
262 0 0         unless ($data = _read_bytes($sock, 8)) {
263 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
264 0           return undef;
265             }
266              
267 0 0         if ($csig ne $data) {
268 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
269 0           return undef;
270             }
271             }
272              
273 0 0         return wantarray ? @output_records : $output_records[0];
274             }
275              
276             sub get {
277 0     0 0   my ($self, $key) = @_;
278 0 0         return unless $key;
279 0           return $self->send_msg(0x01, $key);
280             }
281              
282             sub offset {
283 0     0 0   my ($self, $key, $offset, $len) = @_;
284 0 0         return unless $key;
285 0           return $self->send_msg(0x06, [$key, pack("N", $offset), pack("N", $len)]);
286             }
287              
288             sub set {
289 0     0 0   my ($self, $key, $value, $expire) = @_;
290 0 0 0       return unless $key && defined $value;
291 0           my @records = ($key, $value);
292              
293 0 0         push(@records, pack("N", $expire))
294             if ($expire);
295              
296 0           my $resp = $self->send_msg(0x02, \@records);
297 0   0       return (defined $resp && unpack("C", $resp) == 0x00);
298             }
299              
300             sub add {
301 0     0 0   my ($self, $key, $value, $expire) = @_;
302 0 0 0       return unless $key && defined $value;
303 0           my @records = ($key, $value);
304              
305 0 0         push(@records, pack("N", $expire))
306             if ($expire);
307              
308 0           my $resp = $self->send_msg(0x07, \@records);
309 0   0       return (defined $resp && unpack("C", $resp) == 0x00);
310             }
311              
312             sub del {
313 0     0 0   my ($self, $key) = @_;
314 0 0         return unless $key;
315 0           my $resp = $self->send_msg(0x03, $key);
316 0           return (unpack("C", $resp) == 0x00);
317             }
318              
319             sub evi {
320 0     0 0   my ($self, $key) = @_;
321 0 0         return unless $key;
322 0           my $resp = $self->send_msg(0x04, $key);
323 0           return (unpack("C", $resp) == 0x00);
324             }
325              
326             sub evict {
327 0     0 0   my $self = shift;
328 0           return $self->evi(@_);
329             }
330              
331             sub mgb {
332 0     0 0   my ($self, $key) = @_;
333 0 0         return unless $key;
334             # TODO - nodes-list must be sent together with the MGB command
335 0           my $resp = $self->send_msg(0x22, $key);
336 0           return (unpack("C", $resp) == 0x00);
337             }
338              
339             sub migration_begin {
340 0     0 0   my $self = shift;
341 0           return $self->mgb(@_);
342             }
343              
344             sub mge {
345 0     0 0   my ($self, $key) = @_;
346 0 0         return unless $key;
347 0           my $resp = $self->send_msg(0x23, $key);
348 0           return (unpack("C", $resp) == 0x00);
349             }
350              
351             sub migration_end {
352 0     0 0   my $self = shift;
353 0           return $self->mge(@_);
354             }
355              
356             sub mga {
357 0     0 0   my ($self, $key) = @_;
358 0 0         return unless $key;
359 0           my $resp = $self->send_msg(0x21, $key);
360 0           return (unpack("C", $resp) == 0x00);
361             }
362              
363             sub migration_abort {
364 0     0 0   my $self = shift;
365 0           return $self->mga(@_);
366             }
367              
368             sub _get_sock_key_for_node {
369 0     0     my ($self, $node) = @_;
370 0           my $addr;
371             my $port;
372              
373 0 0         if (@{$self->{_nodes}} == 1) {
  0            
374 0           $addr = $self->{_nodes}->[0]->{addr};
375 0           $port = $self->{_nodes}->[0]->{port};
376             } else {
377 0           my ($node) = grep { $_->{label} eq $node } @{$self->{_nodes}};
  0            
  0            
378 0 0         if ($node) {
379 0           $addr = $node->{addr};
380 0           $port = $node->{port};
381             } else {
382 0           return undef;
383             }
384             }
385 0           return "$addr:$port";
386             }
387              
388             sub _get_sock_for_node {
389 0     0     my ($self, $node) = @_;
390              
391 0           my $sock_key = $self->_get_sock_key_for_node($node);
392              
393 0 0 0       if (!$self->{_sock}->{$sock_key} || !$self->{_sock}->{$sock_key}->connected ||
      0        
394             $self->{_sock}->{$sock_key}->write(pack("C1", 0x90)) != 1)
395             {
396 0           my ($addr, $port) = split(':', $sock_key);
397 0           $self->{_sock}->{$sock_key} = IO::Socket::INET->new(
398             PeerAddr => $addr,
399             PeerPort => $port,
400             Proto => 'tcp');
401             }
402              
403 0           return $self->{_sock}->{$sock_key};
404             }
405              
406             sub _delete_sock_for_node {
407 0     0     my ($self, $node) = @_;
408              
409 0           my $sock_key = $self->_get_sock_key_for_node($node);
410              
411 0           delete $self->{_sock}->{$sock_key};
412             }
413              
414             sub index {
415 0     0 0   my ($self, $node) = @_;
416              
417 0           my %index;
418              
419 0           my @nodes = $node
420             ? ($node)
421 0 0         : map { $_->{label} } @{$self->{_nodes}};
  0            
422              
423 0           foreach my $n (@nodes) {
424 0           my $sock = $self->_get_sock_for_node($n);
425              
426 0 0         return unless $sock;
427              
428              
429 0           my $resp = $self->send_msg(0x41, undef, $sock);
430 0           while (length($resp)) {
431 0           my $ksize = unpack("N", $resp);
432 0           $resp = substr($resp, 4);
433 0           my $kdata = unpack("a$ksize", $resp);
434 0           $resp = substr($resp, $ksize);
435 0           my $vlen = unpack("N", $resp);
436 0           $resp = substr($resp, 4);
437 0 0 0       $index{$kdata} = $vlen
438             if ($ksize && $kdata);
439             }
440             }
441 0 0         return wantarray ? %index : \%index;
442             }
443              
444             sub check {
445 0     0 0   my ($self, $node) = @_;
446 0           my $sock = $self->_get_sock_for_node($node);
447              
448 0 0         return unless $sock;
449              
450 0           my $resp = $self->send_msg(0x31, undef, $sock);
451 0 0         $self->_delete_sock_for_node($node) unless $resp;
452              
453 0 0         return $resp ? (unpack("C", $resp) == 0x00) : undef;
454             }
455              
456             sub chk {
457 0     0 0   my $self = shift;
458             # XXX - deprecated
459 0           return $self->check(@_);
460             }
461              
462             sub status {
463 0     0 0   my ($self, $node) = @_;
464              
465 0           my $sock = $self->_get_sock_for_node($node);
466              
467 0 0         return unless $sock;
468              
469 0           my $resp = $self->send_msg(0x32, undef, $sock);
470 0 0         $self->_delete_sock_for_node($node) unless $resp;
471              
472 0           return $resp;
473             }
474              
475             sub sts {
476 0     0 0   my $self = shift;
477             # XXX - deprecated
478 0           return $self->status(@_);
479             }
480              
481             1;
482             __END__