File Coverage

blib/lib/Cache/Memcached/Async.pm
Criterion Covered Total %
statement 19 21 90.4
branch n/a
condition n/a
subroutine 7 7 100.0
pod n/a
total 26 28 92.8


line stmt bran cond sub pod time code
1             package Cache::Memcached::Async;
2              
3             =head1 NAME
4              
5             Cache::Memcached::Async - Asynchronous version of Cache::Memcached
6              
7             =head1 SYNOPSIS
8              
9             # just like Cache::Memcached
10             use Cache::Memcached::Async;
11             my $mc = Cache::Memcached::Async->new(servers => [ '127.0.0.1:11211' ]);
12              
13             =head1 DESCRIPTION
14              
15             This is a stripped-down version of Cache::Memcached that performs gets and sets
16             asynchronously, notifying the caller of completion via the Danga::Socket loop.
17              
18             It's used almost exactly like Cache::Memcached, except that C and C
19             are allowed a C parameter.
20              
21             Multi-gets are not supported.
22              
23             =cut
24              
25 1     1   307308 use 5.008001;
  1         5  
  1         40  
26              
27 1     1   6 use strict;
  1         2  
  1         36  
28 1     1   6 use warnings;
  1         15  
  1         37  
29              
30 1     1   5 no strict 'refs';
  1         2  
  1         43  
31 1     1   1115 use Storable ();
  1         4033  
  1         26  
32 1     1   10 use Time::HiRes ();
  1         2  
  1         17  
33 1     1   449 use String::CRC32;
  0            
  0            
