File Coverage

blib/lib/DBIx/DBStag.pm
Criterion Covered Total %
statement 10 12 83.3
branch n/a
condition n/a
subroutine 4 4 100.0
pod n/a
total 14 16 87.5


line stmt bran cond sub pod time code
1             # $Id: DBStag.pm,v 1.59 2008/02/06 00:50:55 cmungall Exp $
2             # -------------------------------------------------------
3             #
4             # Copyright (C) 2002 Chris Mungall
5             #
6             # This module is free software.
7             # You may distribute this module under the same terms as perl itself
8              
9             #---
10             # POD docs at end of file
11             #---
12              
13             package DBIx::DBStag;
14              
15 15     15   146355 use strict;
  15         35  
  15         778  
16 15     15   81 use vars qw($VERSION @ISA @EXPORT_OK %EXPORT_TAGS $DEBUG $AUTOLOAD);
  15         30  
  15         1402  
17 15     15   80 use Carp;
  15         32  
  15         968  
18 15     15   25838 use DBI;
  0            
  0            
19             use Data::Stag qw(:all);
20             use DBIx::DBSchema;
21             use Text::Balanced qw(extract_bracketed);
22             #use SQL::Statement;
23             use Parse::RecDescent;
24             $VERSION='0.12';
25              
26              
27             our $DEBUG;
28             our $TRACE = $ENV{DBSTAG_TRACE};
29              
30             sub DEBUG {
31             $DEBUG = shift if @_;
32             return $DEBUG;
33             }
34              
35             sub trace {
36             my ($priority, @msg) = @_;
37             return unless $ENV{DBSTAG_TRACE};
38             print STDERR "@msg\n";
39             }
40              
41             sub dmp {
42             use Data::Dumper;
43             print Dumper shift;
44             }
45              
46             sub force {
47             my $self = shift;
48             $self->{_force} = shift if @_;
49             return $self->{_force};
50             }
51              
52              
53             sub new {
54             my $proto = shift;
55             my $class = ref($proto) || $proto;
56             my ($dbh) =
57             rearrange([qw(dbh)], @_);
58              
59             my $self = {};
60             bless $self, $class;
61             if ($dbh) {
62             $self->dbh($dbh);
63             }
64             $self;
65             }
66              
67              
68             sub connect {
69             my $class = shift;
70             my $dbi = shift;
71             my $self;
72             if (ref($class)) {
73             $self = $class;
74             }
75             else {
76             $self = {};
77             bless $self, $class;
78             }
79             $dbi = $self->resolve_dbi($dbi);
80             eval {
81             $self->dbh(DBI->connect($dbi, @_));
82             };
83             if ($@ || !$self->dbh) {
84             my $mapf = $ENV{DBSTAG_DBIMAP_FILE};
85             if ($dbi =~ /^dbi:(\w+)/) {
86             print STDERR <
87              
88             Could not connect to database: "$dbi"
89              
90             EITHER The required DBD driver "$1" is not installed
91             OR There is no such database as "$dbi"
92              
93             EOM
94             ;
95             }
96             else {
97             print STDERR <
98              
99             Could not connect to database: "$dbi"
100              
101             To connect to a database, you need to set the environment variable
102             DBSTAG_DBIMAP_FILE to the location of your DBI Stag resources file, OR
103             you need to specify the full dbi string of the database
104              
105             A dbi string looks like this:
106              
107             dbi:Pg:dbname=foo;host=mypgserver.foo.com
108              
109             A resources file provides mappings from logical names like "foo" to
110             full DBI locators suchas the one above
111              
112             Please type "man DBI" for more information on DBI strings
113              
114             If you are specifying a valid DBI locator or valid logical name and
115             still connect, check the database server is responding
116              
117             EOM
118             ;
119             }
120             die;
121             }
122             # HACK
123             $self->dbh->{RaiseError} = 1;
124             $self->dbh->{ShowErrorStatement} = 1;
125             if ($dbi =~ /dbi:([\w\d]+)/) {
126             $self->{_driver} = $1;
127             }
128             $self->setup;
129             return $self;
130             }
131              
132             sub resolve_dbi {
133             my $self = shift;
134             my $dbi = shift;
135             if (!$dbi) {
136             $self->throw("database name not provided!");
137             }
138             if ($dbi !~ /^dbi:/) {
139             my $rh = $self->resources_hash;
140             my $res =
141             $rh->{$dbi};
142             if (!$res) {
143             if ($dbi =~ /:/) {
144             $res =
145             {loc=>"$dbi"}
146             }
147             else {
148             $res =
149             {loc=>"Pg:$dbi"};
150             }
151             }
152             if ($res) {
153             my $loc = $res->{loc};
154             if ($loc =~ /(\S+?):(\S+)\@(\S+)/) {
155             my $dbms = $1;
156             my $dbn = $2;
157             my $host = $3;
158             my $extra = '';
159             if ($host =~ /(\S+?):(.*)/) {
160             $host = $1;
161             $extra = ":$2";
162             }
163             if ($dbms =~ /pg/i) {
164             $dbi = "dbi:Pg:dbname=$dbn;host=$host$extra";
165             }
166             elsif ($dbms =~ /db2/i) {
167             $dbi = "dbi:Pg:$dbn;host=$host$extra";
168             }
169             else {
170             # default - tested on MySQL
171             $dbi = "dbi:$dbms:database=$dbn:host=$host$extra";
172             }
173             }
174             elsif ($loc =~ /(\S+):(\S+)$/) {
175             my $dbms = $1;
176             my $dbn = $2;
177             $dbi = "dbi:$dbms:database=$dbn";
178             if ($dbms =~ /pg/i) {
179             $dbi = "dbi:Pg:dbname=$dbn";
180             }
181             }
182             else {
183             $self->throw("$dbi -> $loc does not conform to standard.\n".
184             ":\@");
185             }
186             }
187             else {
188             $self->throw("$dbi is not a valid DBI locator.\n");
189             }
190             }
191             return $dbi;
192             }
193              
194             sub resources_hash {
195             my $self = shift;
196             my $mapf = $ENV{DBSTAG_DBIMAP_FILE};
197             my $rh;
198             if ($mapf) {
199             if (-f $mapf) {
200             $rh = {};
201             open(F, $mapf) || $self->throw("Cannot open $mapf");
202             while () {
203             chomp;
204             next if /^\#/;
205             s/^\!//;
206             my @parts =split(' ', $_);
207             next unless (@parts >= 3);
208             my ($name, $type, $loc, $tagstr) =@parts;
209             my %tagh = ();
210             if ($tagstr) {
211             my @parts = split(/;\s*/, $tagstr);
212             foreach (@parts) {
213             my ($t, $v) = split(/\s*=\s*/, $_);
214             $tagh{$t} = $v;
215             }
216             }
217             $rh->{$name} =
218             {
219             %tagh,
220             name=>$name,
221             type=>$type,
222             loc=>$loc,
223             tagstr=>$tagstr,
224             };
225             }
226             close(F) || $self->throw("Cannot close $mapf");
227             } else {
228             $self->throw("$mapf does not exist");
229             }
230             }
231             return $rh;
232             }
233              
234              
235             sub resources_list {
236             my $self = shift;
237             my $rh =
238             $self->resources_hash;
239             my $rl;
240             if ($rh) {
241             $rl =
242             [map {$_} values %$rh];
243             }
244             return $rl;
245             }
246              
247             sub find_template {
248             my $self = shift;
249             my $tname = shift;
250             my $path = $ENV{DBSTAG_TEMPLATE_DIRS} || '';
251             my $tl = $self->template_list;
252             my ($template, @rest) = grep {$tname eq $_->name} @$tl;
253              
254             if (!$template) {
255             print STDERR "\n\nI could not find the Stag SQL template called \"$tname\".\n";
256             if (!$path) {
257             print STDERR <
258              
259             In order to do use this or any other template, you need to set the environment
260             variable DBSTAG_TEMPLATE_DIRS to the directory or a set of directories
261             containing SQL templates. For example
262              
263             setenv DBSTAG_TEMPLATE_DIRS=".:\$HOME/my-sql-templates:/usr/share/system-sql-templates"
264              
265             EOM1
266             ;
267             }
268             else {
269             print STDERR <
270              
271             I am looking in the following directories:
272              
273             $path
274              
275             Check the contents of the directory to see if the stag sql template
276             you require is there, and is readable by you. Stag SQL templates
277             should end with the suffix ".stg"
278              
279             If you wish to search other directories, set the environment variable
280             DBSTAG_TEMPLATE_DIRS, like this:
281              
282             setenv DBSTAG_TEMPLATE_DIRS=".:\$HOME/my-sql-templates:$path"
283              
284             EOM2
285             ;
286             }
287             $self->throw("Could not find template \"$tname\" in: $path");
288             }
289             return $template;
290             }
291              
292             sub find_templates_by_schema {
293             my $self = shift;
294             my $schema = shift;
295             my $tl = $self->template_list;
296              
297             my @templates = grep {$_->stag_props->tmatch('schema', $schema)} @$tl;
298            
299             return \@templates;
300             }
301              
302             sub find_templates_by_dbname {
303             my $self = shift;
304             my $dbname = shift;
305             my $res = $self->resources_hash->{$dbname};
306             my $templates;
307             if ($res) {
308             my $schema = $res->{schema} || '';
309             if ($schema) {
310             $templates = $self->find_templates_by_schema($schema);
311             }
312             else {
313             # unknown schema - show all templates
314             # $templates = $self->template_list;
315             }
316             }
317             else {
318             $self->throw("unknown db: $dbname");
319             }
320             return $templates;
321             }
322              
323             sub template_list {
324             my $self = shift;
325             my %already_got = ();
326             if (!$self->{_template_list}) {
327             my $path = $ENV{DBSTAG_TEMPLATE_DIRS} || '.';
328             my @dirs = split(/:/, $path);
329             my @templates = ();
330              
331             foreach my $dir (@dirs) {
332             foreach my $fn (glob("$dir/*.stg")) {
333             if (-f $fn) {
334             require "DBIx/DBStag/SQLTemplate.pm";
335             my $template = DBIx::DBStag::SQLTemplate->new;
336             $template->parse($fn);
337             push(@templates, $template) unless $already_got{$template->name};
338             $already_got{$template->name} = 1;
339             }
340             }
341             }
342             $self->{_template_list} = \@templates;
343             }
344             return $self->{_template_list};
345             }
346              
347             sub find_schema {
348             my $self = shift;
349             my $dbname = shift;
350             my $rl = $self->resouces_list || [];
351             my ($r) = grep {$_->{name} eq $_ ||
352             $_->{loc} eq $_} @$rl;
353             if ($r) {
354             return $r->{schema};
355             }
356             return;
357             }
358              
359             sub setup {
360             my $self = shift;
361             return;
362             }
363              
364             # counter
365             sub next_id {
366             my $self = shift;
367             $self->{_next_id} = shift if @_;
368             $self->{_next_id} = 0 unless $self->{_next_id};
369             return ++$self->{_next_id};
370             }
371              
372              
373             sub dbh {
374             my $self = shift;
375             $self->{_dbh} = shift if @_;
376             return $self->{_dbh};
377             }
378              
379             sub dbschema {
380             my $self = shift;
381             $self->{_dbschema} = shift if @_;
382             if (!$self->{_dbschema}) {
383             if (!$self->dbh) {
384             confess("you must establish connection using connect() first");
385             }
386             $self->dbschema(DBIx::DBSchema->new_native($self->dbh));
387             # my $sth = $self->dbh->table_info(undef, undef, undef, 'VIEW') or die $self->dbh->errstr;
388             # use Data::Dumper;
389             # print Dumper $sth->fetchall_arrayref([2,3]);
390             }
391             return $self->{_dbschema};
392             }
393              
394             sub parser {
395             my $self = shift;
396             $self->{_parser} = shift if @_;
397             if (!$self->{_parser}) {
398             $self->{_parser} = Parse::RecDescent->new($self->selectgrammar());
399             }
400             return $self->{_parser};
401             }
402              
403             sub warn {
404             my $self = shift;
405             my $fmt = shift;
406              
407             print STDERR "\nWARNING:\n";
408             printf STDERR $fmt, @_;
409             print STDERR "\n";
410             }
411              
412             sub throw {
413             my $self = shift;
414             my $fmt = shift;
415              
416             print STDERR "\nERROR:\n";
417             printf STDERR $fmt, @_;
418             print STDERR "\n";
419             confess;
420             }
421              
422             sub get_pk_col {
423             my $self = shift;
424             my $table = shift;
425            
426             my $tableobj = $self->dbschema->table(lc($table));
427             if (!$tableobj) {
428             confess("Can't get table $table from db.\n".
429             "Maybe DBIx::DBSchema does not work with your database?");
430             }
431             return $tableobj->primary_key;
432             }
433              
434             sub is_table {
435             my $self = shift;
436             my $tbl = shift;
437             return 1 if $self->dbschema->table($tbl);
438             }
439              
440             sub is_col {
441             my $self = shift;
442             my $col = shift;
443             if ($self->{_is_col_h}) {
444             return $self->{_is_col_h}->{$col}
445             }
446             my @tablenames = $self->dbschema->tables;
447             my @allcols =
448             map {
449             $self->get_all_cols($_);
450             } @tablenames;
451             my %h = map {$_=>1} @allcols;
452             $self->{_is_col_h} = \%h;
453             return $self->{_is_col_h}->{$col};
454             }
455              
456             # ASSUMPTION: pk names same as fk names
457             sub is_fk_col {
458             my $self = shift;
459             my $col = shift;
460             # HACK!!!
461             # currently dbschema does not know about FKs
462             return 1 if $col =~ /_id$/;
463             if ($self->{_is_fk_col_h}) {
464             return $self->{_is_fk_col_h}->{$col}
465             }
466             my @tablenames = $self->dbschema->tables;
467             my %h = ();
468             foreach (@tablenames) {
469             my $pk = $self->dbschema->table($_)->primary_key;
470             $h{$pk} =1 if $pk;
471             }
472             $self->{_is_fk_col_h} = \%h;
473             return $self->{_is_fk_col_h}->{$col};
474             }
475              
476             sub is_pk_col {
477             my $self = shift;
478             my $col = shift;
479             if ($self->{_is_pk_col_h}) {
480             return $self->{_is_pk_col_h}->{$col}
481             }
482             my @tablenames = $self->dbschema->tables;
483             my %h = ();
484             foreach (@tablenames) {
485             my $pk = $self->dbschema->table($_)->primary_key;
486             $h{$pk} =1 if $pk;
487             }
488             $self->{_is_pk_col_h} = \%h;
489             return $self->{_is_pk_col_h}->{$col};
490             }
491              
492             sub get_all_cols {
493             my $self = shift;
494             my $table = shift;
495            
496             my $tableobj = $self->dbschema->table(lc($table));
497             if (!$tableobj) {
498             confess("Can't get table $table from db.\n".
499             "Maybe DBIx::DBSchema does not work with your database?");
500             }
501             return $tableobj->columns;
502             }
503              
504             sub get_unique_sets {
505             my $self = shift;
506             my $table = shift;
507              
508             my $tableobj = $self->dbschema->table(lc($table));
509             if (!$tableobj) {
510             confess("Can't get table $table from db.\n".
511             "Maybe DBIx::DBSchema does not work with your database?");
512             }
513             if ($ENV{OLD_DBIX_DBSCHEMA}) {
514             return @{$tableobj->unique->lol_ref || []};
515             }
516             else {
517             my %indices = $tableobj->indices;
518             my @unique_indices = grep {$_->unique} values %indices;
519             return map {$_->columns} @unique_indices;
520             }
521             }
522              
523             sub mapconf {
524             my $self = shift;
525             my $fn = shift;
526             my $fh = FileHandle->new($fn) || confess("cannot open $fn");
527             my @mappings = <$fh>;
528             $fh->close;
529             $self->mapping(\@mappings);
530             }
531              
532             sub mapping {
533             my $self = shift;
534             if (@_) {
535             my $ml = shift;
536             my @nu =
537             map {
538             if (ref($_)) {
539             Data::Stag->nodify($_);
540             }
541             else {
542             if (/^(\w+)\/(\w+)\.(\w+)=(\w+)\.(\w+)/) {
543             Data::Stag->new(map=>[
544             [fktable_alias=>$1],
545             [table=>$2],
546             [col=>$3],
547             [fktable=>$4],
548             [fkcol=>$5]
549             ]);
550             }
551             elsif (/^(\w+)\.(\w+)=(\w+)\.(\w+)/) {
552             Data::Stag->new(map=>[
553             [table=>$1],
554             [col=>$2],
555             [fktable=>$3],
556             [fkcol=>$4]
557             ]);
558             }
559             elsif (/^parentfk:(\w+)\.(\w+)/) {
560             Data::Stag->new(parentfk=>[
561             [table=>$1],
562             [col=>$2],
563             ]);
564             }
565             else {
566             confess("incorrectly specified mapping: $_".
567             "(must be alias/tbl.col=ftbl.fcol)");
568             ();
569             }
570             }
571             } @$ml;
572             $self->{_mapping} = \@nu;
573             }
574             return $self->{_mapping};
575             }
576              
577             sub guess_mapping {
578             my $self = shift;
579             my $dbschema = $self->dbschema;
580              
581             $self->mapping([]);
582             my %th =
583             map { $_ => $dbschema->table($_) } $dbschema->tables;
584             foreach my $tn (keys %th) {
585             my @cns = $th{$tn}->columns;
586             foreach my $cn (@cns) {
587             my $ftn = $cn;
588             $ftn =~ s/_id$//;
589             if ($th{$ftn}) {
590             push(@{$self->mapping},
591             Data::Stag->new(map=>[
592             [table=>$tn],
593             [col=>$cn],
594             [fktable=>$ftn],
595             [fkcol=>$cn]
596             ]));
597             }
598             }
599             }
600             }
601              
602             sub linking_tables {
603             my $self = shift;
604             $self->{_linking_tables} = {@_} if @_;
605             return %{$self->{_linking_tables} || {}};
606             }
607              
608             sub add_linking_tables {
609             my $self = shift;
610             my %linkh = $self->linking_tables;
611             return unless %linkh;
612             my $struct = shift;
613             foreach my $ltname (keys %linkh) {
614             my ($t1, $t2) = @{$linkh{$ltname}};
615             $struct->where($t1,
616             sub {
617             my $n=shift;
618             my @v = $n->getnode($t2);
619             return unless @v;
620             $n->unset($t2);
621             my @nv =
622             map {
623             $n->new($ltname=>[$_]);
624             } @v;
625             # $n->setnode($ltname,
626             # $n->new($ltname=>[@v]));
627             foreach (@nv) {
628             $n->addkid($_);
629             }
630             0;
631             });
632             }
633             return;
634             }
635              
636             # ----------------------------------------
637              
638             sub elt_card {
639             my $e = shift;
640             my $c = '';
641             if ($e =~ /(.*)([\+\?\*])/) {
642             ($e, $c) = ($1, $2);
643             }
644             # make the element RDB-safe
645             $e =~ s/\-//g;
646             return ($e, $c);
647             }
648              
649             sub source_transforms {
650             my $self = shift;
651             $self->{_source_transforms} = shift if @_;
652             return $self->{_source_transforms};
653             }
654              
655             sub autotemplate {
656             my $self = shift;
657             my $schema = shift;
658             return () unless grep {!stag_isterminal($_)} $schema->subnodes;
659             my @J = ();
660             my @W = ();
661             my @EXAMPLE = ();
662             my ($tname) = elt_card($schema->element);
663             my %joinpaths = ();
664            
665             $schema->iterate(sub {
666             my $n = shift;
667             my $parent = shift;
668             my ($tbl, $card) = elt_card($n->element);
669             if (!$parent) {
670             push(@J, $tbl);
671             # $joinpaths{$tbl} = $tbl;
672             return;
673             }
674             my ($ptbl) = elt_card($parent->element);
675             if (stag_isterminal($n)) {
676             my $v = $ptbl.'_'.$tbl;
677             my $w = "$ptbl.$tbl => \&$v\&";
678             if ($ptbl eq $tname) {
679             push(@W,
680             "[ $w ]");
681             }
682             else {
683             my $pk = $tname.'_id';
684             my $subselect =
685             "SELECT $pk FROM $joinpaths{$ptbl}".
686             " WHERE $w";
687             push(@W,
688             "[ $pk IN ($subselect) ]");
689             }
690             # produce example formula for non-ints
691             if ($n->data eq 's') {
692             push(@EXAMPLE,
693             "$v => SELECT DISTINCT $tbl FROM $ptbl");
694             }
695             }
696             else {
697             my $jtype = 'INNER JOIN';
698             if ($card eq '*' || $card eq '?') {
699             $jtype = 'LEFT OUTER JOIN';
700             }
701             my $jcol = $ptbl.'_id';
702             push(@J,
703             "$jtype $tbl USING ($jcol)");
704             if ($joinpaths{$ptbl}) {
705             $joinpaths{$tbl} =
706             "$joinpaths{$ptbl} INNER JOIN $tbl USING ($jcol)";
707             }
708             else {
709             $joinpaths{$tbl} = $tbl;
710             }
711             }
712             return;
713             });
714             my $from = join("\n ", @J);
715             my $where = join("\n ", @W);
716             my $nesting = $schema->duplicate;
717             $nesting->iterate(sub {
718             my $n = shift;
719             if (stag_isterminal($n)) {
720             return;
721             }
722             my ($tbl, $card) = elt_card($n->element);
723             $n->element($tbl);
724             my @sn = $n->kids;
725             @sn =
726             grep {
727             my ($tbl, $card) = elt_card($_->element);
728             $_->element($tbl);
729             !stag_isterminal($_)
730             } @sn;
731             if (@sn) {
732             $n->kids(@sn);
733             }
734             else {
735             $n->data([]);
736             }
737             });
738             $nesting = Data::Stag->new(set=>[$nesting]);
739             my $nstr = $nesting->sxpr;
740             $nstr =~ s/^\'//;
741             my $tt =
742             join("\n",
743             ":SELECT *",
744             ":FROM $from",
745             ":WHERE $where",
746             ":USE NESTING",
747             "$nstr",
748             "",
749             "// ---- METADATA ----",
750             "schema:",
751             "desc: Fetches $tname objects",
752             " This is an AUTOGENERATED template",
753             "",
754             (map {
755             "example_input: $_"
756             } @EXAMPLE),
757             );
758              
759             # my $template = DBIx::DBStag::SQLTemplate->new;
760             my @sn = $schema->subnodes;
761             my @tts = ();
762             push(@tts, $self->autotemplate($_)) foreach @sn;
763             return ([$tname=>$tt], @tts);
764             }
765              
766             sub autoddl {
767             my $self = shift;
768             my $stag = shift;
769             my $link = shift;
770             $stag->makeattrsnodes;
771             my $schema = $stag->autoschema;
772             $self->source_transforms([]);;
773             $self->_autoddl($schema, undef, $link);
774             }
775              
776             sub _autoddl {
777             my $self = shift;
778             my $schema = shift;
779             my $parent = shift;
780             my $link = shift || []; # link tables
781             my $tbls = shift || [];
782             my @sn = $schema->subnodes;
783             my ($tbl, $card) = elt_card($schema->element);
784             my @cols = (sprintf("%s_id serial PRIMARY KEY NOT NULL", $tbl));
785             my $casc = " ON DELETE CASCADE";
786             foreach (grep {stag_isterminal($_)} @sn) {
787             my ($col, $card) = elt_card($_->element);
788             my $pk = '';
789             if ($col eq $tbl.'_id') {
790             shift @cols;
791             $pk = ' PRIMARY KEY';
792             }
793             if ($card =~ /[\+\*]/) {
794             my $new_name = sprintf("%s_%s", $tbl, $col);
795             my $tf = ["$tbl/$col", "$new_name/$col"];
796             push(@{$self->source_transforms}, $tf);
797             $_->name($new_name);
798             $_->data([[$col => $_->data]]);
799             # $self->throw("In the source data, '$col' is a multivalued\n".
800             # "terminal (data) node. This is difficult to transform");
801             }
802             else {
803             # my $isnull = $card eq '?' ? '' : ' NOT NULL';
804             my $isnull = '';
805             push(@cols,
806             sprintf("%s %s$isnull$pk",
807             $col, $_->data));
808             }
809             }
810             if ($parent) {
811             my ($pn) = elt_card($parent->element);
812             push(@cols,
813             sprintf("%s_id INT", $pn));
814             push(@cols,
815             sprintf("FOREIGN KEY (%s_id) REFERENCES $pn(%s_id)$casc", $pn, $pn));
816             }
817              
818             my $mapping = $self->mapping || [];
819              
820             if (grep {$_ eq $tbl} @$tbls) {
821             # $self->throw("$tbl has >1 parent - you need to\n".
822             # "transform input data");
823             return "";
824             }
825             push(@$tbls, $tbl);
826              
827             my $post_ddl = '';
828             my $pre_ddl = '';
829             foreach (grep {!stag_isterminal($_)} @sn) {
830             # check for cases where we want to include FK to subnode
831             my ($map) =
832             grep {
833             $_->name eq 'map' &&
834             ($_->get_table eq $tbl &&
835             ($_->get_fktable_alias eq $_->element ||
836             $_->get_fktable eq $_->element))
837             } @$mapping;
838             # linking tables
839             if ($map ||
840             grep {$_ eq $tbl} @$link) {
841             my $ftbl = $_->element;
842             push(@cols,
843             sprintf("%s_id INT", $ftbl));
844             push(@cols,
845             sprintf("FOREIGN KEY (%s_id) REFERENCES $ftbl(%s_id)$casc", $ftbl, $ftbl));
846             $pre_ddl .= $self->_autoddl($_, undef, $link, $tbls);
847            
848             }
849             else {
850             $post_ddl .= $self->_autoddl($_, $schema, $link, $tbls);
851             }
852              
853             }
854             my $ddl =
855             sprintf("CREATE TABLE $tbl (\n%s\n);\n\n",
856             join(",\n", map {" $_"} @cols));
857              
858             return $pre_ddl . $ddl . $post_ddl;;
859             }
860              
861             # ----------------------------------------
862             # CACHE METHODS
863             #
864             # we keep a cache of what is stored in
865             # each table
866             #
867             # cache->{$element}->{$key}->{$val}
868             # ----------------------------------------
869              
870             # list of table names that should be cached
871             sub cached_tables {
872             my $self = shift;
873             $self->{_cached_tables} = shift if @_;
874             return $self->{_cached_tables};
875             }
876              
877             sub is_caching_on {
878             my $self = shift;
879             my $element = shift;
880             $self->{_is_caching_on} = {}
881             unless $self->{_is_caching_on};
882             if (@_) {
883             $self->{_is_caching_on}->{$element} = shift;
884             }
885             return $self->{_is_caching_on}->{$element};
886             }
887              
888             sub query_cache {
889             my $self = shift;
890             my $element = shift;
891             my $constr = shift;
892             my $update_h = shift;
893             my @keycols = sort keys %$constr;
894             my $cache = $self->get_tuple_idx($element, \@keycols);
895             my $valstr = join("\t", map {$constr->{$_}} @keycols);
896             # use Data::Dumper;
897             # print Dumper $cache;
898             if ($update_h) {
899             my $current_h = $cache->{$valstr} || {};
900             $current_h->{$_} = $update_h->{$_} foreach keys %$update_h;
901             $cache->{$valstr} = $current_h;
902             }
903             return $cache->{$valstr};
904             }
905              
906             sub insert_into_cache {
907             my $self = shift;
908             my $element = shift;
909             my $insert_h = shift;
910             my $usets = shift;
911             foreach my $uset (@$usets) {
912             my @undef = grep {!defined $insert_h->{$_}} @$uset;
913             if (@undef) {
914             my @defined = grep {defined $insert_h->{$_}} @$uset;
915             trace(1,
916             "undefined column in unique key: @$uset IN $element/[@$uset] ".
917             join('; ',
918             map {"$_=$insert_h->{$_}"} @defined,
919             )
920             ) if $TRACE;
921             # cannot cache undefined values
922             next;
923             }
924             my $cache = $self->get_tuple_idx($element, $uset);
925             my $valstr = join("\t", map {$insert_h->{$_}} sort @$uset);
926             $cache->{$valstr} = $insert_h;
927             }
928             return 1;
929             }
930              
931             sub update_cache {
932             my $self = shift;
933             my $element = shift;
934             my $store_hash = shift;
935             my $unique_constr = shift;
936              
937             my $tuple = $self->query_cache($element,
938             $unique_constr,
939             $store_hash);
940             return;
941             }
942              
943             sub get_tuple_idx {
944             my $self = shift;
945             my $element = shift;
946             my $ukey = shift;
947             my @keycols = @$ukey;
948             @keycols = sort @keycols;
949             @keycols || die;
950              
951             my $cache = $self->cache;
952             if (!$cache->{$element}) {
953             $cache->{$element} = {};
954             }
955             my $eltcache = $cache->{$element};
956             # we just use a flat perl hash - flatten the list of unique cols
957             # to a string with spaces between
958             my $k = "@keycols";
959             if (!$eltcache->{$k}) {
960             $eltcache->{$k} = {};
961             }
962            
963             return $eltcache->{$k};
964             }
965              
966             sub cache_summary {
967             my $self = shift;
968             my $s = Data::Stag->new(cache_summary=>[]);
969             my $cache = $self->cache || {};
970             my @elts = keys %$cache;
971             foreach my $elt (@elts) {
972             my $cnode = $cache->{$elt} || {};
973             my @keys = keys %$cnode;
974             $s->add($elt=>[map {[$_=>scalar(keys %{$cnode->{$_}})]} @keys]);
975             }
976             return $s;
977             }
978              
979             sub cache {
980             my $self = shift;
981             $self->{_cache} = shift if @_;
982             $self->{_cache} = {} unless $self->{_cache};
983             return $self->{_cache};
984             }
985              
986             sub clear_cache {
987             my $self = shift;
988             $self->cache({});
989             }
990              
991             # ---- END OF CACHE METHODS ----
992              
993             # set this if we are loading a fresh/blank slate DB
994             # (will assume database is empty and not check for
995             # existing tuples)
996             sub policy_freshbulkload {
997             my $self = shift;
998             $self->{_policy_freshbulkload} = shift if @_;
999             return $self->{_policy_freshbulkload};
1000             }
1001             sub noupdate_h {
1002             my $self = shift;
1003             $self->{_noupdate_h} = shift if @_;
1004             return $self->{_noupdate_h} || {};
1005             }
1006             sub tracenode {
1007             my $self = shift;
1008             $self->{_tracenode} = shift if @_;
1009             return $self->{_tracenode};
1010             }
1011              
1012             sub mapgroups {
1013             my $self = shift;
1014             if (@_) {
1015             $self->{_mapgroups} = [@_];
1016             $self->{_colvalmap} = {}
1017             unless $self->{_colvalmap};
1018             foreach my $cols (@_) {
1019             my $h = {};
1020             foreach (@$cols) {
1021             $self->{_colvalmap}->{$_} = $h;
1022             }
1023             }
1024             }
1025             return @{$self->{_mapgroups} || []};
1026             }
1027              
1028             # DEPRECATED
1029             sub get_mapping_for_col {
1030             my $self = shift;
1031             my $col = shift;
1032             $self->{_colvalmap}->{$col} = {}
1033             unless $self->{_colvalmap}->{$col};
1034             return $self->{_colvalmap}->{$col};
1035             }
1036              
1037             # mapping of Old ID => New ID
1038             # IDs are assumed to be global across ALL tables
1039             sub id_remap_idx {
1040             my $self = shift;
1041             if (@_) {
1042             $self->{_id_remap_idx} = shift;
1043             }
1044             else {
1045             $self->{_id_remap_idx} = {}
1046             unless $self->{_id_remap_idx};
1047             }
1048             return $self->{_id_remap_idx};
1049             }
1050              
1051             # do the PK values in the XML represent the actual
1052             # internal db ids, or are they local to the document?
1053             # if the latter then we will create a id_remap_idx
1054             sub trust_primary_key_values {
1055             my $self = shift;
1056             $self->{_trust_primary_key_values} = shift if @_;
1057             return $self->{_trust_primary_key_values};
1058             }
1059              
1060              
1061             sub make_stag_node_dbsafe {
1062             my $self = shift;
1063             my $node = shift;
1064             my $parent = shift;
1065             my $name = $node->name;
1066             # CJM 2007-03-05
1067             #return if $name eq '@'; # leave attrs alone
1068             if ($name eq '@') {
1069             # descend into attrs
1070             $parent->data([grep {$_->name ne '@'} @{$parent->data},@{$node->data}]);
1071             return;
1072             }
1073             my $safename = $self->dbsafe($name,$parent);
1074             if ($name ne $safename) {
1075             $node->name($safename);
1076             }
1077             my @kids = $node->kids;
1078             foreach (@kids) {
1079             $self->make_stag_node_dbsafe($_,$node) if ref $_;
1080             }
1081             return;
1082             }
1083             sub dbsafe {
1084             my $self = shift;
1085             my $name = shift;
1086             my $parent = shift;
1087             $name = lc($name);
1088             # dbstag is designed for stag-like xml; no mixed attributes
1089             # however, we do have basic checks for mixed attributes
1090             if ($name eq '.') {
1091             $name = $parent->name.'_data'; # TODO - allow custom column
1092             }
1093             $name =~ tr/a-z0-9_//cd;
1094             return $name;
1095             }
1096              
1097             # cache the attribute nodes as they are parsed
1098             #sub current_attribute_node {
1099             # my $self = shift;
1100             # $self->{_current_attribute_node} = shift if @_;
1101             # return $self->{_current_attribute_node};
1102             #}
1103              
1104             # lookup table; macro ID => internal database ID
1105             sub macro_id_h {
1106             my $self = shift;
1107             $self->{_macro_id_h} = shift if @_;
1108             $self->{_macro_id_h} = {}
1109             unless $self->{_macro_id_h};
1110             return $self->{_macro_id_h};
1111             }
1112              
1113             # xort-style XML; set if an attribute is encountered
1114             sub xort_mode {
1115             my $self = shift;
1116             $self->{_xort_mode} = shift if @_;
1117             return $self->{_xort_mode};
1118             }
1119              
1120              
1121             #'(t1
1122             # (foo x)
1123             # (t2
1124             # (bar y)))
1125             #
1126             # '(fk
1127             # (table t2)
1128             # (ftable t1))
1129             #
1130             # alg: store t1, then t2
1131              
1132             # '(t1
1133             # (foo x)
1134             # (t1_t2
1135             # (t2
1136             # (bar y))))
1137             #
1138             # '(fk
1139             # (table t1_t2)
1140             # (ftable t1))
1141             # '(fk
1142             # (table t1_t2)
1143             # (ftable t2))
1144             #
1145             #
1146             # alg: store t1, hold on t1_t2, store t2
1147              
1148             # '(t1
1149             # (foo x)
1150             # (blah
1151             # (t2
1152             # (bar y))))
1153             #
1154             # '(fk
1155             # (table t1)
1156             # (fktable t2)
1157             # (fktable_alias "blah")
1158             # (fk "blah_id"))
1159              
1160             # alg: store t2, store t1
1161              
1162             # if set, will ensure that tbl/col names are transformed to be safe
1163             sub force_safe_node_names {
1164             my $self = shift;
1165             $self->{_force_safe_node_names} = shift if @_;
1166             return $self->{_force_safe_node_names};
1167             }
1168              
1169              
1170             # recursively stores a Data::Stag tree node in the database
1171             sub storenode {
1172             my $self = shift;
1173             my $node = shift;
1174             my @args = @_;
1175             my $dupnode = $node->duplicate;
1176              
1177             $self->make_stag_node_dbsafe($dupnode,'')
1178             if $self->force_safe_node_names;
1179             $self->add_linking_tables($dupnode);
1180             $self->_storenode($dupnode,@args);
1181             }
1182              
1183             sub _storenode {
1184             my $self = shift;
1185             my $node = shift;
1186             my $opt = shift;
1187             if (!$node) {
1188             confess("you need to pass in a node!");
1189             }
1190             my $element = $node->element;
1191             return unless $node->kids;
1192             if ($element eq 'dbstag_metadata') {
1193             my @maps = $node->get_map;
1194             $self->mapping(\@maps);
1195             my @links = $node->get_link;
1196             if (@links) {
1197             my %h =
1198             map {
1199             ($_->sget_table => [$_->sget_from, $_->sget_to])
1200             } @links;
1201             $self->linking_tables(%h);
1202             }
1203             return;
1204             }
1205              
1206             # sql can be embedded as <_sql> tags
1207             if ($element eq '_sql') {
1208             $self->_execute_sqlnode($node);
1209             return;
1210             }
1211              
1212             # check for XORT-style attributes
1213             # if ($element eq '@') {
1214             # # is this check required???
1215             # $self->current_attribute_node($node);
1216             # $self->xort_mode(1);
1217             # return;
1218             # }
1219             my $current_attribute_node;
1220             unless ($node->isterminal) {
1221             my @kids = $node->kids;
1222             my $changed = 0;
1223             @kids =
1224             map {
1225             if ($_->element eq '@') {
1226             $self->xort_mode(1);
1227             $current_attribute_node = $_;
1228             $changed = 1;
1229             trace(0, "GOT ATTR NODE");
1230             (); # omit
1231             }
1232             else {
1233             $_; # unchanged
1234             }
1235             } @kids;
1236             $node->kids(@kids) if $changed;
1237             }
1238              
1239             my $operation; # directive: force/update/lookup
1240             if ($current_attribute_node){
1241             $operation =
1242             $current_attribute_node->sget_op;
1243             }
1244              
1245             trace(0, "STORING $element\n", $node->xml) if $TRACE;
1246             my $tracenode = $self->tracenode || '';
1247             my $tracekeyval;
1248             if ($tracenode && $tracenode =~ /^(\w+)\/(.*)/) {
1249             my $nn = $1;
1250             my $tag = $2;
1251             if ($nn eq $element) {
1252             $tracekeyval = $node->get($tag);
1253             }
1254             }
1255              
1256             my $dbh = $self->dbh;
1257             my $dbschema = $self->dbschema;
1258              
1259             my $is_caching_on = $self->is_caching_on($element) || 0;
1260              
1261             my $mapping = $self->mapping || [];
1262              
1263             # each relation has zero or one primary keys;
1264             # primary keys are assumed to be single-column
1265             my $pkcol = $self->get_pk_col($element);
1266             trace(0, "PKCOL: $pkcol") if $TRACE;
1267              
1268             # DBIx::DBSchema metadata
1269             my $tableobj = $dbschema->table($element);
1270              
1271             # -- PRE-STORE CHILD NON-TERMINALS --
1272             # before storing this node, we need to
1273             # see if we first need to store any child
1274             # non-terminal nodes (in order to get their
1275             # primary keys, to use as foreign keys in
1276             # the current relation)
1277              
1278             # store non-terminal subnodes first
1279             my @ntnodes = $node->ntnodes;
1280              
1281             # keep track of nodes that have been assigned xort-style
1282             my %assigned_node_h;
1283              
1284             # GET INFORMATION FROM SUPER-NODE
1285             # some nodes may have been assigned by the calling process
1286             # (eg if the supernode is refered to by a fk from the current table)
1287             # this hash maps element names to a boolean;
1288             # this is ONLY used in conjunction with xort-style xml
1289             # we set this when we want to make sure that an element value is
1290             # NOT macro-expanded by the expansion code
1291             %assigned_node_h = %{$opt->{assigned_node_h} || {}};
1292              
1293             # the primary key value of the supernode
1294             my $parent_pk_id = $opt->{parent_pk_id};
1295             # the element type of the supernode
1296             my $parent_element = $opt->{parent_element};
1297             # -- end of info from super-node
1298              
1299             # PRE-STORE
1300             # look through the current node's children;
1301             # + some of these will be nodes that must be pre-stored BEFORE
1302             # the current node (because the current node has a fk to them)
1303             # + some of these will be nodes that must be post-stored AFTER
1304             # the current node (because they have an fk to the current node)
1305             #
1306             # one or other of these situations must be true - otherwise
1307             # nodes should not be nested!
1308             my @delayed_store = (); # keep track of non-pre-stored nodes
1309             foreach my $nt (@ntnodes) {
1310             # First check for XORT-STYLE
1311             # xort-style XML; nodes can be nested inside a non-terminal
1312             # node corresponding to a FK column
1313             # eg
1314             #
1315             # foo
1316             #
1317             #
1318             # here, what looks like a non-terminal node should actually
1319            
1320             # check all sub-nodes; if any of them are nonterminal and correspond
1321             # to a column (not a table) then add the sub-node and use the pk id
1322             # as the returned value
1323             # note: we have to explicitly check the col is not also a table
1324             # since some dbs (eg go db) have col names the same as tbl names
1325             if ($self->is_col($nt->name) &&
1326             !$nt->isterminal &&
1327             !$self->is_table($nt->name)) {
1328             my @kids = $nt->kids;
1329             if (@kids != 1) {
1330             $self->throw("non-terminal pk node should have one subnode only; ".
1331             $nt->name." has ".scalar(@kids));
1332             }
1333             my $sn_val = $self->_storenode(shift @kids);
1334             if (!defined($sn_val)) {
1335             $self->throw("no returned value for ".$nt->name);
1336             }
1337             # TRANSFORM NODE: non-terminal to terminal
1338             # replace node with return pk ID value
1339             $nt->data($sn_val);
1340              
1341             # do NOT try and expand the value assigned to this
1342             # node with a xort-macro expansion later on
1343             $assigned_node_h{$nt->name} = 1;
1344             trace(0, "ASSIGNED NON-MACRO ID for ".$nt->name." TO $sn_val") if $TRACE;
1345              
1346             # skip this ntnode - it is now a tnode
1347             next;
1348             }
1349             # -- END OF xort-style check
1350              
1351             # we want to PRE-STORE any ntnodes that
1352             # are required for foreign key relationships
1353             # within this node;
1354             # ie this node N1 has a foreign key "fk_id" that
1355             # points to ntnode N2.
1356             # if there is an intermediate alias element in
1357             # between then we need to store the ntnode too
1358             #
1359             # check for either of these conditions
1360             my ($map) =
1361             grep {
1362             $_->get_table &&
1363             $_->get_table eq $element &&
1364             ($_->get_fktable_alias &&
1365             $_->get_fktable_alias eq $nt->element ||
1366             ($_->get_fktable &&
1367             $_->get_fktable eq $nt->element && !$_->get_fktable_alias))
1368             } @$mapping;
1369             # check to see if sub-element has FK to this element
1370             if (!$map) {
1371             # my $subtable = $dbschema->table($nt->element);
1372             my $table = $dbschema->table($element);
1373             my $ntelement = $nt->element;
1374             my $subpkcol = $self->get_pk_col($ntelement);
1375            
1376             my @cns = $table->columns;
1377              
1378             my $cn; # col name (FK in current element)
1379             my $fcn; # foreign col name (PK in sub element)
1380              
1381             # HACK - ASSUME NATURAL JOIN
1382             # for example, a FK: person.dept_id => dept.dept_id
1383             if ($subpkcol ne 'id') {
1384             foreach (@cns) {
1385             if ($_ eq $subpkcol) {
1386             $cn = $_;
1387             $fcn = $_;
1388             }
1389             }
1390             }
1391              
1392             # second chance; allow base "id" style
1393             # for example, a FK: person.dept_id => dept.id
1394             # via ...
1395             if (!$cn) {
1396             if ($subpkcol eq 'id') {
1397             foreach (@cns) {
1398             if ($_ eq $ntelement."_id") {
1399             $cn = $_;
1400             $fcn = 'id';
1401             }
1402             }
1403             }
1404             }
1405             if ($cn) {
1406             $map =
1407             Data::Stag->new(map=>[
1408             [table=>$element],
1409             [col=>$cn],
1410             [fktable=>$nt->element],
1411             [fkcol=>$fcn]
1412             ]);
1413             }
1414             }
1415              
1416             # if $map is set, then we have to pre-store this subnode
1417             if ($map) {
1418             # 1:many between this and child
1419             # (eg this has fk to child)
1420             # store child before this;
1421             # use fk in this
1422             my $fktable = $map->get_fktable;
1423              
1424             my $col = $map->get_col || $self->get_pk_col($fktable);
1425              
1426             # aliases map an extra table
1427             # eg table X col X.A => Y.B
1428             # fktable_alias = A
1429             my $fktable_alias = $map->get_fktable_alias;
1430             my $orig_nt = $nt;
1431              
1432             # if we have an alias, it means the actual node
1433             # we want to store is one beneath the alias;
1434             # eg ..
1435             # we want to actually store the node foo2
1436             if ($fktable_alias) {
1437             my @nts = $nt->sgetnode($map->sget_fktable);
1438             if (!@nts) {
1439             print STDERR $nt->sxpr;
1440             confess("could not get node for: ".$map->sget_fktable);
1441             }
1442             if (@nts > 1) {
1443             print STDERR $nt->sxpr;
1444             confess("multiple nodes for: ".$map->sget_fktable);
1445             }
1446             $nt = shift @nts;
1447             if (!$nt) {
1448             print STDERR $map->sxpr;
1449             print STDERR $orig_nt->sxpr;
1450             confess("bad nodes for: ".$map->sget_fktable);
1451             }
1452             }
1453             my $fk_id = $self->_storenode($nt);
1454             if (!defined($fk_id)) {
1455             confess("ASSERTION ERROR: could not get foreign key val\n".
1456             "trying to store: $element\n".
1457             "no fk returned when storing: $fktable");
1458             }
1459             trace(0, "SETTING $element.$col=$fk_id [via ".$orig_nt->element."]") if $TRACE;
1460             $node->set($col, $fk_id);
1461             $node->unset($orig_nt->element);
1462              
1463             # do NOT try and expand the value assigned to this
1464             # node with a xort-macro expansion later on
1465             $assigned_node_h{$col} = 1;
1466             trace(0, "ASSIGNED NON-MACRO ID for ".$col) if $TRACE;
1467             }
1468             else {
1469             # 1:many between child and this
1470             # (eg child has fk to this)
1471             # store child after
1472             trace(0, "WILL STORE LATER:\n", $nt->xml) if $TRACE;
1473             $node->unset($nt->element);
1474             push(@delayed_store, $nt);
1475             }
1476             # $node->unset($nt->element); # clear it
1477             }
1478             # --- done storing kids
1479              
1480             # --- replace *IDs ---
1481             # dbstag XML allows placeholder values in primary key cols
1482             # (for now, PKs are always assumed to be autoincrement/serial ints)
1483             # placeholder PKs get remapped to a new autogenerated ID
1484             # all FKs referring to this get remapped too
1485             my @tnodes = $node->tnodes; # terminal nodes mapped to columns in db
1486             my %remap = (); # indexed by column name; new PK value
1487             if (!$self->trust_primary_key_values) {
1488             foreach my $tnode (@tnodes) {
1489             # foreign keys in XORT mode - replace macro ID with
1490             # actual database foreign key value
1491             if ($self->is_fk_col($tnode->name) && $self->xort_mode) {
1492             my $v = $tnode->data;
1493              
1494             # -- CHECK FOR MACRO EXPANSION (XORT-STYLE) --
1495             # IF this tnode was originally an ntnode that
1496             # was collapsed to a pk val, xort style, do not
1497             # try and map it to a previously assigned macro
1498             # EXAMPLE:
1499             # we start with A
1500             # we collapse too $v
1501             if ($assigned_node_h{$tnode->name}) {
1502             trace(0, "ALREADY CALCULATED; not a Macro ID:$v;; in $element/".$tnode->name) if $TRACE;
1503             # DO NOTHING
1504             }
1505             else { # NOT ASSIGNED
1506             my $actual_id =
1507             $self->macro_id_h->{$v};
1508             if (!defined($actual_id)) {
1509             $self->throw("XORT-style Macro ID:$v is undefined;; in $element/".$tnode->name);
1510             }
1511             $tnode->data($actual_id);
1512             }
1513             # -- END OF MACRO EXPANSION --
1514             }
1515             elsif ($tnode->name eq $pkcol) {
1516             my $v = $tnode->data;
1517             trace(0, "REMAP $pkcol: $v => ? [do not know new value yet]") if $TRACE;
1518             $remap{$tnode->name} = $v; # map after insert/update
1519             $node->unset($tnode->name); # discard placeholder
1520             } else {
1521             if ($self->is_fk_col($tnode->name)) {
1522             # hack!! need proper FK refs...; DBSchema wont do this
1523             my $colvalmap = $self->id_remap_idx;
1524             #my $colvalmap = $self->get_mapping_for_col($nt->elememt);
1525             if ($colvalmap) {
1526             my $v = $tnode->data;
1527             my $nv = $colvalmap->{$v};
1528             if ($nv) {
1529             trace(0, "remapping $v => $nv") if $TRACE;
1530             $tnode->data($nv);
1531             }
1532             }
1533             }
1534             }
1535             }
1536             } # -- end of ID remapping
1537            
1538             # --- Get columns that need updating/inserting ---
1539             # turn all remaining tag-val pairs into a hash
1540             my %store_hash = $node->pairs;
1541              
1542             # All columns to be stored should be terminal nodes
1543             # in the Stag tree; if not there is a problem
1544             my @refcols = grep { ref($store_hash{$_}) } keys %store_hash;
1545             if (@refcols) {
1546             foreach (@$mapping) {
1547             trace(0, $_->sxpr) if $TRACE;
1548             }
1549             confess("I can't store the current node; ".
1550             "These elements need to be mapped via FKs: ".
1551             join(', ', map {"\"@refcols\""} @refcols).
1552             "\n\nPerhaps you need to specify more schema metadata?");
1553             } # -- end of sanity check
1554              
1555             # each relation has zero or more unique keys;
1556             # unique keys may be compound (ie >1 column)
1557             my @usets = $self->get_unique_sets($element);
1558             trace(0, "USETS: ", map {"unique[ @$_ ]"} @usets) if $TRACE;
1559              
1560             # get all the columns/fields/attributes of this relation
1561             my @cols = $self->get_all_cols($element);
1562             trace(0, "COLS: @cols") if $TRACE;
1563              
1564             # store_node() will either perform an update or
1565             # an insert. if we are performing an update, we
1566             # need a query constraint to determine which row
1567             # to update.
1568             #
1569             # this hash is used to determine the key/val pairs
1570             my %unique_constr;
1571              
1572             # this is the value of the primary key of
1573             # the inserted/update row
1574             my $id;
1575              
1576             # if this relation has a primary key AND the stag node
1577             # being stored has the value of this column set, THEN
1578             # use this as the update constraint
1579             if (0 && $pkcol) {
1580             my $pk_id;
1581             $pk_id = $node->get($pkcol);
1582             if ($pk_id) {
1583             # unset the value of the pk in the node; there
1584             # is no point setting this in the UPDATE as it
1585             # is already part of the update constraint
1586             $node->unset($pkcol);
1587              
1588             # set the update constraint based on the PK value
1589             %unique_constr = ($pkcol => $pk_id);
1590              
1591             # return this value at the end
1592             $id = $pk_id;
1593             trace(0, "SETTING UPDATE CONSTR BASED ON PK $pkcol = $pk_id") if $TRACE;
1594             }
1595             } # -- end of xxxx
1596              
1597              
1598             # foreach my $sn ($node->kids) {
1599             # my $name = $sn->element;
1600             # my $nu_id = $self->id_mapping($name, $sn->data);
1601             # # do the old 2 nu mapping
1602             # # (the ids in the xml are just temporary
1603             # # for internal consistency)
1604             # $sn->data($nu_id) if $nu_id;
1605             # }
1606              
1607             if (0) {
1608             # ---- EXPERIMENTAL ----
1609             # if no unique keys are provided, assume that all
1610             # non-PK columns together provide a compound unique key
1611             # <> expedient for now!
1612             if (!@usets) {
1613             @usets = ( [grep {$_ ne $pkcol} @cols] );
1614             }
1615             }
1616             if ($pkcol) {
1617             # make single PK the first unique key set;
1618             # add to beginning as this is the most efficient
1619             unshift(@usets, [$pkcol]);
1620             }
1621              
1622             # get the column to select to get the pk for this element
1623             my $select_col = $pkcol;
1624              
1625             # -------- find update constraint by unique keys ----
1626             # if the unique_constr hash is set, we know we
1627             # are doing an UPDATE, and we know the query
1628             # constraint that will be used;
1629             #
1630             # otherwise loop through all unique keys; if
1631             # all the columns in the key are set, then we
1632             # can safely use this unique key as the update
1633             # constraint.
1634             # if no update constraint can be found, this node
1635             # is presumed not to exist in the DB and an INSERT
1636             # is performed
1637             foreach my $uset (@usets) {
1638             # we already know & have the primary key
1639             last if %unique_constr;
1640              
1641             # if we are loading up a fresh/blank slate
1642             # database then we don't need to check for
1643             # existing tuples, as everything should
1644             # have been inserted/updated this session
1645             if ($self->policy_freshbulkload) {
1646             next;
1647             }
1648              
1649             # if an xort-style attribute has op=insert
1650             # this is the same as a bulkload
1651             if ($operation && $operation eq 'insert') {
1652             next;
1653             }
1654              
1655             # already tried PK
1656             # if (scalar(@$uset) == 1 &&
1657             # $uset->[0] eq $pkcol) {
1658             # next;
1659             # }
1660             trace(0, "TRYING USET: ;@$uset; [pk=$pkcol]") if $TRACE;
1661              
1662             # get the values of the unique key columns;
1663             # %constr is a candidate unique key=>val mapping
1664             my %constr =
1665             map {
1666             my $v = $node->sget($_);
1667             $_ => $v
1668             } @$uset;
1669              
1670             # each column in the unique key must be
1671             # non-NULL; try the next unique key if
1672             # this one is unsuitable
1673              
1674             # -- COMMENTED OUT cjm 20041012
1675             # mysql auto-creates defaults for non-null fields;
1676             # we cannot use this code:
1677             # UNCOMMENTED 20050304
1678              
1679             # -- make null value part of the key
1680             # -- ADDED 20041012 - make null 0/''
1681            
1682             foreach (keys %constr) {
1683             # in pg, pk cols are sequences with defaults nextval
1684             # skip these
1685             next if $self->is_pk_col($_);
1686             if (!defined($constr{$_})) {
1687             if ($self->is_fk_col($_)) {
1688             # if xort-style, the container may be an
1689             # implicit foreign key
1690            
1691             # TODO: check element
1692             if ($parent_pk_id) {
1693             trace(0, "USING PARENT ELEMENT: $_ => $parent_pk_id");
1694             $constr{$_} = $parent_pk_id;
1695             }
1696             }
1697             else {
1698             my $colobj = $tableobj->column($_);
1699             my $default_val = $colobj->default;
1700             my $col_type = $colobj->type;
1701             if (defined $default_val) {
1702             # problem with DBIx::DBSchema
1703             if ($default_val =~ /^\'(.*)\'::/) {
1704             trace(0, "FIXING DEFAULT: $default_val => $1") if $TRACE;
1705             $default_val = $1;
1706             }
1707             if (($col_type =~ /^int/ || $col_type =~ /float/) && $default_val eq '') {
1708             # this SHOULDN'T be necessary, but appears to be required for
1709             # some configuartions. DBSchema problem?
1710             $default_val=0;
1711             }
1712             if (ref($default_val)) {
1713             # In new versions of DBIx::DBSchema (0.38, possibly older versions),
1714             # this appears to be a reference
1715             $default_val = $$default_val;
1716             if ($default_val eq "''") {
1717             $default_val = '';
1718             }
1719             }
1720             $constr{$_} = $default_val;
1721             trace(0, "USING DEFAULT[type=$col_type] $_ => \"$constr{$_}\"") if $TRACE;
1722             }
1723             }
1724             }
1725             }
1726              
1727             # TODO: check cases eg dbxref in chado; null default values...?
1728             next if grep { !defined($_) } values %constr;
1729              
1730             %unique_constr = %constr;
1731             if (!$select_col && @$uset == 1) {
1732             $select_col = $uset->[0];
1733             }
1734             trace(0, "GOT unique_constr, select_col=$select_col") if $TRACE;
1735             last;
1736             }
1737             # -- END OF @usets --
1738              
1739             # %unique_constr is set; a mapping for a unique key colset
1740             # if this is not set, then we must insert
1741              
1742             if (%unique_constr) {
1743              
1744             # -- IN-MEMORY CACHING --
1745             # check if we have already updated/inserted
1746             # this tuple this session; and if so, what
1747             # the update constraint used was
1748             if ($is_caching_on == 1 || $is_caching_on == 3) {
1749              
1750             #$self->throw("no select col for $element") unless $select_col;
1751             # fetch values of unique_constr from cache
1752             my %cached_colvals =
1753             %{$self->query_cache($element,
1754             \%unique_constr)
1755             || {}};
1756             # have we stored anything with uniq key %unique_constr before?
1757             if (%cached_colvals) {
1758             if ($pkcol) {
1759             $id = $cached_colvals{$pkcol};
1760             if ($id) {
1761             # use the cached pk id for efficiency
1762             #%unique_constr = {$pkcol => $id};
1763             trace(0, "CACHED $pkcol = $id") if $TRACE;
1764             }
1765             else {
1766             trace(0, "NO CACHED COLVAL FOR $pkcol :: ".
1767             join("; ",map {"$_ = $cached_colvals{$_}"} keys %cached_colvals)) if $TRACE;
1768             }
1769             }
1770              
1771             # yes - has it changed?
1772             foreach my $col (keys %cached_colvals) {
1773             if ($cached_colvals{$col} && $store_hash{$col} &&
1774             $cached_colvals{$col} && $store_hash{$col}) {
1775             # don't bother re-storing anything
1776             delete $store_hash{$col};
1777             }
1778             }
1779             if (%store_hash) {
1780             my @x = keys %store_hash;
1781             trace(0, "WILL STORE: @x") if $TRACE;
1782             }
1783             else {
1784             trace(0, "UNCHANGED - WILL NOT STORE; store_hash empty") if $TRACE;
1785             }
1786             }
1787             else {
1788             }
1789             }
1790             # -- END OF CACHING CHECK --
1791              
1792             # -- GET PK VAL $id BASED ON unique_constr --
1793             # (we may already have this based on memory-cache)
1794             if (!$id) {
1795              
1796             # the input node contains all the keys in %update_constr
1797             # - check to see if this relation exists in the DB
1798              
1799             my $vals;
1800             if ($is_caching_on >= 2) {
1801             $vals = [];
1802             }
1803             else {
1804             my $sql =
1805             $self->makesql($element,
1806             \%unique_constr,
1807             $select_col);
1808             trace(0, "SQL: $sql") if $TRACE;
1809             $vals =
1810             $dbh->selectcol_arrayref($sql);
1811             }
1812              
1813             if (@$vals) {
1814             # yes it does exist in DB; check if there is a
1815             # pkcol - if there is, it means we can do an
1816             # update and
1817             if ($pkcol && $select_col && $select_col eq $pkcol) {
1818             # this is the value we return at the
1819             # end
1820             $id = $vals->[0];
1821             if ($remap{$pkcol}) {
1822             #my $colvalmap = $self->get_mapping_for_col($pkcol);
1823             my $colvalmap = $self->id_remap_idx;
1824             #my $colvalmap = $self->get_mapping_for_col($element);
1825             $colvalmap->{$remap{$pkcol}} = $id;
1826             trace(0, "COLVALMAP $pkcol $remap{$pkcol} = $id") if $TRACE;
1827             }
1828             }
1829             else {
1830             # $id not set, but we will later perform an update anyway
1831             }
1832             }
1833             else {
1834             # this node is not in the DB; force insert
1835             %unique_constr = ();
1836             }
1837             }
1838             } # end of get pk val
1839              
1840             # ---- UPDATE OR INSERT -----
1841             # at this stage we know if we are updating
1842             # or inserting, depending on whether a suitable
1843             # update constraint has been found
1844              
1845             my $this_op;
1846             if (%unique_constr) {
1847             $this_op = 'update';
1848             }
1849             else {
1850             $this_op = 'insert';
1851             }
1852             if (defined $operation) {
1853             if ($operation eq 'force') {
1854             $operation = $this_op;
1855             }
1856             else {
1857             # update/lookup/insert
1858             # insert: already dealt with
1859             }
1860             }
1861             else {
1862             $operation = $this_op;
1863             }
1864              
1865             if ($operation eq 'replace') {
1866             # replace = delete followed by insert
1867             if (%unique_constr) {
1868             $self->deleterow($element,\%unique_constr);
1869             }
1870             else {
1871             $self->throw("Cannot find row to delete it:\n".$node->xml);
1872             }
1873             $operation = 'insert';
1874             }
1875              
1876             if ($operation eq 'update') {
1877             # ** UPDATE **
1878             if ($self->noupdate_h->{$element}) {
1879             if ($tracekeyval) {
1880             printf STDERR "NOUPDATE: $tracenode = $tracekeyval\n"
1881             }
1882             trace(0, sprintf("NOUPDATE on %s OR child nodes (We have %s)",
1883             $element,
1884             join('; ',values %unique_constr)
1885             )) if $TRACE;
1886             # don't return yet; there are still the delayed nodes
1887             ##return $id;
1888             }
1889             else {
1890             # if there are no fields modified,
1891             # no change
1892             foreach (keys %unique_constr) {
1893             # no point setting any column
1894             # that is part of the update constraint
1895             delete $store_hash{$_};
1896             }
1897            
1898             # only update if there are cols set that are
1899             # not part of unique constraint
1900             if (%store_hash) {
1901             if ($tracekeyval) {
1902             printf STDERR "UPDATE: $tracenode = $tracekeyval\n"
1903             }
1904              
1905             $self->updaterow($element,
1906             \%store_hash,
1907             \%unique_constr);
1908             # -- CACHE RESULTS --
1909             if ($is_caching_on == 1 || $is_caching_on == 3) {
1910             $self->update_cache($element,
1911             \%store_hash,
1912             \%unique_constr);
1913             }
1914             }
1915             else {
1916             trace(0, sprintf("NOCHANGE on %s (We have %s) id=$id",
1917             $element,
1918             join('; ',values %unique_constr)
1919             )) if $TRACE;
1920             if ($tracekeyval) {
1921             print STDERR "NOCHANGE: $tracenode = $tracekeyval\n"
1922             }
1923             }
1924             }
1925             } elsif ($operation eq 'insert') {
1926             # ** INSERT **
1927             if (%store_hash) {
1928             $id =
1929             $self->insertrow($element,
1930             \%store_hash,
1931             $pkcol);
1932             if (!$id) {
1933             # this only happens if $self->force(1) is set
1934             if (@delayed_store) {
1935             print STDERR "Insert on \"$element\" did not return a primary key ID.\n Possible causes: sequence not define [Pg]?\n";
1936             if ($self->force) {
1937             return;
1938             }
1939             else {
1940             confess("non-recoverable error");
1941             }
1942             }
1943             return;
1944             }
1945             if ($tracekeyval) {
1946             printf STDERR "INSERT: $tracenode $tracekeyval [val = $id]\n"
1947             }
1948             if ($pkcol) {
1949             if ($remap{$pkcol}) {
1950             my $colvalmap = $self->id_remap_idx;
1951             #my $colvalmap = $self->get_mapping_for_col($element);
1952             $colvalmap->{$remap{$pkcol}} = $id;
1953             trace(0, "colvalmap $remap{$pkcol} = $id") if $TRACE;
1954             }
1955             }
1956            
1957             # -- CACHE RESULTS --
1958             if ($is_caching_on) {
1959             my %cache_hash = %store_hash;
1960             if ($pkcol) {
1961             $cache_hash{$pkcol} = $id;
1962             }
1963             $self->insert_into_cache($element,
1964             \%cache_hash,
1965             \@usets);
1966             trace(0, "CACHING: $element") if $TRACE;
1967             }
1968             }
1969             }
1970             elsif ($operation eq 'delete') {
1971             if (%unique_constr) {
1972             $self->deleterow($element,\%unique_constr);
1973             }
1974             else {
1975             $self->throw("Cannot find row to delete it (perhaps unique constraint not satisfied?):\n".$node->xml);
1976             }
1977             }
1978             elsif ($operation eq 'lookup') {
1979             # lookup: do nothing, already have ID
1980             if (!$id) {
1981             $self->throw("lookup: no ID; could not find this node in db (perhaps unique constraint not satisfied?) %s:\n",$node->xml);
1982             }
1983             }
1984             else {
1985             $self->throw("cannot do op: $operation");
1986             } # -- end of UPDATE/INSERT/LOOKUP
1987            
1988              
1989             # -- DELAYED STORE --
1990             # Any non-terminal child nodes of the current one have
1991             # some kind of foreign key relationship to the current
1992             # relation. Either it is 1:many or many:1
1993             #
1994             # if the relation for the child node has a foreign key
1995             # into the current relation, we need to store the current
1996             # relation first to get the current relation's primary key.
1997             #
1998             # we have already done this, so now is the time to store
1999             # any of these child nodes
2000             if (@delayed_store) {
2001             foreach my $sn (@delayed_store) {
2002              
2003             my $fk; # foreign key column in subtable
2004             my $snname = $sn->name; # subtable name
2005              
2006             # if a mapping is used (eg in metadata), then
2007             # this takes priority
2008             foreach (@$mapping) {
2009             if ($_->name eq 'parentfk' &&
2010             $_->get_table eq $snname) {
2011             $fk = $_->get_col;
2012             }
2013             }
2014              
2015             # no mapping, by default use the current nodes primary
2016             # key (this assumes eg person.address_id is a fk to
2017             # a table with pk address_id; we will check and possibly
2018             # override this later)
2019             if (!$fk) {
2020             $fk = $pkcol;
2021             }
2022              
2023             # HACK!!
2024             # Some databases (eg GO Database) use 'id' for pk col
2025             # names; fks to this table will be of form _id
2026             if ($fk eq 'id') {
2027             $fk = $element . '_id';
2028             }
2029              
2030             # --SET SUBNODE FK--
2031             # it is necessarily true that each delayed-store subnode
2032             # must have some fk relationship back to the existing one
2033             # the subnode has a fk relation up to this one;
2034             # by default we assume that the subnode fk column is named
2035             # the same as the current pk. However, we check that this
2036             # is the case. If not, we deduce what the correct fk col is
2037             my $subtable =
2038             $dbschema->table($snname);
2039             if ($subtable->column($fk)) {
2040             # a fk col with the name as the current node pk col exists;
2041             # use it
2042              
2043             # do nothing - current value of $fk is fine
2044             }
2045             else {
2046             # deduce actual fk column
2047             # there should only be ONE subnode fk column UNSET;
2048             # this implicitly refers to the current node
2049             my @subcolumns = $subtable->columns;
2050             my @potential_fks = ();
2051             foreach my $subcolumn (@subcolumns) {
2052             if ($self->is_fk_col($subcolumn) &&
2053             !$self->is_pk_col($subcolumn)) {
2054            
2055             # Definite foreign key
2056             if (defined $sn->sget($subcolumn)) {
2057             # already set
2058             }
2059             else {
2060             push(@potential_fks, $subcolumn);
2061             }
2062             }
2063             }
2064             trace(0, "POTENTIAL FKS: @potential_fks");
2065             if (!@potential_fks) {
2066             $self->throw("I do not know what to do with the current ".
2067             "pl val ($id). There does not appear to be ".
2068             "a $fk column in $snname, and all fks in ".
2069             "the subtable $snname are currently set");
2070             }
2071             if (@potential_fks > 1) {
2072             $self->throw("There appear to be multiple potential fks ".
2073             "[ @potential_fks ]. I do not know which ".
2074             "to choose to assign the current pk val $id".
2075             " to");
2076             }
2077             $fk = shift @potential_fks;
2078             }
2079             # -- $fk value is set
2080             $sn->set($fk, $id);
2081             # -- $fk table assigned
2082              
2083             trace(0, "NOW TIME TO STORE [curr pk val = $id] [fkcol = $fk] ", $sn->xml) if $TRACE;
2084             # store subnode, passing in info on current node
2085             $self->_storenode($sn,{parent_pk_id=>$id,
2086             parent_element=>$element,
2087             assigned_node_h=>{$fk=>1}});
2088             }
2089             } # -- end of @delayed_store
2090              
2091             if ($current_attribute_node) {
2092             if ($id) {
2093             my $macro_id = $current_attribute_node->sget_id;
2094             if ($macro_id) {
2095             $self->macro_id_h->{$macro_id} = $id;
2096             trace(0, "SETTING MACRO ID MAP: $macro_id => $id") if $TRACE;
2097             }
2098             else {
2099             }
2100             }
2101             }
2102              
2103             return $id;
2104             }
2105              
2106             # --SQL directives embedded in XML--
2107              
2108             sub _execute_sqlnode {
2109             my $self = shift;
2110             my $sqlnode = shift;
2111             if ($sqlnode->element eq '_sql') {
2112             my $dbh = $self->dbh;
2113             my $op = $sqlnode->get('@/op');
2114             my $col = $sqlnode->get('@/col');
2115             my $table = $sqlnode->get('@/from');
2116             my $match = $sqlnode->get('@/match');
2117             my @subnodes = grep {$_->element ne '@'} $sqlnode->kids;
2118             if ($op eq 'delete') {
2119             my $pkey = $sqlnode->get('@/pkey');
2120             trace(0,"deleting from $table");
2121             my @vals = map {$self->_execute_sqlnode($_)} @subnodes;
2122             # do iteratively rather than in 1 SQL stmt
2123             if (@vals) {
2124             my $sql =
2125             sprintf("SELECT $pkey FROM $table WHERE $match IN (%s)",
2126             join(", ",@vals));
2127             trace(0, "SQL: $sql");
2128             my $ids_to_delete =
2129             $dbh->selectcol_arrayref($sql); # quote
2130             foreach my $id (@$ids_to_delete) {
2131             my $delete_sql =
2132             "DELETE FROM $table WHERE $pkey=$id";
2133             trace(0,"SQL: $delete_sql");
2134             $dbh->do($delete_sql);
2135             }
2136             }
2137             }
2138             elsif ($op eq "select") {
2139             my @vals = $sqlnode->get('.');
2140             my $sql =
2141             sprintf("SELECT $col FROM $table WHERE $match IN (%s)",
2142             join(", ",map {$dbh->quote($_)} @vals));
2143             trace(0, "SQL: $sql");
2144             my $ids =
2145             $dbh->selectcol_arrayref($sql);
2146             trace(0,"id list in select: @$ids");
2147             return(@$ids);
2148             }
2149             else {
2150             $self->throw("Do not understand SQL directive: $op")
2151             }
2152             }
2153             else {
2154             return $sqlnode->data;
2155             }
2156             return;
2157             }
2158              
2159             sub _process_sql {
2160             my $self = shift;
2161             my $node = shift;
2162             my $element = $node->element;
2163             if ($element eq 'in') {
2164            
2165             }
2166             else {
2167             $self->throw("Do not understand SQL directive: $element")
2168             }
2169             }
2170              
2171             # -- QUERYING --
2172              
2173             sub rmake_nesting {
2174             my $node = shift;
2175              
2176             if ($node->element eq 'composite') {
2177             my $first = $node->getnode_first;
2178             my $second = $node->getnode_second;
2179             my $head = rmake_nesting($first->data->[0]);
2180             my $tail = rmake_nesting($second->data->[0]);
2181             if ($head->isterminal) {
2182             return
2183             Data::Stag->new($head->element => [$tail]);
2184             }
2185             $head->addkid($tail);
2186             return $head;
2187             }
2188             elsif ($node->element eq 'leaf') {
2189             my $alias = $node->get_alias;
2190             my $tn = $alias || $node->get_name;
2191             return Data::Stag->new($tn=>1);
2192             }
2193             else {
2194             die;
2195             }
2196             }
2197              
2198             # if true, a metadata tag will be added to stag nodes selected from db
2199             sub include_metadata {
2200             my $self = shift;
2201             $self->{_include_metadata} = shift if @_;
2202             return $self->{_include_metadata};
2203             }
2204              
2205              
2206             # last SQL SELECT statement executed
2207             sub last_stmt {
2208             my $self = shift;
2209             $self->{_last_stmt} = shift if @_;
2210             return $self->{_last_stmt};
2211             }
2212              
2213             sub last_sql_and_args {
2214             my $self = shift;
2215             $self->{_last_sql_and_args} = shift if @_;
2216             return $self->{_last_sql_and_args};
2217             }
2218              
2219              
2220             sub sax_handler {
2221             my $self = shift;
2222             $self->{_sax_handler} = shift if @_;
2223             return $self->{_sax_handler};
2224             }
2225              
2226              
2227             # delegates to selectall_stag and turns tree to XML
2228             sub selectall_xml {
2229             my $self = shift;
2230             my $stag = $self->selectall_stag(@_);
2231             return $stag->xml;
2232             }
2233              
2234             # delegates to selectall_stag and turns tree to SAX
2235             # (candidate for optimisation - TODO - use event firing model)
2236             sub selectall_sax {
2237             my $self = shift;
2238             my ($sql, $nesting, $h) =
2239             rearrange([qw(sql nesting handler)], @_);
2240             my $stag = $self->selectall_stag(@_);
2241             $h = $h || $self->sax_handler;
2242             if (!$h) {
2243             $self->throw("You must specify the sax handler;\n".
2244             "Either use \$dbh->sax_handler(\$h), or \n".
2245             "\$dbh->selectall_sax(-sql=>\$sql, handler->\$h)");
2246             }
2247             return $stag->sax($h);
2248             }
2249              
2250             # delegates to selectall_stag and turns tree to S-Expression
2251             sub selectall_sxpr {
2252             my $self = shift;
2253             my $stag = $self->selectall_stag(@_);
2254             return $stag->sxpr;
2255             }
2256              
2257             # does not bother decomposing and nesting the results; just
2258             # returns the denormalised table from the SQL query.
2259             # arrayref of arrayrefs - rows x cols
2260             # first row of rows is column headings
2261             sub selectall_rows {
2262             my $self = shift;
2263             my ($sql, $nesting, $bind, $template) =
2264             rearrange([qw(sql nesting bind template)], @_);
2265             my $rows =
2266             $self->selectall_stag(-sql=>$sql,
2267             -nesting=>$nesting,
2268             -bind=>$bind,
2269             -template=>$template,
2270             -return_arrayref=>1,
2271             );
2272             return $rows;
2273             }
2274              
2275             # ---------------------------------------
2276             # selectall_stag(sql, nesting)
2277             #
2278             # Takes an sql string containing a SELECT statement,
2279             # parses it to get the tree structure; this can be
2280             # overridden with the nesting optional argument.
2281             #
2282             # The SELECT statement is executed, and the relations are
2283             # transformed into a stag tree
2284             #
2285             # ---------------------------------------
2286             sub selectall_stag {
2287             my $self = shift;
2288             my ($sql, $nesting, $bind, $template, $return_arrayref, $include_metadata, $aliaspolicy) =
2289             rearrange([qw(sql nesting bind template return_arrayref include_metadata aliaspolicy)], @_);
2290             my $prep_h = $self->prepare_stag(@_);
2291             my $cols = $prep_h->{cols};
2292             my $sth = $prep_h->{sth};
2293             my $exec_args = $prep_h->{exec_args};
2294              
2295             if (!defined($include_metadata)) {
2296             $include_metadata = $self->include_metadata;
2297             }
2298              
2299             # TODO - make this event based so we don't have to
2300             # load all into memory
2301             my $rows =
2302             $self->dbh->selectall_arrayref($sth, undef, @$exec_args);
2303             if ($return_arrayref) {
2304             my @hdrs = ();
2305             for (my $i=0; $i<@$cols; $i++) {
2306             my $h = $prep_h->{col_aliases_ordered}->[$i] || $cols->[$i];
2307             push(@hdrs, $h);
2308             }
2309             return [\@hdrs, @$rows];
2310             }
2311              
2312             trace(0, sprintf("Got %d rows\n", scalar(@$rows))) if $TRACE;
2313             # --- reconstruct tree from relations
2314             my $stag =
2315             $self->reconstruct(
2316             -rows=>$rows,
2317             -cols=>$cols,
2318             -alias=>$prep_h->{alias},
2319             -nesting=>$prep_h->{nesting},
2320             -aliaspolicy=>$aliaspolicy,
2321             );
2322             if ($include_metadata) {
2323             my ($last_sql, @sql_args) = @{$self->last_sql_and_args || []};
2324             my @kids = $stag->kids;
2325             my @bind_nodes;
2326             if ($bind && ref($bind) eq 'HASH') {
2327             @bind_nodes = (stag_unflatten(argset=>[%$bind]));
2328             }
2329             unshift(@kids,
2330             [dbstag_metadata=>[
2331             [sql=>$last_sql],
2332             [nesting=>$nesting],
2333             [template=>$template],
2334             @bind_nodes,
2335             (map {[exec_arg=>$_]} @sql_args)
2336             ]]);
2337             $stag->kids(@kids);
2338             }
2339             return $stag;
2340             }
2341              
2342             sub prepare_stag {
2343             my $self = shift;
2344             my ($sql, $nesting, $bind, $template, $return_arrayref, $aliaspolicy) =
2345             rearrange([qw(sql nesting bind template return_arrayref aliaspolicy)], @_);
2346              
2347             my $parser = $self->parser;
2348              
2349             my $sth;
2350             my @exec_args = ();
2351             if (ref($sql)) {
2352             $template = $sql;
2353             }
2354             if ($template) {
2355             if (!ref($template)) {
2356             $template = $self->find_template($template);
2357             }
2358             ($sql, @exec_args) = $template->get_sql_and_args($bind);
2359             }
2360             trace 0, "parsing_sql: $sql\n";
2361              
2362             # PRE-parse SQL statement for stag-specific extensions
2363             if ($sql =~ /(.*)\s+use\s+nesting\s*(.*)/si) {
2364             my ($pre, $post) = ($1, $2);
2365             my ($extracted, $remainder) =
2366             extract_bracketed($post, '()');
2367             if ($nesting) {
2368             $self->throw("nestings clash: $nesting vs $extracted");
2369             }
2370             $nesting = Data::Stag->parsestr($extracted);
2371             $sql = "$pre $remainder";
2372             }
2373              
2374              
2375             # get the parsed SQL SELECT statement as a stag node
2376             my $stmt = $parser->selectstmt($sql);
2377             if (!$stmt) {
2378             # there was some error parsing the SQL;
2379             # DBI can probably give a better explanation.
2380             eval {
2381             my $sth = $self->dbh->prepare($sql);
2382            
2383             };
2384             if ($@) {
2385             $self->throw("SQL ERROR:\n$@");
2386             }
2387             # DBI accepted it - must be a bug in the DBStag grammar
2388             $self->throw("I'm sorry but the SQL statement you gave does\n".
2389             "not conform to the more limited subset of SQL\n".
2390             "that DBStag supports. Please see the DBStag docs\n".
2391             "for details.\n".
2392             "\n".
2393             "Remember to check you explicitly declare all aliases\n".
2394             "using AS\n\n\nSQL:$sql");
2395             }
2396              
2397              
2398             trace 0, "parsed_sql: $sql\n";
2399             # trace 0, $stmt->xml;
2400             my $dbschema = $self->dbschema;
2401              
2402             $self->last_stmt($stmt);
2403              
2404             # stag node of FROM part of SQL
2405             my $fromstruct = $stmt->get_from;
2406              
2407             # --- aliases ---
2408              
2409             # keep a hash of table aliases
2410             # KEY: table alias
2411             # VAL: base table
2412             # for example, 'SELECT * FROM person AS p'
2413             # will result in $alias_h = { p => person }
2414             my $alias_h = {};
2415              
2416             # build alias hash using FROM node
2417             foreach my $sn ($fromstruct->subnodes) {
2418             get_table_alias_map($sn, $alias_h);
2419             }
2420              
2421             # as well as an alias hash map,
2422             # keep an array of stag nodes representing all the aliases
2423             my @aliases = ();
2424             foreach my $alias (keys %$alias_h) {
2425             push(@aliases,
2426             Data::Stag->new(alias=>[
2427             [name=>$alias],
2428             [table=>$alias_h->{$alias}->[0]]
2429             ]));
2430             }
2431             my $aliasstruct = Data::Stag->new(alias=>[@aliases]);
2432              
2433             # --- nestings ---
2434             #
2435             # the cartesian product that results from a SELECT can
2436             # be turned into a tree - there is more than one tree to
2437             # choose from; eg with "x NJ y NJ z" we can have trees:
2438             # [x [y [z]]]
2439             # [x [y z]]
2440             # [z [x y]]
2441             # etc
2442             #
2443             # the actual allowed nestings through the graph is constrained
2444             # by the FK relationships; we do not utilise this yet (TODO!)
2445             # later the user need only specify the root. for now they
2446             # must specify the full nesting OR allow the bracket structure
2447             # of the joins...
2448              
2449             # if the user did not explicitly supply a nesting,
2450             # guess one from the bracket structure of the FROM
2451             # clause (see rmake_nesting)
2452             # [TODO: be more clever in guessing the nesting using FKs]
2453             if (!$nesting) {
2454             $nesting = Data::Stag->new(top=>1);
2455             # my $cons = rmake_cons($fromstruct->data->[0], $nesting);
2456             $nesting = rmake_nesting($fromstruct->data->[0]);
2457             $nesting = Data::Stag->new(top=>[$nesting]);
2458             trace(0, "\n\nNesting:\n%s\n\n", $nesting->xml) if $TRACE;
2459             }
2460             if ($nesting && !ref($nesting)) {
2461             $nesting = Data::Stag->parsestr($nesting);
2462             }
2463              
2464             # keep an array of named relations used in the query -
2465             # the named relation is the alias if present;
2466             # eg
2467             # SELECT * FROM person AS p NATURAL JOIN department
2468             # the named relations here are 'p' and 'department'
2469             my @namedrelations = ();
2470             $fromstruct->iterate(sub {
2471             my $n = shift;
2472             if ($n->element eq 'leaf') {
2473             my $v = $n->sget_alias || $n->sget_name;
2474             push(@namedrelations, $v)
2475             }
2476             });
2477              
2478             # --- fetch columns ---
2479             #
2480             # loop through all the columns in the SELECT clause
2481             # making them all of a standard form; eg dealing
2482             # with functions and '*' wildcards appropriately
2483              
2484             my @col_aliases_ordered = ();
2485             my @cols =
2486             map {
2487             # $_ iterator variable is over the columns
2488             # specified in the SELECT part of the query;
2489             # each column is represented as a stag node
2490              
2491             # column name
2492             my $name = $_->get_name;
2493              
2494             # column alias, if exists
2495             # eg in 'SELECT name AS n' the alias is 'n'
2496             my $col_alias = $_->get_alias;
2497             push(@col_aliases_ordered, $col_alias);
2498              
2499             # make the name the alias; prepend named relation if supplied.
2500             # eg in 'SELECT person.name AS n' the name will become
2501             # 'person.n'
2502             if ($col_alias) {
2503             $name = $col_alias;
2504             if ($_->get_table) {
2505             $name = $_->get_table . '.'. $name;
2506             }
2507             }
2508              
2509             my $func = $_->getnode('func');
2510              
2511             # from here on determines returned value of the
2512             # map iteration:
2513              
2514             if ($func) {
2515             # a typical column node for a function looks like
2516             # this:
2517             #
2518             # (col
2519             # (func
2520             # (name "somefunc")
2521             # (args
2522             # (col
2523             # (name "x.foo")
2524             # (table "x")))))
2525             # (alias "myname"))
2526             #
2527             # if a function is included, and the function
2528             # return value is aliased, use that alias;
2529             # otherwise ...
2530              
2531             my $funcname = $func->get_name;
2532             # query the function stag node for the element
2533             # 'col'
2534             my ($col) =
2535             $func->where('col',
2536             sub {shift->get_table});
2537             my $table = $col_alias || $funcname;
2538             if (!$col_alias) {
2539             $col_alias = $funcname;
2540             }
2541             if ($col) {
2542             $table = $col->get_table;
2543             }
2544             # if ($col_alias =~ /(\w+)__(\w+)/) {
2545             # $table = $1;
2546             # $col_alias = $2;
2547             # }
2548             $name = $table . '.' . $col_alias;
2549             # return:
2550             $name;
2551             }
2552             elsif ($name =~ /^(\w+)\.\*$/) {
2553             # if the column name is of the form
2554             # RELATION.*, then replace the * with
2555             # all the actual columns from the base relation
2556             # RELATION
2557             #
2558             # the final result will be TABLE.col1, TABLE.col2,...
2559              
2560             my $tn = $1;
2561             my $tn_alias = $tn;
2562              
2563             # use base relation name to introspect schema
2564             if ($alias_h->{$tn}) {
2565             $tn = $alias_h->{$tn}->[0];
2566             }
2567             my $tbl = $dbschema->table(lc($tn));
2568             if (!$tbl) {
2569             confess("No such table as $tn");
2570             }
2571             # introspect schema to get columns for this table
2572             my @cns = $tbl->columns;
2573              
2574             # trace(0, Dumper $tbl) if $TRACE;
2575             trace(0, "TN:$tn ALIAS:$tn_alias COLS:@cns") if $TRACE;
2576              
2577             # return:
2578             map { "$tn_alias.$_" } @cns;
2579             }
2580             elsif ($name =~ /^\*$/) {
2581             # if the column name is '*' (ie select all)
2582             # then replace the * with
2583             # all the actual columns from the base relations in
2584             # the query (use FROM clause)
2585             #
2586              
2587             my %got = ();
2588             my @allcols =
2589             map {
2590             my $tn = $_;
2591             my $baserelname =
2592             $alias_h->{$tn} ?
2593             $alias_h->{$tn}->[0] : $tn;
2594             my $tbl = $dbschema->table(lc($baserelname));
2595             if (!$tbl) {
2596             confess("Don't know anything about table:$tn\n".
2597             "Maybe DBIx::DBSchema does not work for your DBMS?\n".
2598             "If $tn is a view, you may need to modify DBIxLLDBSchema");
2599             }
2600             my @cns = $tbl->columns;
2601             # @cns = grep { !$got{$_}++ } @cns;
2602             map { "$tn.$_"} @cns;
2603             } @namedrelations;
2604              
2605             # This is a bit hacky; if the user specifies
2606             # SELECT * FROM... then there is no way
2607             # to introspect the actual column returned
2608             # using DBI->selectall_arrayref
2609             #
2610             # maybe we should selectall_hashref
2611             # instead? this is generally slower; also
2612             # even if we get it with a hashref, the
2613             # result can be ambiguous since DBI only
2614             # gives us the colun names back
2615             #
2616             # to get round this we just replace the *
2617             # in the user's query (ie in the actual SQL)
2618             # with the full column list
2619             my $replace = join(', ', @allcols);
2620             # rewrite SQL statement; assum only one instance of
2621             # string '*' in these cases
2622             $sql =~ s/\*/$replace/;
2623             # return:
2624             @allcols;
2625             }
2626             else {
2627             # no * wildcard in column, and not a function;
2628             # just give back the node
2629              
2630             # return:
2631             $name
2632             }
2633             } $stmt->sgetnode_cols->getnode_col;
2634              
2635             @cols =
2636             map {
2637             if (/(\w+)__(\w+)/) {
2638             "$1.$2";
2639             }
2640             else {
2641             $_
2642             }
2643             } @cols;
2644              
2645             # ---- end of column fetching ---
2646              
2647             trace(0, "COLS:@cols") if $TRACE;
2648              
2649              
2650              
2651             # --- execute SQL SELECT statement ---
2652             if ($template) {
2653             $sth = $template->cached_sth->{$sql};
2654             if (!$sth) {
2655             $sth = $self->dbh->prepare($sql);
2656             $template->cached_sth->{$sql} = $sth;
2657             }
2658             # ($sql, $sth, @exec_args) =
2659             # $template->prepare($self->dbh, $bind);
2660             }
2661             else {
2662             $sth = $self->dbh->prepare($sql);
2663             }
2664             my $sql_or_sth = $sql;
2665             if ($sth) {
2666             $sql_or_sth = $sth;
2667             }
2668             trace(0, "SQL:$sql") if $TRACE;
2669             trace(0, "Exec_args: @exec_args") if $TRACE && @exec_args;
2670             $self->last_sql_and_args([$sql, @exec_args]);
2671             return
2672             {
2673             sth=>$sth,
2674             exec_args=>\@exec_args,
2675             cols=>\@cols,
2676             col_aliases_ordered=>\@col_aliases_ordered,
2677             alias=>$aliasstruct,
2678             nesting=>$nesting
2679             };
2680             }
2681              
2682              
2683             # ============================
2684             # get_table_alias_map(tablenode, alias hash)
2685             #
2686             # checks a tablenode (eg the stag representing
2687             # a table construct in the FROM clause) and adds
2688             # it to the alias hash if it specifies an alias
2689             # ============================
2690             sub get_table_alias_map {
2691             my $s = shift;
2692             my $h = shift;
2693              
2694             # the FROM clause is natively stored as a binary tree
2695             # (in order to group the joins by brackets) - recursively
2696             # descend building the hash map
2697              
2698             if ($s->name eq 'leaf') {
2699             my $alias = $s->get_alias;
2700             if ($alias) {
2701             $h->{$alias} = [$s->get_name];
2702             }
2703             return ($s->get_name);
2704             }
2705             elsif ($s->name eq 'composite') {
2706             my ($first, $second) =
2707             ($s->getnode_first,
2708             $s->getnode_second);
2709             my $alias = $s->get_alias;
2710             my @sn = ($first->subnodes, $second->subnodes);
2711             my @subtbls = map {
2712             get_table_alias_map($_, $h),
2713             } @sn;
2714             if ($alias) {
2715             $h->{$alias} = [@subtbls];
2716             }
2717             return @subtbls;
2718             }
2719             else {
2720             confess $s->name;
2721             }
2722             }
2723              
2724             # ============================
2725             # reconstruct(schema, rows, top, cols, constraints, nesting, aliasstruct)
2726             #
2727             # mainly called by: selectall_stag(...)
2728             #
2729             # takes an array of rows (ie the result of an SQL query, probably
2730             # involving JOINs, which is a denormalised relation) and
2731             # decomposes this relation into a tree structure
2732             #
2733             # in order to do this, it requires schema information, and a nesting
2734             # through the implicit result graph to build a tree
2735             # ============================
2736             sub reconstruct {
2737             my $self = shift;
2738             my $tree = Data::Stag->new();
2739             my ($schema, # OPTIONAL - meta data on relation
2740             $rows, # REQUIRED - relation R - array-of-array
2741             $top, # OPTIONAL - root node name
2742             $cols, # REQUIRED - array of stag nodes per column of R
2743             $constraints, # NOT USED!!!
2744             $nesting, # REQUIRED - tree representing decomposed schema
2745             $aliasstruct, # OPTIONAL - renaming of columns in R
2746             $aliaspolicy) =
2747             rearrange([qw(schema
2748             rows
2749             top
2750             cols
2751             constraints
2752             nesting
2753             alias
2754             aliaspolicy)], @_);
2755              
2756             $aliaspolicy = 'nest' unless $aliaspolicy;
2757              
2758             # --- get the schema ---
2759             #
2760             # $schema is a stag representing the schema
2761             # of the input releation R (not the schema of
2762             # the db that produced it.... hmm, this could
2763             # be misleading)
2764             #
2765             # it conforms to the following stag-struct:
2766             #
2767             #'(schema
2768             # (top? "RECORDSET-ELEMENT-NAME")
2769             # (cols?
2770             # (col+
2771             # (relation "RELATION-NAME")
2772             # (name "COLUMN-NAME")
2773             # ))
2774             # (nesting?
2775             # (* "NESTING-TREE")))
2776             #
2777             # each column represents the
2778            
2779             if (!$schema) {
2780             $schema = $tree->new(schema=>[]);
2781             }
2782             if (!ref($schema)) {
2783             # it is a string - parse it
2784             # (assume sxpr)
2785             $schema = $tree->from('sxprstr', $schema);
2786             }
2787              
2788             # TOP - this is the element name
2789             # to group the structs under.
2790             # [override if specified explicitly]
2791             if ($top) {
2792             stag_set($schema, 'top', $top);
2793             }
2794             # $top = $schema->get_top || "set";
2795             if (!$top) {
2796             if ($nesting) {
2797             # use first element in nesting
2798             $top = $nesting->element;
2799             }
2800             else {
2801             $top = 'set';
2802             }
2803             }
2804             my $topstruct = $tree->new($top, []);
2805              
2806             # COLS - this is the columns (attribute names)
2807             # in the order they appear
2808             # [override if specified explicitly]
2809             if ($cols) {
2810             my @ncols =
2811             map {
2812             if (ref($_)) {
2813             $_
2814             }
2815             else {
2816             # presume it's a string
2817             # format = RELATION.ATTRIBUTENAME
2818             if (/(\w+)\.(\w+)/) {
2819             $tree->new(col=>[
2820             [relation=>$1],
2821             [name=>$2]]);
2822             }
2823             elsif (/(\w+)/) {
2824             confess("Not implemented yet - must specify tbl for $_");
2825             $tree->new(col=>[
2826             [relation=>'unknown'],
2827             [name=>$2]]);
2828             }
2829             else {
2830             confess "I am confused by this column: $_";
2831             }
2832             }
2833             } @$cols;
2834             $schema->set_cols([@ncols]);
2835             }
2836              
2837              
2838             # NESTING - this is the tree structure in
2839             # which the relations are structured
2840             # [override if specified explicitly]
2841             if ($nesting) {
2842             if (ref($nesting)) {
2843             }
2844             else {
2845             $nesting = $tree->from('sxprstr', $nesting);
2846             }
2847             $schema->set_nesting([$nesting]);
2848             }
2849             else {
2850             $nesting = $schema->sgetnode_nesting;
2851             }
2852             if (!$nesting) {
2853             confess("no nesting!");
2854             }
2855              
2856             # --- alias structure ---
2857             #
2858             # use this to get a hash map of alias => baserelation
2859              
2860             ($aliasstruct) = $schema->getnode_aliases unless $aliasstruct;
2861             if ($aliasstruct && !ref($aliasstruct)) {
2862             $aliasstruct = $tree->from('sxprstr', $aliasstruct);
2863             }
2864             my @aliases = ();
2865             if ($aliasstruct && $aliaspolicy !~ /^a/i) {
2866             @aliases = $aliasstruct->getnode_alias;
2867             }
2868             my %alias2baserelation =
2869             map {
2870             $_->sget_name => $_->sget_table
2871             } @aliases;
2872              
2873             # column headings; (ie all columns in R)
2874             my @cols = $schema->sgetnode_cols->getnode_col();
2875              
2876             # --- primary key info ---
2877              
2878             # set the primary key for each relation (one per relation);
2879             # the default is *all* the columns in that relation
2880             my %pkey_by_relationname = (); # eg {person => [person_id]
2881             my %cols_by_relationname = (); # eg {person => [person_id, fname, lname]
2882              
2883             # loop through all columns in R, setting above hash maps
2884             foreach my $col (@cols) {
2885              
2886             # the stag struct for each $col looks like this:
2887             #
2888             # (col+
2889             # (relation "RELATION-NAME")
2890             # (name "COLUMN-NAME")
2891             # ))
2892            
2893             my $relationname = $col->get_relation;
2894             my $colname = $col->get_name;
2895              
2896             # pkey defaults to all columns in a relation
2897             # (we may override this later)
2898             $pkey_by_relationname{$relationname} = []
2899             unless $pkey_by_relationname{$relationname};
2900             push(@{$pkey_by_relationname{$relationname}},
2901             $colname);
2902              
2903             # all columns in a relation
2904             # (note: same as default PK)
2905             $cols_by_relationname{$relationname} = []
2906             unless $cols_by_relationname{$relationname};
2907             push(@{$cols_by_relationname{$relationname}},
2908             $colname);
2909             }
2910             my @relationnames = keys %pkey_by_relationname;
2911              
2912             # override PK if explicitly set as a constraint
2913             my @pks = $schema->findnode("primarykey");
2914             foreach my $pk (@pks) {
2915              
2916             # $pk looks like this:
2917             #
2918             # '(primarykey
2919             # (relation "R-NAME")
2920             # (col+ "COL-NAME"))
2921              
2922             my $relationname = $pk->get_relation;
2923             my @cols = $pk->get_col;
2924              
2925             # the hash %pkey_by_relationname should
2926             # be keyed by the named relations, not the
2927             # base relations
2928             my @aliasnames =
2929             grep {
2930             $alias2baserelation{$_} eq $relationname
2931             } keys %alias2baserelation;
2932              
2933             # relation is not aliased
2934             if (!@aliasnames) {
2935             @aliasnames = ($relationname);
2936             }
2937             foreach (@aliasnames) {
2938             $pkey_by_relationname{$_} = [@cols];
2939             }
2940             }
2941              
2942             # ------------------
2943             #
2944             # loop through denormalised rows,
2945             # putting the columns into their
2946             # respecive relations
2947             #
2948             # eg
2949             #
2950             # <----- a -----> <-- b -->
2951             # a.1 a.2 a.3 b.1 b.2
2952             #
2953             # algorithm:
2954             # use nesting/tree to walk through
2955             #
2956             # ------------------
2957              
2958             #~~~ keep a hash of all relations by their primary key vals
2959             #~~~ outer key = relationname
2960             #~~~ inner key = pkval
2961             #~~~ hash val = relation structure
2962             #~~~ my %all_relation_hh = ();
2963             #~~~ foreach my $relationname (@relationnames) {
2964             #~~~ $all_relation_hh{$relationname} = {};
2965             #~~~ }
2966              
2967             #~~~ keep an array of all relations
2968             #~~~ outer key = relationname
2969             #~~~ inner array = ordered list of relations
2970             #~~~ my %all_relation_ah = ();
2971             #~~~ foreach my $relationname (keys %pkey_by_relationname) {
2972             #~~~ $all_relation_ah{$relationname} = [];
2973             #~~~ }
2974              
2975             # start at top of nesting tree
2976             #
2977             # a typical nesting tree may look like this:
2978             #
2979             # '(tableA
2980             # (tableB "1")
2981             # (tableC
2982             # (tableD "1")))
2983             #
2984             # terminals ie "1" are ignored
2985              
2986             my ($first_in_nesting) = $nesting->subnodes;
2987             if (!$first_in_nesting) {
2988             $first_in_nesting = $nesting;
2989             }
2990             my $fipname = $first_in_nesting ? $first_in_nesting->name : '';
2991              
2992             # recursive hash representing tree
2993             #
2994             # $record =
2995             # {child_h => {
2996             # $relation_name* => {
2997             # $pk_val => $record
2998             # }
2999             # },
3000             # struct => $stag_obj
3001             # }
3002             #
3003             # this is recursively constructed using the make_a_tree() method
3004             # below. the nesting tree (see above) is traversed depth first,
3005             # constructing both the child_h hash and the resulting Stag
3006             # structure.
3007              
3008             my $top_record_h =
3009             {
3010             child_h=>{ $fipname ? ($fipname=>{}) : () },
3011             struct=>$topstruct
3012             };
3013             # loop through rows in R
3014             foreach my $row (@$rows) {
3015             my @colvals = @$row;
3016              
3017             # keep a record of all table names in
3018             # this row from R
3019             my %current_relation_h = ();
3020             for (my $i=0; $i<@cols; $i++) {
3021             my $colval = $colvals[$i];
3022             my $col = $cols[$i];
3023             my $relationname = $col->get_relation;
3024             my $colname = $col->get_name;
3025             my $relation = $current_relation_h{$relationname};
3026             if (!$relation) {
3027             $relation = {};
3028             $current_relation_h{$relationname} = $relation;
3029             }
3030             $relation->{$colname} = $colval;
3031             }
3032              
3033             # print "ROW=@$row\n";
3034             # dmp(\%pkey_by_relationname);
3035             # dmp($top_record_h);
3036              
3037             # we now have a hash of hashes -
3038             # outer keyed by relation id
3039             # inner keyed by relation attribute name
3040            
3041             # traverse depth first down nesting;
3042             # add new nodes as children of the parent
3043             $self->make_a_tree($tree,
3044             $top_record_h,
3045             $first_in_nesting,
3046             \%current_relation_h,
3047             \%pkey_by_relationname,
3048             \%cols_by_relationname,
3049             \%alias2baserelation,
3050             $aliaspolicy);
3051             }
3052             return $topstruct;
3053             }
3054             *norm = \&reconstruct;
3055             *normalise = \&reconstruct;
3056             *normalize = \&reconstruct;
3057              
3058             # ============================
3059             # make_a_tree(...) RECURSIVE
3060             #
3061             # called by: reconstruct(...)
3062             #
3063             # ============================
3064             sub make_a_tree {
3065             my $self = shift;
3066             my $tree = shift;
3067             my $parent_rec_h = shift;
3068             my $node = shift;
3069             my %current_relation_h= %{shift ||{}};
3070             my %pkey_by_relationname = %{shift ||{}};
3071             my %cols_by_relationname = %{shift ||{}};
3072             my %alias2baserelation = %{shift ||{}};
3073             my $aliaspolicy = shift;
3074              
3075             my $relationname = $node->name;
3076             my $relationrec = $current_relation_h{$relationname};
3077             my $pkcols = $pkey_by_relationname{$relationname};
3078             my $rec; # this is the next node down in the hash tree
3079              
3080             if (!$pkcols || !@$pkcols) {
3081             # if we have no columns for a particular part of
3082             # the nesting through the relation, it means it
3083             # was ommitted from the select clause - just skip
3084             # this part of the nesting.
3085             #
3086             # for example: SELECT a.*, b.* FROM a NJ a_to_b NJ b
3087             # the default nesting will be: [a [a_to_b [b]]]
3088             # the relation R will have columns:
3089             # a.c1 a.c2 b.c1 b.c2
3090             #
3091             # we want to build a resulting structure like this:
3092             # (a
3093             # (c1 "x") (c2 "y")
3094             # (b
3095             # (c1 "a") (c2 "b")))
3096             #
3097             # so we just miss out a_to_b in the nesting, because it
3098             # has no columns in the relation R.
3099             $rec = $parent_rec_h;
3100             }
3101             else {
3102              
3103             my $pkval =
3104             CORE::join("\t",
3105             map {
3106             esctab($relationrec->{$_} || '')
3107             } @$pkcols);
3108              
3109             $rec = $parent_rec_h->{child_h}->{$relationname}->{$pkval};
3110              
3111             if (!$rec) {
3112             my $relationcols = $cols_by_relationname{$relationname};
3113             my $has_non_null_val = grep {defined($relationrec->{$_})} @$relationcols;
3114             return unless $has_non_null_val;
3115             my $relationstruct =
3116             $tree->new($relationname=>[
3117             map {
3118             defined($relationrec->{$_}) ? [$_ => $relationrec->{$_}] : ()
3119             } @$relationcols
3120             ]);
3121             my $parent_relationstruct = $parent_rec_h->{struct};
3122             if (!$parent_relationstruct) {
3123             confess("no parent for $relationname");
3124             }
3125            
3126             # if we have an aliased relation, add an extra
3127             # level of nesting
3128             my $baserelation = $alias2baserelation{$relationname};
3129             if ($baserelation) {
3130              
3131             # $aliaspolicy eq 'nest' or 't'
3132             # nest base relations inside an alias node
3133             # OR use table name in place of alias name
3134             if ($aliaspolicy =~ /^t/i) {
3135             stag_add($parent_relationstruct,
3136             $baserelation,
3137             $relationstruct->data);
3138             }
3139             else {
3140             # nest
3141             my $baserelationstruct =
3142             Data::Stag->new($baserelation =>
3143             $relationstruct->data);
3144             stag_add($parent_relationstruct,
3145             $relationname,
3146             [$baserelationstruct]);
3147             }
3148             } else {
3149             # either no aliases, or $aliaspolicy eq 'a'
3150             # (in which case columns already mapped to aliases)
3151             stag_add($parent_relationstruct,
3152             $relationstruct->name,
3153             $relationstruct->data);
3154             }
3155             $rec =
3156             {struct=>$relationstruct,
3157             child_h=>{}};
3158             foreach ($node->subnodes) {
3159             # keep index of children by PK
3160             $rec->{child_h}->{$_->name} = {};
3161             }
3162             $parent_rec_h->{child_h}->{$relationname}->{$pkval} = $rec;
3163             }
3164             }
3165             foreach ($node->subnodes) {
3166             $self->make_a_tree($tree,
3167             $rec,
3168             $_,
3169             \%current_relation_h,
3170             \%pkey_by_relationname,
3171             \%cols_by_relationname,
3172             \%alias2baserelation,
3173             $aliaspolicy);
3174             }
3175             }
3176              
3177              
3178             # -------- GENERAL SUBS -----------
3179              
3180             sub esctab {
3181             my $w=shift;
3182             $w =~ s/\t/__MAGICTAB__/g;
3183             $w;
3184             }
3185              
3186             sub makesql {
3187             my $self = shift;
3188             my ($table,
3189             $where,
3190             $select,
3191             $order,
3192             $group,
3193             $distinct) =
3194             rearrange([qw(table
3195             where
3196             select
3197             order
3198             group
3199             distinct)], @_);
3200              
3201             confess("must specify table") unless $table;
3202              
3203             # array of tables
3204             if (ref($table)) {
3205             if (ref($table) eq "HASH") {
3206             $table =
3207             [
3208             map {
3209             "$table->{$_} AS $_"
3210             } keys %$table
3211             ];
3212             }
3213             }
3214             else {
3215             $table = [$table];
3216             }
3217              
3218             $where = [] unless $where;
3219             # array of ANDed where clauses
3220             if (ref($where)) {
3221             if (ref($where) eq "HASH") {
3222             $where =
3223             [
3224             map {
3225             "$_ = ".$self->quote($where->{$_})
3226             } keys %$where
3227             ];
3228             }
3229             }
3230             else {
3231             $where = [$where];
3232             }
3233              
3234             $select = ['*'] unless $select;
3235             # array of SELECT cols
3236             if (ref($select)) {
3237             if (ref($select) eq "HASH") {
3238             $select =
3239             [
3240             map {
3241             "$select->{$_} AS $_"
3242             } keys %$select
3243             ];
3244             }
3245             }
3246             else {
3247             $select = [$select];
3248             }
3249              
3250             $order = [] unless $order;
3251             # array of order tables
3252             if (ref($order)) {
3253             if (ref($order) eq "HASH") {
3254             confess("order must be an array");
3255             }
3256             }
3257             else {
3258             $order = [$order];
3259             }
3260              
3261             $group = [] unless $group;
3262             # array of group tables
3263             if (ref($group)) {
3264             if (ref($group) eq "HASH") {
3265             confess("group must be an array");
3266             }
3267             }
3268             else {
3269             $group = [$group];
3270             }
3271              
3272             $distinct = $distinct ? '' : ' DISTINCT';
3273             my $sql =
3274             sprintf("SELECT%s %s FROM %s%s%s",
3275             $distinct,
3276             join(', ', @$select),
3277             join(', ', @$table),
3278             (scalar(@$where) ?
3279             ' WHERE '.join(' AND ', @$where) : ''),
3280             (scalar(@$group) ?
3281             ' GROUP BY '.join(', ', @$group) : ''),
3282             (scalar(@$order) ?
3283             ' ORDER BY '.join(', ', @$order) : ''),
3284             );
3285             return $sql;
3286             }
3287              
3288              
3289              
3290             sub selectval {
3291             my $self = shift;
3292             trace(0, "@_") if $TRACE;
3293             return $self->dbh->selectcol_arrayref(@_)->[0];
3294             }
3295              
3296             sub insertrow {
3297             my $self = shift;
3298             my ($table, $colvalh, $pkcol) = @_;
3299            
3300             my $driver = $self->dbh->{Driver}->{Name};
3301             my @cols = keys %$colvalh;
3302             my @vals =
3303             map {
3304             defined($_) ? $colvalh->{$_} : undef
3305             } @cols;
3306             my @placeholders = map { '?' } @vals;
3307             my $sql =
3308             sprintf("INSERT INTO %s (%s) VALUES (%s)",
3309             $table,
3310             join(", ", @cols),
3311             #join(", ", @vals),
3312             join(", ", @placeholders),
3313             );
3314             if (!@cols) {
3315             $sql = "INSERT INTO $table DEFAULT VALUES";
3316             }
3317              
3318             trace(0, "SQL:$sql") if $TRACE;
3319             my $succeeded = 0;
3320             eval {
3321             my $sth = $self->dbh->prepare($sql);
3322             my $rval = $sth->execute(@vals);
3323             $succeeded = 1 if defined $rval;
3324             };
3325             if ($@) {
3326             if ($self->force) {
3327             # what about transactions??
3328             $self->warn("IN SQL: $sql\nWARNING: $@\n");
3329             return;
3330             }
3331             else {
3332             confess $@;
3333             }
3334             }
3335             return unless $succeeded;
3336             my $pkval;
3337             if ($pkcol) {
3338             # primary key value may have been specified in the xml
3339             # (this is necessary for non-surrogate pks in tables that
3340             # are to be linked to via foreign keys)
3341             $pkval = $colvalh->{$pkcol};
3342              
3343             # pk was not supplied - perhaps this is a SERIAL/AUTO_INCREMENT
3344             # column (ie surrogate integer primary key)
3345             if (!$pkval) {
3346             # assume pk is a SERIAL / AUTO_INCREMENT
3347             if ($driver eq 'Pg') {
3348             my $seqn = sprintf("%s_%s_seq",
3349             $table,
3350             $pkcol);
3351             $pkval = $self->selectval("select currval('$seqn')");
3352             trace(0, "CURRVAL $seqn = $pkval [Pg]") if $TRACE;
3353             }
3354             # this doesn't work on older
3355             # versions of DBI/DBD::mysql
3356             # seems to have been fixed Oct 2004 release
3357             elsif ($driver eq 'mysql') {
3358             $pkval = $self->dbh->last_insert_id(undef,undef,$table,$pkcol);
3359             trace(0, "CURRVAL mysql_insert_id $pkval [mysql]") if $TRACE;
3360             }
3361             else {
3362             $pkval = $self->selectval("select max($pkcol) from $table");
3363             }
3364             }
3365             trace(0, "PKVAL = $pkval") if $TRACE;
3366             }
3367             return $pkval;
3368             }
3369              
3370             sub updaterow {
3371             my $self = shift;
3372             my ($table, $set, $where) = @_;
3373              
3374             confess("must specify table") unless $table;
3375              
3376             my $dbh = $self->dbh;
3377              
3378             # array of WHERE cols
3379             if (ref($where)) {
3380             if (ref($where) eq "HASH") {
3381             $where =
3382             [
3383             map {
3384             "$_ = ".$dbh->quote($where->{$_})
3385             } keys %$where
3386             ];
3387             }
3388             }
3389             else {
3390             $where = [$where];
3391             }
3392             confess("must specify constraints") unless @$where;
3393              
3394             confess("must set update vals") unless $set;
3395             my @bind = ();
3396             # array of SET colvals
3397             if (ref($set)) {
3398             if (ref($set) eq "HASH") {
3399             $set =
3400             [
3401             map {
3402             push(@bind, defined $set->{$_} ? $set->{$_} : 'NULL');
3403             "$_ = ?"
3404             } keys %$set
3405             ];
3406             }
3407             }
3408             else {
3409             $set = [$set];
3410             }
3411            
3412             my $sql =
3413             sprintf("UPDATE %s SET %s WHERE %s",
3414             $table,
3415             join(', ', @$set),
3416             join(' AND ', @$where),
3417             );
3418             trace(0, "SQL:$sql [",join(', ',@bind)."]") if $TRACE;
3419              
3420             my $sth = $dbh->prepare($sql) || confess($sql."\n\t".$dbh->errstr);
3421             return $sth->execute(@bind) || confess($sql."\n\t".$sth->errstr);
3422             }
3423              
3424             sub deleterow {
3425             my $self = shift;
3426             my ($table, $where) = @_;
3427              
3428             confess("must specify table") unless $table;
3429              
3430             my $dbh = $self->dbh;
3431              
3432             # array of WHERE cols
3433             if (ref($where)) {
3434             if (ref($where) eq "HASH") {
3435             $where =
3436             [
3437             map {
3438             "$_ = ".$dbh->quote($where->{$_})
3439             } keys %$where
3440             ];
3441             }
3442             }
3443             else {
3444             $where = [$where];
3445             }
3446             confess("must specify constraints") unless @$where;
3447              
3448             my $sql =
3449             sprintf("DELETE FROM %s WHERE %s",
3450             $table,
3451             join(' AND ', @$where),
3452             );
3453             trace(0, "SQL:$sql") if $TRACE;
3454              
3455             my $sth = $dbh->prepare($sql) || confess($sql."\n\t".$dbh->errstr);
3456             return $sth->execute() || confess($sql."\n\t".$sth->errstr);
3457             }
3458              
3459             #$::RD_HINT = 1;
3460              
3461             $::RD_AUTOACTION = q { [@item] };
3462             sub selectgrammar {
3463             return q[
3464              
3465             {
3466             use Data::Dumper;
3467             use Data::Stag;
3468             sub N {
3469             Data::Stag->new(@_);
3470             }
3471             }
3472             ]
3473             .
3474             q[
3475              
3476             selectstmts: selectstmt ';' selectstmts
3477             selectstmts: selectstmt
3478             # selectstmt: /select/i selectcols /from/i fromtables
3479             selectstmt: /select/i selectq(?) selectcols /from/i fromtables where(?) group(?) having(?) combiner(?) order(?) limit(?) offset(?)
3480             {
3481             N(select => [
3482             [qual => $item{'selectq'}[0]],
3483             [cols => $item[3]],
3484             [from => $item[5]],
3485             # [where => $item[6]],
3486             # [group => $item{'group'}[0]],
3487             # [having => $item{'having'}[0]],
3488             ]);
3489             }
3490             |
3491             selectq: /all/i | /distinct/i
3492             { $item[1] }
3493             |
3494             # as: /\s+as\s+/i
3495             as: /as/i
3496             selectcols: selectexpr /\,/ selectcols
3497             { [$item[1], @{$item[3]}] }
3498             |
3499             selectcols: selectexpr
3500             { [$item[1]] }
3501             |
3502             selectexpr: bselectexpr as aliasname
3503             {
3504             my $col = $item{bselectexpr};
3505             $col->set_alias($item{aliasname}->[1]);
3506             $col;
3507             }
3508             |
3509             selectexpr: bselectexpr
3510             { $item[1] }
3511             |
3512             bselectexpr: funccall
3513             { $item[1] }
3514             |
3515             bselectexpr: selectcol
3516             { $item[1] }
3517             |
3518              
3519             selectcol: brackselectcol operator selectcol
3520             {
3521             N(col=>[
3522             [func => [
3523             [name => $item[2]->[1]],
3524             [args => [$item[1],$item[3]]]
3525             ]
3526             ]
3527             ]);
3528             }
3529             ### { $item[1]}
3530             |
3531             selectcol: brackselectcol
3532             { $item[1]}
3533             |
3534              
3535             brackselectcol: '(' selectcol ')'
3536             { $item[2]}
3537             |
3538              
3539             brackselectcol: bselectcol
3540             { $item[1]}
3541             |
3542              
3543             bselectcol: /(\w+)\.(\w+)/
3544             { N(col=>[
3545             [name => $item[1]],
3546             [table=>$1],
3547             ])
3548             }
3549             |
3550             bselectcol: /(\w+)\.\*/
3551             { N(col=>[
3552             [name => $item[1]],
3553             [table=>$1],
3554             ])
3555             }
3556             |
3557             bselectcol: /\*/
3558             { N(col=>[
3559             [name => $item[1]]
3560             ])
3561             }
3562             |
3563             bselectcol: /\w+/
3564             { N(col=>[
3565             [name => $item[1]]
3566             ])
3567             }
3568             |
3569             bselectcol: expr
3570             { N(col=>[
3571             [expr => $item[1]]
3572             ]) }
3573             |
3574             funccall: funcname '(' distinct(?) selectcols ')'
3575             {
3576             my $col = N(col=>[
3577             [func => [
3578             [name => $item[1]->[1]],
3579             [args => $item[4]]
3580             ]
3581             ]
3582             ]);
3583             $col;
3584             }
3585             |
3586              
3587             distinct: /distinct/i
3588              
3589             operator: '+' | '-' | '*' | '/' | '||'
3590            
3591              
3592             fromtables: jtable
3593             { [$item[1]] }
3594             |
3595             jtable: join_jtable
3596             { $item[1] }
3597             |
3598             join_jtable: qual_jtable jointype join_jtable
3599             {
3600             shift @{$item[2]};
3601             my $j =
3602             N(composite=>[
3603             [ctype=>"@{$item[2]}"],
3604             [first=>[$item[1]]],
3605             [second=>[$item[3]]]
3606             ]);
3607             $j;
3608             }
3609             |
3610             join_jtable: qual_jtable
3611             { $item[1] }
3612             |
3613             qual_jtable: alias_jtable joinqual
3614             {
3615             my $j = $item[1];
3616             $j->setnode_qual($item[2]);
3617             $j;
3618             }
3619             |
3620             qual_jtable: alias_jtable
3621             { $item[1] }
3622             |
3623             alias_jtable: brack_jtable /as\s+/i aliasname
3624             {
3625             my $j = $item[1];
3626             $j->set_alias($item[3][1]);
3627             $j;
3628             }
3629             |
3630             alias_jtable: brack_jtable
3631             { $item[1] }
3632             |
3633             brack_jtable: '(' jtable ')'
3634             { $item[2] }
3635             |
3636             brack_jtable: table
3637             { N(leaf=>[[name=>$item[1]->[1]]]) }
3638             |
3639              
3640             joinqual: /on\s+/i bool_expr
3641             { N(qual => [
3642             [type=>'on'],
3643             [expr=>"@{$item[2]}"]
3644             ])
3645             }
3646             |
3647             joinqual: /using\s+/i '(' cols ')'
3648             { N(qual =>[
3649             [type=>'using'],
3650             [expr=>"@{$item[3]}"]
3651             ])
3652             }
3653             |
3654              
3655             table: tablename
3656             { $item[1] }
3657             |
3658              
3659             funcname: /\w+/
3660             tablename: /\w+/
3661             aliasname: /\w+/
3662              
3663              
3664             cols: col(s)
3665             col: /\w+\.\w+/
3666             col: /\w+/
3667              
3668             jointype: /\,/
3669             jointype: /natural/i bjointype /join/i
3670             jointype: /natural/i /join/i
3671             jointype: bjointype /join/i
3672             jointype: /join/i
3673             bjointype: /inner/i
3674             bjointype: lrf(?) /outer/i
3675             lrf: /left/i | /right/i | /full/i
3676             bjointype: /cross/i
3677              
3678             number: float | int
3679             float: /\d*\.?\d+/ 'e' sign int
3680             float: /\d*\.\d+/
3681             int: /\d+/
3682             string: /\'.*?\'/
3683             sign: '+' | '-'
3684            
3685             exprs: '(' exprs ')'
3686             exprs: expr ',' exprs
3687             exprs: expr
3688              
3689             # bool_expr - eg in where clause
3690             bool_expr: not_bool_expr boolop bool_expr | not_bool_expr
3691             not_bool_expr: '!' brack_bool_expr | brack_bool_expr
3692             brack_bool_expr: '(' bool_expr ')' | bool_exprprim
3693             bool_exprprim: boolval | expr
3694             boolval: /true/i | /false/i | /null/i
3695              
3696             expr: brack_expr op expr | brack_expr
3697             brack_expr: '(' expr ')' | exprprim
3698             exprprim: col | val
3699             val: number | string
3700            
3701             op: /not\s+/i /like\s+/i
3702             op: /like\s+/i
3703             op: /is\s+/i /not\s+/i
3704             op: /is\s+/i
3705             op: '=' | '!=' | '<>' | '<=' | '>=' | '<' | '>'
3706             boolop: /and\s+/i | /or\s+/i | /not\s+/i
3707              
3708             # where: /where/i /.*/
3709             where: /where/i bool_expr
3710             group: /group/i /by/i exprs
3711             having: /having/i /.*/
3712             combiner: combinekwd selectstmt
3713             combinekwd: /union/i | /intersect/i | /update/i
3714             order: /order/i /by/i orderexprs
3715             orderexprs: orderexpr ',' orderexprs
3716             orderexprs: orderexpr
3717             orderexpr: expr /asc/i
3718             orderexpr: expr /desc/i
3719             orderexpr: expr /using/i op
3720             orderexpr: expr
3721             limit: /limit/i /\w+/
3722             offset: /offset/i /\d+/
3723             ];
3724             }
3725              
3726             no strict 'refs';
3727             sub AUTOLOAD {
3728             my $self = shift;
3729             my @args = @_;
3730              
3731             my $name = $AUTOLOAD;
3732             $name =~ s/.*://; # strip fully-qualified portion
3733              
3734             if ($name eq "DESTROY") {
3735             # we dont want to propagate this!!
3736             return;
3737             }
3738            
3739             unless ($self->isa("DBIx::DBStag")) {
3740             confess("no such subroutine $name");
3741             }
3742             if ($self->dbh) {
3743             if ($TRACE) {
3744             # the following check may impair performance
3745             if (grep { ref($_) } @args) {
3746             $self->throw("cannot quote @args");
3747             }
3748             }
3749             if ($self->dbh->can($name)) {
3750             return $self->dbh->$name(@args);
3751             }
3752             }
3753             confess("no such method:$name)");
3754             }
3755              
3756             sub rearrange {
3757             my($order,@param) = @_;
3758              
3759             # If there are no parameters, we simply wish to return
3760             # an undef array which is the size of the @{$order} array.
3761             return (undef) x $#{$order} unless @param;
3762              
3763             # If we've got parameters, we need to check to see whether
3764             # they are named or simply listed. If they are listed, we
3765             # can just return them.
3766             return @param unless (defined($param[0]) && $param[0]=~/^-/);
3767              
3768             # Now we've got to do some work on the named parameters.
3769             # The next few lines strip out the '-' characters which
3770             # preceed the keys, and capitalizes them.
3771             my $i;
3772             for ($i=0;$i<@param;$i+=2) {
3773             if (!defined($param[$i])) {
3774             cluck("Hmmm in $i ".CORE::join(";", @param)." == ".CORE::join(";",@$order)."\n");
3775             }
3776             else {
3777             $param[$i]=~s/^\-//;
3778             $param[$i]=~tr/a-z/A-Z/;
3779             }
3780             }
3781            
3782             # Now we'll convert the @params variable into an associative array.
3783             my(%param) = @param;
3784              
3785             my(@return_array);
3786            
3787             # What we intend to do is loop through the @{$order} variable,
3788             # and for each value, we use that as a key into our associative
3789             # array, pushing the value at that key onto our return array.
3790             my($key);
3791              
3792             foreach $key (@{$order}) {
3793             $key=~tr/a-z/A-Z/;
3794             my($value) = $param{$key};
3795             delete $param{$key};
3796             push(@return_array,$value);
3797             }
3798            
3799             # catch user misspellings resulting in unrecognized names
3800             my(@restkeys) = keys %param;
3801             if (scalar(@restkeys) > 0) {
3802             confess("@restkeys not processed in rearrange(), did you use a
3803             non-recognized parameter name ? ");
3804             }
3805             return @return_array;
3806             }
3807              
3808             #sub loadschema {
3809             # my $self = shift;
3810             # my ($ddl, $ddlf, $dialect) =
3811             # rearrange([qw(ddl ddlf dialect)], @_);
3812             # if ($ddlf) {
3813             # my $fh = FileHandle->new($ddlf) || $self->throw("no file $ddlf");
3814             # $ddl = join('',<$fh>);
3815             # $fh->close;
3816             # }
3817             # $self->throw("no DDL") unless $ddl;
3818             # if ($dialect) {
3819             # my $driver = $self->{_driver} || 'Pg';
3820             # if ($driver ne $dialect) {
3821            
3822             # }
3823             # }
3824             #}
3825              
3826             1;
3827              
3828             __END__