File Coverage

blib/lib/Cache/Elasticache/Memcache.pm
Criterion Covered Total %
statement 109 115 94.7
branch 33 42 78.5
condition 5 12 41.6
subroutine 19 20 95.0
pod 3 4 75.0
total 169 193 87.5


line stmt bran cond sub pod time code
1             package Cache::Elasticache::Memcache;
2              
3 3     3   121092 use strict;
  3         4  
  3         77  
4 3     3   10 use warnings;
  3         3  
  3         71  
5              
6             =pod
7              
8             =for HTML
9              
10             =head1 NAME
11              
12             Cache::Elasticache::Memcache - A wrapper for L with support for AWS's auto reconfiguration mechanism
13              
14             =head1 SYNOPSIS
15              
16             use Cache::Elasticache::Memcache;
17              
18             my $memd = new Cache::Elasticache::Memcache->new({
19             config_endpoint => 'foo.bar',
20             update_period => 180,
21             # All other options are passed on to Cache::Memcached::Fast
22             ...
23             });
24              
25             # Will update the server list from the configuration endpoint
26             $memd->updateServers();
27              
28             # Will update the serverlist from the configuration endpoint if the time since
29             # the last time the server list was checked is greater than the update period
30             # specified when the $memd object was created.
31             $memd->checkServers();
32              
33             # Class method to retrieve a server list from a configuration endpoint.
34             Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');
35              
36             # All other supported methods are handled by Cache::Memcached::Fast
37              
38             # N.B. This library is currently under development
39              
40             =head1 DESCRIPTION
41              
42             A wrapper for L with support for AWS's auto reconfiguration mechanism. It makes use of an AWS elasticache memcached cluster's configuration endpoint to discover the memcache servers in the cluster and periodically check the current server list to adapt to a changing cluster.
43              
44             =head1 UNDER DEVELOPMENT DISCALIMER
45              
46             N.B. This module is still under development. It should work, but things may change under the hood. I plan to imporove the resiliance with better timeout handling of communication when updating the server list. I'm toying with the idea of making the server list lookup asyncronus, however that may add a level of complexity not worth the benefits. Also I'm investigating switching to Dist::Milla. I'm open to suggestions, ideas and pull requests.
47              
48             =cut
49              
50 3     3   8 use Carp;
  3         3  
  3         119  
51 3     3   1599 use IO::Socket::IP;
  3         73029  
  3         14  
52 3     3   2711 use IO::Socket::Timeout;
  3         12746  
  3         18  
53 3     3   1587 use Cache::Memcached::Fast;
  3         11708  
  3         77  
54 3     3   1357 use Try::Tiny;
  3         2856  
  3         145  
55 3     3   17 use Scalar::Util qw(blessed);
  3         3  
  3         718  
