File Coverage

blib/lib/PGObject/Util/BulkLoad.pm
Criterion Covered Total %
statement 89 149 59.7
branch 8 28 28.5
condition n/a
subroutine 18 30 60.0
pod 7 7 100.0
total 122 214 57.0


line stmt bran cond sub pod time code
1             package PGObject::Util::BulkLoad;
2              
3 2     2   14455 use 5.006;
  2         4  
4 2     2   6 use strict;
  2         2  
  2         35  
5 2     2   6 use warnings FATAL => 'all';
  2         2  
  2         70  
6              
7 2     2   8 use Carp;
  2         2  
  2         138  
8 2     2   1065 use Memoize;
  2         4004  
  2         105  
9 2     2   1516 use Text::CSV;
  2         23776  
  2         104  
10 2     2   1113 use Try::Tiny;
  2         2441  
  2         1830  
11              
12             =head1 NAME
13              
14             PGObject::Util::BulkLoad - Bulk load records into PostgreSQL
15              
16             =cut
17              
18             our $VERSION = '0.05';
19              
20             =head1 SYNOPSIS
21              
22             To insert all rows into a table using COPY:
23             my ($dbh, @objects);
24             PGObject::Util::BulkLoad->copy(
25             {table => 'mytable', insert_cols => ['col1', 'col2'], dbh => $dbh},
26             @objects
27             );
28              
29             To copy to a temp table and then upsert:
30             my ($dbh, @objects);
31             PGObject::Util::BulkLoad->upsert(
32             {table => 'mytable',
33             insert_cols => ['col1', 'col2'],
34             update_cols => ['col1'],
35             key_cols => ['col2'],
36             dbh => $dbh},
37             @objects
38             );
39              
40             Or if you prefer to run the statements yourself:
41              
42             PGObject::Util::BulkLoad->statement(
43             table => 'mytable', type => 'temp', tempname => 'foo_123'
44             );
45             PGObject::Util::BulkLoad->statement(
46             table => 'mytable', type => 'copy', insert_cols => ['col1', 'col2']
47             );
48             PGObject::Util::BulkLoad->statement(
49             type => 'upsert',
50             tempname => 'foo_123',
51             table => 'mytable',
52             insert_cols => ['col1', 'col2'],
53             update_cols => ['col1'],
54             key_cols => ['col2']
55             );
56              
57             If you are running repetitive calls, you may be able to trade time for memory
58             using Memoize by unning the following:
59              
60             PGObject::Util::BulkLoad->memoize_statements;
61              
62             To unmemoize:
63              
64             PGObject::Util::BulkLoad->unmemoize;
65              
66             To flush cache
67              
68             PGObject::Util::BulkLoad->flush_memoization;
69              
70             =head1 DESCRIPTION
71              
72             =head1 SUBROUTINES/METHODS
73              
74             =head2 memoize_statements
75              
76             This function exists to memoize statement calls, i.e. generate the exact same
77             statements on the same argument calls. This isn't too likely to be useful in
78             most cases but it may be if you have repeated bulk loader calls in a persistent
79             script (for example real-time importing of csv data from a frequent source).
80              
81             =cut
82              
83             sub memoize_statements {
84 0     0 1 0 return memoize 'statement';
85             }
86              
87             =head2 unmemoize
88              
89             Unmemoizes the statement calls.
90              
91             =cut
92              
93             sub unmemoize {
94 0     0 1 0 return Memoize::unmemoize 'statement';
95             }
96              
97             =head2 flush_memoization
98              
99             Flushes the cache for statement memoization. Does *not* flush the cache for
100             escaping memoization since that is a bigger win and a pure function accepting
101             simple strings.
102              
103             =cut
104              
105             sub flush_memoization {
106 0     0 1 0 return Memoization::flush_cache('statement');
107             }
108              
109             =head2 statement
110              
111             This takes the following arguments and returns a suitable SQL statement
112              
113             =over
114              
115             =item type
116              
117             Type of statement. Options are:
118              
119             =over
120              
121             =item temp
122              
123             Create a temporary table
124              
125             =item copy
126              
127             sql COPY statement
128              
129             =item upsert
130              
131             Update/Insert CTE pulling temp table
132              
133             =item stats
134              
135             Get stats on pending upsert, grouped by an arbitrary column.
136              
137             =back
138              
139             =item table
140              
141             Name of table
142              
143             =item tempname
144              
145             Name of temp table
146              
147             =item insert_cols
148              
149             Column names for insert
150              
151             =item update_cols
152              
153             Column names for update
154              
155             =item key_cols
156              
157             Names of columns in primary key.
158              
159             =item group_stats_by
160              
161             Names of columns to group stats by
162              
163             =back
164              
165             =cut
166              
167             sub _sanitize_ident {
168 98     98   64 my ($string) = @_;
169 98         87 $string =~ s/"/""/g;
170 98         205 return qq("$string");
171             }
172              
173             sub _statement_stats {
174 3     3   4 my ($args) = @_;
175 3 50       12 croak 'Key columns must array ref' unless (ref $args->{key_cols}) =~ /ARRAY/;
176 3 50       1 croak 'Must supply key columns' unless @{$args->{key_cols}};
  3         8  
177 3 50       4 croak 'Must supply table name' unless $args->{table};
178 3 50       7 croak 'Must supply temp table' unless $args->{tempname};
179              
180 3         2 my @groupcols;
181             @groupcols =
182             $args->{group_stats_by}
183 3         6 ? @{$args->{group_stats_by}}
184 3 50       6 : @{$args->{key_cols}};
  0         0  
185 3         5 my $table = _sanitize_ident($args->{table});
186 3         5 my $temp = _sanitize_ident($args->{tempname});
187 4         5 return "SELECT " . join(', ', map { "$temp." . _sanitize_ident($_) } @groupcols) . ",
188 4         7 SUM(CASE WHEN ROW(" . join(', ', map { "$table." . _sanitize_ident($_) } @{$args->{key_cols}}) . ") IS NULL
  3         5  
189             THEN 1
190             ELSE 0
191             END) AS pgobject_bulkload_inserts,
192 4         5 SUM(CASE WHEN ROW(" . join(', ', map { "$table." . _sanitize_ident($_) } @{$args->{key_cols}}) . ") IS NULL
  3         4  
193             THEN 0
194             ELSE 1
195             END) AS pgobject_bulkload_updates
196             FROM $temp
197 4         5 LEFT JOIN $table USING (" . join(', ', map { _sanitize_ident($_) } @{$args->{key_cols}}) . ")
  3         4  
198 3         5 GROUP BY " . join(', ', map { "$temp." . _sanitize_ident($_) } @groupcols);
  4         7  
199             }
200              
201             sub _statement_temp {
202 3     3   3 my ($args) = @_;
203              
204 3         8 return "CREATE TEMPORARY TABLE " . _sanitize_ident($args->{tempname}) . " ( LIKE " . _sanitize_ident($args->{table}) . " )";
205             }
206              
207             sub _statement_copy {
208 3     3   3 my ($args) = @_;
209 3 50       7 croak 'No insert cols' unless $args->{insert_cols};
210              
211             return
212             "COPY "
213             . _sanitize_ident($args->{table}) . "("
214 3         6 . join(', ', map { _sanitize_ident($_) } @{$args->{insert_cols}}) . ') '
  9         8  
  3         5  
215             . "FROM STDIN WITH CSV";
216             }
217              
218             sub _statement_upsert {
219 3     3   3 my ($args) = @_;
220 3         5 for (qw(insert_cols update_cols key_cols table tempname)) {
221 15 50       25 croak "Missing argument $_" unless $args->{$_};
222             }
223 3         6 my $table = _sanitize_ident($args->{table});
224 3         6 my $temp = _sanitize_ident($args->{tempname});
225              
226             return "WITH UP AS (
227             UPDATE $table
228             SET " . join(
229             ",
230 5         6 ", map { _sanitize_ident($_) . ' = ' . "$temp." . _sanitize_ident($_) } @{$args->{update_cols}})
  3         20  
231             . "
232             FROM $temp
233             WHERE " . join("
234 4         6 AND ", map { "$table." . _sanitize_ident($_) . ' = ' . "$temp." . _sanitize_ident($_) } @{$args->{key_cols}}) . "
  3         5  
235 4         6 RETURNING " . join(", ", map { "$table." . _sanitize_ident($_) } @{$args->{key_cols}}) . "
  3         3  
236             )
237 9         10 INSERT INTO $table (" . join(", ", map { _sanitize_ident($_) } @{$args->{insert_cols}}) . ")
  3         5  
