File Coverage

blib/lib/Data/Record/Serialize/Encode/dbi.pm
Criterion Covered Total %
statement 109 125 87.2
branch 31 56 55.3
condition 6 12 50.0
subroutine 22 24 91.6
pod 5 6 83.3
total 173 223 77.5


line stmt bran cond sub pod time code
1              
2             # ABSTRACT: store a record in a database
3              
4             use Moo::Role;
5 1     1   484  
  1         3  
  1         5  
6             use Data::Record::Serialize::Error { errors =>
7 1         20 [ qw( param
8             connect
9             schema
10             create
11             insert
12             sqlite_backend
13             )] }, -all;
14 1     1   400  
  1         22  
15             our $VERSION = '1.03'; # TRIAL
16              
17             use Data::Record::Serialize::Types -types;
18 1     1   182  
  1         4  
  1         9  
19             use SQL::Translator;
20 1     1   993 use SQL::Translator::Schema;
  1         207071  
  1         31  
21 1     1   7 use Types::Standard -types;
  1         2  
  1         23  
22 1     1   4 use Types::Common::String qw( NonEmptySimpleStr );
  1         2  
  1         12  
23 1     1   5448  
  1         24954  
  1         10  
24             use List::Util qw[ pairmap ];
25 1     1   525  
  1         2  
  1         70  
26             use DBI;
27 1     1   6  
  1         2  
  1         33  
28             use namespace::clean;
29 1     1   5  
  1         3  
  1         8  
