File Coverage

blib/lib/Cache/Elasticache/Memcache.pm
Criterion Covered Total %
statement 110 116 94.8
branch 33 42 78.5
condition 5 12 41.6
subroutine 19 20 95.0
pod 3 4 75.0
total 170 194 87.6


line stmt bran cond sub pod time code
1             package Cache::Elasticache::Memcache;
2              
3 3     3   111412 use strict;
  3         4  
  3         65  
4 3     3   10 use warnings;
  3         4  
  3         69  
5              
6             =pod
7              
8             =for HTML
9              
10             =head1 NAME
11              
12             Cache::Elasticache::Memcache - A wrapper for L with support for AWS's auto reconfiguration mechanism
13              
14             =head1 SYNOPSIS
15              
16             use Cache::Elasticache::Memcache;
17              
18             my $memd = new Cache::Elasticache::Memcache->new({
19             config_endpoint => 'foo.bar',
20             update_period => 180,
21             # All other options are passed on to Cache::Memcached::Fast
22             ...
23             });
24              
25             # Will update the server list from the configuration endpoint
26             $memd->updateServers();
27              
28             # Will update the serverlist from the configuration endpoint if the time since
29             # the last time the server list was checked is greater than the update period
30             # specified when the $memd object was created.
31             $memd->checkServers();
32              
33             # Class method to retrieve a server list from a configuration endpoint.
34             Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');
35              
36             # All other supported methods are handled by Cache::Memcached::Fast
37              
38             # N.B. This library is currently under development
39              
40             =head1 DESCRIPTION
41              
42             A wrapper for L with support for AWS's auto reconfiguration mechanism. It makes use of an AWS elasticache memcached cluster's configuration endpoint to discover the memcache servers in the cluster and periodically check the current server list to adapt to a changing cluster.
43              
44             =head1 UNDER DEVELOPMENT DISCALIMER
45              
46             N.B. This module is still under development. It should work, but things may change under the hood. I plan to imporove the resiliance with better timeout handling of communication when updating the server list. I'm toying with the idea of making the server list lookup asyncronus, however that may add a level of complexity not worth the benefits. Also I'm investigating switching to Dist::Milla. I'm open to suggestions, ideas and pull requests.
47              
48             =cut
49              
50 3     3   9 use Carp;
  3         3  
  3         115  
51 3     3   1559 use IO::Socket::IP;
  3         77742  
  3         15  
52 3     3   2947 use IO::Socket::Timeout;
  3         13409  
  3         20  
53 3     3   1698 use Cache::Memcached::Fast;
  3         12127  
  3         81  
54 3     3   1315 use Try::Tiny;
  3         2956  
  3         156  
55 3     3   18 use Scalar::Util qw(blessed);
  3         6  
  3         653  