238 9         7 SELECT " . join(", ", map { _sanitize_ident($_) } @{$args->{insert_cols}}) . "
  3         5  
239             FROM $temp
240 4         5 WHERE ROW(" . join(", ", map { "$temp." . _sanitize_ident($_) } @{$args->{key_cols}}) . ")
  3         4  
241 3         7 NOT IN (SELECT " . join(", ", map { "UP." . _sanitize_ident($_) } @{$args->{key_cols}}) . " FROM UP)";
  4         3  
  3         4  
242              
243             }
244              
245             sub statement {
246 12     12 1 4587 my %args = @_;
247 12 50       24 croak "Missing argument 'type'" unless $args{type};
248 2     2   11 no strict 'refs';
  2         3  
  2         922  
249 12         13 return &{"_statement_$args{type}"}(\%args);
  12         38  
250             }
251              
252             =head2 upsert
253              
254             Creates a temporary table named "pg_object.bulkload" and copies the data there
255              
256             If the first argument is an object, then if there is a function by the name
257             of the object, it will provide the value.
258              
259             =over
260              
261             =item table
262              
263             Table to upsert into
264              
265             =item insert_cols
266              
267             Columns to insert (by name)
268              
269             =item update_cols
270              
271             Columns to update (by name)
272              
273             =item key_cols
274              
275             Key columns (by name)
276              
277             =item group_stats_by
278              
279             This is an array of column names for optional stats retrieval and grouping.
280             If it is set then we will grab the stats and return them. Note this has a
281             performance penalty because it means an extra scan of the temp table and an
282             extra join against the parent table. See get_stats for the return value
283             information if this is set.
284              
285             =back
286              
287             =cut
288              
289             sub _build_args {
290 0     0     my ($init_args, $obj) = @_;
291 0           my @arglist = qw(table insert_cols update_cols key_cols dbh
292             tempname group_stats_by);
293             return {
294             map {
295 0           my $val;
  0            
296 0     0     for my $v ($init_args->{$_}, try { $obj->$_ }) {
  0            
297 0 0         $val = $v if defined $v;
298             }
299 0           $_ => $val;
300             } @arglist
301             };
302             }
303              
304             sub upsert { ## no critic (ArgUnpacking)
305 0     0 1   my ($args) = shift;
306 0 0         $args = shift if $args eq __PACKAGE__;
307             try {
308 0     0     $args->can('foo');
309 0           unshift @_, $args; # args is an object
310 0           };
311 0           $args = _build_args($args, $_[0]);
312 0           my $dbh = $args->{dbh};
313              
314             # pg_temp is the schema of temporary tables. If someone wants to create
315             # a permanent table there, they are inviting disaster. At any rate this is
316             # safe but a plain drop without schema qualification risks losing user data.
317              
318 0           my $return_value;
319              
320 0           $dbh->do("DROP TABLE IF EXISTS pg_temp.pgobject_bulkloader");
321 0           $dbh->do(
322             statement(
323             %$args,
324             (
325             type => 'temp',
326             tempname => 'pgobject_bulkloader'
327             )));
328 0           copy({(%$args, (table => 'pgobject_bulkloader'))}, @_);
329              
330 0 0         if ($args->{group_stats_by}) {
331 0           $return_value = get_stats({(%$args, (tempname => 'pgobject_bulkloader'))});
332             }
333              
334             $dbh->do(
335 0           statement(
336             %$args,
337             (
338             type => 'upsert',
339             tempname => 'pgobject_bulkloader'
340             )));
341 0           my $dropstatus = $dbh->do("DROP TABLE pg_temp.pgobject_bulkloader");
342 0 0         return $return_value if $args->{group_stats_by};
343 0           return $dropstatus;
344             }
345              
346             =head2 copy
347              
348             Copies data into the specified table. The following arguments are used:
349              
350             =over
351              
352             =item table
353              
354             Table to upsert into
355              
356             =item insert_cols
357              
358             Columns to insert (by name)
359              
360             =back
361              
362             =cut
363              
364             sub _to_csv {
365 0     0     my ($args) = shift;
366              
367 0           my $csv = Text::CSV->new();
368             return join(
369             "\n",
370             map {
371 0           my $obj = $_;
  0            
372 0           $csv->combine(map { $obj->{$_} } @{$args->{cols}});
  0            
  0            
373 0           $csv->string();
374             } @_
375             );
376             }
377              
378             sub copy { ## no critic (ArgUnpacking)
379 0     0 1   my ($args) = shift;
380 0 0         $args = shift if $args eq __PACKAGE__;
381             try {
382 2     2   10 no warnings; ## no critic (ProhibitNoWarnings)
  2         4  
  2         75  
383 2     2   24 no strict; ## no critic (ProhibitNoStrict)
  2         3  
  2         283  
384 0     0     $args->can('foo');
385 0           unshift @_, $args; # args is an object
386 0           };
387 0           $args = _build_args($args, $_[0]);
388 0           my $dbh = $args->{dbh};
389 0           $dbh->do(statement(%$args, (type => 'copy')));
390 0           $dbh->pg_putcopydata(_to_csv({cols => $args->{insert_cols}}, @_));
391 0           return $dbh->pg_putcopyend();
392             }
393              
394             =head2 get_stats
395              
396             Takes the same arguments as upsert plus group_stats_by
397              
398             Returns an array of hashrefs representing the number of inserts and updates
399             that an upsert will perform. It must be performed before the upsert statement
400             actually runs. Typically this is run via the upsert command (which
401             automatically runs this if group_stats_by is set in the argumements hash).
402              
403             There is a performance penalty here since an unindexed left join is required
404             between the temp and the normal table.
405              
406             This function requires tempname, table, and group_stats_by to be set in the
407             argument hashref. The return value is a list of hashrefs with the following
408             keys:
409              
410             =over
411              
412             =item stats
413              
414             Hashref with keys inserts and updates including numbers of rows.
415              
416             =item keys
417              
418             Hashref for key columns and their values, by name
419              
420             =back
421              
422             =cut
423              
424             sub get_stats { ## no critic (ArgUnpacking)
425 0     0 1   my ($args) = shift;
426 0 0         $args = shift if $args eq __PACKAGE__;
427             try {
428 2     2   9 no warnings; ## no critic (ProhibitNoWarnings)
  2         3  
  2         60  
429 2     2   8 no strict; ## no critic (ProhibitNoStrict)
  2         3  
  2         295  
430 0     0     $args->can('foo');
431 0           unshift @_, $args; # args is an object
432 0           };
433 0           $args = _build_args($args, $_[0]);
434 0           my $dbh = $args->{dbh};
435              
436             return [
437             map {
438 0           my @row = @$_;
439             {
440             stats => {
441             updates => pop @row,
442             inserts => pop @row,
443             },
444 0           keys => {map { $_ => shift @row } @{$args->{group_stats_by}}},
  0            
  0            
445             }
446 0           } @{$dbh->selectall_arrayref(statement(%$args, (type => 'stats')))}];
  0            
447             }
448              
449             =head1 AUTHOR
450              
451             Chris Travers, C<< >>
452              
453             =head1 CO-MAINTAINERS
454              
455             =over
456              
457             =item Binary.com, C<< >>
458              
459             =back
460              
461             =head1 BUGS
462              
463             Please report any bugs or feature requests to C, or through
464             the web interface at L. I will be notified, and then you'll
465             automatically be notified of progress on your bug as I make changes.
466              
467              
468              
469              
470             =head1 SUPPORT
471              
472             You can find documentation for this module with the perldoc command.
473              
474             perldoc PGObject::Util::BulkLoad
475              
476              
477             You can also look for information at:
478              
479             =over 4
480              
481             =item * RT: CPAN's request tracker (report bugs here)
482              
483             L
484              
485             =item * AnnoCPAN: Annotated CPAN documentation
486              
487             L
488              
489             =item * CPAN Ratings
490              
491             L
492              
493             =item * Search CPAN
494              
495             L
496              
497             =back
498              
499              
500             =head1 ACKNOWLEDGEMENTS
501              
502              
503              
504             =cut
505              
506             1; # End of PGObject::Util::BulkUpload