File Coverage

blib/lib/KiokuDB/Backend/DBI.pm
Criterion Covered Total %
statement 405 444 91.2
branch 83 124 66.9
condition 20 40 50.0
subroutine 87 96 90.6
pod 2 36 5.5
total 597 740 80.6


line stmt bran cond sub pod time code
1             package KiokuDB::Backend::DBI;
2             BEGIN {
3 5     5   3103281 $KiokuDB::Backend::DBI::AUTHORITY = 'cpan:NUFFIN';
4             }
5             $KiokuDB::Backend::DBI::VERSION = '1.23';
6 5     5   348 use Moose;
  5         503916  
  5         40  
7             # ABSTRACT: DBI backend for KiokuDB
8              
9 5     5   36519 use Moose::Util::TypeConstraints;
  5         36  
  5         47  
10              
11 5     5   14054 use MooseX::Types 0.08 -declare => [qw(ValidColumnName SchemaProto)];
  5         120503  
  5         36  
12              
13 5     5   25546 use MooseX::Types::Moose qw(ArrayRef HashRef Str Defined);
  5         62085  
  5         49  
14              
15 5     5   25375 use Moose::Util::TypeConstraints qw(enum);
  5         11  
  5         29  
16              
17 5     5   2512 use Try::Tiny;
  5         34  
  5         369  
18 5     5   1789 use Data::Stream::Bulk::DBI 0.07;
  5         322687  
  5         263  
19 5     5   2352 use SQL::Abstract;
  5         35077  
  5         344  
20 5     5   2386 use JSON;
  5         35864  
  5         64  
21 5     5   816 use Scalar::Util qw(weaken refaddr);
  5         10  
  5         281  
22 5     5   349 use List::MoreUtils qw(any);
  5         9399  
  5         61  
23 5     5   3594 use Class::Load qw(load_class);
  5         13  
  5         238  
24 5     5   1322 use Search::GIN 0.07 ();
  5         503  
  5         113  
25              
26 5     5   363 use KiokuDB 0.46 ();
  5         1741989  
  5         103  
27 5     5   1584 use KiokuDB::Backend::DBI::Schema;
  5         17  
  5         170  
28 5     5   33 use KiokuDB::TypeMap;
  5         9  
  5         129  
29 5     5   2102 use KiokuDB::TypeMap::Entry::DBIC::Row;
  5         24  
  5         225  
30 5     5   2747 use KiokuDB::TypeMap::Entry::DBIC::ResultSource;
  5         21  
  5         177  
31 5     5   2135 use KiokuDB::TypeMap::Entry::DBIC::ResultSet;
  5         23  
  5         211  
32 5     5   2371 use KiokuDB::TypeMap::Entry::DBIC::Schema;
  5         17  
  5         207  
33              
34 5     5   36 use namespace::clean -except => 'meta';
  5         10  
  5         29  
