File Coverage

blib/lib/ShardedKV.pm
Criterion Covered Total %
statement 1 3 33.3
branch n/a
condition n/a
subroutine 1 1 100.0
pod n/a
total 2 4 50.0


line stmt bran cond sub pod time code
1             package ShardedKV;
2             $ShardedKV::VERSION = '0.20';
3 3     3   76569 use Moose;
  0            
  0            
4             # ABSTRACT: An interface to sharded key-value stores
5              
6             require ShardedKV::Storage;
7             require ShardedKV::Storage::Memory;
8             require ShardedKV::Continuum;
9              
10              
11              
12             has 'continuum' => (
13             is => 'rw',
14             does => 'ShardedKV::Continuum',
15             required => 1,
16             );
17              
18              
19             has 'migration_continuum' => (
20             is => 'rw',
21             does => 'ShardedKV::Continuum',
22             );
23              
24              
25             has 'storages' => (
26             is => 'ro',
27             isa => 'HashRef', # of ShardedKV::Storage doing-things
28             default => sub { +{} },
29             );
30              
31              
32              
33             has 'logger' => (
34             is => 'rw',
35             );
36              
37              
38             # bypassing accessors since this is a hot path
39             sub get {
40             my ($self, $key) = @_;
41             my ($mig_cont, $cont) = @{$self}{qw(migration_continuum continuum)};
42              
43             # dumb code for efficiency (otherwise, this would be a loop or in methods)
44              
45             my $logger = $self->{logger};
46             my $do_debug = ($logger and $logger->is_debug) ? 1 : 0;
47              
48             my $storages = $self->{storages};
49             my $chosen_shard;
50             my $value_ref;
51             if (defined $mig_cont) {
52             $chosen_shard = $mig_cont->choose($key);
53             $logger->debug("get()using migration continuum, got storage '$chosen_shard'") if $do_debug;
54             my $storage = $storages->{ $chosen_shard };
55             die "Failed to find chosen storage (server) for id '$chosen_shard' via key '$key'"
56             if not $storage;
57             $value_ref = $storage->get($key);
58             }
59              
60             if (not defined $value_ref) {
61             my $where = $cont->choose($key);
62             $logger->debug("get()using regular continuum, got storage '$where'") if $do_debug;
63             if (!$chosen_shard or $where ne $chosen_shard) {
64             my $storage = $storages->{ $where };
65             die "Failed to find chosen storage (server) for id '$where' via key '$key'"
66             if not $storage;
67             $value_ref = $storage->get($key);
68             }
69             }
70              
71             return $value_ref;
72             }
73              
74              
75             # bypassing accessors since this is a hot path
76             sub set {
77             my ($self, $key, $value_ref) = @_;
78             my $continuum = $self->{migration_continuum};
79             $continuum = $self->{continuum} if not defined $continuum;
80              
81             my $where = $continuum->choose($key);
82             my $storage = $self->{storages}{$where};
83             if (not $storage) {
84             die "Failed to find chosen storage (server) for id '$where' via key '$key'";
85             }
86              
87             $storage->set($key, $value_ref);
88             }
89              
90              
91             sub delete {
92             my ($self, $key) = @_;
93              
94             my ($mig_cont, $cont) = @{$self}{qw(migration_continuum continuum)};
95              
96             # dumb code for efficiency (otherwise, this would be a loop or in methods)
97              
98             my $logger = $self->{logger};
99             my $do_debug = ($logger and $logger->is_debug) ? 1 : 0;
100              
101             my $storages = $self->{storages};
102             my $chosen_shard;
103             # Try deleting from shard pointed at by migr. cont. first
104             if (defined $mig_cont) {
105             $chosen_shard = $mig_cont->choose($key);
106             $logger->debug("Deleting from migration continuum, got storage '$chosen_shard'") if $do_debug;
107             my $storage = $storages->{ $chosen_shard };
108             die "Failed to find chosen storage (server) for id '$chosen_shard' via key '$key'"
109             if not $storage;
110             $storage->delete($key);
111             }
112              
113             # ALWAYS also delete from the shard pointed at by the main continuum
114             my $where = $cont->choose($key);
115             $logger->debug("Deleting from continuum, got storage '$where'") if $do_debug;
116             if (!$chosen_shard or $where ne $chosen_shard) {
117             my $storage = $storages->{ $where };
118             die "Failed to find chosen storage (server) for id '$where' via key '$key'"
119             if not $storage;
120             $storage->delete($key);
121             }
122             }
123              
124              
125             sub reset_connection {
126             my ($self, $key) = @_;
127              
128             my ($mig_cont, $cont) = @{$self}{qw(migration_continuum continuum)};
129              
130             # dumb code for efficiency (otherwise, this would be a loop or in methods)
131              
132             my $logger = $self->{logger};
133             my $do_debug = ($logger and $logger->is_debug) ? 1 : 0;
134              
135             my $storages = $self->{storages};
136             my $chosen_shard;
137             # Reset the shard pointed at by migr. cont. first
138             if (defined $mig_cont) {
139             $chosen_shard = $mig_cont->choose($key);
140             $logger->debug("Resetting the connection to the shard from migration continuum, got storage '$chosen_shard'") if $do_debug;
141             my $storage = $storages->{ $chosen_shard };
142             die "Failed to find chosen storage (server) for id '$chosen_shard' via key '$key'"
143             if not $storage;
144             $storage->reset_connection();
145             }
146              
147             # Reset the shard from the main continuum
148             my $where = $cont->choose($key);
149             $logger->debug("Resetting the connection to the shard from the main continuum, got storage '$where'") if $do_debug;
150             if (!$chosen_shard or $where ne $chosen_shard) {
151             my $storage = $storages->{ $where };
152             die "Failed to find chosen storage (server) for id '$where' via key '$key'"
153             if not $storage;
154             $storage->reset_connection();
155             }
156             }
157              
158              
159             sub begin_migration {
160             my ($self, $migration_continuum) = @_;
161              
162             my $logger = $self->{logger};
163             if ($self->migration_continuum) {
164             my $err = "Cannot start a continuum migration in the middle of another migration";
165             $logger->fatal($err) if $logger;
166             Carp::croak($err);
167             }
168             $logger->info("Starting continuum migration") if $logger;
169              
170             $self->migration_continuum($migration_continuum);
171             }
172              
173              
174             sub end_migration {
175             my ($self) = @_;
176             my $logger = $self->{logger};
177             $logger->info("Ending continuum migration") if $logger;
178              
179             $self->continuum($self->migration_continuum);
180             delete $self->{migration_continuum};
181             }
182              
183             no Moose;
184             __PACKAGE__->meta->make_immutable;
185              
186             =pod
187              
188             =head1 NAME
189              
190             ShardedKV - An interface to sharded key-value stores
191              
192             =head1 VERSION
193              
194             version 0.20
195              
196             =head1 SYNOPSIS
197              
198             use ShardedKV;
199             use ShardedKV::Continuum::Ketama;
200             use ShardedKV::Storage::Redis;
201            
202             my $continuum_spec = [
203             ["shard1", 100], # shard name, weight
204             ["shard2", 150],
205             ];
206             my $continuum = ShardedKV::Continuum::Ketama->new(from => $continuum_spec);
207            
208             # Redis storage chosen here, but can also be "Memory" or "MySQL".
209             # "Memory" is for testing. Mixing storages likely has weird side effects.
210             my %storages = (
211             shard1 => ShardedKV::Storage::Redis->new(
212             redis_connect_str => 'redisserver:6379',
213             ),
214             shard2 => ShardedKV::Storage::Redis->new(
215             redis_connect_str => 'redisserver:6380',
216             ),
217             );
218            
219             my $skv = ShardedKV->new(
220             storages => \%storages,
221             continuum => $continuum,
222             );
223            
224             my $value = $skv->get($key);
225             $skv->set($key, $value);
226             $skv->delete($key);
227              
228             =head1 DESCRIPTION
229              
230             This module implements an abstract interface to a sharded key-value store.
231             The storage backends as well as the "continuum" are pluggable. "Continuum"
232             is to mean "the logic that decides in which shard a particular key lives".
233             Typically, people use consistent hashing for this purpose and very commonly
234             the choice is to use ketama specifically. See below for references.
235              
236             Beside the abstract querying interface, this module also implements logic
237             to add one or more servers to the continuum and use passive key migration
238             to extend capacity without downtime. Do make it a point to understand the
239             logic before using it. More on that below.
240              
241             =head2 LOGGING
242              
243             ShardedKV allows instrumentation for logging and debugging by setting
244             the C<logger> attribute of the main ShardedKV object, and/or its
245             continuum and/or any or all storage sub-objects. If set, the
246             C<logger> attribute must be an object implementing the following methods:
247              
248             =over 4
249              
250             =item *
251              
252             trace
253              
254             =item *
255              
256             debug
257              
258             =item *
259              
260             info
261              
262             =item *
263              
264             warn
265              
266             =item *
267              
268             error
269              
270             =item *
271              
272             fatal
273              
274             =back
275              
276             which take a string parameter that is to be logged.
277             These logging levels might be familiar since they are taken from L<Log::Log4perl>,
278             which means that you can use a C<Log::Log4perl::Logger> object here.
279              
280             Additionally, the following methods must return whether or not the given log
281             level is enabled, to potentially avoid costly construction of log messages:
282              
283             =over 4
284              
285             =item *
286              
287             is_trace
288              
289             =item *
290              
291             is_debug
292              
293             =item *
294              
295             is_info
296              
297             =item *
298              
299             is_warn
300              
301             =item *
302              
303             is_error
304              
305             =item *
306              
307             is_fatal
308              
309             =back
310              
311             =head1 PUBLIC ATTRIBUTES
312              
313             =head2 continuum
314              
315             The continuum object decides on which shard a given key lives.
316             This is required for a C<ShardedKV> object and must be an object
317             that implements the C<ShardedKV::Continuum> role.
318              
319             =head2 migration_continuum
320              
321             This is a second continuum object that has additional shards configured.
322             If this is set, a passive key migration is in effect. See C<begin_migration>
323             below!
324              
325             =head2 storages
326              
327             A hashref of storage objects, each of which represents one shard.
328             Keys in the hash must be the same labels/shard names that are used
329             in the continuum. Each storage object must implement the
330             C<ShardedKV::Storage> role.
331              
332             =head2 logger
333              
334             If set, this must be a user-supplied object that implements
335             a certain number of methods which are called throughout ShardedKV
336             for logging/debugging purposes. See L</LOGGING> for details.
337              
338             =head1 PUBLIC METHODS
339              
340             =head2 get
341              
342             Given a key, fetches the value for that key from the correct shard
343             and returns that value or undef on failure.
344              
345             Different storage backends may return a reference to the value instead.
346             For example, the Redis and Memory backends return scalar references,
347             whereas the mysql backend returns an array reference. This might still
348             change, likely, all backends may be required to return scalar references
349             in the future.
350              
351             =head2 set
352              
353             Given a key and a value, saves the value into the key within the
354             correct shard.
355              
356             The value needs to be a reference of the same type that would be
357             returned by the storage backend when calling C<get()>. See the
358             discussion above.
359              
360             =head2 delete
361              
362             Given a key, deletes the key's entry from the correct shard.
363              
364             In a migration situation, this might attempt to delete the key from
365             multiple shards, see below.
366              
367             =head2 reset_connection
368              
369             Given a key, it retrieves to which shard it would have communicated and calls
370             reset_connection() upon it. This allows doing a reconnect only for the shards
371             that have problems. If there is a migration_continuum it will also reset the
372             connection to that shard as well in an abundance of caution.
373              
374             =head2 begin_migration
375              
376             Given a C<ShardedKV::Continuum> object, this sets the
377             C<migration_continuum> property of the C<ShardedKV>, thus
378             beginning a I<passive> key migration. Right now, the only
379             kind of migration that is supported is I<adding> shards!
380             Only one migration may be in effect at a time. The
381             I<passive> qualification there is very significant. If you are,
382             for example, using the Redis storage backend with a key
383             expiration of one hour, then you B<know>, that after letting
384             the passive migration run for one hour, all keys that are
385             still relevant will have been migrated (or expired if they
386             were not relevant).
387              
388             Full migration example:
389              
390             use ShardedKV;
391             use ShardedKV::Continuum::Ketama;
392             use ShardedKV::Storage::Redis;
393            
394             my $continuum_spec = [
395             ["shard1", 100], # shard name, weight
396             ["shard2", 150],
397             ];
398             my $continuum = ShardedKV::Continuum::Ketama->new(from => $continuum_spec);
399            
400             # Redis storage chosen here, but can also be "Memory" or "MySQL".
401             # "Memory" is for testing. Mixing storages likely has weird side effects.
402             my %storages = (
403             shard1 => ShardedKV::Storage::Redis->new(
404             redis_connect_str => 'redisserver:6379',
405             expiration_time => 60*60,
406             ),
407             shard2 => ShardedKV::Storage::Redis->new(
408             redis_connect_str => 'redisserver:6380',
409             expiration_time => 60*60,
410             ),
411             );
412            
413             my $skv = ShardedKV->new(
414             storages => \%storages,
415             continuum => $continuum,
416             );
417             # ... use the skv ...
418            
419             # Oh, we need to extend it!
420             # Add storages:
421             $skv->storages->{shard3} = ShardedKV::Storage::Redis->new(
422             redis_connect_str => 'NEWredisserver:6379',
423             expiration_time => 60*60,
424             );
425             # ... could add more at the same time...
426             my $old_continuum = $skv->continuum;
427             my $extended_continuum = $old_continuum->clone;
428             $extended_continuum->extend([shard3 => 120]);
429             $skv->begin_migration($extended_continuum);
430             # ... use the skv normally...
431             # ... after one hour (60*60 seconds), we can stop the migration:
432             $skv->end_migration();
433              
434             The logic for the migration is fairly simple:
435              
436             If there is a migration continuum, then for get requests, that continuum
437             is used to find the right shard for the given key. If that shard does not
438             have the key, we check the original continuum and if that points the key
439             at a different shard, we query that.
440              
441             For delete requests, we also attempt to delete from the shard pointed to
442             by the migration continuum AND the shard pointed to by the main continuum.
443              
444             For set requests, we always only use the shard deduced from the migration
445             continuum
446              
447             C<end_migration()> will promote the migration continuum to the regular
448             continuum and set the C<migration_continuum> property to undef.
449              
450             =head2 end_migration
451              
452             See the C<begin_migration> docs above.
453              
454             =head1 SEE ALSO
455              
456             =over 4
457              
458             =item *
459              
460             L<ShardedKV::Storage>
461              
462             =item *
463              
464             L<ShardedKV::Storage::Redis>
465              
466             =item *
467              
468             L<Redis>
469              
470             =item *
471              
472             L<ShardedKV::Storage::Memory>
473              
474             =item *
475              
476             L<ShardedKV::Storage::MySQL>
477              
478             =item *
479              
480             L<DBI>
481              
482             =item *
483              
484             L<DBD::mysql>
485              
486             =back
487              
488             =over 4
489              
490             =item *
491              
492             L<ShardedKV::Continuum>
493              
494             =item *
495              
496             L<ShardedKV::Continuum::Ketama>
497              
498             =item *
499              
500             L<Algorithm::ConsistentHash::Ketama>
501              
502             =item *
503              
504             L<https://github.com/RJ/ketama>
505              
506             =item *
507              
508             L<ShardedKV::Continuum::StaticMapping>
509              
510             =back
511              
512             =head1 ACKNLOWLEDGMENT
513              
514             This module was originally developed for Booking.com.
515             With approval from Booking.com, this module was generalized
516             and put on CPAN, for which the authors would like to express
517             their gratitude.
518              
519             =head1 AUTHORS
520              
521             =over 4
522              
523             =item *
524              
525             Steffen Mueller <smueller@cpan.org>
526              
527             =item *
528              
529             Nick Perez <nperez@cpan.org>
530              
531             =item *
532              
533             Damian Gryski <dgryski@cpan.org>
534              
535             =back
536              
537             =head1 COPYRIGHT AND LICENSE
538              
539             This software is copyright (c) 2013 by Steffen Mueller.
540              
541             This is free software; you can redistribute it and/or modify it under
542             the same terms as the Perl 5 programming language system itself.
543              
544             =cut
545              
546             __END__
547              
548             # vim: ts=2 sw=2 et