File Coverage

blib/lib/DBD/PgLite/MirrorPgToSQLite.pm
Criterion Covered Total %
statement 12 218 5.5
branch 0 138 0.0
condition 0 41 0.0
subroutine 4 17 23.5
pod 0 12 0.0
total 16 426 3.7


line stmt bran cond sub pod time code
1             package DBD::PgLite::MirrorPgToSQLite;
2 1     1   27155 use strict;
  1         3  
  1         58  
3 1     1   2897 use DBI;
  1         27181  
  1         101  
4 1     1   3157 use Storable;
  1         5359  
  1         102  
5 1     1   1115 use File::Copy;
  1         6292  
  1         4055  
6             require Exporter;
7             our @ISA = qw(Exporter);
8             our @EXPORT_OK = qw(pg_to_sqlite);
9             our $VERSION = '0.05';
10              
11             #### MAIN SUBROUTINE ####
12              
13             sub pg_to_sqlite {
14 0     0 0   my %opt = @_;
15 0           my %defaults = defaults();
16 0           for (keys %defaults) {
17 0 0         next if exists $opt{$_};
18 0           $opt{$_} = $defaults{$_};
19             }
20 0           $opt{tables} = _commasplit($opt{tables});
21 0           $opt{views} = _commasplit($opt{views});
22 0 0 0       if ($opt{where}
      0        
23             && $opt{where} !~ /^\s*where\s/i
24             && $opt{where} !~ /^\s*(?:natural\s+)?join\s/i) {
25 0           $opt{where} = 'where '.$opt{where};
26             }
27 0 0 0       die "Incompatible options: 'append' and 'snapshot'" if $opt{append} && $opt{snapshot};
28 0 0 0       die "Need either database handle (pg_dbh) or DSN (pg_dsn)" unless $opt{pg_dbh} || $opt{pg_dsn};
29 0           my $disconnect = 0;
30 0 0         unless ($opt{pg_dbh}) {
31 0           $disconnect++;
32 0 0         $opt{pg_dbh} = DBI->connect(@opt{ qw(pg_dsn pg_user pg_pass) },{RaiseError=>1})
33             or die "Could not connect to PostgreSQL: $DBI::errstr";
34             }
35 0           my $fn = $opt{sqlite_file};
36 0 0 0       die "Need both list of source tables and a SQLite file" unless @{$opt{tables}} && $fn;
  0            
37 0 0         $|++ if $opt{verbose};
38 0           my $lockfile = "$fn.lock";
39 0           lockfile('create',$lockfile);
40 0 0         if (-f "$fn.tmp") {
41 0           warn "WARNING: Removing temp file $fn.tmp - apparently left over from a previous run\n";
42 0 0         unlink "$fn.tmp" or die "ERROR: Could not remove file.tmp: $!\n";
43             }
44 0 0         if ($opt{append}) {
45 0 0         unless (copy $fn, "$fn.tmp") {
46 0           warn "WARNING: Could not copy $fn to $fn.tmp for appending: $! - turning --append off\n";
47 0           $opt{append} = 0;
48             }
49             }
50 0           $opt{sl_dbh} = DBI->connect("dbi:SQLite:dbname=$fn.tmp",undef,undef,{RaiseError=>1});
51              
52 0           my @tables = tablelist($opt{pg_dbh}, $opt{schema}, @{ $opt{tables} }); # handle regexp
  0            
53 0 0         print "MIRRORING ".scalar(@tables)." table(s):\n" if $opt{verbose};
54 0           my @views = viewlist($opt{pg_dbh}, $opt{schema}, @{ $opt{views} })
  0            
55 0 0         if grep { /^\/.+\/$/ } @{ $opt{views} };
  0            
56              
57 0           eval {
58 0 0         if ($opt{snapshot}) {
59 0           $opt{pg_dbh}->do("set session characteristics as transaction isolation level serializable");
60 0           $opt{pg_dbh}->begin_work;
61 0           mirror_table($_,%opt) for @tables;
62 0 0         mirror_functions(%opt) if $opt{functions};
63 0           $opt{pg_dbh}->commit;
64             } else {
65 0           mirror_table($_,%opt) for @tables;
66 0 0         mirror_functions(%opt) if $opt{functions};
67             }
68 0 0         if (@views) {
69 0 0         print "CREATING ".scalar(@views)." view(s):\n" if $opt{verbose};
70 0           create_view($_,%opt) for @views;
71             }
72 0 0         print "done!\n" if $opt{verbose};
73             };
74              
75 0 0         if ($@) {
76 0           lockfile('clear',$lockfile);
77 0           die $@;
78             }
79              
80 0 0         $opt{pg_dbh}->disconnect if $disconnect;
81 0           $opt{sl_dbh}->disconnect;
82              
83 0 0         if (-f $fn) {
84 0 0         copy $fn, "$fn.bak" or warn "WARNING: Could not make backup copy of $fn: $!\n";
85             }
86 0 0         move "$fn.tmp", $fn or die "ERROR: Could not move temporary SQLite file $fn.tmp to $fn";
87 0           lockfile('clear',$lockfile);
88             }
89              
90              
91              
92             ########## OTHER SUBROUTINES ###########
93              
94             sub _commasplit {
95 0     0     my $list = shift;
96 0 0         return [] unless $list;
97 0 0         $list = [$list] unless ref $list;
98 0           my @new = split(/\s*,\s*/,join(',',@$list));
99 0           return \@new;
100             }
101              
102             sub defaults {
103 0 0 0 0 0   my %defaults = (
104             verbose => 0,
105             pg_dsn => ($ENV{PGDATABASE} ? "dbi:Pg:dbname=$ENV{PGDATABASE}" : undef),
106             pg_user => ($ENV{PGUSER} || $ENV{USER}),
107             pg_pass => $ENV{PGPASSWORD},
108             schema => 'public',
109             tables => [],
110             sqlite_file => '',
111             where => '',
112             cachedir => '/tmp/sqlite_mirror_cache',
113             append => 0,
114             snapshot => 0,
115             indexes => 0,
116             views => [],
117             functions => 0,
118             page_limit => 5000, # each page is 8K
119             pg_dbh => undef,
120             );
121 0 0 0       $defaults{pg_dsn} ||= "dbi:Pg:dbname=$defaults{pg_user}" if $defaults{pg_user};
122 0           return %defaults;
123             }
124              
125             sub mirror_table {
126 0     0 0   my ($tn,%opt) = @_;
127 0           my $sn = $opt{schema};
128 0 0         print " - $sn.$tn\n" if $opt{verbose};
129 0           my ($create,$colcnt) = get_schema($sn,$tn,%opt);
130 0           my $drop = '';
131 0 0         if ($opt{append}) {
132 0           $drop = $opt{sl_dbh}->selectrow_array("select name from sqlite_master where type = 'table' and name = ?",{},$tn);
133 0 0         $opt{sl_dbh}->do("drop table $tn") if $drop;
134             }
135 0           $opt{sl_dbh}->do($create);
136 0           my $pages = $opt{pg_dbh}->selectrow_array("select relpages from pg_class where relnamespace = (select oid from pg_namespace where nspname = ?) and relname = ?",{},$sn,$tn);
137 0           my $ins = $opt{sl_dbh}->prepare("insert into $tn values (". join(',', ("?") x $colcnt) . ")");
138 0 0         if ($pages > $opt{page_limit}) {
139 0 0         warn " pagelimit ($opt{page_limit}) kicks in for $sn.$tn ($pages)\n" if $opt{verbose};
140 0           my @pkey = $opt{pg_dbh}->primary_key(undef,$sn,$tn);
141 0 0         warn " (pkey is )".join(":",@pkey)."\n" if $opt{verbose};
142 0 0         if (@pkey) {
143 0           my $pkey_vals = $opt{pg_dbh}->selectall_arrayref("select ".join(', ', @pkey)." from $sn.$tn");
144 0           my $sql = "select * from $sn.$tn where ".join(" and ", map {"$_ = ?"} @pkey);
  0            
145 0           my $selh = $opt{pg_dbh}->prepare($sql);
146 0           $opt{sl_dbh}->begin_work;
147 0           foreach (@$pkey_vals) {
148 0           $selh->execute(@$_);
149 0           my $row = $selh->fetchrow_arrayref;
150 0           $ins->execute(@$row);
151             }
152 0           $opt{sl_dbh}->commit;
153 0           $selh->finish;
154             }
155             else {
156 0           warn "*** CANNOT READ $sn.$tn ROW-BY-ROW - NO PRIMARY KEY\n*** SKIPPING TABLE!\n";
157 0           $ins->finish;
158 0           return;
159             }
160             }
161             else {
162 0           my $res = $opt{pg_dbh}->selectall_arrayref("select * from $sn.$tn $opt{where}");
163 0 0 0       if (@$res && scalar(@{$res->[0]}) != $colcnt) {
  0            
164 0           $ins->finish;
165 0           die "ERROR: Bad schema for table $tn: number of columns does not match\n";
166             }
167 0           $opt{sl_dbh}->begin_work;
168 0           $ins->execute(@$_) for @$res;
169 0           $opt{sl_dbh}->commit;
170             }
171 0           $ins->finish;
172 0 0         create_indexes($tn,%opt) if $opt{indexes};
173 0 0         $opt{sl_dbh}->do("vacuum") if $drop;
174             }
175              
176             sub mirror_functions {
177 0     0 0   my %opt = @_;
178 0 0         unless ($opt{sl_dbh}->selectrow_array("select name from sqlite_master where name = 'pglite_functions' and type = 'table'")) {
179 0           $opt{sl_dbh}->do("CREATE TABLE pglite_functions (name text, argnum int, type text, sql text, primary key (name,argnum))");
180             }
181 0           my $langnum = $opt{pg_dbh}->selectrow_array("select oid from pg_language where lanname = 'sql'");
182 0           my $snum = $opt{pg_dbh}->selectrow_array("select oid from pg_namespace where nspname = ?",{},$opt{schema});
183 0           my $fun = $opt{pg_dbh}->selectall_arrayref("select proname, pronargs, prosrc from pg_proc where prolang = ? and pronamespace = ?",{},$langnum,$snum);
184 0 0         print "FUNCTIONS:\n" if $opt{verbose};
185 0 0 0       return unless ref $fun eq 'ARRAY' && @$fun;
186 0           for (@$fun) {
187 0           my ($name,$argnum,$sql) = @$_;
188 0 0         unless ($opt{sl_dbh}->selectrow_array("select name from pglite_functions where name = ? and argnum = ?",{},$name,$argnum)) {
189 0 0         print " - $name ($argnum)\n" if $opt{verbose};
190 0           $opt{sl_dbh}->do("insert into pglite_functions (name,argnum,type,sql) values (?,?,'sql',?)",{},$name,$argnum,$sql);
191             }
192             }
193             }
194              
195             sub create_indexes {
196 0     0 0   my ($tn,%opt) = @_;
197 0           my $sn = $opt{schema};
198 0           my $ixn = $opt{pg_dbh}->selectcol_arrayref("select indexdef from pg_indexes where schemaname = ? and tablename = ? ", {}, $sn,$tn);
199 0           for (@$ixn) {
200 0 0         next if /\(oid\)/;
201 0 0         next if /_pkey\b/; # No need to recreate primary keys
202 0           s/USING btree //;
203 0           s/ ON \w+\.\"?([^\"]+)\"?/ ON $1/;
204 0 0         print " + $_\n" if $opt{verbose};
205 0           eval { $opt{sl_dbh}->do($_); };
  0            
206             # Pg supports multiple null values in unique columns - SQLite doesn't
207 0 0 0       if ($@ =~ /unique/i && s/ UNIQUE / /) {
208 0 0         print " + retry: $_\n" if $opt{verbose};
209 0           eval { $opt{sl_dbh}->do($_); };
  0            
210             }
211             }
212             }
213              
214             sub create_view {
215 0     0 0   my ($vn,%opt) = @_;
216 0           my $sn = $opt{schema};
217 0           my $def = $opt{pg_dbh}->selectrow_array("select definition from pg_views where schemaname = ? and viewname = ?",{},$sn,$vn);
218 0 0         print " - $sn.$vn\n" if $opt{verbose};
219 0           $def =~ s/::\w+//g; # casting is not supported in SQLite
220 0 0         if ($opt{sl_dbh}->selectrow_array("select name from sqlite_master where name = ? and type = 'view'",{},$vn)) {
221 0           eval { $opt{sl_dbh}->do("drop view $vn") };
  0            
222             }
223 0           eval { $opt{sl_dbh}->do("create view $vn as $def"); };
  0            
224 0 0         warn " *** COULD NOT CREATE VIEW $sn.$vn *** \n" if $@;
225             }
226              
227             sub tablelist {
228 0     0 0   my ($pg,$sn,@pats) = @_;
229 0           my %tables; # prevent duplicate table names
230 0           for my $pat (@pats) {
231 0 0         if ($pat =~ s/^\/(.+)\/$/$1/) {
232 0           my $res = $pg->selectcol_arrayref("select tablename from pg_tables where lower(schemaname) = lower('$sn') and tablename ~* '$pat'");
233 0           $tables{$_}++ for @$res;
234             } else {
235 0           $tables{$pat}++;
236             }
237             }
238 0           return keys %tables;
239             }
240              
241             sub viewlist {
242 0     0 0   my ($pg,$sn,@pats) = @_;
243 0           my %views; # prevent duplicate table names
244 0           for my $pat (@pats) {
245 0 0         if ($pat =~ s/^\/(.+)\/$/$1/) {
246 0           my $res = $pg->selectcol_arrayref("select viewname from pg_views where lower(schemaname) = lower('$sn') and viewname ~* '$pat'");
247 0           $views{$_}++ for @$res;
248             } else {
249 0           $views{$pat}++;
250             }
251             }
252 0           return keys %views;
253             }
254              
255              
256             sub get_schema {
257 0     0 0   my ($sn,$tn,%opt) = @_;
258             # Constructing a schema definition can be rather slow,
259             # so we cache the result for up to a week
260 0           my @cached = cached_schema($sn,$tn,undef,undef,%opt);
261 0 0         return @cached if @cached;
262 0           my @cdef = col_def($sn,$tn,%opt);
263 0           my $colcnt = scalar @cdef;
264 0           my @pknames = $opt{pg_dbh}->primary_key(undef,$sn,$tn);
265 0 0 0       push @cdef, "primary key (" . join(',',@pknames) . ")" if @pknames && $pknames[0] ne '';
266 0           my $create = "create table $tn (\n ".join(",\n ",@cdef)."\n)\n";
267 0           cached_schema($sn,$tn,$create,$colcnt,%opt);
268 0           return ($create, $colcnt);
269             }
270              
271             sub cached_schema {
272 0     0 0   my ($sn,$tn,$creat,$cnt,%opt) = @_;
273 0 0         my $database = $opt{pg_dbh}->{mbl_dbh}
274             ? $opt{pg_dbh}->{mbl_dbh}->[0]->{Name}
275             : $opt{pg_dbh}->{Name};
276 0 0         unless (-d $opt{cachedir}) {
277 0           mkdir $opt{cachedir};
278 0           chmod 0777, $opt{cachedir};
279             }
280 0   0       my $uid = (getpwuid($>))[0] || $>;
281 0 0         mkdir "$opt{cachedir}/$uid" unless -d "$opt{cachedir}/$uid";
282 0           my $fn = "$opt{cachedir}/$uid/$database.$sn.$tn";
283 0 0 0       if ($cnt) {
    0          
284 0           Storable::store [$creat,$cnt], $fn;
285             } elsif (-f $fn && time-(stat $fn)[9]<7*24*60*60) {
286 0   0       my $ret = Storable::retrieve $fn || [];
287 0           return @$ret;
288             } else {
289 0           return ();
290             }
291             }
292              
293             sub col_def {
294 0     0 0   my ($sn,$tn,%opt) = @_;
295 0           my $sth = $opt{pg_dbh}->column_info(undef,$sn,$tn,undef);
296 0           $sth->execute;
297 0           my $res = $sth->fetchall_arrayref;
298 0           my @ret;
299 0           foreach my $ci (@$res) {
300 0           my ($colnam,$typnam,$nullable) = @{$ci}[qw(3 5 10)]; #)];
  0            
301 0 0         my $notnull = $nullable ? "" : " not null";
302 0           push @ret, "$colnam $typnam$notnull";
303             }
304 0           $sth->finish;
305 0           return @ret;
306             }
307              
308             sub lockfile {
309 0     0 0   my ($action,$lockfile) = @_;
310 0 0         if ($action eq 'create') {
    0          
311 0 0         die "ERROR: Lockfile $lockfile exists - cannot continue" if -f $lockfile;
312 0 0         open LOCK, ">", "$lockfile" or die "ERROR: Could not open lockfile $lockfile: $!";
313 0           print LOCK $$;
314 0           close LOCK;
315             } elsif ($action eq 'clear') {
316 0 0         if (-f $lockfile) {
317 0 0         unlink $lockfile or die "ERROR: Could not remove lockfile $lockfile: $!";
318             } else {
319 0 0         warn "WARNING: Lockfile $lockfile does not exist - cannot clear" unless -f $lockfile;
320             }
321             }
322             }
323              
324             1;
325              
326             __END__