56              
57             our $VERSION = '0.0.5';
58              
59             =pod
60              
61             =head1 CONSTRUCTOR
62              
63             Cache::Elasticache::Memcache->new({
64             config_endpoint => 'foo.bar',
65             update_period => 180,
66             ...
67             })
68              
69             =head2 Constructor parameters
70              
71             =over
72              
73             =item config_endpoint
74              
75             AWS elasticache memcached cluster config endpoint location
76              
77             =item update_period
78              
79             The minimum period (in seconds) to wait between updating the server list. Defaults to 180 seconds
80              
81             =back
82              
83             =cut
84              
85             sub new {
86 10     10 0 72119 my $class = shift;
87 10         19 my ($conf) = @_;
88 10         23 my $self = bless {}, $class;
89              
90 10 50       35 my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
91              
92 10 100       52 croak "config_endpoint must be speccified" if (!defined $args->{'config_endpoint'});
93 9 100       33 croak "servers is not a valid constructors parameter" if (defined $args->{'servers'});
94              
95 8         15 $self->{'config_endpoint'} = delete @{$args}{'config_endpoint'};
  8         29  
96              
97 8 50       47 $args->{servers} = $self->getServersFromEndpoint($self->{'config_endpoint'}) if(defined $self->{'config_endpoint'});
98 8         29 $self->{_last_update} = time;
99              
100 8 100       24 $self->{update_period} = exists $args->{update_period} ? $args->{update_period} : 180;
101              
102 8         11 $self->{'_args'} = $args;
103 8         30 $self->{_memd} = Cache::Memcached::Fast->new($args);
104 8         123 $self->{servers} = $args->{servers};
105              
106 8         35 return $self;
107             }
108              
109             =pod
110              
111             =head1 METHODS
112              
113             =over
114              
115             =item Supported Cache::Memcached::Fast methods
116              
117             These methods can be called on a Cache::Elasticache::Memcache object. The object will call checkServers, then the call will be passed on to the appropriate L code. Please see the L documentation for further details regarding these methods.
118              
119             $memd->enable_compress($enable)
120             $memd->namespace($string)
121             $memd->set($key, $value)
122             $memd->set_multi([$key, $value],[$key, $value, $expiration_time])
123             $memd->cas($key, $cas, $value)
124             $memd->cas_multi([$key, $cas, $value],[$key, $cas, $value])
125             $memd->add($key, $value)
126             $memd->add_multi([$key, $value],[$key, $value])
127             $memd->replace($key, $value)
128             $memd->replace_multi([$key, $value],[$key, $value])
129             $memd->append($key, $value)
130             $memd->append_multi([$key, $value],[$key, $value])
131             $memd->prepend($key, $value)
132             $memd->prepend_multi([$key, $value],[$key, $value])
133             $memd->get($key)
134             $memd->get_multi(@keys)
135             $memd->gets($key)
136             $memd->gets_multi(@keys)
137             $memd->incr($key)
138             $memd->incr_multi(@keys)
139             $memd->decr($key)
140             $memd->decr_multi(@keys)
141             $memd->delete($key)
142             $memd->delete_multi(@keys)
143             $memd->touch($key, $expiration_time)
144             $memd->touch_multi([$key],[$key, $expiration_time])
145             $memd->flush_all($delay)
146             $memd->nowait_push()
147             $memd->server_versions()
148             $memd->disconnect_all()
149              
150             =cut
151              
152             my @methods = qw(
153             enable_compress
154             namespace
155             set
156             set_multi
157             cas
158             cas_multi
159             add
160             add_multi
161             replace
162             replace_multi
163             append
164             append_multi
165             prepend
166             prepend_multi
167             get
168             get_multi
169             gets
170             gets_multi
171             incr
172             incr_multi
173             decr
174             decr_multi
175             delete
176             delete_multi
177             touch
178             touch_multi
179             flush_all
180             nowait_push
181             server_versions
182             disconnect_all
183             );
184              
185             foreach my $method (@methods) {
186             my $method_name = "Cache::Elasticache::Memcache::$method";
187 3     3   17 no strict 'refs';
  3         7  
  3         1305  
188             *$method_name = sub {
189 30     30   58038 my $self = shift;
190 30         55 $self->checkServers;
191 30         295 return $self->{'_memd'}->$method(@_);
192             };
193             }
194              
195             =pod
196              
197             =item checkServers
198              
199             my $memd = Cache::Elasticache::Memcache->new({
200             config_endpoint => 'foo.bar'
201             })
202              
203             ...
204              
205             $memd->checkServers();
206              
207             Trigger the the server list to be updated if the time passed since the server list was last updated is greater than the update period (default 180 seconds).
208              
209             =cut
210              
211             sub checkServers {
212 0     0 1 0 my $self = shift;
213 0 0       0 $self->{_current_update_period} = (defined $self->{_current_update_period}) ? $self->{_current_update_period}: $self->{update_period} - rand(10);
214 0 0 0     0 if ( defined $self->{'config_endpoint'} && (time - $self->{_last_update}) > $self->{_current_update_period} ) {
215 0         0 $self->updateServers();
216 0         0 $self->{_current_update_period} = undef;
217             }
218             }
219              
220             =pod
221              
222             =item updateServers
223              
224             my $memd = Cache::Elasticache::Memcache->new({
225             config_endpoint => 'foo.bar'
226             })
227              
228             ...
229              
230             $memd->updateServers();
231              
232             This method will update the server list regardles of how much time has passed since the server list was last checked.
233              
234             =cut
235              
236             sub updateServers {
237 4     4 1 6004459 my $self = shift;
238              
239 4         19 my $servers = $self->getServersFromEndpoint($self->{'config_endpoint'});
240              
241             ## Cache::Memcached::Fast does not support updating the server list after creation
242             ## Therefore we must create a new object.
243              
244 4 100       13 if ( $self->_hasServerListChanged($servers) ) {
245 2         4 $self->{_args}->{servers} = $servers;
246 2         5 $self->{_memd} = Cache::Memcached::Fast->new($self->{'_args'});
247             }
248              
249 4         35 $self->{servers} = $servers;
250 4         14 $self->{_last_update} = time;
251             }
252              
253             sub _hasServerListChanged {
254 4     4   7 my $self = shift;
255 4         6 my $servers = shift;
256              
257 4 100       7 return 1 unless (scalar(@$servers) == scalar(@{$self->{'servers'}}));
  4         15  
258              
259 2 50       18 return 1 unless (join('|',sort(@$servers)) eq join('|',sort(@{$self->{'servers'}})));
  2         11  
260              
261 2         6 return 0;
262             }
263              
264             =pod
265              
266             =back
267              
268             =head1 CLASS METHODS
269              
270             =over
271              
272             =item getServersFromEndpoint
273              
274             Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');
275              
276             This class method will retrieve the server list for a given configuration endpoint.
277              
278             =cut
279              
280             sub getServersFromEndpoint {
281 16     16 1 6690 my $invoker = shift;
282 16         23 my $config_endpoint = shift;
283 16         15 my $data = "";
284             # TODO: make use of "connect_timeout" (default 0.5s) and "io_timeout" (default 0.2s) constructor parameters
285             # my $args = shift;
286             # $connect_timeout = exists $args->{connect_timeout} ? $args->{connect_timeout} : $class::default_connect_timeout;
287             # $io_timeout = exists $args->{io_timeout} ? $args->{io_timeout} : $class::default_io_timeout;
288 16 100       61 my $socket = (blessed($invoker)) ? $invoker->{_sockets}->{$config_endpoint} : undef;
289              
290 16         46 for my $i (0..2) {
291 18 100 66     60 unless (defined $socket && $socket->connected()) {
292 17         57 $socket = IO::Socket::IP->new(PeerAddr => $config_endpoint, Timeout => 10, Proto => 'tcp');
293 17 50       6327 croak "Unable to connect to server: ".$config_endpoint." - $!" unless $socket;
294 17         102 $socket->sockopt(SO_KEEPALIVE,1);
295 17         856 $socket->autoflush(1);
296 17         612 IO::Socket::Timeout->enable_timeouts_on($socket);
297 17         2987 $socket->read_timeout(0.5);
298             # This is currently commented out as it was breaking under perl 5.24 for me. Need to investigate!
299             # $socket->write_Timeout(0.5);
300             }
301              
302             try {
303 18     18   868 $socket->send("config get cluster\r\n");
304 15         578 my $count = 0;
305 15         44 until ($data =~ m/END/) {
306 225         663 my $line = $socket->getline();
307 225 100       7663 if (defined $line) {
308 144         184 $data .= $line;
309             }
310 225         144 $count++;
311 225 100       613 last if ( $count == 30 );
312             }
313             } catch {
314 3     3   145 $socket = undef;
315 3     3   36 no warnings 'exiting';
  3         5  
  3         865  
316 3         4 next;
317 18         2020 };
318 18 100       234 if ($data ne "") {
319 15         22 last;
320             } else {
321 3         5 $socket = undef;
322             }
323             }
324 16 100       41 if (blessed $invoker) {
325 12         29 $invoker->{_sockets}->{$config_endpoint} = $socket;
326             } else {
327 4 100       29 $socket->close() if (blessed $socket);
328             }
329 16         157 return $invoker->_parseConfigResponse($data);
330             }
331              
332             sub _parseConfigResponse {
333 16     16   20 my $class = shift;
334 16         15 my $data = shift;
335 16 100 66     82 return [] unless (defined $data && $data ne '');
336 15         93 my @response_lines = split(/[\r\n]+/,$data);
337 15         24 my @servers = ();
338 15         19 my $node_regex = '([-.a-zA-Z0-9]+)\|(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\|(\d+)';
339 15         24 foreach my $line (@response_lines) {
340 62 100       204 if ($line =~ m/$node_regex/) {
341 15         42 foreach my $node (split(' ', $line)) {
342 37         89 my ($host, $ip, $port) = split('\|',$node);
343 37         97 push(@servers,$ip.':'.$port);
344             }
345             }
346             }
347 15         44 return \@servers;
348             }
349              
350             sub DESTROY {
351 10     10   10598 my $self = shift;
352 10         16 foreach my $config_endpoint (keys %{$self->{_sockets}}) {
  10         50  
353 7         13 my $socket = $self->{_sockets}->{$config_endpoint};
354 7 50 33     58 if (defined $self->{_socket} && $socket->connected()) {
355 0           $self->{_socket}->close();
356             }
357             }
358             };
359              
360             1;
361             __END__