30              
31              
32              
33              
34              
35              
36             has dsn => (
37             is => 'ro',
38             required => 1,
39             coerce => sub {
40             my $arg = 'ARRAY' eq ref $_[0] ? $_[0] : [ $_[0] ];
41             my @dsn;
42             for my $el ( @{$arg} ) {
43              
44             my $ref = ref $el;
45             push( @dsn, $el ), next
46             unless $ref eq 'ARRAY' || $ref eq 'HASH';
47              
48             my @arr = $ref eq 'ARRAY' ? @{$el} : %{$el};
49              
50             push @dsn, pairmap { join( '=', $a, $b ) } @arr;
51             }
52              
53             unshift @dsn, 'dbi' unless $dsn[0] =~ /^dbi/;
54              
55             return join( ':', @dsn );
56             },
57             );
58              
59             has _cached => (
60             is => 'ro',
61             default => 0,
62             init_arg => 'cached',
63             );
64              
65              
66              
67              
68              
69              
70              
71              
72             has table => (
73             is => 'ro',
74             isa => Str,
75             required => 1,
76             );
77              
78              
79              
80              
81              
82              
83              
84              
85              
86              
87              
88             has schema => (
89             is => 'ro',
90             isa => Maybe[NonEmptySimpleStr],
91             );
92              
93              
94              
95              
96              
97              
98              
99             has drop_table => (
100             is => 'ro',
101             isa => Bool,
102             default => 0,
103             );
104              
105              
106              
107              
108              
109              
110              
111             has create_table => (
112             is => 'ro',
113             isa => Bool,
114             default => 1,
115             );
116              
117              
118              
119              
120              
121              
122              
123             has primary => (
124             is => 'ro',
125             isa => ArrayOfStr,
126             coerce => 1,
127             default => sub { [] },
128             );
129              
130              
131              
132              
133              
134              
135              
136             has db_user => (
137             is => 'ro',
138             isa => Str,
139             default => '',
140             );
141              
142              
143              
144              
145              
146              
147              
148             has db_pass => (
149             is => 'ro',
150             isa => Str,
151             default => '',
152             );
153              
154             has _sth => (
155             is => 'rwp',
156             init_arg => undef,
157             );
158              
159             has _dbh => (
160             is => 'rwp',
161             init_arg => undef,
162             clearer => 1,
163             predicate => 1,
164             );
165              
166              
167              
168              
169              
170              
171              
172             has column_defs => (
173             is => 'rwp',
174             lazy => 1,
175             clearer => 1,
176             init_arg => undef,
177             builder => sub {
178             my $self = shift;
179 0     0   0  
180             my @column_defs;
181 0         0 for my $field ( @{ $self->output_fields } ) {
182 0         0 push @column_defs,
  0         0  
183             join( ' ',
184             $field,
185             $self->output_types->{$field},
186 0         0 ( 'primary key' )x!!( $self->primary eq $field ) );
187             }
188              
189             return join ', ', @column_defs;
190 0         0 },
191             );
192              
193              
194              
195              
196              
197              
198              
199             has batch => (
200             is => 'ro',
201             isa => Int,
202             default => 100,
203             coerce => sub { $_[0] > 1 ? $_[0] : 0 },
204             );
205              
206              
207              
208              
209              
210              
211              
212             has dbitrace => ( is => 'ro', );
213              
214              
215              
216              
217              
218              
219              
220              
221              
222              
223              
224              
225              
226             has queue => (
227             is => 'ro',
228             init_arg => undef,
229             default => sub { [] },
230             );
231              
232             around '_build__nullified' => sub {
233             my $orig = shift;
234             my $self = $_[0];
235              
236             my $nullified = $self->$orig( @_ );
237              
238             # defer to the caller
239             return $nullified if $self->has_nullify;
240              
241             # add all of the numeric fields
242             [ @{ $self->numeric_fields } ];
243              
244             };
245              
246             my %MapTypes = (
247             Pg => { S => 'text', N => 'real', I => 'integer', B => 'boolean' },
248             SQLite => { S => 'text', N => 'real', I => 'integer', B => 'integer' },
249             Default => { S => 'text', N => 'real', I => 'integer', B => 'integer' },
250             );
251              
252             $MapTypes{ $_[0]->_dbi_driver } // $MapTypes{ Default }
253             }
254              
255 15   33 15   43  
256              
257              
258              
259              
260              
261              
262              
263              
264             my $self = shift;
265 0 0   0 1 0  
266             return
267             defined $self->_dbh->table_info( '%', $self->schema, $self->table,
268 5     5   12 'TABLE' )->fetch;
269             }
270              
271 5         49 my $self = shift;
272             defined $self->schema ? $self->schema . '.' . $self->table : $self->table;
273             }
274              
275             has _dsn_components => (
276 5     5   13 is => 'lazy',
277 5 50       48 init_arg => undef,
278             builder => sub {
279             my @dsn = DBI->parse_dsn( $_[0]->dsn )
280             or error( 'param', "unable to parse DSN: ", $_[0]->dsn );
281             \@dsn;
282             },
283             );
284 5 50   5   81  
285             $_[0]->_dsn_components->[1];
286 5         203 }
287              
288             my %producer = (
289             DB2 => 'DB2',
290             MySQL => 'mysql',
291 25     25   366 Oracle => 'Oracle',
292             Pg => 'PostgreSQL',
293             SQLServer => 'SQLServer',
294             SQLite => 'SQLite',
295             Sybase => 'Sybase',
296             );
297              
298             has _producer => (
299             is => 'lazy',
300             init_arg => undef,
301             builder => sub {
302             my $dbi_driver = $_[0]->_dbi_driver;
303             $producer{$dbi_driver} || $dbi_driver;
304             },
305             );
306              
307              
308 5     5   57  
309 5 50       155  
310              
311              
312              
313             my $self = shift;
314              
315             return if $self->_has_dbh;
316              
317             my %attr = (
318             AutoCommit => !$self->batch,
319             RaiseError => 1,
320 5     5 0 11 PrintError => 0,
321             'private_' . __PACKAGE__ => __FILE__ . __LINE__,
322 5 50       21 );
323              
324 5         35  
325             if ( $self->_dbi_driver eq 'SQLite' ) {
326             my $DBD_SQLite_VERSION = 1.31;
327              
328             error( 'sqlite_backend',
329             "need DBD::SQLite >= $DBD_SQLite_VERSION; have @{[ DBD::SQLite->VERSION ]}"
330             )
331             unless eval {
332 5 50       14 require DBD::SQLite;
333 5         40 DBD::SQLite->VERSION( $DBD_SQLite_VERSION );
334             1;
335             };
336 0         0  
337             $attr{sqlite_allow_multiple_statements} = 1;
338 5 50       10 }
339 5         31  
340 5         101 my $connect = $self->_cached ? 'connect_cached' : 'connect';
341 5         22  
342             $self->_set__dbh(
343             DBI->$connect( $self->dsn, $self->db_user, $self->db_pass, \%attr ) )
344 5         16 or error( 'connect', 'error connecting to ', $self->dsn, "\n" );
345              
346             $self->_dbh->trace( $self->dbitrace )
347 5 50       24 if $self->dbitrace;
348              
349 5 50       45 if ( $self->drop_table || ( $self->create_table && !$self->_table_exists ) ) {
350             my $tr = SQL::Translator->new(
351             from => sub {
352             my $schema = $_[0]->schema;
353 5 50       4217 my $table = $schema->add_table( name => $self->_fq_table_name )
354             or error( 'schema', $schema->error );
355              
356 5 50 33     43 for my $field_name ( @{ $self->output_fields } ) {
      66        
357             $table->add_field(
358             name => $field_name,
359 5     5   2414 data_type => $self->output_types->{$field_name}
360 5 50       3729 ) or error( 'schema', $table->error );
361             }
362              
363 5         4196 if ( @{ $self->primary } ) {
  5         82  
364             $table->primary_key( @{ $self->primary } )
365             or error( 'schema', $table->error );
366 15 50       10694 }
367              
368             1;
369             },
370 5 50       6061 to => $self->_producer,
  5         30  
371 0 0       0 producer_args => { no_transaction => 1 },
  0         0  
372             add_drop_table => $self->drop_table && $self->_table_exists,
373             );
374              
375 5         19 my $sql = $tr->translate
376             or error( 'schema', $tr->error );
377 5   66     3375  
378             eval { $self->_dbh->do( $sql ); };
379              
380             error( 'create', { msg => "error in table creation: $@", payload => $sql } )
381             if $@;
382 5 50       22868  
383             $self->_dbh->commit if $self->batch;
384             }
385 5         21105  
  5         51  
