File Coverage

blib/lib/Metabase/Index/SQLite/Sharded.pm
Criterion Covered Total %
statement 114 119 95.8
branch 27 42 64.2
condition 8 15 53.3
subroutine 20 32 62.5
pod 0 18 0.0
total 169 226 74.7


line stmt bran cond sub pod time code
1 1     1   678858 use 5.006;
  1         4  
2 1     1   7 use strict;
  1         1  
  1         24  
3 1     1   5 use warnings;
  1         2  
  1         78  
4              
5             package Metabase::Index::SQLite::Sharded;
6             # ABSTRACT: Metabase index using multiple SQLite databases
7              
8             our $VERSION = '1.001';
9              
10 1     1   6 use Moose;
  1         1  
  1         9  
11 1     1   6597 use Data::Stream::Bulk::Callback;
  1         120402  
  1         40  
12 1     1   7 use List::Util qw/sum/;
  1         1  
  1         69  
13 1     1   747 use Storable qw/dclone/;
  1         3740  
  1         90  
14 1     1   9 use Moose::Util::TypeConstraints;
  1         1  
  1         12  
15 1     1   2501 use Metabase::Index::SQLite;
  1         4  
  1         1154  
16              
17             with 'Metabase::Backend::SQLite';
18             with 'Metabase::Index' => { -excludes => 'exists' };
19              
20             subtype 'IndexShardSize', # XXX should refactor with Archive one
21             as 'Int',
22             where { $_ > 0 && $_ < 8 }, # can't trust last byte of timestamp
23             message { "The number you provided, $_, was not between 1 and 8" };
24              
25             has shard_digits => (
26             is => 'ro',
27             isa => 'IndexShardSize',
28             default => 2,
29             );
30              
31             has _shards => (
32             is => 'ro',
33             traits => ['Hash'],
34             isa => 'HashRef[Object]',
35             default => sub { return {} },
36             handles => {
37             '_get_shard' => 'get',
38             '_set_shard' => 'set',
39             '_all_shards' => 'values',
40             },
41             );
42              
43             sub initialize {
44 4     4 0 7216 my ($self, $classes, $resources) = @_;
45 4         161 my $filename = $self->filename;
46 4         130 my ($basename, $ext) = $self->filename =~ m{^(.*)\.([^.]+)$};
47 4 50       243 $ext = '' unless defined $ext;
48 4 50       14 $basename = $filename unless defined $basename;
49 4         182 my $digits = $self->shard_digits;
50 4 50       16 my $template = $digits == 1 ? "H2" : "H$digits";
51             # warn "*** TEMPLATE: $template";
52 4         23 for my $n ( 0 .. 16**$digits - 1) {
53 64         634 my $key = unpack($template,pack("I",$n));
54 64         120 my $index_file;
55 64 50 33     428 if ( $basename && $ext ) {
56 64         241 $index_file = "$basename\_$key.$ext";
57             }
58             else {
59 0         0 $index_file = "$basename\_$key";
60             }
61 64 50       828 my $index = Metabase::Index::SQLite->new(
62             filename => $index_file,
63             ) or die "Couldn't not build shard '$index_file' \n";
64             # warn "*** Initializing $key\n";
65 64         152738 $index->initialize($classes, $resources);
66 64         3355 $self->_set_shard($key, $index);
67             }
68 4         22 return;
69             }
70              
71             sub _shard_key {
72 13     13   67 my ($self, $guid) = @_;
73 13         441 my $digits = $self->shard_digits;
74 13         44 my $key = substr $guid, (7-$digits), $digits;
75 13 50 33     185 if ( defined $key && length $key > 1 ) {
    50 33        
76 0         0 return $key;
77             }
78             elsif ( defined $key && length $key == 1 ) {
79 13         51 return "0$key";
80             }
81             else {
82 0 0       0 return scalar "0"x($digits==1 ? 2 : $digits);
83             }
84             }
85              
86             # override from role to target query at right shard
87             sub exists {
88 3     3 0 2536 my ($self, $guid) = @_;
89 3         9 my $key = $self->_shard_key($guid);
90 3         146 my $shard = $self->_get_shard($key);
91             # if desired guid in upper case, fix it
92 3         8 return scalar @{ $shard->search(-where => [-eq =>'core.guid'=>lc $guid])};
  3         36  
93             }
94              
95             sub add {
96 8     8 0 6768 my ( $self, $fact ) = @_;
97 8         90 my $key = $self->_shard_key($fact->guid);
98             # warn "***Adding to shard '$key'\n";
99 8 50       303 my $shard = $self->_get_shard($key)
100             or die "Couldn't find shard for '$key' from " . $fact->guid. "\n";
101 8         43 return $shard->add($fact);
102             }
103              
104             sub delete {
105 2     2 0 2188 my ( $self, $guid ) = @_;
106 2         10 my $key = $self->_shard_key($guid);
107 2         72 return $self->_get_shard($key)->delete($guid);
108             }
109              
110             sub count {
111 15     15 0 10942 my ( $self, %spec ) = @_;
112 15         700 return sum map { $_->count(%{ dclone(\%spec) }) } $self->_all_shards;
  240         15781  
  240         4636  
113             }
114              
115             sub query {
116 13     13 0 17181 my ( $self, %spec) = @_;
117 13         579 my @shards = $self->_all_shards;
118 13         543 my @iters = map { $_->_shard_query(%{ dclone(\%spec) }) } $self->_all_shards;
  208         27797  
  208         6541  
119              
120             # XXX this does not preserve order or limit
121 13         2023 my $limit = $spec{-limit};
122 13         32 my $count = 0;
123             return Data::Stream::Bulk::Callback->new(
124             callback => sub {
125 24 100 100 24   2989 return if $limit && $count == $limit; # shortcut
126             # Need to merge results
127 23         36 my @results;
128             my @not_done;
129 23         62 for my $s ( @iters ) {
130 223 100       11339 if ( my @items = $s->items ) {
131 17         1179 push @not_done, $s; # round-robin
132 17         48 push @results, @items;
133             }
134             }
135 23 100       1587 return unless @results;
136 11         327 @iters = @not_done; # for next invocation
137             # Need to order results
138 11 100       530 if ( my @clauses = $self->_order_clauses(\%spec) ) {
139 3         17 @results = $self->_sort_results(\@results, \@clauses);
140             }
141             # Need to limit results
142 11 100       49 if ( $limit ) {
143 1 50       6 if ( $count + @results <= $limit ) {
144 0         0 $count += @results;
145             }
146             else {
147 1         3 my $need = $limit - $count;
148 1         2 $count += $need;
149 1         3 splice @results, $need;
150             }
151             }
152             # Need to extract just guid
153             # warn "*** RESULTS: @results\n";
154 11         35 return [ map { $_->[0] } @results ]; # just the GUID
  16         92  
155             },
156 13         667 );
157             }
158              
159             sub _order_clauses {
160 11     11   28 my ($self, $spec) = @_;
161 11 100 66     114 if ( defined $spec->{-order} and ref $spec->{-order} eq 'ARRAY') {
162 3         5 my @clauses;
163 3         10 my @order = @{$spec->{-order}};
  3         17  
164 3         15 while ( @order ) {
165 3         14 my ($dir, $field) = splice( @order, 0, 2);
166 3         17 $dir =~ s/^-//;
167 3         10 $dir = uc $dir;
168 3         18 push @clauses, [$field, $dir];
169             }
170 3         16 return @clauses;
171             }
172             else {
173 8         57 return ();
174             }
175             }
176              
177             sub _sort_results {
178 3     3   7 my ($self, $results, $clauses) = @_;
179             my $sorter = sub {
180 3     3   7 my ($left_data, $right_data) = @_;
181 3         15 for my $i ( 0 .. $#$clauses ) {
182 3         8 my $dir = $clauses->[$i][1];
183 3         10 my $left = $left_data->[$i+1];
184 3         8 my $right = $right_data->[$i+1];
185 3 100       12 if ( $dir eq 'ASC' ) {
186 1 50       6 return 1 if $left gt $right;
187 1 50       19 return -1 if $left lt $right;
188             }
189             else {
190 2 50       9 return -1 if $left gt $right;
191 2 50       28 return 1 if $left lt $right;
192             }
193             }
194 0         0 return 0; # everything was equal
195 3         20 };
196 3         23 return sort { $sorter->($a,$b) } @$results; ## no critic
  3         8  
197             }
198              
199             # Fake these to satisfy the role -- we actually delegate everything out
200             # to shards
201       0 0   sub op_and { }
202       0 0   sub op_between { }
203       0 0   sub op_eq { }
204       0 0   sub op_ge { }
205       0 0   sub op_gt { }
206       0 0   sub op_le { }
207       0 0   sub op_like { }
208       0 0   sub op_lt { }
209       0 0   sub op_ne { }
210       0 0   sub op_not { }
211       0 0   sub op_or { }
212       0 0   sub translate_query { }
213              
214             1;
215              
216             __END__
217              
218             =pod
219              
220             =encoding UTF-8
221              
222             =head1 NAME
223              
224             Metabase::Index::SQLite::Sharded - Metabase index using multiple SQLite databases
225              
226             =head1 VERSION
227              
228             version 1.001
229              
230             =head1 SYNOPSIS
231              
232             use Metabase::Index::SQLite::Sharded;
233              
234             my $index = Metabase::Index::SQLite::Sharded->new(
235             filename => $sqlite_file,
236             shard_digits => 2,
237             );
238              
239             =head1 DESCRIPTION
240              
241             This is an implementation of the L<Metabase::Index::SQL> role using SQLite
242             shards.
243              
244             SQLite stores a database entirely in a single file. That starts to become
245             slow as the size of the file gets large. This Metabase::Index shards
246             the index across multiple SQLite files.
247              
248             It takes the same options as L<Metabase::Index::SQLite>, with one additional
249             option, C<shard_digits>. The C<shard_digits> attribute defines how many digits
250             of the GUID to use as a shard key. Each digit is a hexadecimal number, so
251             digits increase the number of shards as a power of 16. E.g., "1" means 16
252             shards, "2" means 256 shards and so on.
253              
254             The shard key is inserted to the database C<filename> parameter either before
255             the final period or at the end. E.g. for C<shard_digits> of "2" and
256             C<filename> "db.sqlite3", the shards would be "db_00.slite3", "db_01.sqlite3",
257             and so on.
258              
259             =for Pod::Coverage::TrustPod add query delete count exists initialize
260             translate_query op_eq op_ne op_gt op_lt op_ge op_le op_between op_like
261             op_not op_or op_and
262              
263             =head1 AUTHORS
264              
265             =over 4
266              
267             =item *
268              
269             David Golden <dagolden@cpan.org>
270              
271             =item *
272              
273             Leon Brocard <acme@astray.org>
274              
275             =back
276              
277             =head1 COPYRIGHT AND LICENSE
278              
279             This software is Copyright (c) 2011 by David Golden.
280              
281             This is free software, licensed under:
282              
283             The Apache License, Version 2.0, January 2004
284              
285             =cut