File Coverage

blib/lib/Xtract.pm
Criterion Covered Total %
statement 210 253 83.0
branch 31 70 44.2
condition 1 10 10.0
subroutine 42 44 95.4
pod 0 20 0.0
total 284 397 71.5


line stmt bran cond sub pod time code
1             package Xtract;
2              
3             =pod
4              
5             =head1 NAME
6              
7             Xtract - Take any data source and deliver it to the world
8              
9             =head1 DESCRIPTION
10              
11             B
12              
13             Xtract is an command line application for extracting data out of
14             many different types of databases (or other things that are able
15             to look like a database via L).
16              
17             More information to follow...
18              
19             =cut
20              
21 4     4   141751 use 5.008005;
  4         15  
  4         162  
22 4     4   46 use strict;
  4         7  
  4         124  
23 4     4   24 use warnings;
  4         7  
  4         179  
24 4     4   3635 use bytes ();
  4         39  
  4         92  
25 4     4   22 use Carp 'croak';
  4         8  
  4         244  
26 4     4   4819 use File::Which 0.05 ();
  4         3851  
  4         98  
27 4     4   25 use File::Remove 1.42 ();
  4         70  
  4         85  
28 4     4   6107 use Getopt::Long 2.37 ();
  4         79912  
  4         262  
29 4     4   2878 use Params::Util 0.35 ();
  4         9249  
  4         110  
30 4     4   3588 use IPC::Run3 0.042 ();
  4         269269  
  4         123  
31 4     4   7597 use Time::HiRes 1.9709 ();
  4         8645  
  4         135  
32 4     4   5919 use Time::Elapsed 0.24 ();
  4         33410  
  4         191  
33 4     4   29055 use DBI 1.57 ':sql_types';
  4         115901  
  4         6368  
34 4     4   6384 use DBD::SQLite 1.25 ();
  4         92686  
  4         143  
35 4     4   2897 use Xtract::Scan ();
  4         11  
  4         86  
36 4     4   2512 use Xtract::Scan::SQLite ();
  4         12  
  4         83  
37 4     4   2300 use Xtract::Scan::mysql ();
  4         15  
  4         84  
38 4     4   1748 use Xtract::Publish ();
  4         10  
  4         90  
39 4     4   11321 use Xtract::Column ();
  4         10  
  4         286  
40 4     4   2243 use Xtract::Table ();
  4         10  
  4         163  
41              
42             our $VERSION = '0.16';
43              
44 4     4   24 use Mouse 0.93;
  4         77  
  4         35  
45              
46             has from => ( is => 'ro', isa => 'Str' );
47             has user => ( is => 'ro', isa => 'Str' );
48             has pass => ( is => 'ro', isa => 'Str' );
49             has to => ( is => 'ro', isa => 'Str' );
50             has index => ( is => 'ro', isa => 'Bool' );
51             has trace => ( is => 'ro', isa => 'Bool' );
52             has sqlite_cache => ( is => 'ro', isa => 'Int' );
53             has argv => ( is => 'ro', isa => 'ArrayRef[Str]' );
54             has publish => ( is => 'rw', isa => 'Xtract::Publish' );
55              
56 4     4   1889 no Mouse;
  4         9  
  4         18  