35              
36             with qw(
37             KiokuDB::Backend
38             KiokuDB::Backend::Serialize::Delegate
39             KiokuDB::Backend::Role::Clear
40             KiokuDB::Backend::Role::TXN
41             KiokuDB::Backend::Role::Scan
42             KiokuDB::Backend::Role::Query::Simple
43             KiokuDB::Backend::Role::Query::GIN
44             KiokuDB::Backend::Role::Concurrency::POSIX
45             KiokuDB::Backend::Role::GC
46             Search::GIN::Extract::Delegate
47             );
48             # KiokuDB::Backend::Role::TXN::Nested is not supported by many DBs
49             # we don't really care though
50              
51             my @std_cols = qw(id class root tied);
52             my @reserved_cols = ( @std_cols, 'data' );
53             my %reserved_cols = ( map { $_ => 1 } @reserved_cols );
54              
55             subtype ValidColumnName, as Str, where { not exists $reserved_cols{$_} };
56             subtype SchemaProto, as Defined, where {
57             load_class($_) unless ref;
58             !ref($_) || blessed($_) and $_->isa("DBIx::Class::Schema::KiokuDB");
59             };
60              
61             sub new_from_dsn {
62 62     62 0 335992 my ( $self, $dsn, @args ) = @_;
63 62 50 33     463 @args = %{ $args[0] } if @args == 1 and ref $args[0] eq 'HASH';
  0         0  
64 62         2155 $self->new( dsn => "dbi:$dsn", @args );
65             }
66              
67             sub BUILD {
68 64     64 0 177 my $self = shift;
69              
70 64         1946 $self->schema; # connect early
71              
72 64 100       1923 if ( $self->create ) {
73 62         286 $self->create_tables;
74             }
75             }
76              
77             has '+serializer' => ( default => "json" ); # to make dumps readable
78              
79             has json => (
80             isa => "Object",
81             is => "ro",
82             default => sub { JSON->new },
83             );
84              
85             has create => (
86             isa => "Bool",
87             is => "ro",
88             default => 0,
89             );
90              
91             has 'dsn' => (
92             isa => "Str|CodeRef",
93             is => "ro",
94             );
95              
96             has [qw(user password)] => (
97             isa => "Str",
98             is => "ro",
99             );
100              
101             has dbi_attrs => (
102             isa => HashRef,
103             is => "ro",
104             );
105              
106             has mysql_strict => (
107             isa => "Bool",
108             is => "ro",
109             default => 1,
110             );
111              
112             has sqlite_sync_mode => (
113             isa => enum([qw(0 1 2 OFF NORMAL FULL off normal full)]),
114             is => "ro",
115             predicate => "has_sqlite_fsync_mode",
116             );
117              
118             has on_connect_call => (
119             isa => "ArrayRef",
120             is => "ro",
121             lazy_build => 1,
122             );
123              
124             sub _build_on_connect_call {
125 63     63   133 my $self = shift;
126              
127 63         117 my @call;
128              
129 63 50       2037 if ( $self->mysql_strict ) {
130             push @call, sub {
131 62     62   104032 my $storage = shift;
132              
133 62 50       723 if ( $storage->can("connect_call_set_strict_mode") ) {
134 0         0 $storage->connect_call_set_strict_mode;
135             }
136 63         351 };
137             };
138              
139 63 100       2099 if ( $self->has_sqlite_fsync_mode ) {
140             push @call, sub {
141 60     60   4044 my $storage = shift;
142              
143 60 50       1668 if ( $storage->sqlt_type eq 'SQLite' ) {
144 60         3214 $storage->dbh_do(sub { $_[1]->do("PRAGMA synchronous=" . $self->sqlite_sync_mode) });
  60         2885  
145             }
146 60         354 };
147             }
148              
149 63         1666 return \@call;
150             }
151              
152             has dbic_attrs => (
153             isa => "HashRef",
154             is => "ro",
155             lazy_build => 1,
156             );
157              
158             sub _build_dbic_attrs {
159 63     63   136 my $self = shift;
160              
161             return {
162 63         1847 on_connect_call => $self->on_connect_call,
163             };
164             }
165              
166             has connect_info => (
167             isa => ArrayRef,
168             is => "ro",
169             lazy_build => 1,
170             );
171              
172             sub _build_connect_info {
173 63     63   137 my $self = shift;
174              
175 63         1696 return [ $self->dsn, $self->user, $self->password, $self->dbi_attrs, $self->dbic_attrs ];
176             }
177              
178             has schema => (
179             isa => "DBIx::Class::Schema",
180             is => "ro",
181             lazy_build => 1,
182             init_arg => "connected_schema",
183             handles => [qw(deploy kiokudb_entries_source_name)],
184             );
185              
186             has _schema_proto => (
187             isa => SchemaProto,
188             is => "ro",
189             init_arg => "schema",
190             default => "KiokuDB::Backend::DBI::Schema",
191             );
192              
193             has schema_hook => (
194             isa => "CodeRef|Str",
195             is => "ro",
196             predicate => "has_schema_hook",
197             );
198              
199             sub _build_schema {
200 63     63   149 my $self = shift;
201              
202 63         1958 my $schema = $self->_schema_proto->clone;
203              
204 63 50       29717 unless ( $schema->kiokudb_entries_source_name ) {
205 63         5984 $schema->define_kiokudb_schema( extra_entries_columns => $self->columns );
206             }
207              
208 63 50       9888 if ( $self->has_schema_hook ) {
209 0         0 my $h = $self->schema_hook;
210 0         0 $self->$h($schema);
211             }
212              
213 63         164 $schema->connect(@{ $self->connect_info });
  63         1777  
214             }
215              
216             has storage => (
217             isa => "DBIx::Class::Storage::DBI",
218             is => "rw",
219             lazy_build => 1,
220             handles => [qw(dbh_do)],
221             );
222              
223 63     63   1690 sub _build_storage { shift->schema->storage }
224              
225             has for_update => (
226             isa => "Bool",
227             is => "ro",
228             default => 1,
229             );
230              
231             has _for_update => (
232             isa => "Bool",
233             is => "ro",
234             lazy_build => 1,
235             );
236              
237             sub _build__for_update {
238 52     52   177 my $self = shift;
239              
240             return (
241 52   33     1652 $self->for_update
242             and
243             $self->storage->sqlt_type =~ /^(?:MySQL|Oracle|PostgreSQL)$/
244             );
245             }
246              
247             has columns => (
248             isa => ArrayRef[ValidColumnName|HashRef],
249             is => "ro",
250             default => sub { [] },
251             );
252              
253             has _columns => (
254             isa => HashRef,
255             is => "ro",
256             lazy_build => 1,
257             );
258              
259             sub _build__columns {
260 53     53   105 my $self = shift;
261              
262 53         1438 my $rs = $self->schema->source( $self->kiokudb_entries_source_name );
263              
264 53         5215 my @user_cols = grep { not exists $reserved_cols{$_} } $rs->columns;
  364         1572  
265              
266 53   50     269 return { map { $_ => $rs->column_info($_)->{extract} || undef } @user_cols };
  99         1503  
267             }
268              
269             has _ordered_columns => (
270             isa => "ArrayRef",
271             is => "ro",
272             lazy_build => 1,
273             );
274              
275             sub _build__ordered_columns {
276 52     52   174 my $self = shift;
277 52         141 return [ @reserved_cols, sort keys %{ $self->_columns } ];
  52         1483  
278             }
279              
280             has _column_order => (
281             isa => "HashRef",
282             is => "ro",
283             lazy_build => 1,
284             );
285              
286             sub _build__column_order {
287 0     0   0 my $self = shift;
288              
289 0         0 my $cols = $self->_ordered_columns;
290 0         0 return { map { $cols->[$_] => $_ } 0 .. $#$cols }
  0         0  
291             }
292              
293             has '+extract' => (
294             required => 0,
295             );
296              
297             has sql_abstract => (
298             isa => "SQL::Abstract",
299             is => "ro",
300             lazy_build => 1,
301             );
302              
303             sub _build_sql_abstract {
304 3     3   9 my $self = shift;
305              
306 3         29 SQL::Abstract->new;
307             }
308              
309             # use a Maybe so we can force undef in the builder
310             has batch_size => (
311             isa => "Maybe[Int]",
312             is => "ro",
313             lazy => 1,
314             builder => '_build_batch_size',
315             );
316              
317             sub _build_batch_size {
318 52     52   157 my $self = shift;
319              
320 52 50       1476 if ($self->storage->sqlt_type eq 'SQLite') {
321 52         4345 return 999;
322             } else {
323 0         0 return undef;
324             }
325             }
326              
327 0     0 0 0 sub has_batch_size { defined shift->batch_size }
328              
329             sub register_handle {
330 63     63 0 37676 my ( $self, $kiokudb ) = @_;
331              
332 63         1835 $self->schema->_kiokudb_handle($kiokudb);
333             }
334              
335             sub default_typemap {
336 52     52 0 1639399 KiokuDB::TypeMap->new(
337             isa_entries => {
338             # redirect to schema row
339             'DBIx::Class::Row' => KiokuDB::TypeMap::Entry::DBIC::Row->new,
340              
341             # actual serialization
342             'DBIx::Class::ResultSet' => KiokuDB::TypeMap::Entry::DBIC::ResultSet->new,
343              
344             # fake, the entries never get written to the db
345             'DBIx::Class::ResultSource' => KiokuDB::TypeMap::Entry::DBIC::ResultSource->new,
346             'DBIx::Class::Schema' => KiokuDB::TypeMap::Entry::DBIC::Schema->new,
347             },
348             );
349             }
350              
351             sub insert {
352 148     148 0 18871224 my ( $self, @entries ) = @_;
353              
354 148 100       910 return unless @entries;
355              
356 133         4127 my $g = $self->schema->txn_scope_guard;
357              
358 133         40071 $self->insert_rows( $self->entries_to_rows(@entries) );
359              
360             # hopefully we're in a transaction, otherwise this totally sucks
361 130 100       19679 if ( $self->extract ) {
362 122         306 my %gin_index;
363              
364 122         408 foreach my $entry ( @entries ) {
365 3394         621708 my $id = $entry->id;
366              
367 3394 100 66     97426 if ( $entry->deleted || !$entry->has_object ) {
368 3         133 $gin_index{$id} = [];
369             } else {
370 3391   33     200217 my $d = $entry->backend_data || $entry->backend_data({});
371 3391         190425 $gin_index{$id} = [ $self->extract_values( $entry->object, entry => $entry ) ];
372             }
373             }
374              
375 122         30072 $self->update_index(\%gin_index);
376             }
377              
378 130         6363 $g->commit;
379             }
380              
381             sub entries_to_rows {
382 133     133 0 978 my ( $self, @entries ) = @_;
383              
384 133         482 my ( %insert, %update, @dbic );
385              
386 133         529 foreach my $t ( \%insert, \%update ) {
387 266         488 foreach my $col ( @{ $self->_ordered_columns } ) {
  266         8605  
388 1826         4338 $t->{$col} = [];
389             }
390             }
391              
392 133         368 foreach my $entry ( @entries ) {
393 3411         82550 my $id = $entry->id;
394              
395 3411 100       30109 if ( $id =~ /^dbic:schema/ ) {
    100          
396 2         10 next;
397             } elsif ( $id =~ /^dbic:row:/ ) {
398 2         44 push @dbic, $entry->data;
399             } else {
400 3407 100       78135 my $targ = $entry->prev ? \%update : \%insert;
401              
402 3407         29989 my $row = $self->entry_to_row($entry, $targ);
403             }
404             }
405              
406 133         1605 return \( %insert, %update, @dbic );
407             }
408              
409             sub entry_to_row {
410 3408     3408 0 7282 my ( $self, $entry, $collector ) = @_;
411              
412 3408         6880 for (qw(id class tied)) {
413 10224         55916 push @{ $collector->{$_} }, $entry->$_;
  10224         242861  
414             }
415              
416 3408 100       24417 push @{ $collector->{root} }, $entry->root ? 1 : 0;
  3408         79235  
417              
418 3408         52463 push @{ $collector->{data} }, $self->serialize($entry);
  3408         14396  
419              
420 3408         5238588 my $cols = $self->_columns;
421              
422 3408         12981 foreach my $column ( keys %$cols ) {
423 6785         13421 my $c = $collector->{$column};
424 6785 50       167304 if ( my $extract = $cols->{$column} ) {
    100          
425 0 0       0 if ( my $obj = $entry->object ) {
426 0         0 push @$c, $obj->$extract($column);
427 0         0 next;
428             }
429             } elsif ( ref( my $data = $entry->data ) eq 'HASH' ) {
430 6749 100 66     65885 if ( exists $data->{$column} and not ref( my $value = $data->{$column} ) ) {
431 3424         8010 push @$c, $value;
432 3424         10973 next;
433             }
434             }
435              
436 3361         14338 push @$c, undef;
437             }
438             }
439              
440             sub insert_rows {
441 133     133 0 531 my ( $self, $insert, $update, $dbic ) = @_;
442              
443 133         4118 my $g = $self->schema->txn_scope_guard;
444              
445             $self->dbh_do(sub {
446 133     133   4623 my ( $storage, $dbh ) = @_;
447              
448 133 100       4206 if ( $self->extract ) {
449 125 50       446 if ( my @ids = map { @{ $_->{id} || [] } } $insert, $update ) {
  250 50       393  
  250         2782  
450              
451 125   50     3914 my $batch_size = $self->batch_size || scalar(@ids);
452              
453 125         1333 my @ids_copy = @ids;
454 125         729 while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) {
455 128         2659 my $del_gin_sth = $dbh->prepare_cached("DELETE FROM gin_index WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")");
456              
457 128         57837 $del_gin_sth->execute(@batch_ids);
458              
459 128         2150 $del_gin_sth->finish;
460             }
461             }
462             }
463              
464 133         5478 my $colinfo = $self->schema->source('entries')->columns_info;
465              
466 133         14928 my %rows = ( insert => $insert, update => $update );
467              
468 133         554 foreach my $op (qw(insert update)) {
469 263         864 my $prepare = "prepare_$op";
470 263         1290 my ( $sth, @cols ) = $self->$prepare($dbh);
471              
472 263         656 my $i = 1;
473              
474 263         627 foreach my $column_name (@cols) {
475 1805         3136 my $attributes = {};
476              
477 1805 50       4293 if ( exists $colinfo->{$column_name} ) {
478 1805         3928 my $dt = $colinfo->{$column_name}{data_type};
479 1805         51859 $attributes = $self->storage->bind_attribute_by_data_type($dt);
480             }
481              
482 1805         16503 $sth->bind_param_array( $i, $rows{$op}->{$column_name}, $attributes );
483              
484 1805         32999 $i++;
485             }
486              
487 263 50       2237 $sth->execute_array({ArrayTupleStatus => []}) or die;
488              
489 260         186743 $sth->finish;
490             }
491              
492 130         1038 $_->insert_or_update for @$dbic;
493 133         26612 });
494              
495 130         2420 $g->commit;
496             }
497              
498             sub prepare_select {
499 456     456 0 1573 my ( $self, $dbh, $stmt ) = @_;
500              
501 456 50       14337 $dbh->prepare_cached($stmt . ( $self->_for_update ? " FOR UPDATE" : "" ), {}, 3); # 3 = don't use if still Active
502             }
503              
504             sub prepare_insert {
505 133     133 0 432 my ( $self, $dbh ) = @_;
506              
507 133         273 my @cols = @{ $self->_ordered_columns };
  133         4569  
508              
509 133         1781 my $ins = $dbh->prepare_cached("INSERT INTO entries (" . join(", ", @cols) . ") VALUES (" . join(", ", ('?') x @cols) . ")");
510              
511 133         8486 return ( $ins, @cols );
512             }
513              
514             sub prepare_update {
515 130     130 0 410 my ( $self, $dbh ) = @_;
516              
517 130         248 my ( $id, @cols ) = @{ $self->_ordered_columns };
  130         4875  
518              
519 130         517 my $upd = $dbh->prepare_cached("UPDATE entries SET " . join(", ", map { "$_ = ?" } @cols) . " WHERE $id = ?");
  762         2638  
520              
521 130         8022 return ( $upd, @cols, $id );
522             }
523              
524             sub update_index {
525 122     122 0 447 my ( $self, $entries ) = @_;
526              
527             $self->dbh_do(sub {
528 122     122   4036 my ( $storage, $dbh ) = @_;
529              
530 122         908 my $i_sth = $dbh->prepare_cached("INSERT INTO gin_index (id, value) VALUES (?, ?)");
531              
532 122         13529 foreach my $id ( keys %$entries ) {
533             my $rv = $i_sth->execute_array(
534             {ArrayTupleStatus => []},
535             $id,
536 3394         713746 $entries->{$id},
537             );
538             }
539              
540 122         31420 $i_sth->finish;
541 122         1112 });
542             }
543              
544             sub _parse_dbic_key {
545 6     6   18 my ( $self, $key ) = @_;
546              
547 6         9 @{ $self->json->decode(substr($key,length('dbic:row:'))) };
  6         178  
548             }
549              
550             sub _part_rows_and_ids {
551 356     356   938 my ( $self, $rows_and_ids ) = @_;
552              
553 356         792 my ( @rows, @ids, @special );
554              
555 356         1171 for ( @$rows_and_ids ) {
556 3579 100       6588 if ( /^dbic:schema/ ) {
    100          
557 2         6 push @special, $_;
558             } elsif ( /^dbic:row:/ ) {
559 6         18 push @rows, $_;
560             } else {
561 3571         6164 push @ids, $_;
562             }
563             }
564              
565 356         1486 return \( @rows, @ids, @special );
566             }
567              
568             sub _group_dbic_keys {
569 5     5   12 my ( $self, $keys, $mkey_handler ) = @_;
570              
571 5         12 my ( %keys, %ids );
572              
573 5         14 foreach my $id ( @$keys ) {
574 6         17 my ( $rs_name, @key ) = $self->_parse_dbic_key($id);
575              
576 6 50       19 if ( @key > 1 ) {
577 0         0 $mkey_handler->($id, $rs_name, @key);
578             } else {
579             # for other objects we queue up IDs for a single SELECT
580 6   100     12 push @{ $keys{$rs_name} ||= [] }, $key[0];
  6         37  
581 6   100     10 push @{ $ids{$rs_name} ||= [] }, $id;
  6         32  
582             }
583             }
584              
585 5         19 return \( %keys, %ids );
586             }
587              
588             sub get {
589 301     301 1 1405105 my ( $self, @rows_and_ids ) = @_;
590              
591 301 50       1360 return unless @rows_and_ids;
592              
593 301         618 my %entries;
594              
595 301         1181 my ( $rows, $ids, $special ) = $self->_part_rows_and_ids(\@rows_and_ids);
596              
597 301 100       1032 if ( @$ids ) {
598             $self->dbh_do(sub {
599 296     296   43840 my ( $storage, $dbh ) = @_;
600              
601 296         1529 my @ids_copy = @$ids;
602              
603 296   50     9799 my $batch_size = $self->batch_size || scalar(@$ids);
604              
605 296         1714 while ( my @batch_ids = splice(@ids_copy, 0, $batch_size) ) {
606 299         2692 my $sth = $self->prepare_select($dbh, "SELECT id, data FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")");
607 299         53292 $sth->execute(@batch_ids);
608              
609 299         3494 $sth->bind_columns( \my ( $id, $data ) );
610              
611             # not actually necessary but i'm keeping it around for reference:
612             #my ( $id, $data );
613             #use DBD::Pg qw(PG_BYTEA);
614             #$sth->bind_col(1, \$id);
615             #$sth->bind_col(2, \$data, { pg_type => PG_BYTEA });
616              
617 299         13433 while ( $sth->fetch ) {
618 3508         25054 $entries{$id} = $data;
619             }
620             }
621 296         3348 });
622             }
623              
624 301 100       13535 if ( @$rows ) {
625 4         122 my $schema = $self->schema;
626              
627 4         10 my $err = \"foo";
628             my ( $rs_keys, $rs_ids ) = try {
629             $self->_group_dbic_keys( $rows, sub {
630 0         0 my ( $id, $rs_name, @key ) = @_;
631              
632             # multi column primary keys need 'find'
633 0 0       0 my $obj = $schema->resultset($rs_name)->find(@key) or die $err; # die to stop search
634              
635 0         0 $entries{$id} = KiokuDB::Entry->new(
636             id => $id,
637             class => ref($obj),
638             data => $obj,
639             );
640 4     4   357 });
641             } catch {
642 0 0 0 0   0 die $_ if ref $_ and refaddr($_) == refaddr($err);
643 4 50       41 } or return;
644              
645 4         93 foreach my $rs_name ( keys %$rs_keys ) {
646 4         20 my $rs = $schema->resultset($rs_name);
647              
648 4         1557 my $ids = $rs_ids->{$rs_name};
649              
650 4         8 my @objs;
651              
652 4 100       17 if ( @$ids == 1 ) {
653 3         8 my $id = $ids->[0];
654              
655 3 50       19 my $obj = $rs->find($rs_keys->{$rs_name}[0]) or return;
656              
657 3         7091 $entries{$id} = KiokuDB::Entry->new(
658             id => $id,
659             class => ref($obj),
660             data => $obj,
661             );
662             } else {
663 1         11 my ($pk) = $rs->result_source->primary_columns;
664              
665 1         15 my $keys = $rs_keys->{$rs_name};
666              
667 1         9 my @objs = $rs->search({ $pk => $keys })->all;
668              
669 1 50       2603 return if @objs != @$ids;
670              
671             # this key lookup is because it's not returned in the same order
672 1         42 my %pk_to_id;
673 1         5 @pk_to_id{@$keys} = @$ids;
674              
675 1         2 foreach my $obj ( @objs ) {
676 2         105 my $id = $pk_to_id{$obj->id};
677 2         66 $entries{$id} = KiokuDB::Entry->new(
678             id => $id,
679             class => ref($obj),
680             data => $obj,
681             );
682             }
683             }
684             }
685             }
686              
687 301         1442 for ( @$special ) {
688 2 100       59 $entries{$_} = KiokuDB::Entry->new(
689             id => $_,
690             $_ eq 'dbic:schema'
691             ? ( data => $self->schema,
692             class => "DBIx::Class::Schema" )
693             : ( data => undef,
694             class => "DBIx::Class::ResultSource" )
695             );
696             }
697              
698             # ->rows only works after we're done
699 301 100       1342 return if @rows_and_ids != keys %entries;
700             # case sensitivity differences, possibly?
701 295 50   3515   3367 return if any { !defined } @entries{@rows_and_ids};
  3515         4473  
702              
703 295 100       1339 map { ref($_) ? $_ : $self->deserialize($_) } @entries{@rows_and_ids};
  3515         674618  
704             }
705              
706             sub delete {
707 15     15 0 26369 my ( $self, @ids_or_entries ) = @_;
708              
709             # FIXME special DBIC rows
710              
711 15 100       63 my @ids = map { ref($_) ? $_->id : $_ } @ids_or_entries;
  15         281  
712              
713             $self->dbh_do(sub {
714 15     15   576 my ( $storage, $dbh ) = @_;
715              
716 15         538 my $g = $self->schema->txn_scope_guard;
717              
718 15   50     4643 my $batch_size = $self->batch_size || scalar(@ids);
719              
720 15         64 my @ids_copy = @ids;
721 15         113 while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) {
722 15 50       529 if ( $self->extract ) {
723             # FIXME rely on cascade delete?
724 15         267 my $sth = $dbh->prepare_cached("DELETE FROM gin_index WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")");
725 15         5613 $sth->execute(@batch_ids);
726 15         170 $sth->finish;
727             }
728              
729 15         186 my $sth = $dbh->prepare_cached("DELETE FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")");
730 15         2560 $sth->execute(@batch_ids);
731 15         174 $sth->finish;
732             }
733              
734 15         95 $g->commit;
735 15         275 });
736              
737 15         1969 return;
738             }
739              
740             sub exists {
741 55     55 0 248934 my ( $self, @rows_and_ids ) = @_;
742              
743 55 50       244 return unless @rows_and_ids;
744              
745 55         1569 my $schema = $self->schema;
746              
747 55         111 my %entries;
748              
749 55         240 my ( $rows, $ids, $special ) = $self->_part_rows_and_ids(\@rows_and_ids);
750              
751 55 100       204 if ( @$ids ) {
752             $self->dbh_do(sub {
753 54     54   8811 my ( $storage, $dbh ) = @_;
754              
755 54   50     1837 my $batch_size = $self->batch_size || scalar(@$ids);
756              
757 54         228 my @ids_copy = @$ids;
758 54         304 while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) {
759 54         471 my $sth = $self-> prepare_select ( $dbh, "SELECT id FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")");
760 54         7497 $sth->execute(@batch_ids);
761              
762 54         612 $sth->bind_columns( \( my $id ) );
763              
764 54         2543 $entries{$id} = 1 while $sth->fetch;
765             }
766 54         597 });
767             }
768              
769 55 100       2791 if ( @$rows ) {
770             my ( $rs_keys, $rs_ids ) = $self->_group_dbic_keys( $rows, sub {
771 0     0   0 my ( $id, $rs_name, @key ) = @_;
772 0         0 $entries{$id} = defined $schema->resultset($rs_name)->find(@key); # FIXME slow
773 1         6 });
774              
775 1         6 foreach my $rs_name ( keys %$rs_keys ) {
776 1         7 my $rs = $schema->resultset($rs_name);
777              
778 1         390 my $ids = $rs_ids->{$rs_name};
779 1         2 my $keys = $rs_keys->{$rs_name};
780              
781 1         9 my ( $pk ) = $rs->result_source->primary_columns;
782              
783 1         10 my @exists = $rs->search({ $pk => $keys })->get_column($pk)->all;
784              
785 1         8066 my %pk_to_id;
786 1         136 @pk_to_id{@$keys} = @$ids;
787              
788 1         13 @entries{@pk_to_id{@exists}} = ( (1) x @exists );
789             }
790             }
791              
792 55         232 for ( @$special ) {
793 0 0       0 if ( $_ eq 'dbic:schema' ) {
    0          
794 0         0 $entries{$_} = 1;
795             } elsif ( /^dbic:schema:(.*)/ ) {
796 0     0   0 $entries{$_} = defined try { $schema->source($1) };
  0         0  
797             }
798             }
799              
800 55         377 return @entries{@rows_and_ids};
801             }
802              
803 232     232 0 1188948 sub txn_begin { shift->storage->txn_begin(@_) }
804 205     205 0 8574111 sub txn_commit { shift->storage->txn_commit(@_) }
805 27     27 0 49231 sub txn_rollback { shift->storage->txn_rollback(@_) }
806              
807             sub clear {
808 27     27 0 19524 my $self = shift;
809              
810             $self->dbh_do(sub {
811 27     27   755 my ( $storage, $dbh ) = @_;
812              
813 27         277 $dbh->do("DELETE FROM gin_index");
814 27         8262 $dbh->do("DELETE FROM entries");
815 27         296 });
816             }
817              
818             sub _sth_stream {
819 103     103   341 my ( $self, $sql, @bind ) = @_;
820              
821             $self->dbh_do(sub {
822 103     103   5686 my ( $storage, $dbh ) = @_;
823 103         446 my $sth = $self->prepare_select($dbh, $sql);
824              
825 103         17234 $sth->execute(@bind);
826              
827 103         3931 Data::Stream::Bulk::DBI->new( sth => $sth );
828 103         967 });
829             }
830              
831             sub _select_entry_stream {
832 70     70   265 my ( $self, @args ) = @_;
833              
834 70         276 my $stream = $self->_sth_stream(@args);
835              
836 70     70   6010 return $stream->filter(sub { [ map { $self->deserialize($_->[0]) } @$_ ] });
  70         84976  
  151         30386  
837             }
838              
839             sub all_entries {
840 24     24 0 19037 my $self = shift;
841 24         97 $self->_select_entry_stream("SELECT data FROM entries");
842             }
843              
844             sub root_entries {
845 27     27 0 33006 my $self = shift;
846 27         155 $self->_select_entry_stream("SELECT data FROM entries WHERE root");
847             }
848              
849             sub child_entries {
850 3     3 0 2526 my $self = shift;
851 3         14 $self->_select_entry_stream("SELECT data FROM entries WHERE not root");
852             }
853              
854             sub _select_id_stream {
855 33     33   119 my ( $self, @args ) = @_;
856              
857 33         154 my $stream = $self->_sth_stream(@args);
858              
859 33     33   2529 return $stream->filter(sub {[ map { $_->[0] } @$_ ]});
  33         16270  
  102         323  
860             }
861              
862             sub all_entry_ids {
863 9     9 0 90512 my $self = shift;
864 9         46 $self->_select_id_stream("SELECT id FROM entries");
865             }
866              
867             sub root_entry_ids {
868 21     21 0 90015 my $self = shift;
869 21         118 $self->_select_id_stream("SELECT id FROM entries WHERE root");
870             }
871              
872             sub child_entry_ids {
873 3     3 0 9780 my $self = shift;
874 3         12 $self->_select_id_stream("SELECT id FROM entries WHERE not root");
875             }
876              
877             sub simple_search {
878 6     6 0 31408 my ( $self, $proto ) = @_;
879              
880 6         208 my ( $where_clause, @bind ) = $self->sql_abstract->where($proto);
881              
882 6         1320 $self->_select_entry_stream("SELECT data FROM entries $where_clause", @bind);
883             }
884              
885             sub search {
886 10     10 0 40166 my ( $self, $query, @args ) = @_;
887              
888 10         260 my %args = (
889             distinct => $self->distinct,
890             @args,
891             );
892              
893 10         118 my %spec = $query->extract_values($self);
894 10         2180 my @binds;
895              
896 10         36 my $inner_sql = $self->_search_gin_subquery(\%spec, \@binds);
897 10         44 return $self->_select_entry_stream("SELECT data FROM entries WHERE id IN (".$inner_sql.")",@binds);
898             }
899              
900             sub _search_gin_subquery {
901 12     12   35 my ($self, $spec, $binds) = @_;
902              
903 12 100       77 my @v = ref $spec->{values} eq 'ARRAY' ? @{ $spec->{values} } : ();
  11         37  
904 12 100 100     92 if ( $spec->{method} eq 'set' ) {
    100          
905 1         4 my $op = $spec->{operation};
906              
907 1 50       12 die 'gin set query received bad operation'
908             unless $op =~ /^(UNION|INTERSECT|EXCEPT)$/i;
909              
910             die 'gin set query missing subqueries'
911             unless ref $spec->{subqueries} eq 'ARRAY' &&
912 1 50 50     14 scalar @{ $spec->{subqueries} };
  1         9  
913              
914             return "(".
915             (
916             join ' '.$op.' ',
917 2         10 map { $self->_search_gin_subquery($_, $binds) }
918 1         6 @{ $spec->{subqueries} }
  1         6  
919             ).")";
920              
921             } elsif ( $spec->{method} eq 'all' and @v > 1) {
922             # for some reason count(id) = ? doesn't work
923 1         3 push @$binds, @v;
924 1         10 return "SELECT id FROM gin_index WHERE value IN ".
925             "(" . join(", ", ('?') x @v) . ")" .
926             "GROUP BY id HAVING COUNT(id) = " . scalar(@v);
927             } else {
928 10         26 push @$binds, @v;
929 10         49 return "SELECT DISTINCT id FROM gin_index WHERE value IN ".
930             "(" . join(", ", ('?') x @v) . ")";
931             }
932             }
933              
934 0     0 0 0 sub fetch_entry { die "TODO" }
935              
936             sub remove_ids {
937 0     0 0 0 my ( $self, @ids ) = @_;
938              
939 0         0 die "Deletion the GIN index is handled implicitly";
940             }
941              
942             sub insert_entry {
943 0     0 0 0 my ( $self, $id, @keys ) = @_;
944              
945 0         0 die "Insertion to the GIN index is handled implicitly";
946             }
947              
948             sub _table_info {
949 62     62   269 my ( $self, $catalog, $schema, $table ) = @_;
950              
951             $self->dbh_do(sub {
952 62     62   1723 my ( $storage, $dbh ) = @_;
953              
954 62 50       1868 my $filter = ( $self->storage->sqlt_type eq 'SQLite' ? '%' : '' );
955              
956 62         3028 foreach my $arg ( $catalog, $schema, $table ) {
957 186 100       454 $arg = $filter unless defined $arg;
958             }
959              
960 62         538 $dbh->table_info($catalog, $schema, $table, 'TABLE')->fetchall_arrayref;
961 62         595 });
962             }
963              
964             sub tables_exist {
965 62     62 0 158 my $self = shift;
966              
967 62         151 return ( @{ $self->_table_info(undef, undef, 'entries') } > 0 );
  62         235  
968             }
969              
970             sub create_tables {
971 62     62 0 136 my $self = shift;
972              
973             $self->dbh_do(sub {
974 62     62   22117 my ( $storage, $dbh ) = @_;
975              
976 62 100       322 unless ( $self->tables_exist ) {
977 5         3637 $self->deploy({ producer_args => { mysql_version => 4.1 } });
978             }
979 62         601 });
980             }
981              
982             sub drop_tables {
983 3     3 1 90 my $self = shift;
984              
985             $self->dbh_do(sub {
986 3     3   636 my ( $storage, $dbh ) = @_;
987              
988 3         28 $dbh->do("DROP TABLE gin_index");
989 3         1722 $dbh->do("DROP TABLE entries");
990 3         22 });
991             }
992              
993             sub DEMOLISH {
994 4     4 0 18 my $self = shift;
995 4 50       17 return if $_[0];
996              
997 4 100       165 if ( $self->has_storage ) {
998 3         88 $self->storage->disconnect;
999             }
1000             }
1001              
1002             sub new_garbage_collector {
1003 0     0 0   my ( $self, %args ) = @_;
1004              
1005 0 0         if ( grep { $_ !~ /^(?:entries|gin_index)/ } map { $_->[2] } @{ $self->_table_info } ) {
  0            
  0            
  0            
1006 0           die "\nRefusing to GC a database with additional tables.\n\nThis is ecause the root set and referencing scheme might be ambiguous (it's not yet clear what garbage collection should actually do on a mixed schema).\n";
1007             } else {
1008 0           my $cmd = $args{command};
1009 0 0 0       my $class = $args{class} || $cmd ? $cmd->class : "KiokuDB::GC::Naive";
1010              
1011 0           load_class($class);
1012              
1013 0 0         return $class->new(
1014             %args,
1015             backend => $self,
1016             ( $cmd ? ( verbose => $cmd->verbose ) : $cmd ),
1017             );
1018             }
1019             }
1020              
1021             __PACKAGE__->meta->make_immutable;
1022              
1023             __PACKAGE__
1024              
1025             __END__
1026              
1027             =pod
1028              
1029             =encoding UTF-8
1030              
1031             =head1 NAME
1032              
1033             KiokuDB::Backend::DBI - DBI backend for KiokuDB
1034              
1035             =head1 VERSION
1036              
1037             version 1.23
1038              
1039             =head1 SYNOPSIS
1040              
1041             my $dir = KiokuDB->connect(
1042             "dbi:mysql:foo",
1043             user => "blah",
1044             password => "moo',
1045             columns => [
1046             # specify extra columns for the 'entries' table
1047             # in the same format you pass to DBIC's add_columns
1048              
1049             name => {
1050             data_type => "varchar",
1051             is_nullable => 1, # probably important
1052             },
1053             ],
1054             );
1055              
1056             $dir->search({ name => "foo" }); # SQL::Abstract
1057              
1058             =head1 DESCRIPTION
1059              
1060             This backend for L<KiokuDB> leverages existing L<DBI> accessible databases.
1061              
1062             The schema is based on two tables, C<entries> and C<gin_index> (the latter is
1063             only used if a L<Search::GIN> extractor is specified).
1064              
1065             The C<entries> table has two main columns, C<id> and C<data> (currently in
1066             JSPON format, in the future the format will be pluggable), and additional user
1067             specified columns.
1068              
1069             The user specified columns are extracted from inserted objects using a callback
1070             (or just copied for simple scalars), allowing SQL where clauses to be used for
1071             searching.
1072              
1073             =head1 COLUMN EXTRACTIONS
1074              
1075             The columns are specified using a L<DBIx::Class::ResultSource> instance.
1076              
1077             One additional column info parameter is used, C<extract>, which is called as a
1078             method on the inserted object with the column name as the only argument. The
1079             return value from this callback will be used to populate the column.
1080              
1081             If the column extractor is omitted then the column will contain a copy of the
1082             entry data key by the same name, if it is a plain scalar. Otherwise the column
1083             will be C<NULL>.
1084              
1085             These columns are only used for lookup purposes, only C<data> is consulted when
1086             loading entries.
1087              
1088             =head1 DBIC INTEGRATION
1089              
1090             This backend is layered on top of L<DBIx::Class::Storage::DBI> and reused
1091             L<DBIx::Class::Schema> for DDL.
1092              
1093             Because of this objects from a L<DBIx::Class::Schema> can refer to objects in
1094             the KiokuDB entries table, and vice versa.
1095              
1096             For more details see L<DBIx::Class::Schema::KiokuDB>.
1097              
1098             =head1 SUPPORTED DATABASES
1099              
1100             This driver has been tested with MySQL 5 (4.1 should be the minimal supported
1101             version), SQLite 3, and PostgreSQL 8.3.
1102              
1103             The SQL code is reasonably portable and should work with most databases. Binary
1104             column support is required when using the L<Storable> serializer.
1105              
1106             =head2 Transactions
1107              
1108             For reasons of performance and ease of use database vendors ship with read
1109             committed transaction isolation by default.
1110              
1111             This means that read locks are B<not> acquired when data is fetched from the
1112             database, allowing it to be updated by another writer. If the current
1113             transaction then updates the value it will be silently overwritten.
1114              
1115             IMHO this is a much bigger problem when the data is unstructured. This is
1116             because data is loaded and fetched in potentially smaller chunks, increasing
1117             the risk of phantom reads.
1118              
1119             Unfortunately enabling truly isolated transaction semantics means that
1120             C<txn_commit> may fail due to a lock contention, forcing you to repeat your
1121             transaction. Arguably this is more correct "read comitted", which can lead to
1122             race conditions.
1123              
1124             Enabling repeatable read or serializable transaction isolation prevents
1125             transactions from interfering with eachother, by ensuring all data reads are
1126             performed with a shared lock.
1127              
1128             For more information on isolation see
1129             L<http://en.wikipedia.org/wiki/Isolation_(computer_science)>
1130              
1131             =head3 SQLite
1132              
1133             SQLite provides serializable isolation by default.
1134              
1135             L<http://www.sqlite.org/pragma.html#pragma_read_uncommitted>
1136              
1137             =head3 MySQL
1138              
1139             MySQL provides read committed isolation by default.
1140              
1141             Serializable level isolation can be enabled by by default by changing the
1142             C<transaction-isolation> global variable,
1143              
1144             L<http://dev.mysql.com/doc/refman/5.1/en/set-transaction.html#isolevel_serializable>
1145              
1146             =head3 PostgreSQL
1147              
1148             PostgreSQL provides read committed isolation by default.
1149              
1150             Repeatable read or serializable isolation can be enabled by setting the default
1151             transaction isolation level, or using the C<SET TRANSACTION> SQL statement.
1152              
1153             L<http://www.postgresql.org/docs/8.3/interactive/transaction-iso.html>,
1154             L<http://www.postgresql.org/docs/8.3/interactive/runtime-config-client.html#GUC-DEFAULT-TRANSACTION-ISOLATION>
1155              
1156             =head1 ATTRIBUTES
1157              
1158             =over 4
1159              
1160             =item schema
1161              
1162             Created automatically.
1163              
1164             This is L<DBIx::Class::Schema> object that is used for schema deployment,
1165             connectivity and transaction handling.
1166              
1167             =item connect_info
1168              
1169             An array reference whose contents are passed to L<DBIx::Class::Schema/connect>.
1170              
1171             If omitted will be created from the attrs C<dsn>, C<user>, C<password> and
1172             C<dbi_attrs>.
1173              
1174             =item dsn
1175              
1176             =item user
1177              
1178             =item password
1179              
1180             =item dbi_attrs
1181              
1182             Convenience attrs for connecting using L<KiokuDB/connect>.
1183              
1184             User in C<connect_info>'s builder.
1185              
1186             =item columns
1187              
1188             Additional columns, see L</"COLUMN EXTRACTIONS">.
1189              
1190             =item serializer
1191              
1192             L<KiokuDB::Serializer>. Coerces from a string, too:
1193              
1194             KiokuDB->connect("dbi:...", serializer => "storable");
1195              
1196             Defaults to L<KiokuDB::Serializer::JSON>.
1197              
1198             =item create
1199              
1200             If true the existence of the tables will be checked for and the DB will be
1201             deployed if not.
1202              
1203             Defaults to false.
1204              
1205             =item extract
1206              
1207             An optional L<Search::GIN::Extract> used to create the C<gin_index> entries.
1208              
1209             Usually L<Search::GIN::Extract::Callback>.
1210              
1211             =item schema_hook
1212              
1213             A hook that is called on the backend object as a method with the schema as the
1214             argument just before connecting.
1215              
1216             If you need to modify the schema in some way (adding indexes or constraints)
1217             this is where it should be done.
1218              
1219             =item for_update
1220              
1221             If true (the defaults), will cause all select statement to be issued with a
1222             C<FOR UPDATE> modifier on MySQL, Postgres and Oracle.
1223              
1224             This is highly reccomended because these database provide low isolation
1225             guarantees as configured out the box, and highly interlinked graph databases
1226             are much more susceptible to corruption because of lack of transcational
1227             isolation than normalized relational databases.
1228              
1229             =item sqlite_sync_mode
1230              
1231             If this attribute is set and the underlying database is SQLite, then
1232             C<PRAGMA syncrhonous=...> will be issued with this value.
1233              
1234             Can be C<OFF>, C<NORMAL> or C<FULL> (SQLite's default), or 0, 1, or 2.
1235              
1236             See L<http://www.sqlite.org/pragma.html#pragma_synchronous>.
1237              
1238             =item mysql_strict
1239              
1240             If true (the default), sets MySQL's strict mode.
1241              
1242             This is B<HIGHLY> reccomended, or you may enjoy some of MySQL's more
1243             interesting features, like automatic data loss when the columns are too narrow.
1244              
1245             See L<http://dev.mysql.com/doc/refman/5.0/en/server-sql-mode.html> and
1246             L<DBIx::Class::Storage::DBI::mysql> for more details.
1247              
1248             =item on_connect_call
1249              
1250             See L<DBIx::Class::Storage::DBI>.
1251              
1252             This attribute is constructed based on the values of C<mysql_version> and
1253             C<sqlite_sync_mode>, but may be overridden if you need more control.
1254              
1255             =item dbic_attrs
1256              
1257             See L<DBIx::Class::Storage::DBI>.
1258              
1259             Defaults to
1260              
1261             { on_connect_call => $self->on_connect_call }
1262              
1263             =item batch_size
1264              
1265             SQL that deals with entries run in batches of the amount provided in
1266             C<batch_size>. If it is not provided, the statements will run in a single
1267             batch.
1268              
1269             This solves the issue with SQLite where lists can only handle 999
1270             elements at a time. C<batch_size> will be set to 999 by default if the
1271             driver in use is SQLite.
1272              
1273             =back
1274              
1275             =head1 METHODS
1276              
1277             See L<KiokuDB::Backend> and the various roles for more info.
1278              
1279             =over 4
1280              
1281             =item deploy
1282              
1283             Calls L<DBIx::Class::Schema/deploy>.
1284              
1285             Deployment to MySQL requires that you specify something like:
1286              
1287             $dir->backend->deploy({ producer_args => { mysql_version => 4 } });
1288              
1289             because MySQL versions before 4 did not have support for boolean types, and the
1290             schema emitted by L<SQL::Translator> will not work with the queries used.
1291              
1292             =item drop_tables
1293              
1294             Drops the C<entries> and C<gin_index> tables.
1295              
1296             =back
1297              
1298             =head1 TROUBLESHOOTING
1299              
1300             =head2 I get C<unexpected end of string while parsing JSON string>
1301              
1302             You are problably using MySQL, which comes with a helpful data compression
1303             feature: when your serialized objects are larger than the maximum size of a
1304             C<BLOB> column MySQL will simply shorten it for you.
1305              
1306             Why C<BLOB> defaults to 64k, and how on earth someone would consider silent
1307             data truncation a sane default I could never fathom, but nevertheless MySQL
1308             does allow you to disable this by setting the "strict" SQL mode in the
1309             configuration.
1310              
1311             To resolve the actual problem (though this obviously won't repair your lost
1312             data), alter the entries table so that the C<data> column uses the nonstandard
1313             C<LONGBLOB> datatype.
1314              
1315             =head1 VERSION CONTROL
1316              
1317             KiokuDB-Backend-DBI is maintained using Git. Information about the repository
1318             is available on L<http://www.iinteractive.com/kiokudb/>
1319              
1320             =for Pod::Coverage BUILD
1321             DEMOLISH
1322             all_entries
1323             all_entry_ids
1324             child_entries
1325             child_entry_ids
1326             clear
1327             create_tables
1328             default_typemap
1329             delete
1330             entries_to_rows
1331             entry_to_row
1332             exists
1333             fetch_entry
1334             has_batch_size
1335             insert
1336             insert_entry
1337             insert_rows
1338             new_from_dsn
1339             new_garbage_collector
1340             prepare_insert
1341             prepare_select
1342             prepare_update
1343             register_handle
1344             remove_ids
1345             root_entries
1346             root_entry_ids
1347             search
1348             simple_search
1349             tables_exist
1350             txn_begin
1351             txn_commit
1352             txn_rollback
1353             update_index
1354              
1355             =head1 AUTHOR
1356              
1357             Yuval Kogman <nothingmuch@woobling.org>
1358              
1359             =head1 COPYRIGHT AND LICENSE
1360              
1361             This software is copyright (c) 2014 by Yuval Kogman, Infinity Interactive.
1362              
1363             This is free software; you can redistribute it and/or modify it under
1364             the same terms as the Perl 5 programming language system itself.
1365              
1366             =cut