File Coverage

blib/lib/Cache/Memcached.pm
Criterion Covered Total %
statement 226 575 39.3
branch 57 272 20.9
condition 25 123 20.3
subroutine 37 68 54.4
pod 19 30 63.3
total 364 1068 34.0


line stmt bran cond sub pod time code
1             # $Id$
2             #
3             # Copyright (c) 2003, 2004 Brad Fitzpatrick
4             #
5             # See COPYRIGHT section in pod text below for usage and distribution rights.
6             #
7              
8             package Cache::Memcached;
9              
10 9     9   320439 use strict;
  9         21  
  9         385  
11 9     9   77 use warnings;
  9         22  
  9         332  
12              
13 9     9   58 no strict 'refs';
  9         18  
  9         230  
14 9     9   21212 use Storable ();
  9         52534  
  9         296  
15 9     9   10495 use Socket qw( MSG_NOSIGNAL PF_INET PF_UNIX IPPROTO_TCP SOCK_STREAM );
  9         43865  
  9         2200  
16 9     9   9485 use IO::Handle ();
  9         76513  
  9         210  
17 9     9   9644 use Time::HiRes ();
  9         19375  
  9         264  
18 9     9   8064 use String::CRC32;
  9         5198  
  9         665  
19 9     9   7816 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
  9         10976  
  9         1386  
20 9     9   8279 use Cache::Memcached::GetParser;
  9         27  
  9         247  
21 9     9   9028 use Encode ();
  9         156273  
  9         366  
22 9         114 use fields qw{
23             debug no_rehash stats compress_threshold compress_enable stat_callback
24             readonly select_timeout namespace namespace_len servers active buckets
25             pref_ip
26             bucketcount _single_sock _stime
27             connect_timeout cb_connect_fail
28             parser_class
29             buck2sock buck2sock_generation
30 9     9   8822 };
  9         14483  
31              
32             # flag definitions
33 9     9   1499 use constant F_STORABLE => 1;
  9         18  
  9         605  
34 9     9   47 use constant F_COMPRESS => 2;
  9         18  
  9         351  
35              
36             # size savings required before saving compressed value
37 9     9   43 use constant COMPRESS_SAVINGS => 0.20; # percent
  9         13  
  9         347  
38              
39 9     9   51 use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL $HAVE_SOCKET6);
  9         14  
  9         845  
40             $VERSION = "1.30";
41              
42             BEGIN {
43 9     9   594 $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
  9     9   12084  
  9         731863  
  9         96  
44 9     9   583 $HAVE_SOCKET6 = eval "use Socket6 qw(AF_INET6 PF_INET6); 1;";
  9         23501  
  0         0  
  0         0  
45             }
46              
47 9     9   4242 my $HAVE_XS = eval "use Cache::Memcached::GetParserXS; 1;";
  0         0  
  0         0  
