File Coverage

blib/lib/Cache/Memcached/AnyEvent/Protocol/Binary.pm
Criterion Covered Total %
statement 27 147 18.3
branch 1 14 7.1
condition 0 10 0.0
subroutine 9 25 36.0
pod 6 6 100.0
total 43 202 21.2


line stmt bran cond sub pod time code
1             package Cache::Memcached::AnyEvent::Protocol::Binary;
2 1     1   1730 use strict;
  1         3  
  1         152  
3 1     1   8 use base 'Cache::Memcached::AnyEvent::Protocol';
  1         2  
  1         899  
4 1     1   8 use bytes;
  1         2  
  1         8  
5 1     1   27 use constant HEADER_SIZE => 24;
  1         4  
  1         72  
6 1         2 use constant HAS_64BIT => do {
7 1     1   6 no strict;
  1         3  
  1         64  
8 1         6 require Config;
9 1 50       193 $Config{use64bitint} || $Config{use64bitall};
10 1     1   6 };
  1         3  
11              
12             # General format of a packet:
13             #
14             # Byte/ 0 | 1 | 2 | 3 |
15             # / | | | |
16             # |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
17             # +---------------+---------------+---------------+---------------+
18             # 0/ HEADER /
19             # / /
20             # / /
21             # / /
22             # +---------------+---------------+---------------+---------------+
23             # 16/ COMMAND-SPECIFIC EXTRAS (as needed) /
24             # +/ (note length in th extras length header field) /
25             # +---------------+---------------+---------------+---------------+
26             # m/ Key (as needed) /
27             # +/ (note length in key length header field) /
28             # +---------------+---------------+---------------+---------------+
29             # n/ Value (as needed) /
30             # +/ (note length is total body length header field, minus /
31             # +/ sum of the extras and key length body fields) /
32             # +---------------+---------------+---------------+---------------+
33             # Total 16 bytes
34             #
35             # Request header:
36             #
37             # Byte/ 0 | 1 | 2 | 3 |
38             # / | | | |
39             # |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
40             # +---------------+---------------+---------------+---------------+
41             # 0| Magic | Opcode | Key length |
42             # +---------------+---------------+---------------+---------------+
43             # 4| Extras length | Data type | Reserved |
44             # +---------------+---------------+---------------+---------------+
45             # 8| Total body length |
46             # +---------------+---------------+---------------+---------------+
47             # 12| Opaque |
48             # +---------------+---------------+---------------+---------------+
49             # 16| CAS |
50             # | |
51             # +---------------+---------------+---------------+---------------+
52             # Total 24 bytes
53             #
54             # Response header:
55             #
56             # Byte/ 0 | 1 | 2 | 3 |
57             # / | | | |
58             # |0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|0 1 2 3 4 5 6 7|
59             # +---------------+---------------+---------------+---------------+
60             # 0| Magic | Opcode | Status |
61             # +---------------+---------------+---------------+---------------+
62             # 4| Extras length | Data type | Reserved |
63             # +---------------+---------------+---------------+---------------+
64             # 8| Total body length |
65             # +---------------+---------------+---------------+---------------+
66             # 12| Opaque |
67             # +---------------+---------------+---------------+---------------+
68             # 16| CAS |
69             # | |
70             # +---------------+---------------+---------------+---------------+
71             # Total 24 bytes
72             #
73             # Header fields:
74             # Magic Magic number.
75             # Opcode Command code.
76             # Key length Length in bytes of the text key that follows the
77             # command extras.
78             # Status Status of the response (non-zero on error).
79             # Extras length Length in bytes of the command extras.
80             # Data type Reserved for future use (Sean is using this
81             # soon).
82             # Reserved Really reserved for future use (up for grabs).
83             # Total body length Length in bytes of extra + key + value.
84             # Opaque Will be copied back to you in the response.
85             # CAS Data version check.
86              
87             # Constants
88             use constant +{
89             # Magic numbers
90 1         443 REQ_MAGIC => 0x80,
91             RES_MAGIC => 0x81,
92              
93             # Status Codes
94             # 0x0000 No error
95             # 0x0001 Key not found
96             # 0x0002 Key exists
97             # 0x0003 Value too large
98             # 0x0004 Invalid arguments
99             # 0x0005 Item not stored
100             # 0x0006 Incr/Decr on non-numeric value.
101             ST_SUCCESS => 0x0000,
102             ST_NOT_FOUND => 0x0001,
103             ST_EXISTS => 0x0002,
104             ST_TOO_LARGE => 0x0003,
105             ST_INVALID => 0x0004,
106             ST_NOT_STORED => 0x0005,
107             ST_NON_NUMERIC => 0x0006,
108              
109             # Opcodes
110             MEMD_GET => 0x00,
111             MEMD_SET => 0x01,
112             MEMD_ADD => 0x02,
113             MEMD_REPLACE => 0x03,
114             MEMD_DELETE => 0x04,
115             MEMD_INCREMENT => 0x05,
116             MEMD_DECREMENT => 0x06,
117             MEMD_QUIT => 0x07,
118             MEMD_FLUSH => 0x08,
119             MEMD_GETQ => 0x09,
120             MEMD_NOOP => 0x0A,
121             MEMD_VERSION => 0x0B,
122             MEMD_GETK => 0x0C,
123             MEMD_GETKQ => 0x0D,
124             MEMD_APPEND => 0x0E,
125             MEMD_PREPEND => 0x0F,
126             MEMD_STAT => 0x10,
127             MEMD_SETQ => 0x11,
128             MEMD_ADDQ => 0x12,
129             MEMD_REPLACEQ => 0x13,
130             MEMD_DELETEQ => 0x14,
131             MEMD_INCREMENTQ => 0x15,
132             MEMD_DECREMENTQ => 0x16,
133             MEMD_QUITQ => 0x17,
134             MEMD_FLUSHQ => 0x18,
135             MEMD_APPENDQ => 0x19,
136             MEMD_PREPENDQ => 0x1A,
137             RAW_BYTES => 0x00,
138 1     1   6 };
  1         27  
