File Coverage

blib/lib/Catmandu/Store/DBI.pm
Criterion Covered Total %
statement 24 204 11.7
branch 0 90 0.0
condition 0 8 0.0
subroutine 8 42 19.0
pod 0 7 0.0
total 32 351 9.1


line stmt bran cond sub pod time code
1             package Catmandu::Store::DBI;
2              
3 2     2   30718 use namespace::clean;
  2         39767  
  2         14  
4 2     2   2165 use Catmandu::Sane;
  2         228203  
  2         17  
5 2     2   5822 use DBI;
  2         43400  
  2         166  
6 2     2   25 use Moo;
  2         4  
  2         23  
7              
8             our $VERSION = "0.04";
9              
10             with 'Catmandu::Store';
11              
12             has data_source => (
13             is => 'ro',
14             required => 1,
15             trigger => sub { $_[0] =~ /^DBI:/i ? $_[0] : "DBI:$_[0]" },
16             );
17              
18             has username => ( is => 'ro', default => sub { '' } );
19             has password => ( is => 'ro', default => sub { '' } );
20              
21             has dbh => (
22             is => 'ro',
23             init_arg => undef,
24             lazy => 1,
25             builder => '_build_dbh',
26             );
27              
28             # Only mysql seems to need auto_reconnect for now
29             sub _build_dbh {
30 0     0     my $self = $_[0];
31 0           my $opts = {
32             AutoCommit => 1,
33             RaiseError => 1,
34             mysql_auto_reconnect => 1,
35             };
36 0           DBI->connect($self->data_source, $self->username, $self->password, $opts);
37             }
38              
39             sub transaction {
40 0     0 0   my ($self, $sub) = @_;
41              
42 0 0         if ($self->{_tx}) {
43 0           return $sub->();
44             }
45              
46 0           my $dbh = $self->dbh;
47 0           my @res;
48              
49             eval {
50 0           $self->{_tx} = 1;
51 0           $dbh->begin_work;
52 0           @res = $sub->();
53 0           $dbh->commit;
54 0           $self->{_tx} = 0;
55 0           1;
56 0 0         } or do {
57 0           my $err = $@;
58 0           eval { $dbh->rollback };
  0            
59 0           $self->{_tx} = 0;
60 0           die $err;
61             };
62              
63 0           @res;
64             }
65              
66             sub DEMOLISH {
67 0 0   0 0   $_[0]->dbh->disconnect if $_[0]->dbh;
68             }
69              
70             package Catmandu::Store::DBI::Bag;
71              
72 2     2   2005 use Catmandu::Sane;
  2         4  
  2         18  
73 2     2   477 use Moo;
  2         4  
  2         9  
74 2     2   2615 use Catmandu::Iterator;
  2         131936  
  2         74  
75 2     2   19 use Catmandu::Util qw(require_package);
  2         4  
  2         7308  
