File Coverage

blib/lib/PGObject/Util/BulkLoad.pm
Criterion Covered Total %
statement 90 150 60.0
branch 8 28 28.5
condition n/a
subroutine 18 30 60.0
pod 7 7 100.0
total 123 215 57.2


line stmt bran cond sub pod time code
1             package PGObject::Util::BulkLoad;
2              
3 3     3   53321 use 5.006;
  3         12  
  3         130  
4 3     3   17 use strict;
  3         5  
  3         115  
5 3     3   17 use warnings FATAL => 'all';
  3         18  
  3         136  
6              
7 3     3   23 use Carp;
  3         4  
  3         271  
8 3     3   4726 use Memoize;
  3         8473  
  3         167  
9 3     3   21082 use Text::CSV;
  3         60862  
  3         22  
10 3     3   17405 use Try::Tiny;
  3         4844  
  3         4586  
11              
12             =head1 NAME
13              
14             PGObject::Util::BulkLoad - Bulk load records into PostgreSQL
15              
16             =head1 VERSION
17              
18             Version 0.04
19              
20             =cut
21              
22             our $VERSION = '0.04';
23              
24              
25             =head1 SYNOPSIS
26              
27             To insert all rows into a table using COPY:
28              
29             PGObject::Util::BulkLoad->copy(
30             {table => 'mytable', insert_cols => ['col1', 'col2'], dbh => $dbh},
31             @objects
32             );
33              
34             To copy to a temp table and then upsert:
35              
36             PGObject::Util::BulkLoad->upsert(
37             {table => 'mytable',
38             insert_cols => ['col1', 'col2'],
39             update_cols => ['col1'],
40             key_cols => ['col2'],
41             dbh => $dbh},
42             @objects
43             );
44              
45             Or if you prefer to run the statements yourself:
46              
47             PGObject::Util::BulkLoad->statement(
48             table => 'mytable', type => 'temp', tempname => 'foo_123'
49             );
50             PGObject::Util::BulkLoad->statement(
51             table => 'mytable', type => 'copy', insert_cols => ['col1', 'col2']
52             );
53             PGObject::Util::BulkLoad->statement(
54             type => 'upsert',
55             tempname => 'foo_123',
56             table => 'mytable',
57             insert_cols => ['col1', 'col2'],
58             update_cols => ['col1'],
59             key_cols => ['col2']
60             );
61              
62             If you are running repetitive calls, you may be able to trade time for memory
63             using Memoize by unning the following:
64              
65             PGObject::Util::BulkLoad->memoize_statements;
66              
67             To unmemoize:
68              
69             PGObject::Util::BulkLoad->unmemoize;
70              
71             To flush cache
72              
73             PGObject::Util::BulkLoad->flush_memoization;
74              
75             =head1 DESCRIPTION
76              
77             =head1 SUBROUTINES/METHODS
78              
79             =head2 memoize_statements
80              
81             This function exists to memoize statement calls, i.e. generate the exact same
82             statements on the same argument calls. This isn't too likely to be useful in
83             most cases but it may be if you have repeated bulk loader calls in a persistent
84             script (for example real-time importing of csv data from a frequent source).
85              
86             =cut
87              
88             sub memoize_statements {
89 0     0 1 0 memoize 'statement';
90             }
91              
92             =head2 unmemoize
93              
94             Unmemoizes the statement calls.
95              
96             =cut
97              
98             sub unmemoize {
99 0     0 1 0 Memoize::unmemoize 'statement';
100             }
101              
102             =head2 flush_memoization
103              
104             Flushes the cache for statement memoization. Does *not* flush the cache for
105             escaping memoization since that is a bigger win and a pure function accepting
106             simple strings.
107              
108             =cut
109              
110             sub flush_memoization {
111 0     0 1 0 Memoization::flush_cache('statement');
112             }
113            
114             =head2 statement
115              
116             This takes the following arguments and returns a suitable SQL statement
117              
118             =over
119              
120             =item type
121              
122             Type of statement. Options are:
123              
124             =over
125              
126             =item temp
127              
128             Create a temporary table
129              
130             =item copy
131              
132             sql COPY statement
133              
134             =item upsert
135              
136             Update/Insert CTE pulling temp table
137              
138             =item stats
139              
140             Get stats on pending upsert, grouped by an arbitrary column.
141              
142             =back
143              
144             =item table
145              
146             Name of table
147              
148             =item tempname
149              
150             Name of temp table
151              
152             =item insert_cols
153              
154             Column names for insert
155              
156             =item update_cols
157              
158             Column names for update
159              
160             =item key_cols
161              
162             Names of columns in primary key.
163              
164             =item group_stats_by
165              
166             Names of columns to group stats by
167              
168             =back
169              
170             =cut
171              
172             sub _sanitize_ident {
173 98     98   121 my($string) = @_;
174 98         1056 $string =~ s/"/""/g;
175 98         4291 qq("$string");
176             }
177              
178             sub _statement_stats {
179 3     3   7 my ($args) = @_;
180 3 50       19 croak 'Key columns must array ref' unless (ref $args->{key_cols}) =~ /ARRAY/;
181 3 50       6 croak 'Must supply key columns' unless @{$args->{key_cols}};
  3         11  
182 3 50       8 croak 'Must supply table name' unless $args->{table};
183 3 50       9 croak 'Must supply temp table' unless $args->{tempname};
184              
185 3         2 my @groupcols;
186 3         8 @groupcols = $args->{group_stats_by}
187 0         0 ? @{$args->{group_stats_by}}
188 3 50       11 : @{$args->{key_cols}};
189 3         9 my $table = _sanitize_ident($args->{table});
190 3         8 my $temp = _sanitize_ident($args->{tempname});
191 4         6 "SELECT " . join(', ', map {"$temp." . _sanitize_ident($_)} @groupcols) . ",
  4         9  
192 3         5 SUM(CASE WHEN ROW(" . join(', ', map {"$table." . _sanitize_ident($_)
193 4         7 } @{$args->{key_cols}}) . ") IS NULL
194             THEN 1
195             ELSE 0
196             END) AS pgobject_bulkload_inserts,
197 3         6 SUM(CASE WHEN ROW(" . join(', ', map {"$table." . _sanitize_ident($_)
198 4         8 } @{$args->{key_cols}}) . ") IS NULL
199             THEN 0
200             ELSE 1
201             END) AS pgobject_bulkload_updates
202             FROM $temp
203 3         5 LEFT JOIN $table USING (" . join(', ', map {_sanitize_ident($_)
204 4         10 } @{$args->{key_cols}}) . ")
205 3         9 GROUP BY " . join(', ', map {"$temp." . _sanitize_ident($_)} @groupcols);
206             }
207              
208             sub _statement_temp {
209 3     3   5 my ($args) = @_;
210              
211 3         8 "CREATE TEMPORARY TABLE " . _sanitize_ident($args->{tempname}) .
212             " ( LIKE " . _sanitize_ident($args->{table}) . " )";
213             }
214              
215             sub _statement_copy {
216 3     3   4 my ($args) = @_;
217 3 50       10 croak 'No insert cols' unless $args->{insert_cols};
218              
219 9         15 "COPY " . _sanitize_ident($args->{table}) . "(" .
220 3         6 join(', ', map { _sanitize_ident($_) } @{$args->{insert_cols}}) . ') ' .
  3         6  
221             "FROM STDIN WITH CSV";
222             }
223              
224             sub _statement_upsert {
225 3     3   6 my ($args) = @_;
226 3         7 for (qw(insert_cols update_cols key_cols table tempname)){
227 15 50       42 croak "Missing argument $_" unless $args->{$_};
228             }
229 3         9 my $table = _sanitize_ident($args->{table});
230 3         9 my $temp = _sanitize_ident($args->{tempname});
231              
232 5         7 "WITH UP AS (
233             UPDATE $table
234             SET " . join(",
235 3         23 ", map { _sanitize_ident($_) . ' = ' .
236 4         12 "$temp." . _sanitize_ident($_)} @{$args->{update_cols}}) . "
237             FROM $temp
238             WHERE " . join("
239 3         371 AND ", map {"$table." . _sanitize_ident($_) . ' = ' .
240 4         9 "$temp." . _sanitize_ident($_)} @{$args->{key_cols}}) . "
241 3         6 RETURNING " . join(", ", map {"$table." . _sanitize_ident($_)} @{$args->{key_cols}}) ."
  9         184  
