File Coverage

blib/lib/Shardcache/Client.pm
Criterion Covered Total %
statement 12 178 6.7
branch 0 70 0.0
condition 0 18 0.0
subroutine 4 16 25.0
pod 0 10 0.0
total 16 292 5.4


line stmt bran cond sub pod time code
1             package Shardcache::Client;
2              
3 1     1   55857 use strict;
  1         3  
  1         81  
4 1     1   1298 use IO::Socket::INET;
  1         31504  
  1         12  
5 1     1   1881 use Digest::SipHash qw/siphash64/;
  1         1544  
  1         71  
6 1     1   967 use Algorithm::ConsistentHash::CHash;
  1         583  
  1         2526  
7              
8             our $VERSION = "0.02";
9              
10             sub new {
11 0     0 0   my ($class, $host, $secret) = @_;
12              
13 0 0         croak("The host parameter is mandatory!")
14             unless($host);
15              
16 0 0         $secret = '' unless($secret);
17              
18 0           my $self = {
19             _secret => $secret,
20             _nodes => [],
21             };
22              
23              
24 0 0 0       if (ref($host) && ref($host) eq "ARRAY") {
25 0           $self->{_port} = [];
26 0           foreach my $h (@$host) {
27 0 0         if ($h !~ /[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+(:[0-9]+)?/) {
28 0           die "Invalid host string $h";
29             }
30 0           my ($label, $addr, $port) = split(':', $h);
31 0           push(@{$self->{_nodes}}, {
  0            
32             label => $label,
33             addr => $addr,
34             port => $port });
35             }
36 0           $self->{_chash} = Algorithm::ConsistentHash::CHash->new(
37 0           ids => [map { $_->{label} } @{$self->{_nodes}} ],
  0            
38             replicas => 200);
39             } else {
40 0 0         if ($host !~ /[a-zA-Z0-9_\.]+:[a-zA-Z0-9_\.]+(:[0-9]+)?/) {
41 0           die "Invalid host string $host";
42             }
43 0           my ($addr, $port) = split(':', $host);
44 0           push(@{$self->{_nodes}}, {
  0            
45             addr => $addr,
46             port => $port
47             });
48             }
49              
50 0           bless $self, $class;
51            
52 0           return $self;
53             }
54              
55             sub _chunkize_var {
56 0     0     my ($var) = @_;
57 0           my $templ;
58             my @vars;
59 0           my $vlen = length($var);
60 0           while ($vlen > 0) {
61 0 0         if ($vlen <= 65535) {
62 0           $templ .= "na$vlen";
63 0           push(@vars, $vlen, $var);
64 0           $vlen = 0;
65             } else {
66 0           $templ .= "na65535";
67 0           my $substr = substr($var, 0, 65535, "");
68 0           $vlen = length($var);
69 0           push(@vars, 65535, $substr);
70             }
71             }
72 0           return pack $templ, @vars;
73             }
74              
75             sub send_msg {
76 0     0 0   my ($self, $hdr, $key, $value, $expire, $sock) = @_;
77              
78 0           my $templ = "C";
79 0           my @vars = ($hdr);
80              
81 0 0         if ($key) {
82 0           my $kbuf = _chunkize_var($key);
83 0           $templ .= sprintf "a%dCC", length($kbuf);
84 0           push @vars, $kbuf, 0x00, 0x00;
85             } else {
86 0           $templ .= "CC";
87 0           push @vars, 0x00, 0x00;
88             }
89              
90 0 0         if ($value) {
91 0           $templ .= "C";
92 0           push @vars, 0x80;
93              
94 0           my $vbuf = _chunkize_var($value);
95              
96 0           $templ .= sprintf "a%dCC", length($vbuf);
97 0           push @vars, $vbuf, 0x00, 0x00;
98 0 0         if ($expire) {
99 0           $templ .= "CCCNCC";
100 0           push @vars, 0x80, 0x00, 0x04, $expire, 0x00, 0x00;
101             }
102             }
103              
104 0           $templ .= "C";
105 0           push @vars, 0x00;
106              
107 0           my $msg = pack $templ, @vars;
108              
109 0 0         if ($self->{_secret}) {
110 0           my $sig = siphash64($msg, pack("a16", $self->{_secret}));
111 0           $msg .= pack("Q", $sig);
112             }
113              
114 0           my $addr;
115             my $port;
116              
117 0 0         if (!$sock) {
118 0 0         if (@{$self->{_nodes}} == 1) {
  0            
119 0           $addr = $self->{_nodes}->[0]->{addr};
120 0           $port = $self->{_nodes}->[0]->{port};
121             } else {
122 0           my ($node) = grep { $_->{label} eq $self->{_chash}->lookup($key) } @{$self->{_nodes}};
  0            
  0            
123 0 0         if ($node) {
124 0           $addr = $node->{addr};
125 0           $port = $node->{port};
126             } else {
127 0           my $index = rand() % @{$self->{_nodes}};
  0            
128 0           $addr = $self->{_nodes}->[$index]->{addr};
129 0           $port = $self->{_nodes}->[$index]->{port};
130             }
131              
132             }
133            
134 0 0 0       if (!$self->{_sock}->{"$addr:$port"} || !$self->{_sock}->{"$addr:$port"}->connected ||
      0        
135             $self->{_sock}->{"$addr:$port"}->write(pack("C4", 0x90, 0x00, 0x00, 0x00)) != 4)
136             {
137 0           $self->{_sock}->{"$addr:$port"} = IO::Socket::INET->new(PeerAddr => $addr,
138             PeerPort => $port,
139             Proto => 'tcp');
140             }
141              
142 0           $sock = $self->{_sock}->{"$addr:$port"};
143             }
144            
145 0           my $wb = $sock->write($msg);
146            
147             # read the response
148 0           my $in;
149             my $data;
150              
151              
152             # read the header
153 0 0         if (read($sock, $data, 1) != 1) {
154 0           delete $self->{_sock}->{"$addr:$port"};
155 0           return undef;
156             }
157              
158 0           $in .= $data;
159              
160 0           my $stop = 0;
161 0           my $out;
162              
163             # read the records
164 0           while (!$stop) {
165 0 0         if (read($sock, $data, 2) != 2) {
166 0           return undef;
167             }
168 0           my ($len) = unpack("n", $data);
169 0           $in .= $data;
170 0           while ($len) {
171 0           my $rb = read($sock, $data, $len);
172 0 0         if ($rb <= 0) {
173 0           return undef;
174             }
175 0           $in .= $data;
176 0           $out .= $data;
177 0           $len -= $rb;
178 0 0         if ($len == 0) {
179 0 0         if (read($sock, $data, 2) != 2) {
180 0           return undef;
181             }
182 0           ($len) = unpack("n", $data);
183 0           $in .= $data;
184             }
185             }
186            
187 0 0         if (read($sock, $data, 1) != 1) {
188 0           return undef;
189             }
190 0           $in .= $data;
191 0           my ($sep) = unpack("C", $data);
192 0 0         $stop = 1 if ($sep == 0);
193             # TODO - should check if it's a correct rsep (0x80)
194             }
195              
196 0 0         if ($self->{_secret}) {
197             # now that we have the whole message, let's compute the signature
198             # (we know it's 8 bytes long and is the trailer of the message
199 0           my $signature = siphash64(substr($in, 0, length($in)), pack("a16", $self->{_secret}));
200              
201 0           my $csig = pack("Q", $signature);
202              
203 0           my $rb = read($sock, $data, 8);
204 0 0         if ($rb != 8) {
205 0           return undef;
206             }
207              
208             # $chunk now points at the signature
209 0 0         if ($csig ne $data) {
210 0           return undef;
211             }
212             }
213              
214 0           return $out;
215             }
216              
217             sub get {
218 0     0 0   my ($self, $key) = @_;
219 0 0         return unless $key;
220 0           return $self->send_msg(0x01, $key);
221             }
222              
223             sub set {
224 0     0 0   my ($self, $key, $value, $expire) = @_;
225 0 0 0       return unless $key && defined $value;
226 0           my $resp = $self->send_msg(0x02, $key, $value, $expire);
227 0   0       return (defined $resp && $resp eq "OK")
228             }
229              
230             sub del {
231 0     0 0   my ($self, $key) = @_;
232 0 0         return unless $key;
233 0           my $resp = $self->send_msg(0x03, $key);
234 0           return ($resp eq "OK")
235             }
236              
237             sub evi {
238 0     0 0   my ($self, $key) = @_;
239 0 0         return unless $key;
240 0           my $resp = $self->send_msg(0x04, $key);
241 0           return ($resp eq "OK")
242             }
243              
244             sub mgb {
245 0     0 0   my ($self, $key) = @_;
246 0 0         return unless $key;
247 0           my $resp = $self->send_msg(0x04, $key);
248 0           return ($resp eq "OK")
249             }
250              
251             sub mga {
252 0     0 0   my ($self, $key) = @_;
253 0 0         return unless $key;
254 0           my $resp = $self->send_msg(0x04, $key);
255 0           return ($resp eq "OK")
256             }
257              
258             sub _get_sock_for_peer {
259 0     0     my ($self, $peer) = @_;
260 0           my $addr;
261             my $port;
262              
263 0 0         if (@{$self->{_nodes}} == 1) {
  0            
264 0           $addr = $self->{_nodes}->[0]->{addr};
265 0           $port = $self->{_nodes}->[0]->{port};
266             } else {
267 0           my ($node) = grep { $_->{label} eq $peer } @{$self->{_nodes}};
  0            
  0            
268 0 0         if ($node) {
269 0           $addr = $node->{addr};
270 0           $port = $node->{port};
271             } else {
272 0           return undef;
273             }
274             }
275 0 0 0       if (!$self->{_sock}->{"$addr:$port"} || !$self->{_sock}->{"$addr:$port"}->connected) {
276 0           $self->{_sock}->{"$addr:$port"} = IO::Socket::INET->new(PeerAddr => $addr,
277             PeerPort => $port,
278             Proto => 'tcp');
279             }
280 0           return $self->{_sock}->{"$addr:$port"};
281             }
282              
283             sub chk {
284 0     0 0   my ($self, $peer) = @_;
285 0           my $sock = $self->_get_sock_for_peer($peer);
286              
287 0 0         return unless $sock;
288              
289 0           my $resp = $self->send_msg(0x31, undef, undef, undef, $sock);
290 0           return ($resp eq "OK");
291             }
292              
293             sub sts {
294 0     0 0   my ($self, $peer) = @_;
295              
296 0           my $sock = $self->_get_sock_for_peer($peer);
297              
298 0 0         return unless $sock;
299              
300 0           my $resp = $self->send_msg(0x32, undef, undef, undef, $sock);
301 0           return $resp;
302             }
303              
304             1;
305             __END__