48             $HAVE_XS = 0 if $ENV{NO_XS};
49              
50             my $parser_class = $HAVE_XS ? "Cache::Memcached::GetParserXS" : "Cache::Memcached::GetParser";
51             if ($ENV{XS_DEBUG}) {
52             print "using parser: $parser_class\n";
53             }
54              
55             $FLAG_NOSIGNAL = 0;
56             eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
57              
58             my %host_dead; # host -> unixtime marked dead until
59             my %cache_sock; # host -> socket
60             my $socket_cache_generation = 1; # Set to 1 here because below the buck2sock_generation is set to 0, keep them in order.
61              
62             my $PROTO_TCP;
63              
64             our $SOCK_TIMEOUT = 2.6; # default timeout in seconds
65              
66             sub new {
67 2     2 1 3007403 my Cache::Memcached $self = shift;
68 2 50       152 $self = fields::new( $self ) unless ref $self;
69              
70 2 50       10365 my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
71              
72 2         8 $self->{'buck2sock'}= [];
73 2         6 $self->{'buck2sock_generation'} = 0;
74 2         13 $self->set_servers($args->{'servers'});
75 2   50     31 $self->{'debug'} = $args->{'debug'} || 0;
76 2         6 $self->{'no_rehash'} = $args->{'no_rehash'};
77 2         6 $self->{'stats'} = {};
78 2   50     20 $self->{'pref_ip'} = $args->{'pref_ip'} || {};
79 2         5 $self->{'compress_threshold'} = $args->{'compress_threshold'};
80 2         5 $self->{'compress_enable'} = 1;
81 2   50     22 $self->{'stat_callback'} = $args->{'stat_callback'} || undef;
82 2         6 $self->{'readonly'} = $args->{'readonly'};
83 2   33     21 $self->{'parser_class'} = $args->{'parser_class'} || $parser_class;
84              
85             # TODO: undocumented
86 2   50     17 $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
87 2   50     20 $self->{'select_timeout'} = $args->{'select_timeout'} || 1.0;
88 2   100     21 $self->{namespace} = $args->{namespace} || '';
89 2         11 $self->{namespace_len} = length $self->{namespace};
90              
91 2         10 return $self;
92             }
93              
94             sub set_pref_ip {
95 0     0 0 0 my Cache::Memcached $self = shift;
96 0         0 $self->{'pref_ip'} = shift;
97             }
98              
99             sub set_servers {
100 2     2 1 5 my Cache::Memcached $self = shift;
101 2         6 my ($list) = @_;
102 2   50     12 $self->{'servers'} = $list || [];
103 2         4 $self->{'active'} = scalar @{$self->{'servers'}};
  2         8  
104 2         5 $self->{'buckets'} = undef;
105 2         13 $self->{'bucketcount'} = 0;
106 2         33 $self->init_buckets;
107              
108             # We didn't close any sockets, so we reset the buck2sock generation, not increment the global socket cache generation.
109 2         4 $self->{'buck2sock_generation'} = 0;
110              
111 2         4 $self->{'_single_sock'} = undef;
112 2 50       4 if (@{$self->{'servers'}} == 1) {
  2         11  
113 2         5 $self->{'_single_sock'} = $self->{'servers'}[0];
114             }
115              
116 2         4 return $self;
117             }
118              
119             sub set_cb_connect_fail {
120 0     0 0 0 my Cache::Memcached $self = shift;
121 0         0 $self->{'cb_connect_fail'} = shift;
122             }
123              
124             sub set_connect_timeout {
125 0     0 1 0 my Cache::Memcached $self = shift;
126 0         0 $self->{'connect_timeout'} = shift;
127             }
128              
129             sub set_debug {
130 0     0 1 0 my Cache::Memcached $self = shift;
131 0         0 my ($dbg) = @_;
132 0   0     0 $self->{'debug'} = $dbg || 0;
133             }
134              
135             sub set_readonly {
136 0     0 1 0 my Cache::Memcached $self = shift;
137 0         0 my ($ro) = @_;
138 0         0 $self->{'readonly'} = $ro;
139             }
140              
141             sub set_norehash {
142 0     0 1 0 my Cache::Memcached $self = shift;
143 0         0 my ($val) = @_;
144 0         0 $self->{'no_rehash'} = $val;
145             }
146              
147             sub set_compress_threshold {
148 0     0 1 0 my Cache::Memcached $self = shift;
149 0         0 my ($thresh) = @_;
150 0         0 $self->{'compress_threshold'} = $thresh;
151             }
152              
153             sub enable_compress {
154 0     0 1 0 my Cache::Memcached $self = shift;
155 0         0 my ($enable) = @_;
156 0         0 $self->{'compress_enable'} = $enable;
157             }
158              
159             sub forget_dead_hosts {
160 0     0 0 0 my Cache::Memcached $self = shift;
161 0         0 %host_dead = ();
162              
163             # We need to globally recalculate our buck2sock in all objects, so we increment the global generation.
164 0         0 $socket_cache_generation++;
165              
166 0         0 return 1;
167             }
168              
169             sub set_stat_callback {
170 0     0 0 0 my Cache::Memcached $self = shift;
171 0         0 my ($stat_callback) = @_;
172 0         0 $self->{'stat_callback'} = $stat_callback;
173             }
174              
175             my %sock_map; # stringified-$sock -> "$ip:$port"
176              
177             sub _dead_sock {
178 1     1   4 my ($self, $sock, $ret, $dead_for) = @_;
179 1 50       9 if (my $ipport = $sock_map{$sock}) {
180 1         16 my $now = time();
181 1 50       9 $host_dead{$ipport} = $now + $dead_for
182             if $dead_for;
183 1         4 delete $cache_sock{$ipport};
184 1         4 delete $sock_map{$sock};
185             }
186             # We need to globally recalculate our buck2sock in all objects, so we increment the global generation.
187 1         3 $socket_cache_generation++;
188              
189 1         95 return $ret; # 0 or undef, probably, depending on what caller wants
190             }
191              
192             sub _close_sock {
193 0     0   0 my ($self, $sock) = @_;
194 0 0       0 if (my $ipport = $sock_map{$sock}) {
195 0         0 close $sock;
196 0         0 delete $cache_sock{$ipport};
197 0         0 delete $sock_map{$sock};
198             }
199              
200             # We need to globally recalculate our buck2sock in all objects, so we increment the global generation.
201 0         0 $socket_cache_generation++;
202              
203 0         0 return 1;
204             }
205              
206             sub _connect_sock { # sock, sin, timeout
207 2     2   6 my ($sock, $sin, $timeout) = @_;
208 2 50       7 $timeout = 0.25 if not defined $timeout;
209              
210             # make the socket non-blocking from now on,
211             # except if someone wants 0 timeout, meaning
212             # a blocking connect, but even then turn it
213             # non-blocking at the end of this function
214              
215 2 50       6 if ($timeout) {
216 2         19 IO::Handle::blocking($sock, 0);
217             } else {
218 0         0 IO::Handle::blocking($sock, 1);
219             }
220              
221 2         294 my $ret = connect($sock, $sin);
222              
223 2 50 33     82 if (!$ret && $timeout && $!==EINPROGRESS) {
      33        
224              
225 2         10 my $win='';
226 2         9 vec($win, fileno($sock), 1) = 1;
227              
228 2 100       250355 if (select(undef, $win, undef, $timeout) > 0) {
229 1         5 $ret = connect($sock, $sin);
230             # EISCONN means connected & won't re-connect, so success
231 1 50 33     5 $ret = 1 if !$ret && $!==EISCONN;
232             }
233             }
234              
235 2 50       20 unless ($timeout) { # socket was temporarily blocking, now revert
236 0         0 IO::Handle::blocking($sock, 0);
237             }
238              
239             # from here on, we use non-blocking (async) IO for the duration
240             # of the socket's life
241              
242 2         13 return $ret;
243             }
244              
245             sub sock_to_host { # (host) #why is this public? I wouldn't have to worry about undef $self if it weren't.
246 9 50   9 0 28 my Cache::Memcached $self = ref $_[0] ? shift : undef;
247 9         13 my $host = $_[0];
248 9 100       34 return $cache_sock{$host} if $cache_sock{$host};
249              
250 3         11 my $now = time();
251 3         34 my ($ip, $port) = $host =~ /(.*):(\d+)$/;
252 3 50       18 if (defined($ip)) {
253 3         10 $ip =~ s/[\[\]]//g; # get rid of optional IPv6 brackets
254             }
255              
256             return undef if
257 3 100 66     59 $host_dead{$host} && $host_dead{$host} > $now;
258 2         4 my $sock;
259              
260 2         9 my $connected = 0;
261 2         12 my $sin;
262 2   33     1688 my $proto = $PROTO_TCP ||= getprotobyname('tcp');
263              
264 2 50       15 if ( index($host, '/') != 0 )
265             {
266             # if a preferred IP is known, try that first.
267 2 50 33     68 if ($self && $self->{pref_ip}{$ip}) {
268 0         0 my $prefip = $self->{pref_ip}{$ip};
269 0 0 0     0 if ($HAVE_SOCKET6 && index($prefip, ':') != -1) {
270 9     9   58 no strict 'subs'; # for PF_INET6 and AF_INET6, weirdly imported
  9         17  
  9         2094  
271 0         0 socket($sock, PF_INET6, SOCK_STREAM, $proto);
272 0         0 $sock_map{$sock} = $host;
273 0         0 $sin = Socket6::pack_sockaddr_in6($port,
274             Socket6::inet_pton(AF_INET6, $prefip));
275             } else {
276 0         0 socket($sock, PF_INET, SOCK_STREAM, $proto);
277 0         0 $sock_map{$sock} = $host;
278 0         0 $sin = Socket::sockaddr_in($port, Socket::inet_aton($prefip));
279             }
280              
281 0 0       0 if (_connect_sock($sock,$sin,$self->{connect_timeout})) {
282 0         0 $connected = 1;
283             } else {
284 0 0       0 if (my $cb = $self->{cb_connect_fail}) {
285 0         0 $cb->($prefip);
286             }
287 0         0 close $sock;
288             }
289             }
290              
291             # normal path, or fallback path if preferred IP failed
292 2 50       9 unless ($connected) {
293 2 50 33     16 if ($HAVE_SOCKET6 && index($ip, ':') != -1) {
294 9     9   49 no strict 'subs'; # for PF_INET6 and AF_INET6, weirdly imported
  9         15  
  9         17000  
295 0         0 socket($sock, PF_INET6, SOCK_STREAM, $proto);
296 0         0 $sock_map{$sock} = $host;
297 0         0 $sin = Socket6::pack_sockaddr_in6($port,
298             Socket6::inet_pton(AF_INET6, $ip));
299             } else {
300 2         83 socket($sock, PF_INET, SOCK_STREAM, $proto);
301 2         95 $sock_map{$sock} = $host;
302 2         49 $sin = Socket::sockaddr_in($port, Socket::inet_aton($ip));
303             }
304              
305 2 50       34 my $timeout = $self ? $self->{connect_timeout} : 0.25;
306 2 100       8 unless (_connect_sock($sock, $sin, $timeout)) {
307 1 50       10 my $cb = $self ? $self->{cb_connect_fail} : undef;
308 1 50       4 $cb->($ip) if $cb;
309 1         83 return _dead_sock($self, $sock, undef, 20 + int(rand(10)));
310             }
311             }
312             } else { # it's a unix domain/local socket
313 0         0 socket($sock, PF_UNIX, SOCK_STREAM, 0);
314 0         0 $sock_map{$sock} = $host;
315 0         0 $sin = Socket::sockaddr_un($host);
316 0 0       0 my $timeout = $self ? $self->{connect_timeout} : 0.25;
317 0 0       0 unless (_connect_sock($sock,$sin,$timeout)) {
318 0 0       0 my $cb = $self ? $self->{cb_connect_fail} : undef;
319 0 0       0 $cb->($host) if $cb;
320 0         0 return _dead_sock($self, $sock, undef, 20 + int(rand(10)));
321             }
322             }
323              
324             # make the new socket not buffer writes.
325 1         6 my $old = select($sock);
326 1         7 $| = 1;
327 1         3 select($old);
328              
329 1         4 $cache_sock{$host} = $sock;
330              
331 1         2 return $sock;
332             }
333              
334             sub get_sock { # (key)
335 2     2 0 5 my Cache::Memcached $self = $_[0];
336 2         4 my $key = $_[1];
337 2 50       13 return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
338 0 0       0 return undef unless $self->{'active'};
339 0 0       0 my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
340              
341 0 0       0 my $real_key = ref $key ? $key->[1] : $key;
342 0         0 my $tries = 0;
343 0         0 while ($tries++ < 20) {
344 0         0 my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
345 0         0 my $sock = $self->sock_to_host($host);
346 0 0       0 return $sock if $sock;
347 0 0       0 return undef if $self->{'no_rehash'};
348 0         0 $hv += _hashfunc($tries . $real_key); # stupid, but works
349             }
350 0         0 return undef;
351             }
352              
353             sub init_buckets {
354 2     2 0 16 my Cache::Memcached $self = shift;
355 2 50       11 return if $self->{'buckets'};
356 2         5 my $bu = $self->{'buckets'} = [];
357 2         5 foreach my $v (@{$self->{'servers'}}) {
  2         13  
358 2 50       9 if (ref $v eq "ARRAY") {
359 0         0 for (1..$v->[1]) { push @$bu, $v->[0]; }
  0         0  
360             } else {
361 2         7 push @$bu, $v;
362             }
363             }
364 2         4 $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
  2         7  
365             }
366              
367             sub disconnect_all {
368 0     0 1 0 my Cache::Memcached $self = shift;
369 0         0 my $sock;
370 0         0 foreach $sock (values %cache_sock) {
371 0         0 close $sock;
372             }
373 0         0 %cache_sock = ();
374              
375             # We need to globally recalculate our buck2sock in all objects, so we increment the global generation.
376 0         0 $socket_cache_generation++;
377             }
378              
379             # writes a line, then reads result. by default stops reading after a
380             # single line, but caller can override the $check_complete subref,
381             # which gets passed a scalarref of buffer read thus far.
382             sub _write_and_read {
383 7     7   10 my Cache::Memcached $self = shift;
384 7         13 my ($sock, $line, $check_complete) = @_;
385 7         8 my $res;
386 7         12 my ($ret, $offset) = (undef, 0);
387              
388             $check_complete ||= sub {
389 7     7   31 return (rindex($ret, "\r\n") + 2 == length($ret));
390 7   50     64 };
391              
392             # state: 0 - writing, 1 - reading, 2 - done
393 7         9 my $state = 0;
394              
395             # the bitsets for select
396 7         9 my ($rin, $rout, $win, $wout);
397 0         0 my $nfound;
398              
399 7         9 my $copy_state = -1;
400 7 50       18 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
401              
402             # the select loop
403 7         10 while(1) {
404 21 50       52 if ($copy_state!=$state) {
405 21 100       47 last if $state==2;
406 14         26 ($rin, $win) = ('', '');
407 14 100       40 vec($rin, fileno($sock), 1) = 1 if $state==1;
408 14 100       48 vec($win, fileno($sock), 1) = 1 if $state==0;
409 14         22 $copy_state = $state;
410             }
411 14         371 $nfound = select($rout=$rin, $wout=$win, undef,
412             $self->{'select_timeout'});
413 14 50       38 last unless $nfound;
414              
415 14 100       36 if (vec($wout, fileno($sock), 1)) {
416 7         443 $res = send($sock, $line, $FLAG_NOSIGNAL);
417             next
418 7 50 33     26 if not defined $res and $!==EWOULDBLOCK;
419 7 50       16 unless ($res > 0) {
420 0         0 $self->_close_sock($sock);
421 0         0 return undef;
422             }
423 7 50       16 if ($res == length($line)) { # all sent
424 7         12 $state = 1;
425             } else { # we only succeeded in sending some of it
426 0         0 substr($line, 0, $res, ''); # delete the part we sent
427             }
428             }
429              
430 14 100       40 if (vec($rout, fileno($sock), 1)) {
431 7         52 $res = sysread($sock, $ret, 255, $offset);
432             next
433 7 50 33     24 if !defined($res) and $!==EWOULDBLOCK;
434 7 50       18 if ($res == 0) { # catches 0=conn closed or undef=error
435 0         0 $self->_close_sock($sock);
436 0         0 return undef;
437             }
438 7         8 $offset += $res;
439 7 50       19 $state = 2 if $check_complete->(\$ret);
440             }
441             }
442              
443 7 50       15 unless ($state == 2) {
444 0         0 $self->_dead_sock($sock); # improperly finished
445 0         0 return undef;
446             }
447              
448 7         53 return $ret;
449             }
450              
451             sub delete {
452 0     0 1 0 my Cache::Memcached $self = shift;
453 0         0 my ($key, $time) = @_;
454 0 0 0     0 return 0 if ! $self->{'active'} || $self->{'readonly'};
455 0 0       0 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
456 0         0 my $sock = $self->get_sock($key);
457 0 0       0 return 0 unless $sock;
458              
459 0         0 $self->{'stats'}->{"delete"}++;
460 0 0       0 $key = ref $key ? $key->[1] : $key;
461 0 0       0 $time = $time ? " $time" : "";
462 0         0 my $cmd = "delete $self->{namespace}$key$time\r\n";
463 0         0 my $res = _write_and_read($self, $sock, $cmd);
464              
465 0 0       0 if ($self->{'stat_callback'}) {
466 0         0 my $etime = Time::HiRes::time();
467 0         0 $self->{'stat_callback'}->($stime, $etime, $sock, 'delete');
468             }
469              
470 0   0     0 return defined $res && $res eq "DELETED\r\n";
471             }
472             *remove = \&delete;
473              
474             sub add {
475 0     0 1 0 _set("add", @_);
476             }
477              
478             sub replace {
479 0     0 1 0 _set("replace", @_);
480             }
481              
482             sub set {
483 2     2 1 930 _set("set", @_);
484             }
485              
486             sub append {
487 0     0 0 0 _set("append", @_);
488             }
489              
490             sub prepend {
491 0     0 0 0 _set("prepend", @_);
492             }
493              
494             sub _set {
495 2     2   4 my $cmdname = shift;
496 2         5 my Cache::Memcached $self = shift;
497 2         5 my ($key, $val, $exptime) = @_;
498 2 50 33     19 return 0 if ! $self->{'active'} || $self->{'readonly'};
499 2 50       8 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
500 2         9 my $sock = $self->get_sock($key);
501 2 50       14 return 0 unless $sock;
502              
503 9     9   71 use bytes; # return bytes from length()
  9         17  
  9         83  
504              
505 0 0 0     0 my $app_or_prep = $cmdname eq 'append' || $cmdname eq 'prepend' ? 1 : 0;
506 0         0 $self->{'stats'}->{$cmdname}++;
507 0         0 my $flags = 0;
508 0 0       0 $key = ref $key ? $key->[1] : $key;
509              
510 0 0       0 if (ref $val) {
511 0 0       0 die "append or prepend cannot take a reference" if $app_or_prep;
512 0         0 local $Carp::CarpLevel = 2;
513 0         0 $val = Storable::nfreeze($val);
514 0         0 $flags |= F_STORABLE;
515             }
516 0 0       0 warn "value for memkey:$key is not defined" unless defined $val;
517              
518 0         0 my $len = length($val);
519              
520 0 0 0     0 if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
      0        
      0        
      0        
521             $len >= $self->{'compress_threshold'} && !$app_or_prep) {
522              
523 0         0 my $c_val = Compress::Zlib::memGzip($val);
524 0         0 my $c_len = length($c_val);
525              
526             # do we want to keep it?
527 0 0       0 if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
528 0         0 $val = $c_val;
529 0         0 $len = $c_len;
530 0         0 $flags |= F_COMPRESS;
531             }
532             }
533              
534 0   0     0 $exptime = int($exptime || 0);
535              
536 0 0       0 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
537 0         0 my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
538              
539 0         0 my $res = _write_and_read($self, $sock, $line);
540              
541 0 0 0     0 if ($self->{'debug'} && $line) {
542 0         0 chop $line; chop $line;
  0         0  
543 0         0 print STDERR "Cache::Memcache: $cmdname $self->{namespace}$key = $val ($line)\n";
544             }
545              
546 0 0       0 if ($self->{'stat_callback'}) {
547 0         0 my $etime = Time::HiRes::time();
548 0         0 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
549             }
550              
551 0   0     0 return defined $res && $res eq "STORED\r\n";
552             }
553              
554             sub incr {
555 0     0 1 0 _incrdecr("incr", @_);
556             }
557              
558             sub decr {
559 0     0 1 0 _incrdecr("decr", @_);
560             }
561              
562             sub _incrdecr {
563 0     0   0 my $cmdname = shift;
564 0         0 my Cache::Memcached $self = shift;
565 0         0 my ($key, $value) = @_;
566 0 0 0     0 return undef if ! $self->{'active'} || $self->{'readonly'};
567 0 0       0 my $stime = Time::HiRes::time() if $self->{'stat_callback'};
568 0         0 my $sock = $self->get_sock($key);
569 0 0       0 return undef unless $sock;
570 0 0       0 $key = $key->[1] if ref $key;
571 0         0 $self->{'stats'}->{$cmdname}++;
572 0 0       0 $value = 1 unless defined $value;
573              
574 0         0 my $line = "$cmdname $self->{namespace}$key $value\r\n";
575 0         0 my $res = _write_and_read($self, $sock, $line);
576              
577 0 0       0 if ($self->{'stat_callback'}) {
578 0         0 my $etime = Time::HiRes::time();
579 0         0 $self->{'stat_callback'}->($stime, $etime, $sock, $cmdname);
580             }
581              
582 0 0 0     0 return undef unless defined $res && $res =~ /^(\d+)/;
583 0         0 return $1;
584             }
585              
586             sub get {
587 0     0 1 0 my Cache::Memcached $self = $_[0];
588 0         0 my $key = $_[1];
589              
590             # TODO: make a fast path for this? or just keep using get_multi?
591 0         0 my $r = $self->get_multi($key);
592 0 0       0 my $kval = ref $key ? $key->[1] : $key;
593              
594             # key reconstituted from server won't have utf8 on, so turn it off on input
595             # scalar to allow hash lookup to succeed
596 0 0       0 Encode::_utf8_off($kval) if Encode::is_utf8($kval);
597              
598 0         0 return $r->{$kval};
599             }
600              
601             sub get_multi {
602 0     0 1 0 my Cache::Memcached $self = shift;
603 0 0       0 return {} unless $self->{'active'};
604 0 0       0 $self->{'_stime'} = Time::HiRes::time() if $self->{'stat_callback'};
605 0         0 $self->{'stats'}->{"get_multi"}++;
606              
607 0         0 my %val; # what we'll be returning a reference to (realkey -> value)
608             my %sock_keys; # sockref_as_scalar -> [ realkeys ]
609 0         0 my $sock;
610              
611 0 0       0 if ($self->{'_single_sock'}) {
612 0         0 $sock = $self->sock_to_host($self->{'_single_sock'});
613 0 0       0 unless ($sock) {
614 0         0 return {};
615             }
616 0         0 foreach my $key (@_) {
617 0 0       0 my $kval = ref $key ? $key->[1] : $key;
618 0         0 push @{$sock_keys{$sock}}, $kval;
  0         0  
619             }
620             } else {
621 0         0 my $bcount = $self->{'bucketcount'};
622 0         0 my $sock;
623              
624 0 0       0 if ($self->{'buck2sock_generation'} != $socket_cache_generation) {
625 0         0 $self->{'buck2sock_generation'} = $socket_cache_generation;
626 0         0 $self->{'buck2sock'} = [];
627             }
628              
629             KEY:
630 0         0 foreach my $key (@_) {
631 0 0       0 my ($hv, $real_key) = ref $key ?
632             (int($key->[0]), $key->[1]) :
633             ((crc32($key) >> 16) & 0x7fff, $key);
634              
635 0         0 my $tries;
636 0         0 while (1) {
637 0         0 my $bucket = $hv % $bcount;
638              
639             # this segfaults perl 5.8.4 (and others?) if sock_to_host returns undef... wtf?
640             #$sock = $buck2sock[$bucket] ||= $self->sock_to_host($self->{buckets}[ $bucket ])
641             # and last;
642              
643             # but this variant doesn't crash:
644 0   0     0 $sock = $self->{'buck2sock'}->[$bucket] || $self->sock_to_host($self->{buckets}[ $bucket ]);
645 0 0       0 if ($sock) {
646 0         0 $self->{'buck2sock'}->[$bucket] = $sock;
647 0         0 last;
648             }
649              
650 0 0       0 next KEY if $tries++ >= 20;
651 0         0 $hv += _hashfunc($tries . $real_key);
652             }
653              
654 0         0 push @{$sock_keys{$sock}}, $real_key;
  0         0  
655             }
656             }
657              
658 0         0 $self->{'stats'}->{"get_keys"} += @_;
659 0         0 $self->{'stats'}->{"get_socks"} += keys %sock_keys;
660              
661 0 0       0 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
662              
663 0         0 _load_multi($self, \%sock_keys, \%val);
664              
665 0 0       0 if ($self->{'debug'}) {
666 0         0 while (my ($k, $v) = each %val) {
667 0         0 print STDERR "MemCache: got $k = $v\n";
668             }
669             }
670 0         0 return \%val;
671             }
672              
673             sub _load_multi {
674 9     9   15524 use bytes; # return bytes from length()
  9         18  
  9         43  
675 0     0   0 my Cache::Memcached $self;
676 0         0 my ($sock_keys, $ret);
677              
678 0         0 ($self, $sock_keys, $ret) = @_;
679              
680             # all keyed by $sockstr:
681 0         0 my %reading; # $sockstr -> $sock. bool, whether we're reading from this socket
682             my %writing; # $sockstr -> $sock. bool, whether we're writing to this socket
683 0         0 my %buf; # buffers, for writing
684              
685 0         0 my %parser; # $sockstr -> Cache::Memcached::GetParser
686              
687 0         0 my $active_changed = 1; # force rebuilding of select sets
688              
689             my $dead = sub {
690 0     0   0 my $sock = shift;
691 0 0       0 print STDERR "killing socket $sock\n" if $self->{'debug'} >= 2;
692 0         0 delete $reading{$sock};
693 0         0 delete $writing{$sock};
694              
695 0 0       0 if (my $p = $parser{$sock}) {
696 0         0 my $key = $p->current_key;
697 0 0       0 delete $ret->{$key} if $key;
698             }
699              
700 0 0       0 if ($self->{'stat_callback'}) {
701 0         0 my $etime = Time::HiRes::time();
702 0         0 $self->{'stat_callback'}->($self->{'_stime'}, $etime, $sock, 'get_multi');
703             }
704              
705 0         0 close $sock;
706 0         0 $self->_dead_sock($sock);
707 0         0 };
708              
709             # $finalize->($key, $flags)
710             # $finalize->({ $key => $flags, $key => $flags });
711             my $finalize = sub {
712 0     0   0 my $map = $_[0];
713 0 0       0 $map = {@_} unless ref $map;
714              
715 0         0 while (my ($k, $flags) = each %$map) {
716              
717             # remove trailing \r\n
718 0         0 chop $ret->{$k}; chop $ret->{$k};
  0         0  
719              
720 0 0 0     0 $ret->{$k} = Compress::Zlib::memGunzip($ret->{$k})
721             if $HAVE_ZLIB && $flags & F_COMPRESS;
722 0 0       0 if ($flags & F_STORABLE) {
723             # wrapped in eval in case a perl 5.6 Storable tries to
724             # unthaw data from a perl 5.8 Storable. (5.6 is stupid
725             # and dies if the version number changes at all. in 5.8
726             # they made it only die if it unencounters a new feature)
727 0         0 eval {
728 0         0 $ret->{$k} = Storable::thaw($ret->{$k});
729             };
730             # so if there was a problem, just treat it as a cache miss.
731 0 0       0 if ($@) {
732 0         0 delete $ret->{$k};
733             }
734             }
735             }
736 0         0 };
737              
738 0         0 foreach (keys %$sock_keys) {
739 0 0       0 my $ipport = $sock_map{$_} or die "No map found matching for $_";
740 0 0       0 my $sock = $cache_sock{$ipport} or die "No sock found for $ipport";
741 0 0       0 print STDERR "processing socket $_\n" if $self->{'debug'} >= 2;
742 0         0 $writing{$_} = $sock;
743 0 0       0 if ($self->{namespace}) {
744 0         0 $buf{$_} = join(" ", 'get', (map { "$self->{namespace}$_" } @{$sock_keys->{$_}}), "\r\n");
  0         0  
  0         0  
745             } else {
746 0         0 $buf{$_} = join(" ", 'get', @{$sock_keys->{$_}}, "\r\n");
  0         0  
747             }
748              
749 0         0 $parser{$_} = $self->{parser_class}->new($ret, $self->{namespace_len}, $finalize);
750             }
751              
752             my $read = sub {
753 0     0   0 my $sockstr = "$_[0]"; # $sock is $_[0];
754 0 0       0 my $p = $parser{$sockstr} or die;
755 0         0 my $rv = $p->parse_from_sock($_[0]);
756 0 0       0 if ($rv > 0) {
    0          
757             # okay, finished with this socket
758 0         0 delete $reading{$sockstr};
759             } elsif ($rv < 0) {
760 0         0 $dead->($_[0]);
761             }
762 0         0 return $rv;
763 0         0 };
764              
765             # returns 1 when it's done, for success or error. 0 if still working.
766             my $write = sub {
767 0     0   0 my ($sock, $sockstr) = ($_[0], "$_[0]");
768 0         0 my $res;
769              
770 0         0 $res = send($sock, $buf{$sockstr}, $FLAG_NOSIGNAL);
771              
772 0 0 0     0 return 0
773             if not defined $res and $!==EWOULDBLOCK;
774 0 0       0 unless ($res > 0) {
775 0         0 $dead->($sock);
776 0         0 return 1;
777             }
778 0 0       0 if ($res == length($buf{$sockstr})) { # all sent
779 0         0 $buf{$sockstr} = "";
780              
781             # switch the socket from writing to reading
782 0         0 delete $writing{$sockstr};
783 0         0 $reading{$sockstr} = $sock;
784 0         0 return 1;
785             } else { # we only succeeded in sending some of it
786 0         0 substr($buf{$sockstr}, 0, $res, ''); # delete the part we sent
787             }
788 0         0 return 0;
789 0         0 };
790              
791             # the bitsets for select
792 0         0 my ($rin, $rout, $win, $wout);
793 0         0 my $nfound;
794              
795             # the big select loop
796 0         0 while(1) {
797 0 0       0 if ($active_changed) {
798 0 0 0     0 last unless %reading or %writing; # no sockets left?
799 0         0 ($rin, $win) = ('', '');
800 0         0 foreach (values %reading) {
801 0         0 vec($rin, fileno($_), 1) = 1;
802             }
803 0         0 foreach (values %writing) {
804 0         0 vec($win, fileno($_), 1) = 1;
805             }
806 0         0 $active_changed = 0;
807             }
808             # TODO: more intelligent cumulative timeout?
809             # TODO: select is interruptible w/ ptrace attach, signal, etc. should note that.
810 0         0 $nfound = select($rout=$rin, $wout=$win, undef,
811             $self->{'select_timeout'});
812 0 0       0 last unless $nfound;
813              
814             # TODO: possible robustness improvement: we could select
815             # writing sockets for reading also, and raise hell if they're
816             # ready (input unread from last time, etc.)
817             # maybe do that on the first loop only?
818 0         0 foreach (values %writing) {
819 0 0       0 if (vec($wout, fileno($_), 1)) {
820 0 0       0 $active_changed = 1 if $write->($_);
821             }
822             }
823 0         0 foreach (values %reading) {
824 0 0       0 if (vec($rout, fileno($_), 1)) {
825 0 0       0 $active_changed = 1 if $read->($_);
826             }
827             }
828             }
829              
830             # if there're active sockets left, they need to die
831 0         0 foreach (values %writing) {
832 0         0 $dead->($_);
833             }
834 0         0 foreach (values %reading) {
835 0         0 $dead->($_);
836             }
837              
838 0         0 return;
839             }
840              
841             sub _hashfunc {
842 0     0   0 return (crc32($_[0]) >> 16) & 0x7fff;
843             }
844              
845             sub flush_all {
846 7     7 1 4117 my Cache::Memcached $self = shift;
847              
848 7         12 my $success = 1;
849              
850 7         9 my @hosts = @{$self->{'buckets'}};
  7         20  
851 7         20 foreach my $host (@hosts) {
852 7         25 my $sock = $self->sock_to_host($host);
853 7         26 my @res = $self->run_command($sock, "flush_all\r\n");
854 7 100 50     79 $success = 0 unless (scalar @res == 1 && (($res[0] || "") eq "OK\r\n"));
      100        
855             }
856              
857 7         65 return $success;
858             }
859              
860             # returns array of lines, or () on failure.
861             sub run_command {
862 7     7 0 11 my Cache::Memcached $self = shift;
863 7         11 my ($sock, $cmd) = @_;
864 7 50       23 return () unless $sock;
865 7         7 my $ret;
866 7         10 my $line = $cmd;
867 7         16 while (my $res = _write_and_read($self, $sock, $line)) {
868 7         10 undef $line;
869 7         10 $ret .= $res;
870 7 50       66 last if $ret =~ /(?:OK|END|ERROR)\r\n$/;
871             }
872 7         12 chop $ret; chop $ret;
  7         8  
873 7         21 return map { "$_\r\n" } split(/\r\n/, $ret);
  13         41  
874             }
875              
876             sub stats {
877 0     0 1   my Cache::Memcached $self = shift;
878 0           my ($types) = @_;
879 0 0         return 0 unless $self->{'active'};
880 0 0 0       return 0 unless !ref($types) || ref($types) eq 'ARRAY';
881 0 0         if (!ref($types)) {
882 0 0         if (!$types) {
883             # I don't much care what the default is, it should just
884             # be something reasonable. Obviously "reset" should not
885             # be on the list :) but other types that might go in here
886             # include maps, cachedump, slabs, or items. Note that
887             # this does NOT include 'sizes' anymore, as that can freeze
888             # bug servers for a couple seconds.
889 0           $types = [ qw( misc malloc self ) ];
890             } else {
891 0           $types = [ $types ];
892             }
893             }
894              
895 0           my $stats_hr = { };
896              
897             # The "self" stat type is special, it only applies to this very
898             # object.
899 0 0         if (grep /^self$/, @$types) {
900 0           $stats_hr->{'self'} = \%{ $self->{'stats'} };
  0            
901             }
902              
903 0           my %misc_keys = map { $_ => 1 }
  0            
904             qw/ bytes bytes_read bytes_written
905             cmd_get cmd_set connection_structures curr_items
906             get_hits get_misses
907             total_connections total_items
908             /;
909              
910             # Now handle the other types, passing each type to each host server.
911 0           my @hosts = @{$self->{'buckets'}};
  0            
912 0           HOST: foreach my $host (@hosts) {
913 0           my $sock = $self->sock_to_host($host);
914 0 0         next HOST unless $sock;
915 0           TYPE: foreach my $typename (grep !/^self$/, @$types) {
916 0 0         my $type = $typename eq 'misc' ? "" : " $typename";
917             my $lines = _write_and_read($self, $sock, "stats$type\r\n", sub {
918 0     0     my $bref = shift;
919 0           return $$bref =~ /^(?:END|ERROR)\r?\n/m;
920 0           });
921 0 0         unless ($lines) {
922 0           $self->_dead_sock($sock);
923 0           next HOST;
924             }
925              
926 0           $lines =~ s/\0//g; # 'stats sizes' starts with NULL?
927              
928             # And, most lines end in \r\n but 'stats maps' (as of
929             # July 2003 at least) ends in \n. ??
930 0           my @lines = split(/\r?\n/, $lines);
931              
932             # Some stats are key-value, some are not. malloc,
933             # sizes, and the empty string are key-value.
934             # ("self" was handled separately above.)
935 0 0         if ($typename =~ /^(malloc|sizes|misc)$/) {
936             # This stat is key-value.
937 0           foreach my $line (@lines) {
938 0           my ($key, $value) = $line =~ /^(?:STAT )?(\w+)\s(.*)/;
939 0 0         if ($key) {
940 0           $stats_hr->{'hosts'}{$host}{$typename}{$key} = $value;
941             }
942 0 0 0       $stats_hr->{'total'}{$key} += $value
      0        
943             if $typename eq 'misc' && $key && $misc_keys{$key};
944 0 0 0       $stats_hr->{'total'}{"malloc_$key"} += $value
945             if $typename eq 'malloc' && $key;
946             }
947             } else {
948             # This stat is not key-value so just pull it
949             # all out in one blob.
950 0           $lines =~ s/^END\r?\n//m;
951 0   0       $stats_hr->{'hosts'}{$host}{$typename} ||= "";
952 0           $stats_hr->{'hosts'}{$host}{$typename} .= "$lines";
953             }
954             }
955             }
956              
957 0           return $stats_hr;
958             }
959              
960             sub stats_reset {
961 0     0 0   my Cache::Memcached $self = shift;
962 0           my ($types) = @_;
963 0 0         return 0 unless $self->{'active'};
964              
965 0           HOST: foreach my $host (@{$self->{'buckets'}}) {
  0            
966 0           my $sock = $self->sock_to_host($host);
967 0 0         next HOST unless $sock;
968 0           my $ok = _write_and_read($self, $sock, "stats reset");
969 0 0 0       unless (defined $ok && $ok eq "RESET\r\n") {
970 0           $self->_dead_sock($sock);
971             }
972             }
973 0           return 1;
974             }
975              
976             1;
977             __END__