File Coverage

blib/lib/Protocol/Memcached.pm
Criterion Covered Total %
statement 70 123 56.9
branch 10 42 23.8
condition 3 7 42.8
subroutine 14 25 56.0
pod 13 13 100.0
total 110 210 52.3


line stmt bran cond sub pod time code
1             package Protocol::Memcached;
2             # ABSTRACT: Support for the memcached binary protocol
3 1     1   30917 use strict;
  1         2  
  1         42  
4 1     1   6 use warnings FATAL => 'all';
  1         1  
  1         72  
5              
6             our $VERSION = '0.004';
7              
8             =head1 NAME
9              
10             Protocol::Memcached - memcached binary protocol implementation
11              
12             =head1 VERSION
13              
14             version 0.004
15              
16             =head1 SYNOPSIS
17              
18             package Subclass::Of::Protocol::Memcached;
19             use parent qw(Protocol::Memcached);
20              
21             sub write { $_[0]->{socket}->write($_[1]) }
22              
23             package main;
24             my $mc = Subclass::Of::Protocol::Memcached->new;
25             my ($k, $v) = ('hello' => 'world');
26             $mc->set(
27             $k => $v,
28             on_complete => sub {
29             $mc->get(
30             'key',
31             on_complete => sub { my $v = shift; print "Had $v\n" },
32             on_error => sub { die "Failed because of @_\n" }
33             );
34             }
35             );
36              
37             =head1 DESCRIPTION
38              
39             Bare minimum protocol support for memcached. This class is transport-agnostic and as
40             such is not a working implementation - you need to subclass and provide your own ->write
41             method.
42              
43             If you're using this class, you're most likely doing it wrong - head over to the
44             L section to rectify this.
45              
46             L is probably the module you want if you are going to subclass
47             this.
48              
49             =head1 SUBCLASSING
50              
51             Provide the following method:
52              
53             =head2 write
54              
55             This will be called with the data to be written, and zero or more named parameters:
56              
57             =over 4
58              
59             =item * on_flush - coderef to execute when the data has left the building, if this is
60             not supported by the transport layer then the subclass should call the coderef
61             before returning
62              
63             =back
64              
65             and when you have data, call L.
66              
67             =cut
68              
69             # Modules
70              
71 1     1   5 use Scalar::Util ();
  1         2  
  1         61  
72 1     1   5 use Digest::MD5 ();
  1         3  
  1         26  
73 1     1   7 use List::Util qw(sum);
  1         2  
  1         148  
74 1     1   903 use List::UtilsBy qw(nsort_by);
  1         1510  
  1         77  
75 1     1   954 use POSIX qw(floor);
  1         7685  
  1         9  
76              
77             # Constants
78              
79             use constant {
80 1         2293 MAGIC_REQUEST => 0x80,
81             MAGIC_RESPONSE => 0x81,
82 1     1   1266 };
  1         3  
83              
84             # Mapping from numeric opcode value in packet header to method
85             my %OPCODE_BY_ID = (
86             0x00 => 'Get',
87             0x01 => 'Set',
88             0x02 => 'Add',
89             0x03 => 'Replace',
90             0x04 => 'Delete',
91             0x05 => 'Increment',
92             0x06 => 'Decrement',
93             0x07 => 'Quit',
94             0x08 => 'Flush',
95             0x09 => 'GetQ',
96             0x0A => 'No-op',
97             0x0B => 'Version',
98             0x0C => 'GetK',
99             0x0D => 'GetKQ',
100             0x0E => 'Append',
101             0x0F => 'Prepend',
102             0x10 => 'Stat',
103             0x11 => 'SetQ',
104             0x12 => 'AddQ',
105             0x13 => 'ReplaceQ',
106             0x14 => 'DeleteQ',
107             0x15 => 'IncrementQ',
108             0x16 => 'DecrementQ',
109             0x17 => 'QuitQ',
110             0x18 => 'FlushQ',
111             0x19 => 'AppendQ',
112             0x1A => 'PrependQ',
113             );
114             # Map from method name to opcode byte
115             my %OPCODE_BY_NAME = reverse %OPCODE_BY_ID;
116              
117             # Status values from response
118             my %RESPONSE_STATUS = (
119             0x0000 => 'No error',
120             0x0001 => 'Key not found',
121             0x0002 => 'Key exists',
122             0x0003 => 'Value too large',
123             0x0004 => 'Invalid arguments',
124             0x0005 => 'Item not stored',
125             0x0006 => 'Incr/Decr on non-numeric value',
126             0x0081 => 'Unknown command',
127             0x0082 => 'Out of memory',
128             );
129              
130             =head1 METHODS
131              
132             =cut
133              
134             =head2 new
135              
136             Bare minimum constructor - subclass may need to inherit from something with a
137             non-trivial constructor, so we put all our init code in L.
138              
139             =cut
140              
141             sub new {
142 1     1 1 101 my $class = shift;
143 1         4 my $self = bless { }, $class;
144 1         2 return $self;
145             }
146              
147             =head2 sap
148              
149             Helper method for weak callbacks.
150              
151             =cut
152              
153 0     0 1 0 sub sap { my ($self, $sub) = @_; Scalar::Util::weaken $self; return sub { $self->$sub(@_); }; }
  0     0   0  
  0         0  
  0         0  