386             my $sql = sprintf(
387 5 50       14188 "insert into %s (%s) values (%s)",
388             $self->_dbh->quote_identifier( undef, $self->schema, $self->table ),
389             join( ',', @{ $self->output_fields } ),
390 5 100       50205 join( ',', ( '?' ) x @{ $self->output_fields } ),
391             );
392              
393             $self->_set__sth( $self->_dbh->prepare( $sql ) );
394              
395             return;
396 5         2040 }
397 5         95  
  5         118  
398              
399              
400 5         89  
401              
402 5         380  
403              
404              
405              
406              
407              
408              
409              
410              
411              
412              
413              
414              
415              
416              
417              
418              
419              
420              
421              
422              
423              
424              
425              
426              
427              
428              
429              
430              
431              
432              
433              
434              
435              
436              
437              
438              
439              
440              
441              
442              
443              
444              
445              
446              
447              
448              
449             my $self = shift;
450              
451             return 1 unless $self->_has_dbh;
452              
453             my $queue = $self->queue;
454              
455             if ( @{ $queue } ) {
456             my $last;
457 7     7 1 14 eval {
458             $self->_sth->execute( @$last )
459 7 50       22 while $last = shift @{ $queue };
460             };
461 7         15  
462             my $error = $@;
463 7 100       9 $self->_dbh->commit;
  7         23  
464 6         9  
465 6         11 if ( $error ) {
466             unshift @{ $queue }, $last;
467 6         10  
  30         1380  
468             my %query;
469             @query{ @{ $self->output_fields } } = @$last;
470 6         16 error( "insert", { msg => "Transaction aborted: $error", payload => \%query } );
471 6         66555 }
472             }
473 6 50       48  
474 0         0 1;
  0         0  
