File Coverage

blib/lib/Cache/Memcached/AnyEvent/Protocol/Text.pm
Criterion Covered Total %
statement 6 161 3.7
branch 0 46 0.0
condition 0 4 0.0
subroutine 2 21 9.5
pod 6 6 100.0
total 14 238 5.8


line stmt bran cond sub pod time code
1             package Cache::Memcached::AnyEvent::Protocol::Text;
2 1     1   1168 use strict;
  1         2  
  1         48  
3 1     1   8 use base 'Cache::Memcached::AnyEvent::Protocol';
  1         2  
  1         2714  
4              
5 0     0     sub _NOOP() {}
6              
7             {
8             my $generator = sub {
9             my $cmd = shift;
10             return sub {
11 0     0     my ($self, $memcached, $key, $value, $initial, $cb) = @_;
12              
13             return sub {
14 0     0     my $guard = shift;
15 0           my $fq_key = $memcached->_prepare_key( $key );
16 0           my $handle = $memcached->_get_handle_for( $key );
17            
18 0   0       $value ||= 1;
19 0           my @command = ($cmd => $fq_key => $value);
20 0           my $noreply = 0; # XXX - FIXME
21 0 0         if ($noreply) {
22 0           push @command, "noreply";
23             }
24 0           $handle->push_write(join(' ', @command) . "\r\n");
25              
26 0 0         if ($noreply) {
27 0           undef $guard;
28             } else {
29             $handle->push_read(line => sub {
30 0           undef $guard;
31 0           $_[1] =~ /^(NOT_FOUND|\w+)\r\n/;
32 0 0         $cb->($1 eq 'NOT_FOUND' ? undef : $1) if $cb;
    0          
33 0           });
34             }
35             }
36 0           }
37             };
38              
39             *decr = $generator->("decr");
40             *incr = $generator->("incr");
41             }
42              
43             sub delete {
44 0     0 1   my ($self, $memcached, $key, $noreply, $cb) = @_;
45             return sub {
46 0     0     my $guard = shift;
47 0           my $fq_key = $memcached->_prepare_key( $key );
48 0           my $handle = $memcached->_get_handle_for( $key );
49              
50 0           my @command = (delete => $fq_key);
51 0           $noreply = 0; # XXX - FIXME
52 0 0         if ($noreply) {
53 0           push @command, "noreply";
54             }
55 0           $handle->push_write(join(' ', @command) . "\r\n");
56 0 0         if (! $noreply) {
57             $handle->push_read(line => sub {
58 0           undef $guard;
59 0           my $data = $_[1];
60 0           my $success = $data =~ /^DELETED\r\n/;
61 0 0         $cb->($success) if $cb;
62 0           });
63             }
64 0           };
65             }
66              
67             sub get {
68 0     0 1   my ($self, $memcached, $key, $cb) = @_;
69              
70             return sub {
71 0     0     my $guard = shift;
72 0           my $fq_key = $memcached->_prepare_key( $key );
73 0           my $handle = $memcached->_get_handle_for( $key );
74              
75 0           $handle->push_write( "get $fq_key\r\n" );
76             $handle->push_read( line => sub {
77 0           my @bits = split /\s+/, $_[1];
78 0 0         if ($bits[0] eq 'VALUE') {
    0          
79 0           my ($rkey, $rflags, $rsize, $rcas) = @bits[1..4];
80             $_[0]->push_read(chunk => $rsize, sub {
81 0           my $value = $_[1];
82 0           $memcached->_decode_key_value(\$rkey, \$rflags, \$value);
83             $handle->push_read(regex => qr{END\r\n}, cb => sub {
84 0           $cb->( $value );
85 0           undef $guard;
86 0           } );
87 0           });
88             } elsif ($bits[0] eq 'END') {
89 0           $cb->( undef );
90 0           undef $guard;
91             } else {
92 0           Carp::confess("Unexpected line $_[1]");
93             }
94 0           });
95 0           };
96             }
97              
98             sub get_multi {
99 0     0 1   my ($self, $memcached, $keys, $cb) = @_;
100 0 0         if (scalar @$keys == 0) {
101             return sub {
102 0     0     my $guard = shift;
103 0           undef $guard;
104 0           $cb->({}, "no keys speficied");
105             }
106 0           }
107              
108             return sub {
109 0     0     my $guard = shift;
110              
111 0           my %keysinserver;
112 0           foreach my $key (@$keys) {
113 0           my $fq_key = $memcached->_prepare_key( $key );
114 0           my $handle = $memcached->_get_handle_for( $key );
115 0           my $list = $keysinserver{ $handle };
116 0 0         if (! $list) {
117 0           $keysinserver{ $handle } = $list = [ $handle ];
118             }
119 0           push @$list, $fq_key;
120             }
121              
122 0           my %rv;
123             my $cv = AE::cv {
124 0           undef $guard;
125 0           $cb->( \%rv );
126 0           };
127              
128 0           foreach my $data (values %keysinserver) {
129 0           my ($handle, @keylist) = @$data;
130 0           $handle->push_write( "get @keylist\r\n" );
131 0           my $code; $code = sub {
132 0           my @bits = split /\s+/, $_[1];
133 0 0         if ($bits[0] eq 'END') {
    0          
134 0           undef $code;
135 0           $cv->end
136             } elsif ($bits[0] eq 'VALUE') {
137 0           my ($rkey, $rflags, $rsize, $rcas) = @bits[1..4];
138             $handle->push_read(chunk => $rsize, sub {
139 0           my $value = $_[1];
140 0           $memcached->_decode_key_value(\$rkey, \$rflags, \$value);
141 0           $rv{ $rkey } = $value; # XXX whatabout CAS?
142 0           $handle->push_read(line => \&_NOOP );
143 0           $handle->push_read(line => $code);
144 0           } );
145             } else {
146 0           Carp::confess("Unexpected line $_[1]");
147             }
148 0           };
149 0           $cv->begin;
150 0           $handle->push_read(line => $code);
151             }
152             }
153 0           }
154              
155             {
156             my $generator = sub {
157             my $cmd = shift;
158             sub {
159 0     0     my ($self, $memcached, $key, $value, $expires, $noreply, $cb) = @_;
        0      
160             return sub {
161 0     0     my $guard = shift;
162 0           my $fq_key = $memcached->_prepare_key( $key );
163 0           my $handle = $memcached->_get_handle_for( $key );
164 0           my ($len, $flags);
165              
166 0           $memcached->_prepare_value( $cmd, \$value, \$len, \$expires, \$flags );
167 0           $handle->push_write("$cmd $fq_key $flags $expires $len\r\n$value\r\n");
168 0 0         if ($noreply) {
169 0           undef $guard;
170             } else {
171             $handle->push_read(regex => qr{^(NOT_)?STORED\r\n}, sub {
172 0           undef $guard;
173 0 0         $cb->($1 ? 0 : 1) if $cb;
    0          
174 0           });
175             }
176 0           };
177             };
178             };
179              
180             *add = $generator->("add");
181             *replace = $generator->("replace");
182             *set = $generator->("set");
183             *append = $generator->("append");
184             *prepend = $generator->("prepend");
185             }
186              
187             sub stats {
188 0     0 1   my ($self, $memcached, $name, $cb) = @_;
189              
190             return sub {
191 0     0     my $guard = shift;
192 0           my %rv;
193             my $cv = AE::cv {
194 0           undef $guard;
195 0           $cb->( \%rv );
196 0           };
197              
198 0           foreach my $server (@{ $memcached->{_active_servers} }) {
  0            
199 0           my $handle = $memcached->get_handle( $server );
200 0 0         $handle->push_write( $name ? "stats $name\r\n" : "stats\r\n" );
201 0           my $code; $code = sub {
202 0           my @bits = split /\s+/, $_[1];
203 0 0         if ($bits[0] eq 'END') {
    0          
204 0           $cv->end;
205             } elsif ( $bits[0] eq 'STAT' ) {
206 0           $rv{ $server }->{ $bits[1] } = $bits[2];
207 0           $handle->push_read( line => $code );
208             } else {
209 0           Carp::confess("Unexpected line $_[1]");
210             }
211 0           };
212 0           $cv->begin;
213 0           $handle->push_read( line => $code );
214             }
215 0           };
216             }
217              
218             sub flush_all {
219 0     0 1   my ($self, $memcached, $delay, $noreply, $cb) = @_;
220              
221             return sub {
222 0     0     my $guard = shift;
223             my $cv = AE::cv {
224 0           undef $guard;
225 0 0         $cb->(1) if $cb;
226 0           };
227              
228 0   0       $delay ||= 0;
229 0           my @command = ('flush_all');
230 0 0         push @command, $delay if ($delay);
231 0 0         push @command, 'noreply' if ($noreply);
232 0           my $command = join(' ', @command) . "\r\n";
233              
234 0           $cv->begin;
235 0           foreach my $server (@{ $memcached->{_active_servers} }) {
  0            
236 0           my $handle = $memcached->get_handle( $server );
237 0           $handle->push_write( $command );
238 0 0         if (! $noreply) {
239 0           $cv->begin;
240 0           $handle->push_read(regex => qr{^OK\r\n}, sub { $cv->end });
  0            
241             }
242             }
243 0           $cv->end;
244 0           };
245             }
246              
247             sub version {
248 0     0 1   my ($self, $memcached, $cb) = @_;
249              
250             return sub {
251 0     0     my $guard = shift;
252             # don't store guard, as we're issuing a new guarded command
253             $memcached->stats( "", sub {
254 0           my $rv = shift;
255 0           my %version = map {
256 0           ($_ => $rv->{$_}->{version})
257             } keys %$rv;
258 0           $cb->(\%version);
259 0           } );
260             }
261 0           }
262              
263             1;
264              
265             __END__