57              
58              
59              
60              
61              
62             #####################################################################
63             # Main Function
64              
65             sub main {
66 0   0 0 0 0 my $class = shift || __PACKAGE__;
67              
68             # Parse the command line options
69 0         0 my $FROM = '';
70 0         0 my $USER = '';
71 0         0 my $PASS = '';
72 0         0 my $TO = '';
73 0         0 my $INDEX = '';
74 0         0 my $QUIET = '';
75 0         0 my $CACHE = '';
76 0 0       0 Getopt::Long::GetOptions(
77             "from=s" => \$FROM,
78             "user=s" => \$USER,
79             "pass=s" => \$PASS,
80             "to=s" => \$TO,
81             "index" => \$INDEX,
82             "quiet" => \$QUIET,
83             "sqlite_cache=i" => \$CACHE,
84             ) or die("Failed to parse options");
85              
86             # Prepend DBI: to the --from as a convenience if needed
87 0 0 0     0 if ( defined $FROM and $FROM !~ /^DBI:/ ) {
88 0         0 $FROM = "DBI:$FROM";
89             }
90              
91             # Create the program instance
92 0 0       0 my $self = $class->new(
93             from => $FROM,
94             user => $USER,
95             pass => $PASS,
96             to => $TO,
97             index => $INDEX,
98             trace => ! $QUIET,
99             $CACHE ? ( sqlite_cache => $CACHE ) : (),
100             argv => [ @ARGV ],
101             );
102              
103             # Clear the existing output sqlite file
104 0 0 0     0 if ( defined $self->to and -e $self->to ) {
105 0         0 $self->say("Deleting '" . $self->to . "'");
106 0         0 File::Remove::remove($self->to);
107             }
108              
109             # Run the object
110 0         0 $self->run;
111             }
112              
113              
114              
115              
116              
117              
118             #####################################################################
119             # Main Execution
120              
121             sub run {
122 2     2 0 5 my $self = shift;
123 2         14 my $start = Time::HiRes::time();
124              
125             # Create the target database
126 2         20 $self->say("Creating SQLite database " . $self->to);
127 2         16 $self->to_prepare;
128              
129             # Fill the database
130 2         15 $self->add;
131              
132             # Generate any required indexes
133 2 50       18 if ( $self->index ) {
134 2         16 foreach my $table ( $self->to_tables ) {
135 2         9 my $name = $table->name;
136 2         12 $self->say("Indexing table $name");
137 2         14 $self->index_table($table);
138             }
139             }
140              
141             # Finish up the population phase
142 2         47 $self->say("Cleaning up");
143 2         17 $self->to_finish;
144 2         47 $self->disconnect;
145              
146             # Pause, briefly, to allow any disk caching stuff to cach up.
147             # This is a just a speculative attempt to fix a compression problem.
148              
149             # Spawn the publisher to prepare the files for the public
150 2         59 $self->publish(
151             Xtract::Publish->new(
152             sqlite => $self->to,
153             trace => $self->trace,
154             gz => 1,
155             bz2 => 1,
156             lz => Xtract::LZMA->available,
157             )
158             );
159 2         17 $self->publish->run;
160              
161             # Summarise the run
162 2         43 my $elapsed = int(Time::HiRes::time() - $start);
163 2         44 my $human = Time::Elapsed::elapsed($elapsed);
164 2         3853 $self->say( "Extraction completed in $elapsed" );
165 2 50       64 if ( -f $self->publish->sqlite ) {
166 2         30 $self->say( "Created " . $self->publish->sqlite );
167             }
168 2 50       28 if ( -f $self->publish->sqlite_gz ) {
169 2         12 $self->say( "Created " . $self->publish->sqlite_gz );
170             }
171 2 50       20 if ( -f $self->publish->sqlite_bz2 ) {
172 2         15 $self->say( "Created " . $self->publish->sqlite_bz2 );
173             }
174 2 50       21 if ( -f $self->publish->sqlite_lz ) {
175 2         22 $self->say( "Created " . $self->publish->sqlite_lz );
176             }
177              
178 2         50 return 1;
179             }
180              
181             sub add {
182 1     1 0 2 my $self = shift;
183              
184             # Check the command
185 1   50     4 my $command = shift(@{$self->{argv}}) || 'all';
186 1 50       5 if ( $command eq 'all' ) {
    0          
187             # Shortcut if there's no tables
188 1 50       5 unless ( $self->from_tables ) {
189 0         0 print "No tables to export\n";
190 0         0 exit(255);
191             }
192             } elsif ( $command eq 'null' ) {
193             # Do nothing else special
194             } else {
195 0         0 die("Unsupported command '$command'");
196             }
197              
198             # Push all source tables into the target database
199 1         21 foreach my $table ( $self->from_tables ) {
200 1         6 my $name = $table->name;
201 1         7 $self->say("Publishing table $name");
202 1         5 my $tstart = Time::HiRes::time();
203 1         10 my $rows = $self->add_table($table);
204 1         12 my $rate = int($rows / (Time::HiRes::time() - $tstart));
205 1         9 $self->say("Completed table $name ($rows rows @ $rate/sec)");
206             }
207              
208 1         6 return 1;
209             }
210              
211             sub add_table {
212 1     1 0 3 my $self = shift;
213 1         6 $self->create_table(
214             $self->from_scan->add_table(@_)
215             );
216             }
217              
218             sub add_select {
219 1     1 0 16 my $self = shift;
220 1         5 $self->create_table(
221             $self->from_scan->add_select(@_)
222             );
223             }
224              
225             sub create_table {
226 2     2 0 6 my $self = shift;
227 2         12 my %params = @_;
228 2         7 my $create = $params{create};
229 2         5 my $select = $params{select};
230 2         5 my $insert = $params{insert};
231 2         6 my $bind = $params{bind};
232              
233             # Create the table
234 2         11 $self->to_dbh->do(@$create);
235              
236             # Launch the select query
237 2         792 my $from = $self->from_dbh->prepare(shift(@$select));
238 2 50       128 unless ( $from ) {
239 0         0 croak($DBI::errstr);
240             }
241 2         200 $from->execute(@$select);
242              
243             # Stream the data into the target table
244 2         9 my $dbh = $self->to_dbh;
245 2         28 $dbh->begin_work;
246 2         42 $dbh->{AutoCommit} = 0;
247 2         8 my $rows = 0;
248 2 50       14 my $to = $dbh->prepare($insert) or croak($DBI::errstr);
249 2         137 while ( my $row = $from->fetchrow_arrayref ) {
250 6 50       15 if ( $bind ) {
251             # When inserting blobs, we need to use the bind_param method
252 0         0 foreach ( 0 .. $#$row ) {
253 0 0       0 if ( defined $bind->[$_] ) {
254 0         0 $to->bind_param( $_ + 1, $row->[$_], $bind->[$_] );
255             } else {
256 0         0 $to->bind_param( $_ + 1, $row->[$_] );
257             }
258             }
259 0         0 $to->execute;
260             } else {
261 6         111 $to->execute( @$row );
262             }
263 6 50       73 next if ++$rows % 10000;
264 0         0 $dbh->commit;
265             }
266 2         58 $dbh->commit;
267 2         15 $dbh->{AutoCommit} = 1;
268              
269             # Clean up
270 2         11 $to->finish;
271 2         7 $from->finish;
272              
273 2         48 return $rows;
274             }
275              
276             sub index_table {
277 2     2 0 6 my $self = shift;
278 2         5 my $table = shift;
279 2         19 my $tname = $table->name;
280 2         9 my $info = $self->to_dbh->selectall_arrayref("PRAGMA table_info($tname)");
281 2         170 foreach my $column ( map { $_->[1] } @$info ) {
  4         15  
282 4         19 $self->index_column($tname, $column);
283             }
284 2         13 return 1;
285             }
286              
287             sub index_column {
288 4     4 0 8 my $self = shift;
289 4         14 my ($t, $c) = _COLUMN(@_);
290 4 100       13 my $unique = _UNIQUE($self->to_dbh, $t, $c) ? 'UNIQUE' : '';
291 4         13 $self->to_dbh->do("CREATE $unique INDEX IF NOT EXISTS idx__${t}__${c} ON ${t} ( ${c} )");
292 4         873 return 1;
293             }
294              
295              
296              
297              
298              
299             #####################################################################
300             # Source Methods
301              
302             sub from_dbh {
303 4     4 0 11 my $self = shift;
304 4 100       18 unless ( $self->{from_dbh} ) {
305 2         19 $self->say("Connecting to " . $self->from);
306 2         35 $self->{from_dbh} = DBI->connect(
307             $self->from,
308             $self->user,
309             $self->pass,
310             {
311             PrintError => 1,
312             RaiseError => 1,
313             }
314             );
315 2 50       3724 unless ( $self->{from_dbh} ) {
316 0         0 die("Failed to connect to " . $self->from);
317             }
318             }
319 4         48 return $self->{from_dbh};
320             }
321              
322             sub from_scan {
323 4     4 0 10 my $self = shift;
324 4 100       21 unless ( $self->{from_scan} ) {
325 2         13 $self->{from_scan} = Xtract::Scan->create( $self->from_dbh );
326             }
327 4         28 return $self->{from_scan};
328             }
329              
330             sub from_tables {
331 4     4 0 22051 my $self = shift;
332 4 100       35 unless ( $self->{from_tables} ) {
333 2         12 my $scan = $self->from_scan;
334 2         90 $self->{from_tables} = {
335             map {
336 2         11 $_ => Xtract::Table->new(
337             name => $_,
338             scan => $scan,
339             )
340             } $scan->tables
341             };
342             }
343 4         27 return map {
344 4         20 $self->{from_tables}->{$_}
345 4         11 } sort keys %{$self->{from_tables}};
346             }
347              
348             sub from_table {
349 0     0 0 0 my $self = shift;
350 0         0 my $name = shift;
351 0 0       0 unless ( $self->{from_tables} ) {
352 0         0 $self->from_tables;
353             }
354 0 0       0 unless ( exists $self->{from_tables}->{$name} ) {
355 0         0 die "No such table '$name'";
356             }
357 0         0 return $self->{from_tables}->{$name}
358             }
359              
360              
361              
362              
363              
364             #####################################################################
365             # Destination Methods
366              
367             sub to_dsn {
368 2     2 0 27 "DBI:SQLite:" . $_[0]->to
369             }
370              
371             sub to_dbh {
372 20     20 0 32 my $self = shift;
373 20 100       62 unless ( $self->{to_dbh} ) {
374 2         14 $self->{to_dbh} = DBI->connect( $self->to_dsn, '', '', {
375             PrintError => 1,
376             RaiseError => 1,
377             } );
378 2 50       1177 unless ( $self->{to_dbh} ) {
379 0         0 die("Failed to connect to " . $self->to_dsn);
380             }
381             }
382 20         185 return $self->{to_dbh};
383             }
384              
385             sub to_scan {
386 2     2 0 6 Xtract::Scan->create( shift->to_dbh );
387             }
388              
389             sub to_tables {
390 2     2 0 5 my $self = shift;
391 2         12 my $scan = $self->to_scan;
392 2         22 my $tables = {
393             map {
394 2         13 $_ => Xtract::Table->new(
395             name => $_,
396             scan => $scan,
397             )
398             } $scan->tables
399             };
400 2         11 return map {
401 2         13 $tables->{$_}
402             } sort keys %$tables;
403             }
404              
405             # Prepare the target database
406             sub to_prepare {
407 2     2 0 6 my $self = shift;
408 2         22 my $dbh = $self->to_dbh;
409              
410             # Maximise compatibility
411 2         16 $dbh->do('PRAGMA legacy_file_format = 1');
412              
413             # Turn on all the go-faster pragmas
414 2         222 $dbh->do('PRAGMA synchronous = 0');
415 2         460 $dbh->do('PRAGMA temp_store = 2');
416 2         146 $dbh->do('PRAGMA journal_mode = OFF');
417 2         288 $dbh->do('PRAGMA locking_mode = EXCLUSIVE');
418              
419             # Disable auto-vacuuming because we'll only fill this once.
420             # Do a one-time vacuum so we start with a clean empty database.
421 2         142 $dbh->do('PRAGMA auto_vacuum = 0');
422 2         135 $dbh->do('VACUUM');
423              
424             # Set the page cache if needed
425 2 50       1343 if ( $self->sqlite_cache ) {
426 0         0 my $page_size = $dbh->selectrow_arrayref('PRAGMA page_size')->[0];
427 0 0       0 if ( $page_size ) {
428 0         0 my $cache_size = $self->sqlite_cache * 1024 * 1024 / $page_size;
429 0         0 $dbh->do("PRAGMA cache_size = $cache_size");
430             }
431             }
432              
433 2         7 return 1;
434             }
435              
436             # Finalise the target database
437             sub to_finish {
438 2     2 0 6 my $self = shift;
439 2         9 my $dbh = $self->to_dbh;
440              
441             # Tidy up the database settings
442 2         13 $dbh->do('PRAGMA synchronous = NORMAL');
443 2         162 $dbh->do('PRAGMA temp_store = 0');
444 2         171 $dbh->do('PRAGMA locking_mode = NORMAL');
445              
446             # Precache index optimisation hints
447 2 50       249 if ( $self->index ) {
448 2         15 $dbh->do('ANALYZE');
449             }
450              
451 2         11559659 return 1;
452             }
453              
454              
455              
456              
457              
458             #####################################################################
459             # Support Methods
460              
461             sub disconnect {
462 2     2 0 9 my $self = shift;
463 2 50       21 if ( $self->{from_scan} ) {
464 2         10 delete($self->{from_scan});
465             }
466 2 50       14 if ( $self->{from_dbh} ) {
467 2         228 delete($self->{from_dbh})->disconnect;
468             }
469 2 50       11 if ( $self->{to_dbh} ) {
470 2         129 delete($self->{to_dbh})->disconnect;
471             }
472 2         133 return 1;
473             }
474              
475             sub say {
476 20 50   20 0 189 if ( Params::Util::_CODE($_[0]->trace) ) {
    50          
477 0         0 $_[0]->say( @_[1..$#_] );
478             } elsif ( $_[0]->trace ) {
479 0         0 my $t = scalar localtime time;
480 0         0 print map { "[$t] $_\n" } @_[1..$#_];
  0         0  
481             }
482             }
483              
484             sub _UNIQUE {
485 4     4   7 my $dbh = shift;
486 4         11 my ($t, $c) = _COLUMN(@_);
487 4         39 my $count = $dbh->selectrow_arrayref(
488             "SELECT COUNT(*), COUNT(DISTINCT $c) FROM $t"
489             );
490 4         418 return !! ( $count->[0] eq $count->[1] );
491             }
492              
493             sub _COLUMN {
494 8 50   8   32 (@_ == 1) ? (split /\./, $_[0]) : @_;
495             }
496              
497             1;
498              
499             =pod
500              
501             =head1 SUPPORT
502              
503             Bugs should be reported via the CPAN bug tracker at
504              
505             L
506              
507             For other issues, contact the author.
508              
509             =head1 AUTHOR
510              
511             Adam Kennedy Eadamk@cpan.orgE
512              
513             =head1 SEE ALSO
514              
515             L
516              
517             =head1 COPYRIGHT
518              
519             Copyright 2009 - 2011 Adam Kennedy.
520              
521             This program is free software; you can redistribute
522             it and/or modify it under the same terms as Perl itself.
523              
524             The full text of the license can be found in the
525             LICENSE file included with this module.
526              
527             =cut