475             }
476 0         0  
477 0         0  
  0         0  
478 0         0  
479              
480              
481              
482 7         33  
483              
484              
485              
486              
487              
488              
489              
490              
491              
492              
493              
494             my $self = shift;
495              
496             if ( $self->batch ) {
497             push @{ $self->queue }, [ @{ $_[0] }{ @{ $self->output_fields } } ];
498             $self->flush
499             if @{ $self->queue } == $self->batch;
500              
501             }
502             else {
503 30     30 1 137 eval {
504             $self->_sth->execute( @{ $_[0] }{ @{ $self->output_fields } } );
505 30 100       73 };
506 24         30 error( "insert", { msg => "record insertion failed: $@", payload => $_[0] } )
  24         43  
  24         182  
  24         300  
507             if $@;
508 24 100       32 }
  24         124  
509             }
510              
511              
512 6         7 after '_trigger_output_fields' => sub {
513 6         14 $_[0]->clear_column_defs;
  6         62376  
  6         135  
514             };
515 6 50       82  
516             after '_trigger_output_types' => sub {
517             $_[0]->clear_column_defs;
518             };
519              
520              
521              
522              
523              
524              
525              
526              
527              
528              
529              
530              
531              
532              
533              
534              
535              
536              
537              
538              
539              
540             my $self = shift;
541              
542             return 1 unless $self->_has_dbh;
543              
544             $self->flush if $self->batch;
545             $self->_dbh->disconnect;
546             $self->_clear_dbh;
547              
548             1;
549             }
550 5     5 1 488  
551              
552 5 50       23  
553              
554 5 100       28  
555 5         487  
556 5         168  
557              
558 5         72  
559              
560              
561              
562             my $self = shift;
563              
564             warnings::warnif( 'Data::Record::Serialize::Encode::dbi::queue', __PACKAGE__.": record queue is not empty in object destruction" )
565             if @{ $self->queue };
566              
567             $self->_dbh->disconnect
568             if $self->_has_dbh;
569              
570             }
571              
572              
573 5     5 1 47043 # these are required by the Sink/Encode interfaces but should never be
574             # called in the ordinary run of things.
575              
576 5 50       13  
  5         32  