76              
77             with 'Catmandu::Bag';
78             with 'Catmandu::Serializer';
79              
80             has _sql_get => (is => 'ro', lazy => 1, builder => '_build_sql_get');
81             has _sql_delete => (is => 'ro', lazy => 1, builder => '_build_sql_delete');
82             has _sql_delete_all =>
83             (is => 'ro', lazy => 1, builder => '_build_sql_delete_all');
84             has _sql_generator =>
85             (is => 'ro', lazy => 1, builder => '_build_sql_generator');
86             has _sql_count => (is => 'ro', lazy => 1, builder => '_build_sql_count');
87             has _add => (is => 'ro', lazy => 1, builder => '_build_add');
88              
89             sub BUILD {
90 0     0 0   $_[0]->_build_create;
91             }
92              
93             sub _build_sql_get {
94 0     0     my $name = $_[0]->name;
95 0           "select data from $name where id=?";
96             }
97              
98             sub _build_sql_delete {
99 0     0     my $name = $_[0]->name;
100 0           "delete from $name where id=?";
101             }
102              
103             sub _build_sql_delete_all {
104 0     0     my $name = $_[0]->name;
105 0           "delete from $name";
106             }
107              
108             sub _build_sql_generator {
109 0     0     my $name = $_[0]->name;
110 0           "select data from $name";
111             }
112              
113             sub _build_sql_count {
114 0     0     my $name = $_[0]->name;
115 0           "select count(*) from $name";
116             }
117              
118             sub _build_add_sqlite {
119 0     0     my $self = $_[0];
120 0           my $name = $self->name;
121 0           my $sql = "insert or replace into $name(id,data) values(?,?)";
122             sub {
123 0     0     my $dbh = $self->store->dbh;
124 0 0         my $sth = $dbh->prepare_cached($sql)
125             or Catmandu::Error->throw($dbh->errstr);
126 0 0         $sth->execute($_[0], $_[1]) or Catmandu::Error->throw($sth->errstr);
127 0           $sth->finish;
128 0           };
129             }
130              
131             sub _build_add_mysql {
132 0     0     my $self = $_[0];
133 0           my $name = $self->name;
134 0           my $sql = "insert into $name(id,data) values(?,?) on duplicate key update data=values(data)";
135             sub {
136 0     0     my $dbh = $self->store->dbh;
137 0 0         my $sth = $dbh->prepare_cached($sql)
138             or Catmandu::Error->throw($dbh->errstr);
139 0 0         $sth->execute($_[0], $_[1])
140             or Catmandu::Error->throw($sth->errstr);
141 0           $sth->finish;
142 0           };
143             }
144              
145             sub _build_add_postgres {
146 0     0     my ($self) = @_;
147 0           my $pg = require_package('DBD::Pg');
148 0           my $name = $self->name;
149 0           my $sql_update = "update $name set data=? where id=?";
150             # see http://stackoverflow.com/questions/15840922/where-not-exists-in-postgresql-gives-syntax-error
151 0           my $sql_insert = "insert into $name select ?,? where not exists (select 1 from $name where id=?)";
152              
153             sub {
154 0     0     my $dbh = $self->store->dbh;
155 0 0         my $sth = $dbh->prepare_cached($sql_update)
156             or Catmandu::Error->throw($dbh->errstr);
157              
158             # special quoting for bytea in postgres:
159             # https://rt.cpan.org/Public/Bug/Display.html?id=13180
160             # http://www.nntp.perl.org/group/perl.dbi.users/2005/01/msg25370.html
161 0           $sth->bind_param(1,$_[1], {pg_type => $pg->PG_BYTEA});
162 0           $sth->bind_param(2,$_[0]);
163              
164 0 0         $sth->execute
165             or Catmandu::Error->throw($sth->errstr);
166              
167 0 0         unless ($sth->rows) {
168 0           $sth->finish;
169 0 0         $sth = $dbh->prepare_cached($sql_insert)
170             or Catmandu::Error->throw($dbh->errstr);
171 0 0         $sth->execute( $_[0], $_[1], $_[0] )
172             or Catmandu::Error->throw($sth->errstr);
173 0           $sth->finish;
174             }
175 0           };
176             }
177              
178             sub _build_add_generic {
179 0     0     my $self = $_[0];
180 0           my $name = $self->name;
181 0           my $sql_update = "update $name set data=? where id=?";
182 0           my $sql_insert = "insert into $name values(?,?) where not exists (select 1 from $name where id=?)";
183             sub {
184 0     0     my $dbh = $self->store->dbh;
185 0 0         my $sth = $dbh->prepare_cached($sql_update)
186             or Catmandu::Error->throw( $dbh->errstr );
187 0 0         $sth->execute($_[1], $_[0]) or Catmandu::Error->throw($sth->errstr);
188 0 0         unless ($sth->rows) {
189 0           $sth->finish;
190 0 0         $sth = $dbh->prepare_cached($sql_insert)
191             or Catmandu::Error->throw($dbh->errstr);
192 0 0         $sth->execute( $_[0], $_[1], $_[0] )
193             or Catmandu::Error->throw($sth->errstr);
194 0           $sth->finish;
195             }
196 0           };
197             }
198              
199             sub _build_create {
200 0     0     my $self = $_[0];
201 0   0       my $driver_name = $self->store->dbh->{Driver}{Name} // "";
202 0 0         if ($driver_name =~ /pg/i) { return $self->_build_create_postgres }
  0            
203 0           $self->_build_create_generic;
204             }
205              
206             sub _build_create_postgres {
207 0     0     my $self = $_[0];
208 0           my $name = $self->name;
209 0           my $dbh = $self->store->dbh;
210             # requires al least Postgres 9.1
211             # TODO get rid of this annoying warning:
212             # 'NOTICE: relation "$name" already exists, skipping'
213 0           my $sql = "create table if not exists $name(id varchar(255) not null primary key, data bytea not null)";
214 0 0         $dbh->do($sql) or Catmandu::Error->throw($dbh->errstr);
215             }
216              
217             sub _build_create_generic {
218 0     0     my $self = $_[0];
219 0           my $name = $self->name;
220 0           my $dbh = $self->store->dbh;
221 0           my $sql = "create table if not exists $name(id varchar(255) not null primary key, data longblob not null)";
222 0 0         $dbh->do($sql) or Catmandu::Error->throw($dbh->errstr);
223             }
224              
225             sub _build_add {
226 0     0     my $self = $_[0];
227 0   0       my $driver_name = $self->store->dbh->{Driver}{Name} // "";
228 0 0         if ($driver_name =~ /sqlite/i) { return $self->_build_add_sqlite; }
  0            
229 0 0         if ($driver_name =~ /mysql/i) { return $self->_build_add_mysql; }
  0            
230 0 0         if ($driver_name =~ /pg/i) { return $self->_build_add_postgres; }
  0            
231 0           return $self->_build_add_generic;
232             }
233              
234             sub get {
235             my ($self, $id) = @_;
236             my $dbh = $self->store->dbh;
237             my $sth = $dbh->prepare_cached($self->_sql_get)
238             or Catmandu::Error->throw($dbh->errstr);
239             $sth->execute($id) or Catmandu::Error->throw($sth->errstr);
240             my $data;
241             if (my $row = $sth->fetchrow_arrayref) {
242             $data = $self->deserialize($row->[0]);
243             }
244             $sth->finish;
245             $data;
246             }
247              
248             sub add {
249             my ($self, $data) = @_;
250             $self->_add->($data->{_id}, $self->serialize($data));
251             }
252              
253             sub delete_all {
254 0     0 0   my ($self) = @_;
255 0           my $dbh = $self->store->dbh;
256 0 0         my $sth = $dbh->prepare_cached($self->_sql_delete_all)
257             or Catmandu::Error->throw($dbh->errstr);
258 0 0         $sth->execute or Catmandu::Error->throw($sth->errstr);
259 0           $sth->finish;
260             }
261              
262             sub delete {
263             my ($self, $id) = @_;
264             my $dbh = $self->store->dbh;
265             my $sth = $dbh->prepare_cached($self->_sql_delete)
266             or Catmandu::Error->throw($dbh->errstr);
267             $sth->execute($id) or Catmandu::Error->throw($sth->errstr);
268             $sth->finish;
269             }
270              
271             sub generator {
272 0     0 0   my ($self) = @_;
273 0           my $dbh = $self->store->dbh;
274             sub {
275 0     0     state $sth;
276 0           state $row;
277 0 0         unless ($sth) {
278 0 0         $sth = $dbh->prepare($self->_sql_generator)
279             or Catmandu::Error->throw($dbh->errstr);
280 0           $sth->execute;
281             }
282 0 0         if ( $row = $sth->fetchrow_arrayref ) {
283 0           return $self->deserialize($row->[0]);
284             }
285 0           $sth->finish;
286 0           return;
287 0           };
288             }
289              
290             sub count {
291 0     0 0   my ($self) = @_;
292 0           my $dbh = $self->store->dbh;
293 0 0         my $sth = $dbh->prepare_cached($self->_sql_count)
294             or Catmandu::Error->throw($dbh->errstr);
295 0 0         $sth->execute or Catmandu::Error->throw($sth->errstr);
296 0           my ($n) = $sth->fetchrow_array;
297 0           $sth->finish;
298 0           $n;
299             }
300              
301             # mysql: select * from limit ,
302             # postgres: select * from limit offset
303             # sqlite: select * from limit ,
304             # select * from limit offset
305              
306             has _sql_slice => (is => 'ro', lazy => 1, builder => '_build_sql_slice');
307              
308             sub _build_sql_slice {
309 0     0     my $self = $_[0];
310 0   0       my $driver_name = $self->store->dbh->{Driver}{Name} // "";
311 0 0         if ($driver_name =~ /sqlite/i) { return $self->_build_slice_sqlite; }
  0            
312 0 0         if ($driver_name =~ /mysql/i) { return $self->_build_slice_mysql; }
  0            
313 0 0         if ($driver_name =~ /pg/i) { return $self->_build_slice_postgres; }
  0            
314             # TODO fall back on default slice implementation
315 0           Catmandu::NotImplemented->throw("slice is only supported for mysql, postgres or sqlite");
316             }
317              
318             sub _build_slice_sqlite {
319 0     0     my $self = $_[0];
320 0           my $name = $self->name;
321 0           my $dbh = $self->store->dbh;
322 0           my $sql = "SELECT data FROM $name LIMIT ?,?";
323              
324             sub {
325 0     0     my ($start, $limit) = @_;
326 0 0         my $sth = $dbh->prepare_cached($sql)
327             or Catmandu::Error->throw($dbh->errstr);
328 0 0         $sth->execute( $start, $limit )
329             or Catmandu::Error->throw($sth->errstr);
330 0           $sth;
331 0           };
332             }
333              
334             sub _build_slice_mysql {
335 0     0     my $self = $_[0];
336 0           my $name = $self->name;
337 0           my $dbh = $self->store->dbh;
338 0           my $sql = "SELECT data FROM $name LIMIT ?,?";
339              
340             sub {
341 0     0     my ($start, $limit) = @_;
342 0 0         my $sth = $dbh->prepare_cached($sql)
343             or Catmandu::Error->throw($dbh->errstr);
344 0 0         $sth->execute( $start, $limit )
345             or Catmandu::Error->throw($sth->errstr);
346 0           $sth;
347 0           };
348             }
349              
350             sub _build_slice_postgres {
351 0     0     my $self = $_[0];
352 0           my $name = $self->name;
353 0           my $dbh = $self->store->dbh;
354 0           my $sql = "SELECT data FROM $name LIMIT ? OFFSET ?";
355              
356             sub {
357 0     0     my ($start, $limit) = @_;
358 0 0         my $sth = $dbh->prepare_cached($sql)
359             or Catmandu::Error->throw($dbh->errstr);
360 0 0         $sth->execute($limit, $start)
361             or Catmandu::Error->throw($sth->errstr);
362 0           $sth;
363 0           };
364             }
365              
366             sub slice {
367 0     0 0   my ($self, $start, $total) = @_;
368 0   0       $start //= 0;
369 0           my $dbh = $self->store->dbh;
370              
371             Catmandu::Iterator->new(
372             sub {
373             sub {
374 0 0         if (defined $total) {
375 0 0         $total || return;
376             }
377              
378 0           state $sth;
379 0           state $row;
380 0 0         unless ($sth) {
381 0 0         if (defined($total)) {
382 0           $sth = $self->_sql_slice->($start, $total);
383             }
384             else {
385 0 0         $sth = $dbh->prepare($self->_sql_generator)
386             or Catmandu::Error->throw($dbh->errstr);
387 0           $sth->execute;
388             }
389             }
390 0 0         if ($row = $sth->fetchrow_arrayref) {
391 0           return $self->deserialize($row->[0]);
392             }
393 0           $sth->finish;
394 0           return;
395              
396             }
397 0     0     }
398 0           );
399              
400             }
401              
402             1;
403              
404             =head1 NAME
405              
406             Catmandu::Store::DBI - A Catmandu::Store plugin for DBI based interfaces
407              
408             =head1 VERSION
409              
410             Version 0.04
411              
412             =head1 SYNOPSIS
413              
414             use Catmandu::Store::DBI;
415              
416             my $store = Catmandu::Store::DBI->new(
417             data_source => 'DBI:mysql:database=test', # prefix "DBI:" optionl
418             username => '', # optional
419             password => '', # optional
420             );
421              
422             my $obj1 = $store->bag->add({ name => 'Patrick' });
423              
424             printf "obj1 stored as %s\n" , $obj1->{_id};
425              
426             # Force an id in the store
427             my $obj2 = $store->bag->add({ _id => 'test123' , name => 'Nicolas' });
428              
429             my $obj3 = $store->bag->get('test123');
430              
431             $store->bag->delete('test123');
432              
433             $store->bag->delete_all;
434              
435             # All bags are iterators
436             $store->bag->each(sub { ... });
437             $store->bag->take(10)->each(sub { ... });
438              
439             The L command line client can be used like this:
440              
441             catmandu import JSON to DBI --data_source SQLite:mydb.sqlite < data.json
442              
443             =head1 DESCRIPTION
444              
445             A Catmandu::Store::DBI is a Perl package that can store data into
446             DBI backed databases. The database as a whole is called a 'store'
447             (L. Databases also have compartments (e.g. tables)
448             called 'bags' (L).
449              
450             =head1 METHODS
451              
452             =head2 new(data_source => $data_source)
453              
454             Create a new Catmandu::Store::DBI store using a DBI $data_source. The
455             prefix "DBI:" is added automatically if needed.
456              
457             =head2 bag($name)
458              
459             Create or retieve a bag with name $name. Returns a Catmandu::Bag.
460              
461             =head1 AUTHOR
462              
463             Nicolas Steenlant, C<< >>
464              
465             =head1 CONTRIBUTOR
466              
467             Vitali Peil C<< >>
468              
469             =head1 CONTRIBUTOR
470              
471             Nicolas Franck C<< >>
472              
473             =head1 SEE ALSO
474              
475             L, L, L
476              
477             =cut