242             )
243             INSERT INTO $table (" . join(", ",
244 3         8 map {_sanitize_ident($_)} @{$args->{insert_cols}}) . ")
  9         14  
245 3         7 SELECT " . join(", ", map {_sanitize_ident($_)} @{$args->{insert_cols}}) . "
  4         8  
246             FROM $temp
247 3         176 WHERE ROW(". join(", ", map { "$temp." . _sanitize_ident($_)} @{$args->{key_cols}}) .")
  4         188  
248 3         7 NOT IN (SELECT ".join(", ", map { "UP." . _sanitize_ident($_)} @{$args->{key_cols}}) ." FROM UP)";
  3         8  
249              
250             }
251              
252             sub statement {
253 12     12 1 6652 my %args = @_;
254 12 50       43 croak "Missing argument 'type'" unless $args{type};
255 3     3   217 no strict 'refs';
  3         8  
  3         1962  
256 12         18 &{"_statement_$args{type}"}(\%args);
  12         51  
257             }
258              
259             =head2 upsert
260              
261             Creates a temporary table named "pg_object.bulkload" and copies the data there
262              
263             If the first argument is an object, then if there is a function by the name
264             of the object, it will provide the value.
265              
266             =over
267              
268             =item table
269              
270             Table to upsert into
271              
272             =item insert_cols
273              
274             Columns to insert (by name)
275              
276             =item update_cols
277              
278             Columns to update (by name)
279              
280             =item key_cols
281              
282             Key columns (by name)
283              
284             =item group_stats_by
285              
286             This is an array of column names for optional stats retrieval and grouping.
287             If it is set then we will grab the stats and return them. Note this has a
288             performance penalty because it means an extra scan of the temp table and an
289             extra join against the parent table. See get_stats for the return value
290             information if this is set.
291              
292             =back
293              
294             =cut
295              
296             sub _build_args {
297 0     0     my ($init_args, $obj) = @_;
298 0           my @arglist = qw(table insert_cols update_cols key_cols dbh
299             tempname group_stats_by);
300             return {
301 0           map { my $val;
  0            
302 0     0     for my $v ($init_args->{$_}, try { $obj->$_ } ){
  0            
303 0 0         $val = $v if defined $v;
304             }
305 0           $_ => $val;
306             } @arglist
307             }
308             }
309              
310             sub upsert {
311 0     0 1   my ($args) = shift;
312 0 0         $args = shift if $args eq __PACKAGE__;
313             try {
314 0     0     $args->can('foo');
315 0           unshift @_, $args; # args is an object
316 0           };
317 0           $args = _build_args($args, $_[0]);
318 0           my $dbh = $args->{dbh};
319              
320             # pg_temp is the schema of temporary tables. If someone wants to create
321             # a permanent table there, they are inviting disaster. At any rate this is
322             # safe but a plain drop without schema qualification risks losing user data.
323              
324 0           my $return_value;
325              
326 0           $dbh->do("DROP TABLE IF EXISTS pg_temp.pgobject_bulkloader");
327 0           $dbh->do(statement( %$args, (type => 'temp',
328             tempname => 'pgobject_bulkloader')
329             ));
330 0           copy({(%$args, (table => 'pgobject_bulkloader'))}, @_);
331              
332 0 0         if ($args->{group_stats_by}){
333 0           $return_value = get_stats(
334             {(%$args, (tempname => 'pgobject_bulkloader'))}
335             );
336             }
337              
338 0           $dbh->do(statement( %$args, (type => 'upsert',
339             tempname => 'pgobject_bulkloader')));
340 0           my $dropstatus = $dbh->do("DROP TABLE pg_temp.pgobject_bulkloader");
341 0 0         return $return_value if $args->{group_stats_by};
342 0           return $dropstatus;
343             }
344              
345             =head2 copy
346              
347             Copies data into the specified table. The following arguments are used:
348              
349             =over
350              
351             =item table
352              
353             Table to upsert into
354              
355             =item insert_cols
356              
357             Columns to insert (by name)
358              
359             =back
360              
361             =cut
362              
363             sub _to_csv {
364 0     0     my ($args) = shift;
365              
366 0           my $csv = Text::CSV->new();
367 0           join("\n", map {
368 0           my $obj = $_;
369 0           $csv->combine(map { $obj->{$_} } @{$args->{cols}});
  0            
  0            
370 0           $csv->string();
371             } @_);
372             }
373              
374             sub copy {
375 0     0 1   my ($args) = shift;
376 0 0         $args = shift if $args eq __PACKAGE__;
377             try {
378 3     3   44 no warnings;
  3         7  
  3         136  
379 3     3   32 no strict;
  3         6  
  3         640  
380 0     0     $args->can('foo');
381 0           unshift @_, $args; # args is an object
382 0           };
383 0           $args = _build_args($args, $_[0]);
384 0           my $dbh = $args->{dbh};
385 0           $dbh->do(statement(%$args, (type => 'copy')));
386 0           $dbh->pg_putcopydata(_to_csv({cols => $args->{insert_cols}}, @_));
387 0           $dbh->pg_putcopyend();
388             }
389              
390             =head2 get_stats
391              
392             Takes the same arguments as upsert plus group_stats_by
393              
394             Returns an array of hashrefs representing the number of inserts and updates
395             that an upsert will perform. It must be performed before the upsert statement
396             actually runs. Typically this is run via the upsert command (which
397             automatically runs this if group_stats_by is set in the argumements hash).
398              
399             There is a performance penalty here since an unindexed left join is required
400             between the temp and the normal table.
401              
402             This function requires tempname, table, and group_stats_by to be set in the
403             argument hashref. The return value is a list of hashrefs with the following
404             keys:
405              
406             =over
407              
408             =item stats
409              
410             Hashref with keys inserts and updates including numbers of rows.
411              
412             =item keys
413              
414             Hashref for key columns and their values, by name
415              
416             =back
417              
418             =cut
419              
420             sub get_stats {
421 0     0 1   my ($args) = shift;
422 0 0         $args = shift if $args eq __PACKAGE__;
423             try {
424 3     3   18 no warnings;
  3         5  
  3         113  
425 3     3   14 no strict;
  3         7  
  3         671  
426 0     0     $args->can('foo');
427 0           unshift @_, $args; # args is an object
428 0           };
429 0           $args = _build_args($args, $_[0]);
430 0           my $dbh = $args->{dbh};
431              
432 0           my $returnval = [
433             map {
434 0           my @row = @$_;
435 0           { stats => {
436             updates => pop @row,
437             inserts => pop @row,
438             },
439             keys => {
440 0           map { $_ => shift @row } @{$args->{group_stats_by}}
  0            
441             },
442             }
443 0           } @{ $dbh->selectall_arrayref(statement(%$args, (type => 'stats'))) }
444             ];
445             }
446              
447             =head1 AUTHOR
448              
449             Chris Travers, C<< >>
450              
451             =head1 CO-MAINTAINERS
452              
453             =over
454              
455             =item Binary.com, C<< >>
456              
457             =back
458              
459             =head1 BUGS
460              
461             Please report any bugs or feature requests to C, or through
462             the web interface at L. I will be notified, and then you'll
463             automatically be notified of progress on your bug as I make changes.
464              
465              
466              
467              
468             =head1 SUPPORT
469              
470             You can find documentation for this module with the perldoc command.
471              
472             perldoc PGObject::Util::BulkLoad
473              
474              
475             You can also look for information at:
476              
477             =over 4
478              
479             =item * RT: CPAN's request tracker (report bugs here)
480              
481             L
482              
483             =item * AnnoCPAN: Annotated CPAN documentation
484              
485             L
486              
487             =item * CPAN Ratings
488              
489             L
490              
491             =item * Search CPAN
492              
493             L
494              
495             =back
496              
497              
498             =head1 ACKNOWLEDGEMENTS
499              
500              
501             =head1 LICENSE AND COPYRIGHT
502              
503             Copyright 2014 Chris Travers.
504              
505             This program is distributed under the (Revised) BSD License:
506             L
507              
508             Redistribution and use in source and binary forms, with or without
509             modification, are permitted provided that the following conditions
510             are met:
511              
512             * Redistributions of source code must retain the above copyright
513             notice, this list of conditions and the following disclaimer.
514              
515             * Redistributions in binary form must reproduce the above copyright
516             notice, this list of conditions and the following disclaimer in the
517             documentation and/or other materials provided with the distribution.
518              
519             * Neither the name of Chris Travers's Organization
520             nor the names of its contributors may be used to endorse or promote
521             products derived from this software without specific prior written
522             permission.
523              
524             THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
525             "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
526             LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
527             A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
528             OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
529             SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
530             LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
531             DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
532             THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
533             (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
534             OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
535              
536              
537             =cut
538              
539             1; # End of PGObject::Util::BulkUpload