577              
578 5 50       85  
579              
580              
581              
582              
583             with 'Data::Record::Serialize::Role::EncodeAndSink';
584              
585             1;
586              
587             #
588             # This file is part of Data-Record-Serialize
589             #
590             # This software is Copyright (c) 2017 by Smithsonian Astrophysical Observatory.
591             #
592             # This is free software, licensed under:
593             #
594             # The GNU General Public License, Version 3, June 2007
595             #
596              
597              
598             =pod
599              
600             =for :stopwords Diab Jerius Smithsonian Astrophysical Observatory Postgres
601              
602             =head1 NAME
603              
604             Data::Record::Serialize::Encode::dbi - store a record in a database
605              
606             =head1 VERSION
607              
608             version 1.03
609              
610             =head1 SYNOPSIS
611              
612             use Data::Record::Serialize;
613              
614             my $s = Data::Record::Serialize->new( encode => 'sqlite', ... );
615              
616             $s->send( \%record );
617              
618             =head1 DESCRIPTION
619              
620             B<Data::Record::Serialize::Encode::dbi> writes a record to a database using
621             L<DBI>.
622              
623             It performs both the L<Data::Record::Serialize::Role::Encode> and
624             L<Data::Record::Serialize::Role::Sink> roles.
625              
626             B<You cannot construct this directly>. You must use
627             L<Data::Record::Serialize/new>.
628              
629             =head2 Types
630              
631             Field types are recognized and converted to SQL types via the following map:
632              
633             S => 'text'
634             N => 'real'
635             I => 'integer'
636              
637             For Postgres, C<< B => 'boolean' >>. For other databases, C<< B => 'integer' >>.
638             This encoder handles transformation of the input "truthy" Boolean value into
639             a form appropriate for the database to ingest.
640              
641             =head2 NULL values
642              
643             By default numeric fields are set to C<NULL> if they are empty. This
644             can be changed by setting the C<nullify> attribute.
645              
646             =head2 Performance
647              
648             Records are by default written to the database in batches (see the
649             C<batch> attribute) to improve performance. Each batch is performed
650             as a single transaction. If there is an error during the transaction,
651             record insertions during the transaction are I<not> rolled back.
652              
653             =head2 Errors
654              
655             Transaction errors result in an exception in the
656             C<Data::Record::Serialize::Error::Encode::dbi::insert> class. See
657             L<Data::Record::Serialize::Error> for more information on exception
658             objects.
659              
660             =head1 ATTRIBUTES
661              
662             These attributes are available in addition to the standard attributes
663             defined for L<< Data::Record::Serialize::new|Data::Record::Serialize/new >>.
664              
665             =head2 C<dsn>
666              
667             The value passed to the constructor.
668              
669             =head2 C<table>
670              
671             The value passed to the constructor.
672              
673             =head2 C<schema>
674              
675             The value passed to the constructor.
676              
677             =head2 C<drop_table>
678              
679             The value passed to the constructor.
680              
681             =head2 C<create_table>
682              
683             The value passed to the constructor.
684              
685             =head2 C<primary>
686              
687             The value passed to the constructor.
688              
689             =head2 C<db_user>
690              
691             The value passed to the constructor.
692              
693             =head2 C<db_pass>
694              
695             The value passed to the constructor.
696              
697             =head2 C<batch>
698              
699             The value passed to the constructor.
700              
701             =head2 C<dbitrace>
702              
703             The value passed to the constructor.
704              
705             =head1 METHODS
706              
707             =head2 C<queue>
708              
709             $queue = $obj->queue;
710              
711             The queue containing records not yet successfully transmitted
712             to the database. This is only of interest if L</batch> is not C<0>.
713              
714             Each element is an array containing values to be inserted into the database,
715             in the same order as the fields in L<Data::Serialize/output_fields>.
716              
717             =head2 to_bool
718              
719             $bool = $self->to_bool( $truthy );
720              
721             Convert a truthy value to something that the JSON encoders will recognize as a boolean.
722              
723             =head2 flush
724              
725             $s->flush;
726              
727             Flush the queue of records to the database. It returns true if
728             all of the records have been successfully written.
729              
730             If writing fails:
731              
732             =over
733              
734             =item *
735              
736             Writing of records ceases.
737              
738             =item *
739              
740             The failing record is left at the head of the queue. This ensures
741             that it is possible to retry writing the record.
742              
743             =item *
744              
745             an exception object (in the
746             C<Data::Record::Serialize::Error::Encode::dbi::insert> class) will be
747             thrown. The failing record (in its final form after formatting, etc)
748             is available via the object's C<payload> method.
749              
750             =back
751              
752             If a record fails to be written, it will still be queued for the next
753             attempt at writing to the database. If this behavior is undesired,
754             make sure to remove it from the queue:
755              
756             use Data::Dumper;
757              
758             if ( ! eval { $output->flush } ) {
759             warn "$@", Dumper( $@->payload );
760             shift $output->queue->@*;
761             }
762              
763             As an example of completely flushing the queue while notifying of errors:
764              
765             use Data::Dumper;
766              
767             until ( eval { $output->flush } ) {
768             warn "$@", Dumper( $@->payload );
769             shift $output->queue->@*;
770             }
771              
772             =head2 send
773              
774             $s->send( \%record );
775              
776             Send a record to the database.
777             If there is an error, an exception object (with class
778             C<Data::Record::Serialize::Error::Encode::dbi::insert>) will be
779             thrown, and the record which failed to be written will be available
780             via the object's C<payload> method.
781              
782             If in L</batch> mode, the record is queued for later transmission.
783             When the number of records queued reaches that specified by the
784             L</batch> attribute, the C<flush> method is called. See L</flush> for
785             more information on how errors are handled.
786              
787             =head2 close
788              
789             $s->close;
790              
791             Close the database handle. If writing is batched, records in the queue
792             are written to the database via L</flush>. An exception will be thrown
793             if a record cannot be written. See L</flush> for more details.
794              
795             As an example of draining the queue while notifying of errors:
796              
797             use Data::Dumper;
798              
799             until ( eval { $output->close } ) {
800             warn "$@", Dumper( $@->payload );
801             shift $output->queue->@*;
802             }
803              
804             =head2 DEMOLISH
805              
806             This method is called when the object is destroyed. It closes the
807             database handle B<but does not flush the record queue>.
808              
809             A warning is emitted if the record queue is not empty; turn off the
810             C<Data::Record::Serialize::Encode::dbi::queue> warning to silence it.
811              
812             =for Pod::Coverage has_schema
813              
814             =for Pod::Coverage _has_dbh
815             _clear_dbh
816              
817             =for Pod::Coverage setup
818              
819             =for Pod::Coverage say
820             print
821             encode
822              
823             =head1 CONSTRUCTOR OPTIONS
824              
825             =over
826              
827             =item C<dsn>
828              
829             I<Required> The DBI Data Source Name (DSN) passed to B<L<DBI>>. It
830             may either be a string or an arrayref containing strings or arrayrefs,
831             which should contain key-value pairs. Elements in the sub-arrays are
832             joined with C<=>, elements in the top array are joined with C<:>. For
833             example,
834              
835             [ 'SQLite', { dbname => $db } ]
836              
837             is transformed to
838              
839             SQLite:dbname=$db
840              
841             The standard prefix of C<dbi:> will be added if not present.
842              
843             =item C<cached>
844              
845             If true, the database connection is made with L<DBI::connect_cached|DBI/connect_cached> rather than
846             L<DBI::connect|DBI/connect>
847              
848             =item C<table>
849              
850             I<Required> The name of the table in the database which will contain the records.
851             It will be created if it does not exist.
852              
853             =item C<schema>
854              
855             The schema to which the table belongs. Optional.
856              
857             =item C<drop_table>
858              
859             If true, the table is dropped and a new one is created.
860              
861             =item C<create_table>
862              
863             If true, a table will be created if it does not exist.
864              
865             =item C<primary>
866              
867             A single output column name or an array of output column names which
868             should be the primary key(s). If not specified, no primary keys are
869             defined.
870              
871             =item C<db_user>
872              
873             The name of the database user
874              
875             =item C<db_pass>
876              
877             The database password
878              
879             =item C<batch>
880              
881             The number of rows to write to the database at once. This defaults to 100.
882              
883             If greater than 1, C<batch> rows are cached and then sent out in a
884             single transaction. See L</Performance> for more information.
885              
886             =item C<dbitrace>
887              
888             A trace setting passed to L<DBI>.
889              
890             =back
891              
892             =head1 SUPPORT
893              
894             =head2 Bugs
895              
896             Please report any bugs or feature requests to bug-data-record-serialize@rt.cpan.org or through the web interface at: https://rt.cpan.org/Public/Dist/Display.html?Name=Data-Record-Serialize
897              
898             =head2 Source
899              
900             Source is available at
901              
902             https://gitlab.com/djerius/data-record-serialize
903              
904             and may be cloned from
905              
906             https://gitlab.com/djerius/data-record-serialize.git
907              
908             =head1 SEE ALSO
909              
910             Please see those modules/websites for more information related to this module.
911              
912             =over 4
913              
914             =item *
915              
916             L<Data::Record::Serialize|Data::Record::Serialize>
917              
918             =back
919              
920             =head1 AUTHOR
921              
922             Diab Jerius <djerius@cpan.org>
923              
924             =head1 COPYRIGHT AND LICENSE
925              
926             This software is Copyright (c) 2017 by Smithsonian Astrophysical Observatory.
927              
928             This is free software, licensed under:
929              
930             The GNU General Public License, Version 3, June 2007
931              
932             =cut