154              
155             =head2 get
156              
157             Retrieves a value from memcached.
158              
159             Takes a key and zero or more optional named parameters:
160              
161             =over 4
162              
163             =item * on_write - called when we've sent the request to the server
164              
165             =back
166              
167             =cut
168              
169             sub get {
170 0     0 1 0 my $self = shift;
171 0         0 my $k = shift; # FIXME should we do anything about encoding or length checks here?
172 0         0 my %args = @_;
173              
174             # Pull out any callbacks that we handle directly
175 0         0 my $on_write = delete $args{on_write};
176              
177 0         0 my $len = length $k; # TODO benchmark - 2xlength calls or lexical var?
178              
179             $self->write(
180             pack(
181             'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 a*',
182             MAGIC_REQUEST, # What type this packet is
183             $OPCODE_BY_NAME{'Get'}, # Opcode
184             $len, # Key length
185             0x00, # Extras length
186             0x00, # Data type binary
187             0x0000, # Reserved
188             $len, # Total body
189             0x00000000, # Opaque
190             0x00, # CAS
191             0x00, # more CAS - 8byte value but don't want to rely on pack 'Q'
192             $k,
193             ),
194             on_flush => $self->sap(sub {
195 0     0   0 my $self = shift;
196 0         0 push @{ $self->{pending} }, {
  0         0  
197             %args,
198             type => 'Get',
199             key => $k,
200             };
201 0 0       0 $on_write->($self, key => $k) if $on_write;
202             })
203 0         0 );
204 0         0 $self
205             }
206              
207             =head2 set
208              
209             Retrieves a value from memcached.
210              
211             Takes a key and zero or more optional named parameters:
212              
213             =over 4
214              
215             =item * on_write - called when we've sent the request to the server
216              
217             =back
218              
219             =cut
220              
221             sub set {
222 0     0 1 0 my $self = shift;
223 0         0 my $k = shift; # FIXME should we do anything about encoding or length checks here?
224 0         0 my $v = shift;
225 0         0 my %args = @_;
226              
227             # Pull out any callbacks that we handle directly
228 0         0 my $on_write = delete $args{on_write};
229              
230             $self->write(
231             pack(
232             'C1 C1 n1 C1 C1 n1 N1 N1 N1 N1 N1 N1 a* a*',
233             MAGIC_REQUEST, # What type this packet is
234             $OPCODE_BY_NAME{'Set'}, # Opcode
235             length($k), # Key length
236             0x08, # Extras length
237             0x00, # Data type binary
238             0x0000, # Reserved
239             8 + length($k) + length($v), # Total body
240             0x00000000, # Opaque
241             0x00, # CAS
242             0x00, # more CAS - 8byte value but don't want to rely on pack 'Q'
243             $args{flags} || 0,
244             $args{ttl} || 60,
245             $k,
246             $v,
247             ),
248             on_flush => $self->sap(sub {
249 0     0   0 my $self = shift;
250 0         0 push @{ $self->{pending} }, {
  0         0  
251             %args,
252             type => 'Set',
253             key => $k,
254             value => $v,
255             };
256 0 0       0 $on_write->($self, key => $k, value => $v) if $on_write;
257             })
258 0   0     0 );
      0        
259 0         0 $self
260             }
261              
262             =head2 init
263              
264             Sets things up.
265              
266             Currently just does some internal housekeeping, takes no parameters, and returns $self.
267              
268             =cut
269              
270             sub init {
271 0     0 1 0 my $self = shift;
272 0         0 $self->{pending} = [];
273 0         0 return $self;
274             }
275              
276             =head2 on_read
277              
278             This should be called when there is data to be processed. It takes a single parameter:
279             a reference to a buffer containing the incoming data. If a packet is processed
280             successfully then it will be removed from this buffer (via C< substr > or C< s// >).
281              
282             Returns true if a packet was found, false if not. It is recommended (but not required)
283             that this method be called repeatedly until it returns false.
284              
285             =cut
286              
287             sub on_read {
288 0     0 1 0 my ($self, $buffref) = @_;
289              
290             # Bail out if we don't have a full header
291 0 0       0 return 0 unless length $$buffref >= 24;
292              
293             # Extract the basic header data first - specifically we want the length
294             # Not using most of these. At least, not yet
295             # my ($magic, $opcode, $kl, $el, $dt, $status, $blen, $opaque, $cas1, $cas2) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
296 0         0 my ($magic, $opcode, undef, undef, undef, $status, $blen) = unpack('C1 C1 n1 C1 C1 n1 N1 N1 N1 N1', $$buffref);
297 0 0       0 die "Not a response" unless $magic == MAGIC_RESPONSE;
298              
299             # If we don't have the full body as well, bail out here
300 0 0       0 return 0 unless length $$buffref >= ($blen + 24);
301              
302             # Strip the header
303 0         0 substr $$buffref, 0, 24, '';
304              
305 0         0 my $body = substr $$buffref, 0, $blen, '';
306 0 0       0 if($opcode == 0x00) {
307             # unused
308             # my $flags = substr $body, 0, 4, '';
309 0         0 substr $body, 0, 4, '';
310             }
311             # printf "=> %-9.9s %-40.40s %08x%08x %s\n", $OPCODE_BY_ID{$opcode}, $body, $cas1, $cas2, $RESPONSE_STATUS{$status} // 'unknown status';
312 0 0       0 my $item = shift @{$self->{pending}} or die "Had response with no queued item\n";
  0         0  
313 0 0       0 $item->{value} = $body if length $body;
314 0 0       0 if($status) {
315 0 0       0 return $item->{on_error}->(%$item, status => $status) if exists $item->{on_error};
316 0         0 die "Failed with " . $RESPONSE_STATUS{$status} . " on item " . join ',', %$item . "\n";
317             } else {
318 0 0       0 $item->{on_complete}->(%$item) if exists $item->{on_complete};
319             }
320 0         0 return 1;
321             }
322              
323             =head2 status_text
324              
325             Returns the status message corresponding to the given code.
326              
327             =cut
328              
329             sub status_text {
330 0     0 1 0 my $self = shift;
331 0         0 $RESPONSE_STATUS{+shift}
332             }
333              
334             =head2 build_packet
335              
336             Generic packet construction.
337              
338             =cut
339              
340             sub build_packet {
341 0     0 1 0 my $self = shift;
342 0         0 my %args = @_;
343 0 0       0 my $pkt = pack(
    0          
    0          
    0          
344             'C1 C1 S1 C1 C1 S1 N1 N1 N1',
345             $args{request} ? MAGIC_REQUEST : MAGIC_RESPONSE,
346             $args{opcode},
347             defined($args{key}) ? length($args{key}) : 0,
348             defined($args{extras}) ? length($args{extras}) : 0,
349             0x00,
350             defined($args{body}) ? length($args{body}) : 0,
351             0x00,
352             0x00
353             );
354 0         0 return $pkt;
355             }
356              
357             =head2 hash_key
358              
359             Returns a hashed version of the given key using md5.
360              
361             =cut
362              
363             sub hash_key {
364 163813     163813 1 167452 my $self = shift;
365 163813         1734230 return Digest::MD5::md5(shift);
366             }
367              
368             =head2 ketama
369              
370             Provided for backward compatibility only. See L.
371              
372             =cut
373              
374 0     0 1 0 sub ketama { shift->hash_key(@_) }
375              
376             =head2 build_ketama_map
377              
378             Generates a Ketama hash map from the given list of servers.
379              
380             Returns an arrayref of points.
381              
382             =cut
383              
384             sub build_ketama_map {
385 5     5 1 31814 my $self = shift;
386 5         432 my @servers = @_;
387 5         12 my $total = 0 + sum values %{ +{ @servers } };
  5         1472  
388 5         207 my @points;
389 5         16 my $server_count = @servers / 2;
390 5         21 while(@servers) {
391 1615         9531 my ($srv, $weight) = splice @servers, 0, 2;
392 1615         2791 my $pct = $weight / $total;
393 1615         4531 my $ks = floor($pct * 40.0 * $server_count);
394 1615         3056 foreach my $k (0..$ks-1) {
395 63808         135115 my $hash = sprintf '%s-%d', $srv, $k;
396 63808         123477 my @digest = map ord, split //, $self->hash_key($hash);
397 63808         202156 foreach my $h (0..3) {
398 255232         982888 push @points, {
399             point => ( $digest[3+$h*4] << 24 )
400             | ( $digest[2+$h*4] << 16 )
401             | ( $digest[1+$h*4] << 8 )
402             | $digest[$h*4],
403             ip => $srv
404             };
405             }
406             }
407             }
408 5     255232   13122 @points = nsort_by { $_->{point} } @points;
  255232         2238423  
409 5         2936303 $self->{points} = \@points;
410 5         255 return \@points;
411             }
412              
413             =head2 ketama_hashi
414              
415             Calculates an integer hash value from the given key.
416              
417             =cut
418              
419             sub ketama_hashi {
420 100005     100005 1 104959 my $self = shift;
421 100005         98291 my $key = shift;
422 100005         173649 my @digest = map ord, split //, $self->hash_key($key);
423 100005         449776 return ( $digest[3] << 24 )
424             | ( $digest[2] << 16 )
425             | ( $digest[1] << 8 )
426             | $digest[0];
427             }
428              
429             =head2 ketama_find_point
430              
431             Given a key value, calculates the closest point on the Ketama map.
432              
433             =cut
434              
435             sub ketama_find_point {
436 100005     100005 1 993162 my ($self, $key) = @_;
437              
438             # Convert this key into a suitably-hashed integer
439 100005         178561 my $h = $self->ketama_hashi($key);
440              
441             # Find the array bounds...
442 100005         111894 my $highp = my $maxp = scalar @{$self->{points}};
  100005         169571  
443 100005         109105 my $lowp = 0;
444              
445             # then kick off our divide and conquer array search,
446             # which will end when we've found the server with next
447             # biggest point after what this key hashes to
448 100005         96440 while(1) {
449 1075938         2110511 my $midp = floor(($lowp + $highp ) / 2);
450 1075938 100       1959968 if ( $midp == $maxp ) {
451             # if at the end, roll back to zeroth
452             # off-by-one? you'd think, but note the oh-so-helpful $midp-1 later on.
453 289 50       405 $midp = 1 if $midp == @{$self->{points}};
  289         666  
454 289         870 return $self->{points}->[$midp - 1];
455             }
456 1075649         2047361 my $midval = $self->{points}->[$midp]->{point};
457 1075649 100       2371286 my $midval1 = $midp == 0 ? 0 : $self->{points}->[$midp-1]->{point};
458              
459 1075649 100 100     3239059 return $self->{points}->[$midp] if $h <= $midval && $h > $midval1;
460              
461 975933 100       1419411 if ($midval < $h) {
462 506734         537538 $lowp = $midp + 1;
463             } else {
464 469199         487464 $highp = $midp - 1;
465             }
466              
467 975933 50       1691381 return $self->{points}->[0] if $lowp > $highp;
468             }
469             }
470              
471             1;
472              
473             __END__