File Coverage

blib/lib/Bio/ConnectDots/DB.pm
Criterion Covered Total %
statement 16 18 88.8
branch n/a
condition n/a
subroutine 6 6 100.0
pod n/a
total 22 24 91.6


line stmt bran cond sub pod time code
1             package Bio::ConnectDots::DB;
2 16     16   22654 use vars qw(@ISA @AUTO_ATTRIBUTES @OTHER_ATTRIBUTES %SYNONYMS);
  16         29  
  16         5493  
3 16     16   99 use strict;
  16         28  
  16         1784  
4 16     16   81499 use DBI;
  16         417457  
  16         1239  
5 16     16   190 use File::Path;
  16         63  
  16         1392  
6 16     16   29739 use Class::AutoClass;
  16         618726  
  16         576  
7 16     16   63248 use Class::AutoClass::Args;
  0            
  0            
8             use Bio::ConnectDots::DotSet;
9             use Bio::ConnectDots::ConnectorSet;
10             @ISA = qw(Class::AutoClass);
11              
12             @AUTO_ATTRIBUTES=qw(dsn dbh dbd database host port user password
13             read_only read_only_schema
14             _needs_disconnect _db_cursor _exists
15             load_name load_save load_chunksize load_cid_base
16             _ext_directory _load_fh _load_count _load_chunk sql_log
17             );
18             @OTHER_ATTRIBUTES=qw(ext_directory);
19             %SYNONYMS=(server=>'host');
20             Class::AutoClass::declare(__PACKAGE__);
21              
22             # use 'double quotations to get case-sensitivity in label
23             # use 'not null' wherever possible to help query optimizier use indexes better
24             # denormalized connector to cut down the number of joins in big queries
25             my %SCHEMA=
26             (connectorset=>
27             qq(connectorset_id SERIAL,
28             "name" VARCHAR(255) NOT NULL,
29             "file_name" TEXT,
30             "version" VARCHAR(255) NOT NULL,
31             "source_date" VARCHAR(255),
32             "source_version" VARCHAR(255),
33             "download_date" VARCHAR(255),
34             "ftp" TEXT,
35             "ftp_files" TEXT,
36             "comment" TEXT,
37             PRIMARY KEY("connectorset_id"),UNIQUE("name","version")),
38             dotset=>
39             qq(dotset_id SERIAL,
40             "name" VARCHAR(255) NOT NULL,
41             PRIMARY KEY(dotset_id),UNIQUE("name")),
42             connectdotset=>
43             qq(connectdotset_id SERIAL,
44             connectorset_id INT NOT NULL,
45             dotset_id INT NOT NULL,
46             label_id INT NOT NULL,
47             PRIMARY KEY(connectdotset_id)),
48             label=>
49             qq(label_id SERIAL,
50             "label" VARCHAR(255) NOT NULL,
51             "source_label" VARCHAR(255),
52             "description" TEXT,
53             PRIMARY KEY(label_id),UNIQUE("label")),
54             connectortable=>
55             qq(connectortable_id SERIAL,
56             "name" VARCHAR(255) NOT NULL,
57             PRIMARY KEY(connectortable_id),UNIQUE("name")),
58             connectortableset=>
59             qq(connectortable_id INT NOT NULL,
60             connectorset_id INT NOT NULL,
61             "alias" VARCHAR(255) NOT NULL,
62             UNIQUE(connectortable_id,"alias")),
63             dottable=>
64             qq(dottable_id SERIAL,
65             "name" VARCHAR(255) NOT NULL,
66             PRIMARY KEY(dottable_id),UNIQUE("name")),
67             dottableset=>
68             qq(dottable_id INT NOT NULL,
69             dotset_id INT NOT NULL,
70             label_id INT NOT NULL,
71             cs_id INT NOT NULL,
72             "alias" VARCHAR(255) NOT NULL,
73             UNIQUE(dottable_id,"alias")),
74              
75             connectdot=>
76             qq(connector_id INT NOT NULL,
77             connectorset_id INT NOT NULL,
78             dot_id INT NOT NULL,
79             label_id INT NOT NULL,
80             "id" TEXT NOT NULL),
81             dot=>
82             qq(dot_id SERIAL,
83             dotset_id INT NOT NULL,
84             "id" TEXT NOT NULL,
85             PRIMARY KEY(dot_id),UNIQUE("id",dotset_id)),
86              
87             cdload=>
88             qq(connector_id INT NOT NULL,
89             connectorset_id INT NOT NULL,
90             dotset_id INT NOT NULL,
91             label_id INT NOT NULL,
92             "id" TEXT NOT NULL),
93             );
94              
95             my %INDICIES = (
96             connectdot=>
97             ['connectorset_id,connector_id,label_id',
98             'connectorset_id,dot_id,label_id',
99             'connectorset_id,label_id',
100             '"id"']
101             );
102              
103             my @INDEX_NAMES;
104              
105             my @TABLES=keys %SCHEMA;
106             # maximum number of rows loaded in one 'load infile' operation
107             my $LOAD_CHUNKSIZE=150000;
108              
109             sub _init_self {
110             my($self,$class,$args)=@_;
111             return unless $class eq __PACKAGE__; # to prevent subclasses from re-running this
112             $self->_connect;
113             return unless $self->is_connected;
114             $self->_manage_schema($args);
115             if(!$self->ext_directory) {
116             $self->ext_directory("/usr/tmp/$ENV{USER}") if $ENV{USER};
117             }
118             $self->load_chunksize or $self->load_chunksize($LOAD_CHUNKSIZE);
119             }
120              
121             sub is_connected {
122             $_[0]->dbh;
123             }
124              
125             sub connect {
126             my($self,@args)=@_;
127             my $args=new Bio::ISB::AutoArgs(@args);
128             $self->Class::AutoClass::set_attributes([qw(dbh dsn dbd host server user password)],$args);
129             $self->_connect;
130             }
131             sub _connect {
132             my($self)=@_;
133             return $self->dbh if $self->dbh; # if dbh set, then already connected
134             my $dbd=lc($self->dbd)||'Pg';
135             $self->throw("-dbd must be 'Pg' at present") if $dbd && $dbd ne 'Pg';
136             my $dsn=$self->dsn;
137             if ($dsn) { # parse off the dbd, database, host elements
138             $dsn = "DBI:$dsn" unless $dsn=~ /^dbi/i;
139             } else {
140             my $database=$self->database;
141             my $host=$self->host;
142             my $port=$self->port;
143             return undef unless $database;
144             $dsn="DBI:$dbd:dbname=$database;";
145             $dsn .= "host=$host;" if $host;
146             $dsn .= "port=$port;" if $port;
147             }
148             # Try to establish connection with data source.
149             my $user=$self->user;
150             my $password = $self->password;
151             my $dbh = DBI->connect($dsn,$user,$password,
152             {AutoCommit=>1, ChopBlanks=>1, PrintError=>0, Warn=>0,});
153             $self->dsn($dsn);
154             $self->dbh($dbh);
155             $self->_needs_disconnect(1);
156             $self->throw("DBI::connect failed for dsn=$dsn, username=$user: ".DBI->errstr) unless $dbh;
157             return $dbh;
158             }
159             sub _manage_schema {
160             my($self,$args)=@_;
161             # grab schema modification parameters
162             my $read_only_schema=$self->read_only_schema || $self->read_only;
163             my $drop=$args->drop;
164             my $create=$args->create;
165             $self->throw("Schema changes not allowed by -read_only or -read_only_schema setting") if ($drop||$create) && $read_only_schema;
166             $self->drop if $drop;
167             $self->create if $create || !($self->exists && !defined $create);
168             }
169              
170             # returns 1 if all tables exist, -1 if some exist, 0 if none exist
171             # note that Perl treats -1 as 'true'
172             sub exists {
173             my($self,$doit)=@_;
174             return $self->_exists if !$doit && defined $self->_exists;
175             $self->throw("Cannot check schema: database is not connected") unless $self->is_connected;
176             my $dbh=$self->dbh;
177             my $tables=$dbh->selectall_arrayref(qq(select tablename from pg_tables where schemaname='public'));
178             my $count;
179             for my $table (@TABLES) {
180             $count++ if grep {$table eq $_->[0]} @$tables;
181             }
182             my $exists;
183             $exists=0 if $count==0;
184             $exists=1 if $count==@TABLES;
185             $exists=-1 if $count>0 && $count!=@TABLES;
186             $self->_exists($exists);
187             }
188             sub drop {
189             my $self=shift;
190             $self->throw("Cannot drop database: database is not connected") unless $self->is_connected;
191             my @sql;
192             foreach my $tbl (@TABLES) {
193             push ( @sql, qq(DROP TABLE $tbl) ) if table_exist($tbl);
194             }
195             foreach my $indx (@INDEX_NAMES) {
196             push(@sql, qq(DROP INDEX $indx));
197              
198             }
199             $self->do_sql(@sql);
200             $self->exists('DOIT'); # make sure schema was really dropped
201             }
202              
203             ### Returns true (1) if table exists in database, 0 otherwise
204             sub table_exist {
205             my ($self, $table_name)=@_;
206             $self->throw("Cannot create database: database is not connected") unless $self->is_connected;
207             $table_name = lc($table_name);
208             my $query = "SELECT tablename FROM pg_tables WHERE tablename='$table_name'";
209             my $dbh=$self->dbh;
210             my $rslt = $dbh->selectrow_arrayref($query);
211             return $rslt ? 1 : 0;
212             }
213              
214             sub create {
215             my $self=shift;
216             $self->throw("Cannot create database: database is not connected") unless $self->is_connected;
217             $self->drop if $self->exists;
218             my @sql;
219             while(my($table,$schema)=each %SCHEMA) {
220             push(@sql,qq(CREATE TABLE $table ($schema)));
221             if ($INDICIES{$table}) {
222             my $num=0;
223             foreach my $tbl_index (@{ $INDICIES{$table} }) {
224             my $index_name = $table .'_index_'. ($num+1);
225             push( @INDEX_NAMES, $index_name );
226             $INDICIES{$table}->[$num] eq 'id'?
227             push( @sql, qq(CREATE INDEX $index_name ON $table USING BTREE ($INDICIES{$table}->[$num])) ) :
228             push( @sql, qq(CREATE INDEX $index_name ON $table ($INDICIES{$table}->[$num])) );
229             $num++;
230             }
231             }
232             }
233             $self->do_sql(@sql);
234             $self->exists('DOIT'); # make sure schema was really created
235             }
236             sub analyze {
237             my $self=shift;
238             $self->throw("Cannot analyze database: database is not connected") unless $self->is_connected;
239             my @sql=map {qq(ANALYZE $_)} @TABLES;
240             $self->do_sql(@sql);
241             }
242             # load dots and connectdots
243             sub load_init {
244             my($self,$load_name,$load_save,$load_chunksize)=@_;
245             my $max=$self->dbh->selectrow_array
246             (qq(select max(connector_id) from connectdot)) || 0;
247             $self->set
248             (load_name=>$load_name,
249             load_save=>$load_save,
250             load_chunksize=>$load_chunksize||$LOAD_CHUNKSIZE,
251             load_cid_base=>$max,
252             _load_fh=>undef,_load_count=>0,_load_chunk=>0);
253             }
254             sub load_row {
255             my($self,$connector_id,$connectorset_id,$id,$dotset_id,$label_id)=@_;
256             my($ext_directory,$load_name,$load_fh,$load_count,$load_chunk)=
257             $self->get(qw(ext_directory load_name _load_fh _load_count _load_chunk));
258             my $load_file="$ext_directory/load.$load_name.$load_chunk";
259             if (!defined $load_fh) {
260             open($load_fh, "> $load_file") || $self->throw("Cannot open load file $load_file: $!");
261             $self->_load_fh($load_fh);
262             } elsif ($load_count>=$self->load_chunksize) {
263             close $load_fh;
264             $self->load($load_file);
265             $load_chunk++;
266             $load_count=0;
267             my $load_file="$ext_directory/load.$load_name.$load_chunk"; # bug found by YW 04-01-15
268             open($load_fh, "> $load_file") || $self->throw("Cannot open load file $load_file: $!");
269             $self->set(_load_fh=>$load_fh,_load_chunk=>$load_chunk);
270             }
271             $connector_id+=$self->load_cid_base;
272             $id=$self->escape($id); # escape special chars
273             print $load_fh join("\t",$connector_id,$connectorset_id,$dotset_id,$label_id,$id),"\n";
274             $self->_load_count($load_count+1);
275             }
276             sub load_finish {
277             my($self)=@_;
278             my($ext_directory,$load_name,$load_fh,$load_count,$load_chunk)=
279             $self->get(qw(ext_directory load_name _load_fh _load_count _load_chunk));
280             if (defined $load_fh) {
281             close $load_fh;
282             my $load_file="$ext_directory/load.$load_name.$load_chunk";
283             $self->load($load_file,'last');
284             }
285             }
286             sub load {
287             my($self,$load_file,$last)=@_;
288             my $dbh=$self->dbh;
289             my @sql;
290             push(@sql,
291             qq(set enable_hashjoin to off),
292             qq(set enable_mergejoin to off));
293             push(@sql, # load data
294             qq(COPY cdload (connector_id,connectorset_id,dotset_id,label_id,id) FROM '$load_file'));
295             push(@sql, qq(SELECT cdload.connector_id,cdload.connectorset_id,cdload.dotset_id,dot.dot_id,cdload.label_id,cdload.id
296             INTO TABLE cdload_dot
297             FROM cdload LEFT JOIN dot ON cdload.id=dot.id));
298             push(@sql,qq(INSERT INTO dot (dotset_id,id) SELECT DISTINCT dotset_id,id FROM cdload_dot WHERE dot_id IS NULL));
299             push(@sql,qq(INSERT INTO connectdot (connector_id,connectorset_id,dot_id,label_id,id)
300             SELECT connector_id,connectorset_id,dot_id,label_id,id FROM cdload_dot WHERE dot_id IS NOT NULL));
301             push(@sql,qq(INSERT INTO connectdot (connector_id,connectorset_id,dot_id,label_id,id)
302             SELECT cdload_dot.connector_id,cdload_dot.connectorset_id,dot.dot_id,cdload_dot.label_id,cdload_dot.id
303             FROM cdload_dot,dot
304             WHERE cdload_dot.dot_id IS NULL AND cdload_dot.id=dot.id));
305             push(@sql,qq(DROP TABLE cdload));
306             push(@sql,qq(CREATE TABLE cdload ($SCHEMA{'cdload'})));
307             push(@sql,qq(DROP TABLE cdload_dot));
308             push(@sql,qq(ANALYZE));
309             $self->do_sql(@sql);
310             $self->do_sql(qq(set enable_hashjoin to on));
311             $self->do_sql(qq(set enable_mergejoin to on));
312             unlink($load_file) unless $self->load_save eq 'all' || ($last && $self->load_save eq $last) ;
313             }
314              
315             sub ext_directory {
316             my $self=shift;
317             if (@_) {
318             my $ext_directory=shift;
319             mkpath([$ext_directory]) if $ext_directory;
320             return $self->_ext_directory($ext_directory);
321             }
322             $self->_ext_directory;
323             }
324              
325             sub create_table_sql {
326             my($self,$name,$sql,$indexed_columns,$sql_columns)=@_;
327             $name = lc($name); # Postgres has inconsistent support for capitalization of table names
328             my @sql;
329             push (@sql, "DROP TABLE $name") if $self->table_exist($name);
330             push (@sql, "CREATE TABLE $name AS $sql");
331            
332             my $num=0;
333             foreach (@$indexed_columns) {
334             my $index_name = $name ."_index_".$_ . $num ;
335             push( @INDEX_NAMES, $index_name );
336             push( @sql, qq(CREATE INDEX $index_name ON $name ($_)) );
337             $num++;
338             }
339             push (@sql, "ANALYZE $name");
340             $self->do_sql(@sql);
341             }
342              
343              
344             sub create_file_sql {
345             my($self,$file,$sql)=@_;
346             unlink($file);
347             # print "$sql ",`date`;
348             my $dbh=$self->dbh;
349             $dbh->do($sql) || $self->throw($dbh->errstr);
350             }
351             sub do_sql {
352             my $self=shift;
353             my @sql=_flatten(@_);
354             $self->throw("Cannot run SQL: database is not connected") unless $self->is_connected;
355             my $dbh=$self->dbh;
356             for my $sql (@sql) {
357             if($self->sql_log) {
358             my $file = $self->sql_log;
359             open (LOG, ">>$file") or $self->throw("Can not open SQL log file: $file");
360             print LOG "#", `date`;
361             print LOG "$sql\n\n";
362             close(LOG);
363             }
364             $dbh->do($sql) || do { print "### SQL: $sql\n"; $self->throw($dbh->errstr); }
365             }
366             }
367              
368             sub quote {
369             my($self,$value)=@_;
370             $self->dbh->quote($value);
371             }
372             sub quote_dot {
373             my($self,$value)=@_;
374             $self->dbh->quote($value);
375             }
376              
377             sub escape {
378             my($self,$field)=@_;
379             my $q_field=$self->dbh->quote($field);
380             $q_field=~s/^\'|\'$//g;
381             $q_field;
382             }
383             sub _flatten {map {'ARRAY' eq ref $_? @$_: $_} @_;}
384              
385              
386              
387             1;
388             __END__