34              
35             use Cache::Memcached::Async::Socket;
36              
37             use fields qw{
38             debug no_rehash stats compress_threshold compress_enable
39             readonly namespace namespace_len servers active buckets
40             pref_ip
41             bucketcount _single_sock _stime
42             connect_timeout cb_connect_fail
43             parser_class
44             };
45              
46             # flag definitions
47             use constant F_STORABLE => 1;
48             use constant F_COMPRESS => 2;
49              
50             # size savings required before saving compressed value
51             use constant COMPRESS_SAVINGS => 0.20; # percent
52              
53             use vars qw($VERSION $HAVE_ZLIB $FLAG_NOSIGNAL);
54             $VERSION = "0.10_01";
55              
56             BEGIN {
57             $HAVE_ZLIB = eval "use Compress::Zlib (); 1;";
58             }
59              
60             my %host_dead; # host -> unixtime marked dead until
61             my %cache_sock; # host -> socket
62             my @buck2sock; # bucket number -> $sock
63              
64             =head1 METHODS
65              
66             For all the below methods, C and C are in seconds, and
67             C will be fired upon response from the server. C may be
68             undef.
69              
70             C, C, C, C, and C all inherit semantics from Cache::Memcached.
71              
72             Unlike Cache::Memcached, C and C do not return the new value of the key.
73              
74             =over 4
75              
76             =item Cache::Memcached::Async-Enew()
77              
78             =cut
79              
80             sub new {
81             my Cache::Memcached::Async $self = shift;
82             $self = fields::new( $self ) unless ref $self;
83              
84             my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
85              
86             $self->set_servers($args->{'servers'});
87             $self->{'debug'} = $args->{'debug'} || 0;
88             $self->{'no_rehash'} = $args->{'no_rehash'};
89             $self->{'stats'} = {};
90             $self->{'pref_ip'} = $args->{'pref_ip'} || {};
91             $self->{'compress_threshold'} = $args->{'compress_threshold'};
92             $self->{'compress_enable'} = 1;
93             $self->{'readonly'} = $args->{'readonly'};
94              
95             # TODO: undocumented
96             $self->{'connect_timeout'} = $args->{'connect_timeout'} || 0.25;
97             $self->{namespace} = $args->{namespace} || '';
98             $self->{namespace_len} = length $self->{namespace};
99              
100             return $self;
101             }
102              
103             sub set_pref_ip {
104             my Cache::Memcached::Async $self = shift;
105             $self->{'pref_ip'} = shift;
106             }
107              
108             sub set_servers {
109             my Cache::Memcached::Async $self = shift;
110             my ($list) = @_;
111             $self->{'servers'} = $list || [];
112             $self->{'active'} = scalar @{$self->{'servers'}};
113             $self->{'buckets'} = undef;
114             $self->{'bucketcount'} = 0;
115             $self->init_buckets;
116             @buck2sock = ();
117              
118             $self->{'_single_sock'} = undef;
119             if (@{$self->{'servers'}} == 1) {
120             $self->{'_single_sock'} = $self->{'servers'}[0];
121             }
122              
123             return $self;
124             }
125              
126             sub set_cb_connect_fail {
127             my Cache::Memcached::Async $self = shift;
128             $self->{'cb_connect_fail'} = shift;
129             }
130              
131             sub set_connect_timeout {
132             my Cache::Memcached::Async $self = shift;
133             $self->{'connect_timeout'} = shift;
134             }
135              
136             sub set_debug {
137             my Cache::Memcached::Async $self = shift;
138             my ($dbg) = @_;
139             $self->{'debug'} = $dbg || 0;
140             }
141              
142             sub set_readonly {
143             my Cache::Memcached::Async $self = shift;
144             my ($ro) = @_;
145             $self->{'readonly'} = $ro;
146             }
147              
148             sub set_norehash {
149             my Cache::Memcached::Async $self = shift;
150             my ($val) = @_;
151             $self->{'no_rehash'} = $val;
152             }
153              
154             sub set_compress_threshold {
155             my Cache::Memcached::Async $self = shift;
156             my ($thresh) = @_;
157             $self->{'compress_threshold'} = $thresh;
158             }
159              
160             sub enable_compress {
161             my Cache::Memcached::Async $self = shift;
162             my ($enable) = @_;
163             $self->{'compress_enable'} = $enable;
164             }
165              
166             sub forget_dead_hosts {
167             %host_dead = ();
168             @buck2sock = ();
169             }
170              
171             my %sock_map; # stringified-$sock -> "$ip:$port"
172              
173             sub _dead_sock {
174             my ($sock, $ret, $dead_for) = @_;
175             if (my $ipport = $sock_map{$sock}) {
176             my $now = time();
177             $host_dead{$ipport} = $now + $dead_for
178             if $dead_for;
179             delete $cache_sock{$ipport};
180             delete $sock_map{$sock};
181             }
182             @buck2sock = ();
183             return $ret; # 0 or undef, probably, depending on what caller wants
184             }
185              
186             sub _close_sock {
187             my ($sock) = @_;
188             if (my $ipport = $sock_map{$sock}) {
189             close $sock;
190             delete $cache_sock{$ipport};
191             delete $sock_map{$sock};
192             }
193             @buck2sock = ();
194             }
195              
196             sub sock_to_host { # (host)
197             my Cache::Memcached::Async $self = shift;
198             my $host = shift;
199             return $cache_sock{$host} if $cache_sock{$host} && !$cache_sock{$host}{closed};
200              
201             my $sock = Cache::Memcached::Async::Socket->new($host);
202              
203             $cache_sock{$host} = $sock;
204             $sock_map{$sock} = $host;
205              
206             return $sock;
207             }
208              
209             sub get_sock { # (key)
210             my Cache::Memcached::Async $self = $_[0];
211             my $key = $_[1];
212             return $self->sock_to_host($self->{'_single_sock'}) if $self->{'_single_sock'};
213             return undef unless $self->{'active'};
214             my $hv = ref $key ? int($key->[0]) : _hashfunc($key);
215              
216             my $real_key = ref $key ? $key->[1] : $key;
217             my $tries = 0;
218             while ($tries++ < 20) {
219             my $host = $self->{'buckets'}->[$hv % $self->{'bucketcount'}];
220             my $sock = $self->sock_to_host($host);
221             return $sock if $sock;
222             return undef if $self->{'no_rehash'};
223             $hv += _hashfunc($tries . $real_key); # stupid, but works
224             }
225             return undef;
226             }
227              
228             sub init_buckets {
229             my Cache::Memcached::Async $self = shift;
230             return if $self->{'buckets'};
231             my $bu = $self->{'buckets'} = [];
232             foreach my $v (@{$self->{'servers'}}) {
233             if (ref $v eq "ARRAY") {
234             for (1..$v->[1]) { push @$bu, $v->[0]; }
235             } else {
236             push @$bu, $v;
237             }
238             }
239             $self->{'bucketcount'} = scalar @{$self->{'buckets'}};
240             }
241              
242             sub disconnect_all {
243             my $sock;
244             foreach $sock (values %cache_sock) {
245             close $sock;
246             }
247             %cache_sock = ();
248             }
249              
250             =item $mc->delete($key, timeout => $timeout_seconds, callback => \&callback);
251              
252             =cut
253              
254             sub delete {
255             my Cache::Memcached::Async $self = shift;
256             my ($key, %opts) = @_;
257              
258             my $sock = $self->get_sock($key);
259             return 0 unless $sock;
260              
261             $key = ref $key ? $key->[1] : $key;
262             my $time = $opts{time};
263             $time = $time ? " $time" : "";
264             my $cmd = "delete $self->{namespace}$key$time\r\n";
265             $sock->run($cmd, $opts{callback}, undef, $opts{timeout});
266              
267             return 1;
268             }
269             *remove = \&delete;
270              
271             =item $mc->add($key, $value, exptime => $expiration, timeout => $timeout, callback => \&callback);
272              
273             =cut
274              
275             sub add {
276             _set("add", @_);
277             }
278              
279             =item $mc->replace($key, $value, exptime => $expiration_seconds, timeout => $timeout_seconds, callback => \&callback);
280              
281             =cut
282              
283             sub replace {
284             _set("replace", @_);
285             }
286              
287             =item $mc->set($key, $value, exptime => $expiration_seconds, timeout => $timeout_seconds, callback => \&callback);
288              
289             =cut
290              
291             sub set {
292             _set("set", @_);
293             }
294              
295             sub _set {
296             my $cmdname = shift;
297             my Cache::Memcached::Async $self = shift;
298             my ($key, $val, %opts) = @_;
299             my $sock = $self->get_sock($key);
300             return 0 unless $sock;
301              
302             use bytes; # return bytes from length()
303              
304             my $flags = 0;
305             $key = ref $key ? $key->[1] : $key;
306              
307             if (ref $val) {
308             local $Carp::CarpLevel = 2;
309             $val = Storable::nfreeze($val);
310             $flags |= F_STORABLE;
311             }
312              
313             my $len = length($val);
314              
315             if ($self->{'compress_threshold'} && $HAVE_ZLIB && $self->{'compress_enable'} &&
316             $len >= $self->{'compress_threshold'}) {
317              
318             my $c_val = Compress::Zlib::memGzip($val);
319             my $c_len = length($c_val);
320              
321             # do we want to keep it?
322             if ($c_len < $len*(1 - COMPRESS_SAVINGS)) {
323             $val = $c_val;
324             $len = $c_len;
325             $flags |= F_COMPRESS;
326             }
327             }
328              
329             my $exptime = $opts{exptime};
330             $exptime = int($exptime || 0);
331              
332             my $line = "$cmdname $self->{namespace}$key $flags $exptime $len\r\n$val\r\n";
333              
334             my $res = $sock->run($line, $opts{callback}, undef, $opts{timeout});
335              
336             return 1;
337             }
338              
339             =item $mc->incr($key, $step, timeout => $timeout_seconds, callback => \&callback);
340              
341             =cut
342              
343             sub incr {
344             _incrdecr("incr", @_);
345             }
346              
347             =item $mc->decr($key, $step, timeout => $timeout_seconds, callback => \&callback);
348              
349             =cut
350              
351             sub decr {
352             _incrdecr("decr", @_);
353             }
354              
355             sub _incrdecr {
356             my $cmdname = shift;
357             my Cache::Memcached::Async $self = shift;
358             my ($key, $value, %opts) = @_;
359             my $sock = $self->get_sock($key);
360             return undef unless $sock;
361             $key = $key->[1] if ref $key;
362             $value = 1 unless defined $value;
363              
364             my $line = "$cmdname $self->{namespace}$key $value\r\n";
365             $sock->run($line, $opts{callback}, undef, $opts{timeout});
366              
367             return 1;
368             }
369              
370             =item $mc->get($key, timeout => $timeout_seconds, callback => \&callback);
371              
372             For C, C is passed the cached value on hit, or undef on miss.
373              
374             =back
375              
376             =cut
377              
378             sub get {
379             my Cache::Memcached::Async $self = shift;
380             my ($get_key, %opts) = @_;
381             my $sock = $self->get_sock($get_key);
382              
383             my ($key, $flags, $length);
384              
385             my $parser = sub {
386             my ($bufref, $callback) = @_;
387              
388             while (1) {
389             my $called_back = 0;
390             if (defined $length) {
391             # Yes, that's right, we have to read an extra two bytes because memcached is acting like a line server.
392             return unless length($$bufref) >= $length + 2;
393              
394             my $value = substr $$bufref, 0, $length, '';
395             my $crlf = substr $$bufref, 0, 2, '';
396             unless ($crlf eq "\r\n") {
397             $crlf =~ s/(\W)/quotemeta $1/ge;
398             die "$self I expected a CR LF pair here, instead I got crlf=$crlf\n";
399             }
400              
401             undef $length;
402             if ($callback) {
403             $callback->($value);
404             $called_back = 1;
405             }
406             }
407              
408             if ($$bufref =~ s/^VALUE (\S+) (\d+) (\d+)\r\n//) {
409             # State: 'VALUE' line received, loop back and try to read the data block
410             $key = $1;
411             $flags = $2;
412             $length = $3;
413             next;
414             }
415              
416             if ($$bufref =~ s/^END\r\n//) {
417             # State: 'END\r\n' recieved, we can return and say we're done.
418             $callback->() if $callback && !$called_back;
419             return 1;
420             }
421              
422             # State: still waiting for END or another VALUE
423             return;
424             }
425             };
426              
427             my $line = "get $self->{namespace}$get_key\r\n";
428              
429             $sock->run($line, $opts{callback}, $parser, $opts{timeout});
430              
431             return 1;
432             }
433              
434             sub _hashfunc {
435             return (crc32($_[0]) >> 16) & 0x7fff;
436             }
437              
438             1;
439              
440             __END__