56              
57             our $VERSION = '0.0.4';
58              
59             =pod
60              
61             =head1 CONSTRUCTOR
62              
63             Cache::Elasticache::Memcache->new({
64             config_endpoint => 'foo.bar',
65             update_period => 180,
66             ...
67             })
68              
69             =head2 Constructor parameters
70              
71             =over
72              
73             =item config_endpoint
74              
75             AWS elasticache memcached cluster config endpoint location
76              
77             =item update_period
78              
79             The minimum period (in seconds) to wait between updating the server list. Defaults to 180 seconds
80              
81             =back
82              
83             =cut
84              
85             sub new {
86 10     10 0 55174 my $class = shift;
87 10         18 my ($conf) = @_;
88 10         24 my $self = bless {}, $class;
89              
90 10 50       43 my $args = (@_ == 1) ? shift : { @_ }; # hashref-ify args
91              
92 10 100       45 croak "config_endpoint must be speccified" if (!defined $args->{'config_endpoint'});
93 9 100       46 croak "servers is not a valid constructors parameter" if (defined $args->{'servers'});
94              
95 8         16 $self->{'config_endpoint'} = delete @{$args}{'config_endpoint'};
  8         39  
96              
97 8 50       47 $args->{servers} = $self->getServersFromEndpoint($self->{'config_endpoint'}) if(defined $self->{'config_endpoint'});
98 8         33 $self->{_last_update} = time;
99              
100 8 100       21 $self->{update_period} = exists $args->{update_period} ? $args->{update_period} : 180;
101              
102 8         15 $self->{'_args'} = $args;
103 8         34 $self->{_memd} = Cache::Memcached::Fast->new($args);
104 8         128 $self->{servers} = $args->{servers};
105              
106 8         33 return $self;
107             }
108              
109             =pod
110              
111             =head1 METHODS
112              
113             =over
114              
115             =item Supported Cache::Memcached::Fast methods
116              
117             These methods can be called on a Cache::Elasticache::Memcache object. The object will call checkServers, then the call will be passed on to the appropriate L code. Please see the L documentation for further details regarding these methods.
118              
119             $memd->enable_compress($enable)
120             $memd->namespace($string)
121             $memd->set($key, $value)
122             $memd->set_multi([$key, $value],[$key, $value, $expiration_time])
123             $memd->cas($key, $cas, $value)
124             $memd->cas_multi([$key, $cas, $value],[$key, $cas, $value])
125             $memd->add($key, $value)
126             $memd->add_multi([$key, $value],[$key, $value])
127             $memd->replace($key, $value)
128             $memd->replace_multi([$key, $value],[$key, $value])
129             $memd->append($key, $value)
130             $memd->append_multi([$key, $value],[$key, $value])
131             $memd->prepend($key, $value)
132             $memd->prepend_multi([$key, $value],[$key, $value])
133             $memd->get($key)
134             $memd->get_multi(@keys)
135             $memd->gets($key)
136             $memd->gets_multi(@keys)
137             $memd->incr($key)
138             $memd->incr_multi(@keys)
139             $memd->decr($key)
140             $memd->decr_multi(@keys)
141             $memd->delete($key)
142             $memd->delete_multi(@keys)
143             $memd->touch($key, $expiration_time)
144             $memd->touch_multi([$key],[$key, $expiration_time])
145             $memd->flush_all($delay)
146             $memd->nowait_push()
147             $memd->server_versions()
148             $memd->disconnect_all()
149              
150             =cut
151              
152             my @methods = qw(
153             enable_compress
154             namespace
155             set
156             set_multi
157             cas
158             cas_multi
159             add
160             add_multi
161             replace
162             replace_multi
163             append
164             append_multi
165             prepend
166             prepend_multi
167             get
168             get_multi
169             gets
170             gets_multi
171             incr
172             incr_multi
173             decr
174             decr_multi
175             delete
176             delete_multi
177             touch
178             touch_multi
179             flush_all
180             nowait_push
181             server_versions
182             disconnect_all
183             );
184              
185             foreach my $method (@methods) {
186             my $method_name = "Cache::Elasticache::Memcache::$method";
187 3     3   21 no strict 'refs';
  3         4  
  3         1236  
188             *$method_name = sub {
189 30     30   41592 my $self = shift;
190 30         57 $self->checkServers;
191 30         263 return $self->{'_memd'}->$method(@_);
192             };
193             }
194              
195             =pod
196              
197             =item checkServers
198              
199             my $memd = Cache::Elasticache::Memcache->new({
200             config_endpoint => 'foo.bar'
201             })
202              
203             ...
204              
205             $memd->checkServers();
206              
207             Trigger the the server list to be updated if the time passed since the server list was last updated is greater than the update period (default 180 seconds).
208              
209             =cut
210              
211             sub checkServers {
212 0     0 1 0 my $self = shift;
213 0 0       0 $self->{_current_update_period} = (defined $self->{_current_update_period}) ? $self->{_current_update_period}: $self->{update_period} - rand(10);
214 0 0 0     0 if ( defined $self->{'config_endpoint'} && (time - $self->{_last_update}) > $self->{_current_update_period} ) {
215 0         0 $self->updateServers();
216 0         0 $self->{_current_update_period} = undef;
217             }
218             }
219              
220             =pod
221              
222             =item updateServers
223              
224             my $memd = Cache::Elasticache::Memcache->new({
225             config_endpoint => 'foo.bar'
226             })
227              
228             ...
229              
230             $memd->updateServers();
231              
232             This method will update the server list regardles of how much time has passed since the server list was last checked.
233              
234             =cut
235              
236             sub updateServers {
237 4     4 1 6005471 my $self = shift;
238              
239 4         25 my $servers = $self->getServersFromEndpoint($self->{'config_endpoint'});
240              
241             ## Cache::Memcached::Fast does not support updating the server list after creation
242             ## Therefore we must create a new object.
243              
244 4 100       17 if ( $self->_hasServerListChanged($servers) ) {
245 2         9 $self->{_args}->{servers} = $servers;
246 2         10 $self->{_memd} = Cache::Memcached::Fast->new($self->{'_args'});
247             }
248              
249 4         48 $self->{servers} = $servers;
250 4         17 $self->{_last_update} = time;
251             }
252              
253             sub _hasServerListChanged {
254 4     4   8 my $self = shift;
255 4         7 my $servers = shift;
256              
257 4 100       9 return 1 unless (scalar(@$servers) == scalar(@{$self->{'servers'}}));
  4         17  
258              
259 2 50       17 return 1 unless (join('|',sort(@$servers)) eq join('|',sort(@{$self->{'servers'}})));
  2         13  
260              
261 2         6 return 0;
262             }
263              
264             =pod
265              
266             =back
267              
268             =head1 CLASS METHODS
269              
270             =over
271              
272             =item getServersFromEndpoint
273              
274             Cache::Elasticache::Memcache->getServersFromEndpoint('foo.bar');
275              
276             This class method will retrieve the server list for a given configuration endpoint.
277              
278             =cut
279              
280             sub getServersFromEndpoint {
281 16     16 1 25218 my $invoker = shift;
282 16         25 my $config_endpoint = shift;
283 16         22 my $data = "";
284             # TODO: make use of "connect_timeout" (default 0.5s) and "io_timeout" (default 0.2s) constructor parameters
285             # my $args = shift;
286             # $connect_timeout = exists $args->{connect_timeout} ? $args->{connect_timeout} : $class::default_connect_timeout;
287             # $io_timeout = exists $args->{io_timeout} ? $args->{io_timeout} : $class::default_io_timeout;
288 16 100       81 my $socket = (blessed($invoker)) ? $invoker->{_sockets}->{$config_endpoint} : undef;
289              
290 16         56 for my $i (0..2) {
291 18 100 66     64 unless (defined $socket && $socket->connected()) {
292 17         68 $socket = IO::Socket::IP->new(PeerAddr => $config_endpoint, Timeout => 10, Proto => 'tcp');
293 17 50       6594 croak "Unable to connect to server: ".$config_endpoint." - $!" unless $socket;
294 17         120 $socket->sockopt(SO_KEEPALIVE,1);
295 17         962 $socket->autoflush(1);
296 17         693 IO::Socket::Timeout->enable_timeouts_on($socket);
297 17         3327 $socket->read_timeout(0.5);
298 17         2290 $socket->write_Timeout(0.5);
299             }
300              
301             try {
302 18     18   942 $socket->send("config get cluster\r\n");
303 15         676 my $count = 0;
304 15         51 until ($data =~ m/END/) {
305 225         705 my $line = $socket->getline();
306 225 100       7546 if (defined $line) {
307 144         191 $data .= $line;
308             }
309 225         160 $count++;
310 225 100       628 last if ( $count == 30 );
311             }
312             } catch {
313 3     3   125 $socket = undef;
314 3     3   36 no warnings 'exiting';
  3         3  
  3         844  
315 3         6 next;
316 18         787 };
317 18 100       265 if ($data ne "") {
318 15         30 last;
319             } else {
320 3         4 $socket = undef;
321             }
322             }
323 16 100       42 if (blessed $invoker) {
324 12         33 $invoker->{_sockets}->{$config_endpoint} = $socket;
325             } else {
326 4 100       32 $socket->close() if (blessed $socket);
327             }
328 16         167 return $invoker->_parseConfigResponse($data);
329             }
330              
331             sub _parseConfigResponse {
332 16     16   21 my $class = shift;
333 16         20 my $data = shift;
334 16 100 66     97 return [] unless (defined $data && $data ne '');
335 15         123 my @response_lines = split(/[\r\n]+/,$data);
336 15         27 my @servers = ();
337 15         26 my $node_regex = '([-.a-zA-Z0-9]+)\|(\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3})\|(\d+)';
338 15         25 foreach my $line (@response_lines) {
339 62 100       225 if ($line =~ m/$node_regex/) {
340 15         44 foreach my $node (split(' ', $line)) {
341 37         97 my ($host, $ip, $port) = split('\|',$node);
342 37         120 push(@servers,$ip.':'.$port);
343             }
344             }
345             }
346 15         52 return \@servers;
347             }
348              
349             sub DESTROY {
350 10     10   13490 my $self = shift;
351 10         18 foreach my $config_endpoint (keys %{$self->{_sockets}}) {
  10         58  
352 7         16 my $socket = $self->{_sockets}->{$config_endpoint};
353 7 50 33     68 if (defined $self->{_socket} && $socket->connected()) {
354 0           $self->{_socket}->close();
355             }
356             }
357             };
358              
359             1;
360             __END__