File Coverage

blib/lib/Shardcache/Client.pm
Criterion Covered Total %
statement 12 267 4.4
branch 0 136 0.0
condition 0 33 0.0
subroutine 4 29 13.7
pod 0 20 0.0
total 16 485 3.3


line stmt bran cond sub pod time code
1             package Shardcache::Client;
2              
3 1     1   23460 use strict;
  1         2  
  1         38  
4 1     1   488 use IO::Socket::INET;
  1         18295  
  1         7  
5 1     1   1039 use Digest::SipHash qw/siphash/;
  1         1076  
  1         57  
6 1     1   448 use Algorithm::ConsistentHash::CHash;
  1         360  
  1         2395  
7              
8             our $VERSION = "0.05";
9              
10             sub _read_bytes {
11 0     0     my ($sock, $len) = @_;
12 0           my $to_read = $len;
13 0           my $read;
14             my $data;
15 0           while ($read != $len) {
16 0           my $buffer;
17 0           my $rb = read($sock, $buffer, $to_read);
18 0 0         last if ($rb <= 0);
19 0           $data .= $buffer;
20 0           $read += $rb;
21 0           $to_read -= $rb;
22             }
23              
24 0 0         return ($read == $len) ? $data : undef;
25             }
26              
27             sub new {
28 0     0 0   my ($class, $host, $secret) = @_;
29              
30 0 0         croak("The host parameter is mandatory!")
31             unless($host);
32              
33 0 0         $secret = '' unless($secret);
34              
35 0           my $self = {
36             _secret => $secret,
37             _nodes => [],
38             };
39              
40              
41 0 0 0       if (ref($host) && ref($host) eq "ARRAY") {
42 0           foreach my $h (@$host) {
43 0 0         if ($h !~ /^[a-zA-Z0-9_\.\-]+:[a-zA-Z0-9_\.\-]+(?:[:][0-9]+)?$/) {
44 0           die "Invalid host string $h";
45             }
46 0           my ($label, $addr, $port) = split(':', $h);
47 0           push(@{$self->{_nodes}}, {
  0            
48             label => $label,
49             addr => $addr,
50             port => $port
51             });
52             }
53 0           $self->{_chash} = Algorithm::ConsistentHash::CHash->new(
54 0           ids => [map { $_->{label} } @{$self->{_nodes}} ],
  0            
55             replicas => 200);
56             } else {
57 0 0         if ($host !~ /^[a-zA-Z0-9_\.\-]+(?:[:][0-9]+)?$/) {
58 0           die "Invalid host string $host";
59             }
60 0           my ($addr, $port) = split(':', $host);
61 0           push(@{$self->{_nodes}}, {
  0            
62             label => "$addr:$port",
63             addr => $addr,
64             port => $port
65             });
66             }
67              
68 0           bless $self, $class;
69            
70 0           return $self;
71             }
72              
73             sub _chunkize_var {
74 0     0     my ($var) = @_;
75 0           my $templ;
76             my @vars;
77 0           my $vlen = length($var);
78 0           while ($vlen > 0) {
79 0 0         if ($vlen <= 65535) {
80 0           $templ .= "na$vlen";
81 0           push(@vars, $vlen, $var);
82 0           $vlen = 0;
83             } else {
84 0           $templ .= "na65535";
85 0           my $substr = substr($var, 0, 65535, "");
86 0           $vlen = length($var);
87 0           push(@vars, 65535, $substr);
88             }
89             }
90 0           return pack $templ, @vars;
91             }
92              
93             sub send_msg {
94 0     0 0   my ($self, $hdr, $records, $sock) = @_;
95              
96 0           my $templ = "C";
97 0           my @vars = ($hdr);
98              
99 0           my $key;
100              
101 0           my $cnt = 0;
102 0 0 0       $records = [ $records ] unless (ref($records) && ref($records) eq "ARRAY");
103 0           foreach my $record (@$records) {
104 0 0         if ($cnt++) {
105 0           $templ .= "C";
106 0           push @vars, 0x80;
107             } else {
108 0           $key = $record;
109             }
110              
111 0 0         if ($record) {
112 0           my $buf = _chunkize_var($record);
113 0           $templ .= sprintf "a%dCC", length($buf);
114 0           push @vars, $buf, 0x00, 0x00;
115             } else {
116 0           $templ .= "CC";
117 0           push(@vars, 0x00, 0x00);
118             }
119             }
120              
121 0 0         if (!$cnt) {
122 0           $templ .= "CC";
123 0           push @vars, 0x00, 0x00;
124             }
125              
126 0           $templ .= "C";
127 0           push @vars, 0x00;
128              
129 0           my $msg = pack $templ, @vars;
130              
131 0 0         if ($self->{_secret}) {
132 0           my ($hi, $lo) = siphash($msg, pack("a16", $self->{_secret}));
133 0           $msg .= pack("LL", $hi, $lo);
134             }
135              
136 0           my $addr;
137             my $port;
138              
139 0 0         if (!$sock) {
140 0 0         if (@{$self->{_nodes}} == 1) {
  0            
141 0           $addr = $self->{_nodes}->[0]->{addr};
142 0           $port = $self->{_nodes}->[0]->{port};
143             } else {
144 0           my ($node) = grep { $_->{label} eq $self->{_chash}->lookup($key) } @{$self->{_nodes}};
  0            
  0            
145 0 0         if ($node) {
146 0           $addr = $node->{addr};
147 0           $port = $node->{port};
148             } else {
149 0           my $index = rand() % @{$self->{_nodes}};
  0            
150 0           $addr = $self->{_nodes}->[$index]->{addr};
151 0           $port = $self->{_nodes}->[$index]->{port};
152             }
153              
154             }
155            
156 0 0 0       if (!$self->{_sock}->{"$addr:$port"} || !$self->{_sock}->{"$addr:$port"}->connected ||
      0        
157             $self->{_sock}->{"$addr:$port"}->write(pack("C1", 0x90)) != 1)
158             {
159 0           $self->{_sock}->{"$addr:$port"} = IO::Socket::INET->new(PeerAddr => $addr,
160             PeerPort => $port,
161             Proto => 'tcp');
162             }
163              
164 0           $sock = $self->{_sock}->{"$addr:$port"};
165             }
166            
167 0 0         return unless $sock->write(pack("C4", 0x73, 0x68, 0x63, 0x01));
168              
169 0 0         if ($self->{_secret}) {
170 0 0         return undef unless $sock->write(pack("C", 0xF0));
171             }
172              
173 0           my $wb = $sock->write($msg);
174            
175             # read the response
176 0           my $in;
177              
178             # read the magic
179 0           my $magic = _read_bytes($sock, 4);
180 0 0         if (!$magic) {
181 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
182 0           return undef;
183             }
184              
185             # read the signature if necessary
186 0 0         if ($self->{_secret}) {
187 0           my $byte = _read_bytes($sock, 1);
188 0 0         if (!$byte) {
189 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
190 0           return undef;
191             }
192              
193 0 0         if (unpack("C", $byte) != 0xF0) {
194 0           return undef;
195             }
196             }
197            
198             # read the header
199 0           my $data = _read_bytes($sock, 1);
200 0 0         if (!$data) {
201 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
202 0           return undef;
203             }
204              
205 0 0         if (unpack("C", $data) == 0xF0) {
206 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
207 0           return undef;
208             }
209              
210 0           $in .= $data;
211              
212 0           my $stop = 0;
213 0           my $out;
214              
215             # read the records
216             my @output_records;
217 0           while (!$stop) {
218 0 0         unless ($data = _read_bytes($sock, 2)) {
219 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
220 0           return undef;
221             }
222 0           my ($len) = unpack("n", $data);
223 0           while ($len) {
224 0           $in .= $data;
225 0 0         unless ($data = _read_bytes($sock, $len)) {
226 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
227 0           return undef;
228             }
229 0           $in .= $data;
230 0           $out .= $data;
231 0           $len = 0;
232 0 0         unless ($data = _read_bytes($sock, 2)) {
233 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
234 0           return undef;
235             }
236 0           ($len) = unpack("n", $data);
237 0           $in .= $data;
238             }
239            
240 0 0         unless ($data = _read_bytes($sock, 1)) {
241 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
242 0           return undef;
243             }
244 0           $in .= $data;
245 0           my ($sep) = unpack("C", $data);
246             # TODO - should check if it's a correct rsep (0x80)
247 0 0         $stop = 1 if ($sep == 0);
248 0           push(@output_records, $out);
249 0           undef($out);
250             }
251              
252 0 0         if ($self->{_secret}) {
253             # now that we have the whole message, let's compute the signature
254             # (we know it's 8 bytes long and is the trailer of the message
255 0           my ($hi, $lo) = siphash(substr($in, 0, length($in)), pack("a16", $self->{_secret}));
256 0           my $csig = pack("LL", $hi, $lo);
257              
258 0 0         unless ($data = _read_bytes($sock, 8)) {
259 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
260 0           return undef;
261             }
262              
263 0 0         if ($csig ne $data) {
264 0 0         delete $self->{_sock}->{"$addr:$port"} if ($addr);
265 0           return undef;
266             }
267             }
268              
269 0 0         return wantarray ? @output_records : $output_records[0];
270             }
271              
272             sub get {
273 0     0 0   my ($self, $key) = @_;
274 0 0         return unless $key;
275 0           return $self->send_msg(0x01, $key);
276             }
277              
278             sub offset {
279 0     0 0   my ($self, $key, $offset, $len) = @_;
280 0 0         return unless $key;
281 0           return $self->send_msg(0x06, [$key, pack("N", $offset), pack("N", $len)]);
282             }
283              
284             sub set {
285 0     0 0   my ($self, $key, $value, $expire) = @_;
286 0 0 0       return unless $key && defined $value;
287 0           my @records = ($key, $value);
288              
289 0 0         push(@records, pack("N", $expire))
290             if ($expire);
291              
292 0           my $resp = $self->send_msg(0x02, \@records);
293 0   0       return (defined $resp && unpack("C", $resp) == 0x00);
294             }
295              
296             sub add {
297 0     0 0   my ($self, $key, $value, $expire) = @_;
298 0 0 0       return unless $key && defined $value;
299 0           my @records = ($key, $value);
300              
301 0 0         push(@records, pack("N", $expire))
302             if ($expire);
303              
304 0           my $resp = $self->send_msg(0x07, \@records);
305 0   0       return (defined $resp && unpack("C", $resp) == 0x00);
306             }
307              
308             sub del {
309 0     0 0   my ($self, $key) = @_;
310 0 0         return unless $key;
311 0           my $resp = $self->send_msg(0x03, $key);
312 0           return (unpack("C", $resp) == 0x00);
313             }
314              
315             sub evi {
316 0     0 0   my ($self, $key) = @_;
317 0 0         return unless $key;
318 0           my $resp = $self->send_msg(0x04, $key);
319 0           return (unpack("C", $resp) == 0x00);
320             }
321              
322             sub evict {
323 0     0 0   my $self = shift;
324 0           return $self->evi(@_);
325             }
326              
327             sub mgb {
328 0     0 0   my ($self, $key) = @_;
329 0 0         return unless $key;
330             # TODO - nodes-list must be sent together with the MGB command
331 0           my $resp = $self->send_msg(0x22, $key);
332 0           return (unpack("C", $resp) == 0x00);
333             }
334              
335             sub migration_begin {
336 0     0 0   my $self = shift;
337 0           return $self->mgb(@_);
338             }
339              
340             sub mge {
341 0     0 0   my ($self, $key) = @_;
342 0 0         return unless $key;
343 0           my $resp = $self->send_msg(0x23, $key);
344 0           return (unpack("C", $resp) == 0x00);
345             }
346              
347             sub migration_end {
348 0     0 0   my $self = shift;
349 0           return $self->mge(@_);
350             }
351              
352             sub mga {
353 0     0 0   my ($self, $key) = @_;
354 0 0         return unless $key;
355 0           my $resp = $self->send_msg(0x21, $key);
356 0           return (unpack("C", $resp) == 0x00);
357             }
358              
359             sub migration_abort {
360 0     0 0   my $self = shift;
361 0           return $self->mga(@_);
362             }
363              
364             sub _get_sock_key_for_node {
365 0     0     my ($self, $node) = @_;
366 0           my $addr;
367             my $port;
368              
369 0 0         if (@{$self->{_nodes}} == 1) {
  0            
370 0           $addr = $self->{_nodes}->[0]->{addr};
371 0           $port = $self->{_nodes}->[0]->{port};
372             } else {
373 0           my ($node) = grep { $_->{label} eq $node } @{$self->{_nodes}};
  0            
  0            
374 0 0         if ($node) {
375 0           $addr = $node->{addr};
376 0           $port = $node->{port};
377             } else {
378 0           return undef;
379             }
380             }
381 0           return "$addr:$port";
382             }
383              
384             sub _get_sock_for_node {
385 0     0     my ($self, $node) = @_;
386              
387 0           my $sock_key = $self->_get_sock_key_for_node($node);
388              
389 0 0 0       if (!$self->{_sock}->{$sock_key} || !$self->{_sock}->{$sock_key}->connected ||
      0        
390             $self->{_sock}->{$sock_key}->write(pack("C1", 0x90)) != 1)
391             {
392 0           my ($addr, $port) = split(':', $sock_key);
393 0           $self->{_sock}->{$sock_key} = IO::Socket::INET->new(
394             PeerAddr => $addr,
395             PeerPort => $port,
396             Proto => 'tcp');
397             }
398              
399 0           return $self->{_sock}->{$sock_key};
400             }
401              
402             sub _delete_sock_for_node {
403 0     0     my ($self, $node) = @_;
404              
405 0           my $sock_key = $self->_get_sock_key_for_node($node);
406              
407 0           delete $self->{_sock}->{$sock_key};
408             }
409              
410             sub index {
411 0     0 0   my ($self, $node) = @_;
412              
413 0           my %index;
414              
415 0           my @nodes = $node
416             ? ($node)
417 0 0         : map { $_->{label} } @{$self->{_nodes}};
  0            
418              
419 0           foreach my $n (@nodes) {
420 0           my $sock = $self->_get_sock_for_node($n);
421              
422 0 0         return unless $sock;
423              
424              
425 0           my $resp = $self->send_msg(0x41, undef, $sock);
426 0           while (length($resp)) {
427 0           my $ksize = unpack("N", $resp);
428 0           $resp = substr($resp, 4);
429 0           my $kdata = unpack("a$ksize", $resp);
430 0           $resp = substr($resp, $ksize);
431 0           my $vlen = unpack("N", $resp);
432 0           $resp = substr($resp, 4);
433 0 0 0       $index{$kdata} = $vlen
434             if ($ksize && $kdata);
435             }
436             }
437 0 0         return wantarray ? %index : \%index;
438             }
439              
440             sub check {
441 0     0 0   my ($self, $node) = @_;
442 0           my $sock = $self->_get_sock_for_node($node);
443              
444 0 0         return unless $sock;
445              
446 0           my $resp = $self->send_msg(0x31, undef, $sock);
447 0 0         $self->_delete_sock_for_node($node) unless $resp;
448              
449 0 0         return $resp ? (unpack("C", $resp) == 0x00) : undef;
450             }
451              
452             sub chk {
453 0     0 0   my $self = shift;
454             # XXX - deprecated
455 0           return $self->check(@_);
456             }
457              
458             sub status {
459 0     0 0   my ($self, $node) = @_;
460              
461 0           my $sock = $self->_get_sock_for_node($node);
462              
463 0 0         return unless $sock;
464              
465 0           my $resp = $self->send_msg(0x32, undef, $sock);
466 0 0         $self->_delete_sock_for_node($node) unless $resp;
467              
468 0           return $resp;
469             }
470              
471             sub sts {
472 0     0 0   my $self = shift;
473             # XXX - deprecated
474 0           return $self->status(@_);
475             }
476              
477             1;
478             __END__