139              
140             my $OPAQUE;
141             BEGIN {
142 1     1   733 $OPAQUE = 0xffffffff;
143             }
144              
145             # binary protocol read type
146             AnyEvent::Handle::register_read_type memcached_bin => sub {
147             my ($self, $cb) = @_;
148              
149             my %state = ( waiting_header => 1 );
150             sub {
151             return unless $_[0]{rbuf};
152              
153             my $rbuf_ref = \$_[0]{rbuf};
154             if ($state{waiting_header}) {
155             return if length $$rbuf_ref < HEADER_SIZE;
156              
157             my $header = substr $$rbuf_ref, 0, HEADER_SIZE, '';
158             my ($i1, $i2, $i3, $i4, $i5, $i6) = unpack('N6', $header);
159             $state{magic} = $i1 >> 24;
160             $state{opcode} = ($i1 & 0x00ff0000) >> 16;
161             $state{key_length} = ($i1 & 0x0000ffff);
162             $state{extra_length} = ($i2 & 0xff000000) >> 24;
163             $state{data_type} = ($i2 & 0x00ff0000) >> 8;
164             $state{status} = ($i2 & 0x0000ffff);
165             $state{total_body_length} = $i3;
166             $state{opaque} = $i4;
167              
168             if (HAS_64BIT) {
169             $state{cas} = $i5 << 32 + $i6;
170             } else {
171             warn "overflow on CAS" if ($i5 || 0) != 0;
172             $state{cas} = $i6;
173             }
174              
175             delete $state{waiting_header};
176             }
177              
178             if ($state{total_body_length}) {
179             return if length $$rbuf_ref < $state{total_body_length};
180              
181             $state{extra} = substr $$rbuf_ref, 0, $state{extra_length}, '';
182             $state{key} = substr $$rbuf_ref, 0, $state{key_length}, '';
183              
184              
185             my $value_len = $state{total_body_length} - ($state{key_length} + $state{extra_length});
186             $state{value} = substr $$rbuf_ref, 0, $value_len, '';
187             }
188              
189             $cb->( \%state );
190             undef %state;
191             1;
192             }
193             };
194              
195             sub prepare_handle {
196 0     0 1   my ($self, $fh) = @_;
197 0           binmode($fh);
198             }
199              
200             AnyEvent::Handle::register_write_type memcached_bin => sub {
201             my ($self, $opcode, $key, $extras, $body, $cas, $data_type, $reserved ) = @_;
202             my $key_length = defined $key ? length($key) : 0;
203             # first 4 bytes (long)
204             my $i1 = 0;
205             $i1 ^= REQ_MAGIC << 24;
206             $i1 ^= $opcode << 16;
207             $i1 ^= $key_length;
208              
209             # second 4 bytes
210             my $i2 = 0;
211             my $extra_length =
212             ($opcode != MEMD_PREPEND && $opcode != MEMD_APPEND && defined $extras) ?
213             length($extras) :
214             0
215             ;
216             if ($extra_length) {
217             $i2 ^= $extra_length << 24;
218             }
219             # $data_type and $reserved are not used currently
220              
221             # third 4 bytes
222             my $body_length = defined $body ? length($body) : 0;
223             my $i3 = $body_length + $key_length + $extra_length;
224              
225             # this is the opaque value, which will be returned with the response
226             my $i4 = $OPAQUE;
227             if ($OPAQUE == 0xffffffff) {
228             $OPAQUE = 0;
229             } else {
230             $OPAQUE++;
231             }
232              
233             # CAS is 64 bit, which is troublesome on 32 bit architectures.
234             # we will NOT allow 64 bit CAS on 32 bit machines for now.
235             # better handling by binary-adept people are welcome
236             $cas ||= 0;
237             my ($i5, $i6);
238             if (HAS_64BIT) {
239 1     1   8 no warnings;
  1         2  
  1         2138  
240             $i5 = 0xffffffff00000000 & $cas;
241             $i6 = 0x00000000ffffffff & $cas;
242             } else {
243             $i5 = 0x00000000;
244             $i6 = $cas;
245             }
246              
247             my $message = pack( 'N6', $i1, $i2, $i3, $i4, $i5, $i6 );
248             if (length($message) > HEADER_SIZE) {
249             Carp::confess "header size assertion failed";
250             }
251              
252             if ($extra_length) {
253             $message .= $extras;
254             }
255             if ($key_length) {
256             $message .= pack('a*', $key);
257             }
258             if ($body_length) {
259             $message .= pack('a*', $body);
260             }
261              
262             return $message;
263             };
264              
265             sub _status_str {
266 0     0     my $status = shift;
267 0           my %strings = (
268             ST_SUCCESS() => "Success",
269             ST_NOT_FOUND() => "Not found",
270             ST_EXISTS() => "Exists",
271             ST_TOO_LARGE() => "Too Large",
272             ST_INVALID() => "Invalid Arguments",
273             ST_NOT_STORED() => "Not Stored",
274             ST_NON_NUMERIC() => "Incr/Decr on non-numeric variables"
275             );
276 0           return $strings{$status};
277             }
278              
279             # Generate setters
280             {
281             my $generator = sub {
282             my ($cmd, $opcode) = @_;
283              
284             sub {
285 0     0     my ($self, $memcached, $key, $value, $expires, $noreply, $cb) = @_;
286             return sub {
287 0     0     my $guard = shift;
288 0           my $fq_key = $memcached->_prepare_key( $key );
289 0           my $handle = $memcached->_get_handle_for( $key );
290 0           my ($len, $flags);
291              
292 0           $memcached->_prepare_value( $cmd, \$value, \$len, \$expires, \$flags);
293              
294 0           my $extras = pack('N2', $flags, $expires);
295              
296 0           $handle->push_write( memcached_bin => $opcode, $fq_key, $extras, $value );
297             $handle->push_read( memcached_bin => sub {
298 0           undef $guard;
299 0           $cb->($_[0]->{status} == 0, $_[0]->{value}, $_[0]);
300 0           });
301             }
302 0           };
303             };
304              
305             *add = $generator->("add", MEMD_ADD);
306             *replace = $generator->("replace", MEMD_REPLACE);
307             *set = $generator->("set", MEMD_SET);
308             *append = $generator->("append", MEMD_APPEND);
309             *prepend = $generator->("prepend", MEMD_PREPEND);
310             }
311              
312             sub delete {
313 0     0 1   my ($self, $memcached, $key, $noreply, $cb) = @_;
314              
315             return sub {
316 0     0     my $guard = shift;
317 0           my $fq_key = $memcached->_prepare_key($key);
318 0           my $handle = $memcached->_get_handle_for($key);
319              
320 0           $handle->push_write( memcached_bin => MEMD_DELETE, $fq_key );
321             $handle->push_read( memcached_bin => sub {
322 0           undef $guard;
323 0           $cb->(@_);
324 0           } );
325             }
326 0           }
327              
328             sub get {
329 0     0 1   my ($self, $memcached, $key, $cb) = @_;
330              
331             return sub {
332 0     0     my $guard = shift;
333 0           my $fq_key = $memcached->_prepare_key( $key );
334 0           my $handle = $memcached->_get_handle_for( $key );
335 0           $handle->push_write(memcached_bin => MEMD_GETK, $fq_key);
336             $handle->push_read(memcached_bin => sub {
337 0           my $msg = shift;
338 0           my ($flags, $exptime) = unpack('N2', $msg->{extra});
339 0 0 0       if (exists $msg->{key} && exists $msg->{value}) {
340 0           my $value = $msg->{value};
341 0           $memcached->_decode_key_value(\$key, \$flags, \$value );
342 0           $cb->($value);
343             } else {
344 0           $cb->();
345             }
346            
347 0           undef $guard;
348 0           });
349             }
350 0           }
351              
352             sub get_multi {
353 0     0 1   my ($self, $memcached, $keys, $cb) = @_;
354              
355             return sub {
356 0     0     my $guard = shift;
357             # organize the keys by handle
358 0           my %handle2keys;
359              
360 0 0         if (! @$keys) {
361 0           undef $guard;
362 0           $cb->({});
363 0           return;
364             }
365              
366 0           foreach my $key (@$keys) {
367 0           my $fq_key = $memcached->_prepare_key( $key );
368 0           my $handle = $memcached->_get_handle_for( $key );
369 0           my $list = $handle2keys{ $handle };
370 0 0         if (! $list) {
371 0           $list = $handle2keys{$handle} = [ $handle ];
372             }
373 0           push @$list, $fq_key;
374             }
375              
376 0           my %rv;
377             my $cv = AE::cv {
378 0           undef $guard;
379 0           $cb->( \%rv );
380 0           };
381              
382 0           foreach my $list (values %handle2keys) {
383 0           my ($handle, @keys) = @$list;
384 0           foreach my $key ( @keys ) {
385 0           $handle->push_write(memcached_bin => MEMD_GETK, $key);
386 0           $cv->begin;
387             $handle->push_read(memcached_bin => sub {
388 0           my $msg = shift;
389            
390 0           my ($flags, $exptime) = unpack('N2', $msg->{extra});
391 0 0 0       if (exists $msg->{key} && exists $msg->{value}) {
392 0           my $value = $msg->{value};
393 0           $memcached->_decode_key_value(\$key, \$flags, \$value );
394 0           $rv{ $key } = $value;
395             }
396 0           $cv->end;
397 0           });
398             }
399             }
400             }
401 0           }
402            
403             {
404             my $generator = sub {
405             my ($opcode) = @_;
406             return sub {
407 0     0     my ($self, $memcached, $key, $value, $initial, $cb) = @_;
408              
409             return sub {
410 0     0     my $guard = shift;
411 0   0       $value ||= 1;
412 0 0         my $expires = defined $initial ? 0 : 0xffffffff;
413 0   0       $initial ||= 0;
414 0           my $fq_key = $memcached->_prepare_key( $key );
415 0           my $handle = $memcached->_get_handle_for($key);
416 0           my $extras;
417 0           if (HAS_64BIT) {
418             $extras = pack('Q2L', $value, $initial, $expires );
419             } else {
420 0           $extras = pack('N5', 0, $value, 0, $initial, $expires );
421             }
422              
423 0           $handle->push_write(memcached_bin =>
424             $opcode, $fq_key, $extras, undef, undef, undef, undef);
425             $handle->push_read(memcached_bin => sub {
426 0           undef $guard;
427 0           my $value;
428 0           if (HAS_64BIT) {
429             $value = unpack('Q', $_[0]->{value});
430             } else {
431 0           (undef, $value) = unpack('N2', $_[0]->{value});
432             }
433            
434 0 0         $cb->($_[0]->{status} == 0 ? $value : undef, $_[0]);
435 0           });
436             }
437 0           }
438             };
439              
440             *incr = $generator->(MEMD_INCREMENT);
441             *decr = $generator->(MEMD_DECREMENT);
442             };
443              
444             sub version {
445 0     0 1   my ($self, $memcached, $cb) = @_;
446              
447             return sub {
448 0     0     my $guard = shift;
449 0           my %ret;
450 0           my $cv = AE::cv { $cb->( \%ret ); undef %ret };
  0            
  0            
451 0           while (my ($host_port, $handle) = each %{ $memcached->{_server_handles} }) {
  0            
452 0           $handle->push_write(memcached_bin => MEMD_VERSION);
453 0           $cv->begin;
454             $handle->push_read(memcached_bin => sub {
455 0           my $msg = shift;
456 0           undef $guard;
457 0           my $value = unpack('a*', $msg->{value});
458              
459 0           $ret{ $host_port } = $value;
460 0           $cv->end;
461 0           });
462             }
463             }
464 0           }
465            
466             sub flush_all {
467 0     0 1   my ($self, $memcached, $delay, $noreply, $cb) = @_;
468              
469             return sub {
470 0     0     my $guard = shift;
471             my $cv = AE::cv {
472 0           undef $guard;
473 0           $cb->(1);
474 0           };
475              
476 0           while (my ($host_port, $handle) = each %{ $memcached->{_server_handles} }) {
  0            
477 0           $cv->begin;
478 0           $handle->push_write(memcached_bin => MEMD_FLUSH);
479 0           $handle->push_read(memcached_bin => sub { $cv->send });
  0            
480             }
481             }
482 0           }
483              
484             1;
485              
486             __END__