File Coverage

lib/DBIx/OptimalQuery.pm
Criterion Covered Total %
statement 30 1027 2.9
branch 0 510 0.0
condition 0 172 0.0
subroutine 10 42 23.8
pod 0 9 0.0
total 40 1760 2.2


line stmt bran cond sub pod time code
1             package DBIx::OptimalQuery::sth;
2              
3 8     8   708 use strict;
  8         14  
  8         209  
4 8     8   32 use warnings;
  8         14  
  8         170  
5 8     8   32 no warnings qw( uninitialized once redefine );
  8         10  
  8         219  
6              
7 8     8   10016 use DBI();
  8         108544  
  8         197  
8 8     8   47 use Carp;
  8         13  
  8         346  
9 8     8   3976 use Data::Dumper();
  8         42210  
  8         27607  
10              
11             sub Dumper {
12 0     0     local $Data::Dumper::Indent = 1;
13 0           local $Data::Dumper::SortKeys = 1;
14 0           Data::Dumper::Dumper(@_);
15             }
16              
17              
18             =comment
19             prepare a DBI sth from user defined selects, filters, sorts
20              
21             this constructor 'new' is called when a DBIx::OptimalQuery->prepare method
22             call is issued.
23              
24             my %opts = (
25             show => []
26             filter => ""
27             sort => ""
28             );
29              
30             $sth = $oq->prepare(%opts);
31             - same as -
32             $sth = DBIx::OptimalQuery::sth->new($oq,%opts);
33              
34             $sth->execute( limit => [0, 10]);
35             =cut
36             sub new {
37 0     0     my $class = shift;
38 0           my $oq = shift;
39 0           my %args = @_;
40              
41             #$$oq{error_handler}->("DEBUG: \$sth = $class->new(\$oq,\n".Dumper(\%args).")\n") if $$oq{debug};
42              
43 0           my $sth = bless \%args, $class;
44 0           $sth->{oq} = $oq;
45 0           $sth->_normalize();
46 0           $sth->create_select();
47 0           $sth->create_where();
48 0           $sth->create_order_by();
49              
50 0           return $sth;
51             }
52              
53 0     0     sub get_lo_rec { $_[0]{limit}[0] }
54 0     0     sub get_hi_rec { $_[0]{limit}[1] }
55              
56             sub set_limit {
57 0     0     my ($sth, $limit) = @_;
58 0           $$sth{limit} = $limit;
59 0           return undef;
60             }
61              
62             # execute statement
63             # notice that we can't execute other child cursors
64             # because their bind params are dependant on
65             # their parent cursor value
66             sub execute {
67 0     0     my ($sth) = @_;
68 0 0         return undef if $$sth{_already_executed};
69 0           $$sth{_already_executed}=1;
70              
71             #$$sth{oq}{error_handler}->("DEBUG: \$sth->execute()\n") if $$sth{oq}{debug};
72 0 0         return undef if $sth->count()==0;
73              
74 0           local $$sth{oq}{dbh}{LongReadLen};
75              
76             # build SQL for main cursor
77 0           { my $c = $sth->{cursors}->[0];
  0            
78 0           my @all_deps = (@{$c->{select_deps}}, @{$c->{where_deps}}, @{$c->{order_by_deps}});
  0            
  0            
  0            
79 0           my ($order) = $sth->{oq}->_order_deps(@all_deps);
80 0           my @from_deps; push @from_deps, @$_ for @$order;
  0            
81              
82             # create from_sql, from_binds
83             # vars prefixed with old_ is used for supported non sql-92 joins
84 0           my ($from_sql, @from_binds, $old_join_sql, @old_join_binds );
85              
86 0           foreach my $from_dep (@from_deps) {
87 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[1] };
  0            
88 0 0         push @from_binds, @binds if @binds;
89              
90             # if this is the driving table join
91 0 0         if (! $sth->{oq}->{joins}->{$from_dep}->[0]) {
    0          
92              
93             # alias it if not already aliased in sql
94 0           $from_sql .= $sql.' ';
95 0 0         $from_sql .= "$from_dep" unless $sql =~ /\b$from_dep\s*$/;
96 0           $from_sql .= "\n";
97             }
98            
99            
100             # if SQL-92 type join?
101             elsif (! defined $sth->{oq}->{joins}->{$from_dep}->[2]) {
102 0           $from_sql .= $sql."\n";
103             }
104            
105             # old style join
106             else {
107 0           $from_sql .= ", ".$sql.' '.$from_dep."\n";
108 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[2] };
  0            
109 0 0         $old_join_sql .= " AND " if $old_join_sql ne '';
110 0           $old_join_sql .= $sql;
111 0           push @old_join_binds, @binds;
112             }
113             }
114            
115            
116             # construct where clause
117 0           my $where;
118 0           { my @where;
  0            
119 0 0         push @where, '('.$old_join_sql.') ' if $old_join_sql;
120 0 0         push @where, '('.$c->{where_sql}.') ' if $c->{where_sql};
121 0 0         $where = ' WHERE '.join("\nAND ", @where) if @where;
122             }
123            
124             # generate sql and bind params
125 0           $$c{sql} = "SELECT ".join(',', @{ $c->{select_sql} })." FROM $from_sql $where ".
126 0 0         (($c->{order_by_sql}) ? "ORDER BY ".$c->{order_by_sql} : '');
127              
128 0           my @binds = (@{ $c->{select_binds} }, @from_binds, @old_join_binds,
129 0           @{$c->{where_binds}}, @{$c->{order_by_binds}} );
  0            
  0            
130 0           $$c{binds} = \@binds;
131              
132             # if clobs have been selected, find & set LongReadLen
133 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
      0        
134             $$sth{'oq'}{'AutoSetLongReadLen'} &&
135 0           scalar(@{$$c{'selected_lobs'}})) {
136              
137             my $maxlenlobsql = "SELECT greatest(".join(',',
138 0           map { "nvl(max(DBMS_LOB.GETLENGTH($_)),0)" } @{$$c{'selected_lobs'}}
  0            
139 0           ).") FROM (".$$c{'sql'}.")";
140              
141 0           my ($SetLongReadLen) = $$sth{oq}{dbh}->selectrow_array($maxlenlobsql, undef, @{$$c{'binds'}});
  0            
142              
143 0 0 0       if (! $$sth{oq}{dbh}{LongReadLen} || $SetLongReadLen > $$sth{oq}{dbh}{LongReadLen}) {
144 0           $$sth{oq}{dbh}{LongReadLen} = $SetLongReadLen;
145             }
146             }
147              
148 0           $sth->add_limit_sql();
149             }
150              
151              
152             # build children cursors
153 0           my $cursors = $sth->{cursors};
154 0           foreach my $i (1 .. $#$cursors) {
155 0           my $c = $sth->{cursors}->[$i];
156 0           my $sd = $c->{select_deps};
157              
158             # define sql and binds for joins for this child cursor
159             # in the following vars
160 0           my ($from_sql, @from_binds, $where_sql, @where_binds );
161              
162             # define vars for child cursor driving table
163             # these are handled differently since we aren't joining in parent deps
164             # they were precomputed in _normalize method when constructing $oq
165              
166             ($from_sql, @from_binds) =
167 0           @{ $sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor}->{sql} };
  0            
168 0           $where_sql = $sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor}->{'join'};
169 0           my $order_by_sql = '';
170 0 0         if ($sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor_order_by}) {
171 0           $order_by_sql = " ORDER BY ".$sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor_order_by};
172             }
173              
174 0           $from_sql .= "\n";
175              
176             # now join in all other deps normally for this cursor
177 0           foreach my $i (1 .. $#$sd) {
178 0           my $joinAlias = $sd->[$i];
179              
180 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$joinAlias}->[1] };
  0            
181              
182             # these will NOT be defined for sql-92 type joins
183             my ($joinWhereSql, @joinWhereBinds) =
184 0           @{ $sth->{oq}->{joins}->{$joinAlias}->[2] }
185 0 0         if defined $sth->{oq}->{joins}->{$joinAlias}->[2];
186              
187             # if SQL-92 type join?
188 0 0         if (! defined $joinWhereSql) {
189 0           $from_sql .= $sql."\n";
190 0           push @from_binds, @binds;
191             }
192              
193             # old style join
194             else {
195 0           $from_sql .= ",\n$sql $joinAlias";
196 0           push @from_binds, @binds;
197 0 0         if ($joinWhereSql) {
198 0 0         $where_sql .= " AND " if $where_sql;
199 0           $where_sql .= $joinWhereSql;
200             }
201 0           push @where_binds, @joinWhereBinds;
202             }
203             }
204              
205             # build child cursor sql
206             $c->{sql} = "
207 0           SELECT ".join(',', @{ $c->{select_sql} })."
  0            
208             FROM $from_sql
209             WHERE $where_sql
210             $order_by_sql ";
211 0           $c->{binds} = [ @{ $c->{select_binds} }, @from_binds, @where_binds ];
  0            
212              
213             # if clobs have been selected, find & set LongReadLen
214 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
      0        
215             $$sth{'oq'}{'AutoSetLongReadLen'} &&
216 0           scalar(@{$$c{'selected_lobs'}})) {
217             my ($SetLongReadLen) = $$sth{oq}{dbh}->selectrow_array("
218             SELECT greatest(".join(',',
219 0           map { "nvl(max(DBMS_LOB.GETLENGTH($_)),0)" } @{$$c{'selected_lobs'}}
  0            
220             ).")
221 0           FROM (".$$c{'sql'}.")", undef, @{$$c{'binds'}});
  0            
222 0 0 0       if (! $$sth{oq}{dbh}{LongReadLen} || $SetLongReadLen > $$sth{oq}{dbh}{LongReadLen}) {
223 0           $$sth{oq}{dbh}{LongReadLen} = $SetLongReadLen;
224             }
225             }
226             }
227              
228 0           eval {
229 0           my $c;
230              
231             # prepare all cursors
232 0           foreach $c (@$cursors) {
233 0 0         $$sth{oq}->{error_handler}->("SQL:\n".$c->{sql}."\nBINDS:\n".Dumper($c->{binds})."\n") if $$sth{oq}{debug};
234 0           $c->{sth} = $sth->{oq}->{dbh}->prepare($c->{sql});
235             }
236 0           $c = $$cursors[0];
237 0           $c->{sth}->execute( @{ $c->{binds} } );
  0            
238 0           my @fieldnames = @{ $$c{select_field_order} };
  0            
239 0           my %rec;
240 0           my @bindcols = \( @rec{ @fieldnames } );
241 0           $c->{sth}->bind_columns(@bindcols);
242 0           $c->{bind_hash} = \%rec;
243             };
244 0 0         if ($@) {
245 0           die "Problem with SQL; $@\n";
246             }
247 0           return undef;
248             }
249              
250             # function to add limit sql
251             # $sth->add_limit_sql()
252             sub add_limit_sql {
253 0     0     my ($sth) = @_;
254              
255             #$$sth{oq}{error_handler}->("DEBUG: \$sth->add_limit_sql()\n") if $$sth{oq}{debug};
256 0   0       my $lo_limit = $$sth{limit}[0] || 0;
257 0   0       my $hi_limit = $$sth{limit}[1] || $sth->count();
258 0           my $c = $sth->{cursors}->[0];
259              
260 0 0         if ($$sth{oq}{dbtype} eq 'Oracle') {
    0          
    0          
261             $c->{sql} = "
262             SELECT *
263             FROM (
264             SELECT tablernk1.*, rownum RANK
265             FROM (
266 0           ".$c->{sql}."
267             ) tablernk1
268             WHERE rownum <= ?
269             ) tablernk2
270             WHERE tablernk2.RANK >= ? ";
271 0           push @{$$c{binds}}, ($hi_limit, $lo_limit);
  0            
272 0           push @{$$c{select_field_order}}, "DBIXOQRANK";
  0            
273             }
274              
275             # sqlserver doesn't support limit/offset until Sql Server 2012 (which I don't have to test)
276             # the workaround is this ugly hack...
277             elsif ($$sth{oq}{dbtype} eq 'Microsoft SQL Server') {
278 0 0         die "missing required U_ID in select" unless exists $$sth{oq}{select}{U_ID};
279              
280 0           my $sql = $c->{sql};
281              
282             # extract order by sql, and binds in order by from sql
283 0           my $orderbysql;
284 0 0         if ($sql =~ s/\ (ORDER BY\ .*?)$//) {
    0          
285 0           $orderbysql = $1;
286 0           my $copy = $orderbysql;
287 0           my $bindCount = $copy =~ tr/,//;
288 0 0         if ($bindCount > 0) {
289 0           my @newBinds;
290 0           push @newBinds, pop @{$$c{binds}} for 1 .. $bindCount;
  0            
291 0           @{$$c{binds}} = (reverse @newBinds, @{$$c{binds}});
  0            
  0            
292             }
293 0           $orderbysql .= ", ".$$sth{oq}{select}{U_ID}[1][0];
294             } elsif (exists $$sth{oq}{select}{U_ID}) {
295 0           $orderbysql = " ORDER BY ".$$sth{oq}{select}{U_ID}[1][0];
296             }
297              
298             # remove first select keyword, and add new one with windowing
299 0 0         if ($sql =~ s/^(\s*SELECT\s*)//) {
300 0           my $limit = int($hi_limit - $lo_limit + 1);
301 0           my $lo_limit = int($lo_limit);
302              
303             # sqlserver doesn't allow placeholders for limit and offset here
304 0           $c->{sql} = "SELECT TOP $limit * FROM (SELECT ROW_NUMBER() OVER ($orderbysql) AS RANK, $sql) tablerank1 WHERE tablerank1.RANK >= $lo_limit";
305 0           unshift @{$$c{select_field_order}}, "DBIXOQRANK";
  0            
306             }
307             }
308              
309             elsif ($$sth{oq}{dbtype} eq 'Pg') {
310 0           my $a = $lo_limit - 1;
311 0           my $b = $hi_limit - $lo_limit + 1;
312 0           $c->{sql} .= "\nLIMIT ? OFFSET ?";
313 0           push @{$$c{binds}}, ($b, $a);
  0            
314             }
315              
316             else {
317 0           my $a = $lo_limit - 1;
318 0           my $b = $hi_limit - $lo_limit + 1;
319 0           $c->{sql} .= "\nLIMIT ?,?";
320 0           push @{$$c{binds}}, ($a, $b);
  0            
321             }
322              
323 0           return undef;
324             }
325              
326              
327             # normalize member variables
328             sub _normalize {
329 0     0     my $sth = shift;
330             #$$sth{oq}{error_handler}->("DEBUG: \$sth->_normalize()\n") if $$sth{oq}{debug};
331              
332             # if show is not defined - then define it
333 0 0         if (! exists $sth->{show}) {
334 0           my @select;
335 0           foreach my $select (@{ $sth->{oq}->{'select'} } ) {
  0            
336 0           push @select, $select;
337             }
338 0           $sth->{show} = \@select;
339             }
340              
341             # define filter & sort if not defined
342 0 0         $sth->{'filter'} = "" if ! exists $sth->{'filter'};
343 0 0         $sth->{'sort'} = "" if ! exists $sth->{'sort'};
344 0           $sth->{'fetch_index'} = 0;
345 0           $sth->{'count'} = undef;
346 0           $sth->{'cursors'} = undef;
347              
348 0           return undef;
349             }
350              
351              
352              
353             # define @select & @select_binds, and add deps
354             sub create_select {
355 0     0     my $sth = shift;
356             #$$sth{oq}{error_handler}->("DEBUG: \$sth->create_select()\n") if $$sth{oq}{debug};
357              
358            
359             # find all of the columns that need to be shown
360 0           my %show;
361              
362             # find all deps to be used in select including cols marked always_select
363 0           my (@deps, @select_sql, @select_binds);
364 0           { my %deps;
  0            
365              
366             # add deps, @select, @select_binds for items in show
367 0           foreach my $show (@{ $sth->{show} }) {
  0            
368 0 0         $show{$show} = 1 if exists $sth->{'oq'}->{'select'}->{$show};
369 0           foreach my $dep (@{ $sth->{'oq'}->{'select'}->{$show}->[0] }) {
  0            
370 0           $deps{$dep} = 1;
371             }
372             }
373              
374             # add deps used in always_select
375 0           foreach my $colAlias (keys %{ $sth->{'oq'}->{'select'} }) {
  0            
376 0 0         if ($sth->{'oq'}->{'select'}->{$colAlias}->[3]->{always_select} ) {
377 0           $show{$colAlias} = 1;
378 0           $deps{$_} = 1 for @{ $sth->{'oq'}->{'select'}->{$colAlias}->[0] };
  0            
379             }
380             }
381 0           @deps = keys %deps;
382             }
383              
384             # order and index deps into appropriate cursors
385 0           my ($dep_order, $dep_idx) = $sth->{oq}->_order_deps(@deps);
386              
387             # look though select again and add all cols with is_hidden option
388             # if all their deps have been fulfilled
389 0           foreach my $colAlias (keys %{ $sth->{'oq'}->{'select'} }) {
  0            
390 0 0         if ($sth->{'oq'}->{'select'}->{$colAlias}->[3]->{is_hidden}) {
391 0           my $deps = $sth->{'oq'}->{'select'}->{$colAlias}->[0];
392 0           my $all_deps_met = 1;
393 0           for (@$deps) {
394 0 0         if (! exists $dep_idx->{$_}) {
395 0           $all_deps_met = 0;
396 0           last;
397             }
398             }
399 0 0         $show{$colAlias} = 1 if $all_deps_met;
400             }
401             }
402              
403             # create main cursor structure & attach deps for main cursor
404 0           $sth->{'cursors'} = [ $sth->_get_main_cursor_template() ];
405 0           $sth->{'cursors'}->[0]->{'select_deps'} = $dep_order->[0];
406              
407             # unique counter that is used to uniquely identify cols in parent cursors
408             # to their children cursors
409 0           my $parent_bind_tag_idx = 0;
410              
411             # create other cursors (if they exist)
412             # and define how they join to their parent cursors
413             # by defining parent_join, parent_keys
414 0           foreach my $i (1 .. $#$dep_order) {
415 0           push @{ $sth->{'cursors'} }, $sth->_get_sub_cursor_template();
  0            
416 0           $sth->{'cursors'}->[$i]->{'select_deps'} = $dep_order->[$i];
417              
418             # add parent_join, parent_keys for this child cursor
419 0           my $driving_child_join_alias = $dep_order->[$i]->[0];
420 0           my $cursor_opts = $sth->{'oq'}->{'joins'}->{$driving_child_join_alias}->[3]->{new_cursor};
421 0           foreach my $part (@{ $cursor_opts->{'keys'} } ) {
  0            
422 0           my ($dep,$sql) = @$part;
423 0           my $key = 'DBIXOQMJK'.$parent_bind_tag_idx; $parent_bind_tag_idx++;
  0            
424 0           my $parent_cursor_idx = $dep_idx->{$dep};
425 0 0         die "could not find dep: $dep for new cursor" if $parent_cursor_idx eq '';
426 0           push @{ $sth->{'cursors'}->[$parent_cursor_idx]->{select_field_order} }, $key;
  0            
427 0           push @{ $sth->{'cursors'}->[$parent_cursor_idx]->{select_sql} }, "$dep.$sql AS $key";
  0            
428 0           push @{ $sth->{'cursors'}->[$i]->{'parent_keys'} }, $key;
  0            
429             }
430 0           $sth->{'cursors'}->[$i]->{'parent_join'} = $cursor_opts->{'join'};
431             }
432            
433             # plug in select_sql, select_binds for cursors
434 0           foreach my $show (keys %show) {
435 0           my $select = $sth->{'oq'}->{'select'}->{$show};
436 0 0         next if ! $select;
437              
438 0           my $cursor = $sth->{'cursors'}->[$dep_idx->{$select->[0]->[0]}];
439              
440 0           my $select_sql;
441              
442             # if type is date then use specified date format
443 0 0 0       if (! $$select[3]{select_sql} && $$select[3]{date_format}) {
444 0           my @tmp = @{ $select->[1] }; $select_sql = \ @tmp; # need a real copy cause we are going to mutate it
  0            
  0            
445 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' ||
    0          
446             $$sth{oq}{dbtype} eq 'Pg') {
447 0           $$select_sql[0] = "to_char(".$$select_sql[0].",'".$$select[3]{date_format}."')";
448             } elsif ($$sth{oq}{dbtype} eq 'mysql') {
449 0           $$select_sql[0] = "date_format(".$$select_sql[0].",'".$$select[3]{date_format}."')";
450             } else {
451 0           die "unsupported DB";
452             }
453             }
454              
455             # else just copy the select
456             else {
457 0   0       $select_sql = $select->[3]->{select_sql} || $select->[1];
458             }
459              
460             # remember if a lob is selected
461 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
462             $sth->{oq}->get_col_types('select')->{$show} eq 'clob') {
463 0           push @{ $cursor->{selected_lobs} }, $show;
  0            
464             #$select_sql->[0] = 'to_char('.$select_sql->[0].')';
465             }
466              
467 0 0         if ($select_sql->[0] ne '') {
468 0           push @{ $cursor->{select_field_order} }, $show;
  0            
469 0           push @{ $cursor->{select_sql} }, $select_sql->[0].' AS '.$show;
  0            
470 0           push @{ $cursor->{select_binds} }, @$select_sql[1 .. $#$select_sql];
  0            
471             }
472             }
473              
474 0           return undef;
475             }
476            
477              
478              
479              
480             # template for the main cursor
481             sub _get_main_cursor_template {
482 0     0     { sth => undef,
483             sql => "",
484             binds => [],
485             selected_lobs => [],
486             select_field_order => [],
487             select_sql => [],
488             select_binds => [],
489             select_deps => [],
490             where_sql => "",
491             where_binds => [],
492             where_deps => [],
493             where_name => "",
494             order_by_sql => "",
495             order_by_binds => [],
496             order_by_deps => [],
497             order_by_name => []
498             };
499             }
500              
501             # template for explicitly defined additional cursors
502             sub _get_sub_cursor_template {
503 0     0     { sth => undef,
504             sql => "",
505             binds => [],
506             selected_lobs => [],
507             select_field_order => [],
508             select_sql => [],
509             select_deps => [],
510             select_binds => [],
511             parent_join => "",
512             parent_keys => [],
513             };
514             }
515              
516              
517              
518              
519            
520            
521              
522              
523             # modify cursor and add where clause data
524             sub create_where {
525 0     0     my ($sth) = @_;
526              
527             # define cursor where_sql, where_deps, where_name where_binds from parsed filter types
528 0           my $c = $sth->{cursors}->[0];
529 0           foreach my $filterType (qw( filter hiddenFilter forceFilter)) {
530 0 0         next if $$sth{$filterType} eq '';
531 0           my $filterArray = $$sth{oq}->parseFilter($$sth{$filterType});
532 0           my $filterSQL = $$sth{oq}->generateFilterSQL($filterArray);
533              
534 0           push @{ $$c{where_deps} }, @{ $$filterSQL{deps} };
  0            
  0            
535 0 0         if ($$c{where_sql}) {
536 0           $$c{where_sql} .= ' AND ('.$$filterSQL{sql}.')';
537             } else {
538 0           $$c{where_sql} = $$filterSQL{sql};
539             }
540 0           push @{ $$c{where_binds} }, @{ $$filterSQL{binds} };
  0            
  0            
541 0 0         $$c{where_name} = $$filterSQL{name} if $filterType eq 'filter';
542             }
543              
544 0           return undef;
545             }
546              
547              
548              
549              
550             # modify cursor and add order by data
551             sub create_order_by {
552 0     0     my ($sth) = @_;
553 0           my $c = $sth->{cursors}->[0];
554              
555 0           my $s = $$sth{oq}->parseSort($$sth{'sort'});
556 0           $$c{order_by_deps} = $$s{deps};
557 0           $$c{order_by_sql} = join(',', @{ $$s{sql} });
  0            
558 0           $$c{order_by_binds} = $$s{binds};
559 0           $$c{order_by_name} = $$s{name};
560 0           return undef;
561             }
562              
563              
564              
565              
566              
567              
568              
569              
570              
571            
572              
573              
574             # fetch next row or return undef when done
575             sub fetchrow_hashref {
576 0     0     my ($sth) = @_;
577 0 0         return undef unless $sth->count() > 0;
578 0           $sth->execute(); # execute if not already existed
579              
580             #$$sth{oq}{error_handler}->("DEBUG: \$sth->fetchrow_hashref()\n") if $$sth{oq}{debug};
581              
582 0           my $cursors = $sth->{cursors};
583 0           my $c = $cursors->[0];
584              
585             # bind hash value to column data
586 0           my $rec = $$c{bind_hash};
587              
588             # fetch record
589 0 0         if (my $v = $c->{sth}->fetch()) {
590              
591 0           foreach my $i (0 .. $#$v) {
592              
593             # if col type is decimal auto trim 0s after decimal
594 0 0 0       if ($c->{sth}->{TYPE}->[$i] eq '3' && $$v[$i] =~ /\./) {
595 0           $$v[$i] =~ s/0+$//;
596 0           $$v[$i] =~ s/\.$//;
597             }
598             }
599            
600 0           $sth->{'fetch_index'}++;
601              
602             # execute other cursors
603 0           foreach my $i (1 .. $#$cursors) {
604 0           $c = $cursors->[$i];
605              
606 0           $c->{sth}->execute( @{ $c->{binds} },
607 0           map { $$rec{$_} } @{ $c->{parent_keys} } );
  0            
  0            
608              
609 0           my $cols = $$c{select_field_order};
610 0           @$rec{ @$cols } = [];
611              
612 0           while (my @vals = $c->{sth}->fetchrow_array()) {
613 0           for (my $i=0; $i <= $#$cols; $i++) {
614 0           push @{ $$rec{$$cols[$i]} }, $vals[$i];
  0            
615             }
616             }
617 0           $c->{sth}->finish();
618             }
619 0           return $rec;
620             } else {
621 0           return undef;
622             }
623             }
624              
625             # finish sth
626             sub finish {
627 0     0     my ($sth) = @_;
628             #$$sth{oq}{error_handler}->("DEBUG: \$sth->finish()\n") if $$sth{oq}{debug};
629 0           foreach my $c (@{$$sth{cursors}}) {
  0            
630 0 0         $$c{sth}->finish() if $$c{sth};
631 0           undef $$c{sth};
632             }
633 0           return undef;
634             }
635              
636             # get count for sth
637             sub count {
638 0     0     my $sth = shift;
639              
640             # if count is not already defined, define it
641 0 0         if (! defined $sth->{count}) {
642             #$$sth{oq}{error_handler}->("DEBUG: \$sth->count()\n") if $$sth{oq}{debug};
643              
644 0           my $c = $sth->{cursors}->[0];
645              
646 0           my $drivingTable = $c->{select_deps}->[0];
647              
648             # only need to join in driving table with
649             # deps used in where clause
650 0           my ($deps) = $sth->{oq}->_order_deps($drivingTable, @{$c->{where_deps}});
  0            
651 0           my @from_deps; push @from_deps, @$_ for @$deps;
  0            
652              
653             # create from_sql, from_binds
654             # vars prefixed with old_ is used for supported non sql-92 joins
655 0           my ($from_sql, @from_binds, $old_join_sql, @old_join_binds );
656 0           foreach my $from_dep (@from_deps) {
657 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[1] };
  0            
658 0 0         push @from_binds, @binds if @binds;
659              
660             # if this is the driving table join
661 0 0         if (! $sth->{oq}->{joins}->{$from_dep}->[0]) {
    0          
662              
663             # alias it if not already aliased in sql
664 0 0         $sql .= " $from_dep" unless $sql =~ /\b$from_dep\s*$/;
665 0           $from_sql .= $sql;
666             }
667              
668             # if SQL-92 type join?
669             elsif (! $sth->{oq}->{joins}->{$from_dep}->[2]) {
670 0           $from_sql .= "\n".$sql;
671             }
672              
673             # old style join
674             else {
675 0           $from_sql .= ",\n".$sql.' '.$from_dep;
676 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[2] };
  0            
677 0 0         if ($sql) {
678 0 0         $old_join_sql .= " AND " if $old_join_sql ne '';
679 0           $old_join_sql .= $sql;
680             }
681 0           push @old_join_binds, @binds;
682             }
683             }
684              
685              
686             # construct where clause
687 0           my $where;
688 0           { my @where;
  0            
689 0 0         push @where, '('.$old_join_sql.') ' if $old_join_sql;
690 0 0         push @where, '('.$c->{where_sql}.') ' if $c->{where_sql};
691 0 0         $where = 'WHERE '.join("\nAND ", @where) if @where;
692             }
693              
694             # generate sql and bind params
695 0           my $sql = "
696             SELECT count(*)
697             FROM (
698             SELECT $drivingTable.*
699             FROM $from_sql
700             $where
701             ) cnt_query";
702 0           my @binds = (@from_binds, @old_join_binds, @{$c->{where_binds}});
  0            
703              
704 0           eval {
705 0 0         $$sth{oq}->{error_handler}->("SQL:\n$sql\nBINDS:\n".Dumper(\@binds)."\n") if $$sth{oq}{debug};
706 0           ($sth->{count}) = $sth->{oq}->{dbh}->selectrow_array($sql, undef, @binds);
707 0 0         }; if ($@) {
708 0           die "Problem finding count for SQL:\n$sql\nBINDS:\n".join(',',@binds)."\n\n$@\n";
709             }
710             }
711              
712 0           return $sth->{count};
713             }
714              
715 0     0     sub fetch_index { $_->{'fetch_index'} }
716              
717             sub filter_descr {
718 0     0     my $sth = shift;
719 0           return $sth->{cursors}->[0]->{'where_name'};
720             }
721              
722             sub sort_descr {
723 0     0     my $sth = shift;
724 0 0         if (wantarray) {
725 0           return @{ $sth->{cursors}->[0]->{'order_by_name'} };
  0            
726             } else {
727 0           return join(', ', @{ $sth->{cursors}->[0]->{'order_by_name'} });
  0            
728             }
729             }
730              
731              
732              
733              
734              
735              
736              
737              
738              
739              
740              
741              
742              
743              
744              
745              
746              
747              
748              
749              
750              
751              
752              
753              
754              
755              
756              
757              
758              
759              
760              
761              
762              
763              
764              
765              
766              
767              
768              
769             package DBIx::OptimalQuery;
770              
771             =comment
772              
773             use DBIx::OptimalQuery;
774             my $oq = DBIx::OptimalQuery->new(
775             select => {
776             'alias' => [dep, sql, nice_name, { OPTIONS } ]
777             }
778              
779             joins => {
780             'alias' => [dep, join_sql, where_sql, { OPTIONS } ]
781             }
782              
783             named_filters => {
784             'name' => [dep, sql, nice]
785             'name' => {
786             sql_generator => sub {
787             my %args = @_;
788             return [dep, sql, name]
789             }
790             title => "text displayed on interactive filter"
791             }
792             },
793              
794             named_sorts => {
795             'name' => [dep, sql, nice]
796             'name' => { sql_generator => sub { return [dep, sql, name] } }
797             },
798              
799            
800              
801             debug => 0 | 1
802             );
803             =cut
804              
805 8     8   64 use strict;
  8         15  
  8         200  
806 8     8   36 use Carp;
  8         14  
  8         395  
807 8     8   42 use Data::Dumper;
  8         13  
  8         304  
808 8     8   38 use DBI();
  8         13  
  8         52902  
809              
810             sub new {
811 0     0 0   my $class = shift;
812 0           my %args = @_;
813 0           my $oq = bless \%args, $class;
814              
815 0   0       $$oq{debug} ||= 0;
816             #$$oq{error_handler}->("DEBUG: $class->new(".Dumper(\%args).")\n") if $$oq{debug};
817              
818             die "BAD_PARAMS - must provide a dbh!"
819 0 0         unless $oq->{'dbh'};
820             die "BAD_PARAMS - must define a select key in call to constructor"
821 0 0         unless ref($oq->{'select'}) eq 'HASH';
822             die "BAD_PARAMS - must define a joins key in call to constructor"
823 0 0         unless ref($oq->{'joins'}) eq 'HASH';
824              
825              
826 0           $oq->_normalize();
827              
828 0           $$oq{dbtype} = $$oq{dbh}{Driver}{Name};
829 0 0         $$oq{dbtype} = $$oq{dbh}->get_info(17) if $$oq{dbtype} eq 'ODBC';
830              
831 0           return $oq;
832             }
833              
834              
835              
836             # returns [
837             # # type 1 - (selectalias operator literal)
838             # [1,$numLeftParen,$leftExpSelectAlias,$op,$rightExpLiteral,$numRightParen],
839             # # type 2 - (namedfilter, arguments)
840             # [2,$numLeftParen,$namedFilter,$argArray,$numRightParen]
841             # # type 3 - (selectalias operator selectalias)
842             # [3,$numLeftParen,$leftExpSelectAlias,$op,$rightExpSelectAlias,$numRightParen],
843             # # logic operator
844             # 'AND'|'OR'
845             # ]
846             sub parseFilter {
847 0     0 0   my ($oq, $f) = @_;
848 0           $f =~ /^\s+/; # trim leading spaces
849              
850 0           my @rv;
851 0 0         return \@rv if $f eq '';
852              
853 0           my $error;
854              
855 0           my $parenthesis=0;
856            
857 0           while (! $error) {
858 0           my $numLeftP=0;
859 0           my $numRightP=0;
860              
861             # parse opening parenthesis
862 0           while ($f =~ /\G\(\s*/gc) { $numLeftP++; $parenthesis++;}
  0            
  0            
863              
864             # if this looks like a named filter
865 0 0         if ($f=~/\G(\w+)\s*\(\s*/gc) {
866 0           my $namedFilter = $1;
867              
868 0 0         if (! exists $$oq{named_filters}{$namedFilter}) {
869 0           $error = "invalid named filter: $namedFilter";
870 0           next;
871             }
872              
873             # parse named filter arguments
874 0           my @args;
875 0           while (! $error) {
876 0 0 0       if ($f =~ /\G\)\s*/gc) {
    0 0        
      0        
      0        
877 0           last;
878             }
879             elsif ($f =~ /\G(\-?\d*\.\d+)\s*\,*\s*/gc ||
880             $f =~ /\G(\-?\d+)\s*\,*\s*/gc ||
881             $f =~ /\G\'([^\']*)\'\s*\,*\s*/gc ||
882             $f =~ /\G\"([^\"]*)\"\s*\,*\s*/gc ||
883             $f =~ /\G(\w+)\s*\,*\s*/gc) {
884 0           push @args, $1;
885             } else {
886 0           $error = "could not parse named filter arguments";
887             }
888             }
889 0 0         next if $error;
890              
891             # parse closing parenthesis
892 0           while ($f =~ /\G\)\s*/gc) {
893 0 0         if ($parenthesis > 0) {
894 0           $parenthesis--;
895 0           $numRightP++;
896             }
897             }
898              
899 0           push @rv, [2,$numLeftP,$namedFilter,\@args,$numRightP];
900             }
901              
902             # else this is an expression
903             else {
904 0           my $lexp;
905             my $rexp;
906 0           my $typeNum = 1;
907              
908             # grab select alias used on the left side of the expression
909 0 0         if ($f=~/\G\[([^\]]+)\]\s*/gc) { $lexp = $1; }
  0 0          
910 0           elsif ($f=~/\G(\w+)\s*/gc) { $lexp = $1; }
911             else {
912 0           $error = "missing left expression";
913             }
914              
915             # make sure the select alias is valid
916 0 0         if (! $$oq{select}{$lexp}) {
917 0           $error = "invalid field $lexp";
918 0           next;
919             }
920              
921             # parse the operator
922 0           my $op;
923 0 0         if ($f =~ /\G(\!\=|\=|\<\=|\>\=|\<|\>|like|not\ ?like|contains|not\ ?contains)\s*/igc) {
924 0           $op = lc($1);
925             }
926             else {
927 0           $error = "invalid operator";
928 0           next;
929             }
930              
931             # if rexp is a select alias
932 0 0 0       if ($f=~/\G\[([^\]]+)\]\s*/gc) {
    0          
    0          
933 0           $rexp = $1;
934 0           $typeNum = 3;
935             }
936              
937             # else if rexp is a literal
938             elsif ($f =~ /\G\'([^\']*)\'\s*/gc ||
939             $f =~ /\G\"([^\"]*)\"\s*/gc) {
940 0           $rexp = $1;
941             }
942              
943             # else if rexp is a word
944             elsif ($f =~ /\G(\S+)\s*/gc) {
945 0           $rexp = $1;
946              
947             # is word a col alias?
948 0 0         if ($$oq{select}{$rexp}) {
949 0           $typeNum = 3;
950             }
951             }
952              
953             else {
954 0           $error = "missing right expression";
955 0           next;
956             }
957              
958             # parse closing parenthesis
959 0           while ($f =~ /\G\)\s*/gc) {
960 0 0         if ($parenthesis > 0) {
961 0           $parenthesis--;
962 0           $numRightP++;
963             }
964             }
965              
966 0           push @rv, [$typeNum, $numLeftP, $lexp, $op, $rexp, $numRightP];
967             }
968              
969             # parse logic operator
970 0 0         if ($f =~ /(AND|OR)\s*/gci) {
971 0           push @rv, uc($1);
972             }
973             else {
974 0           last;
975             }
976             }
977              
978 0 0         if ($error) {
979 0           my $p = pos($f);
980 0           $error .= " at ".substr($f, 0, $p).'<*>'.substr($f, $p);
981 0           die $error."\n";
982             }
983              
984 0           return \@rv;
985             }
986              
987              
988             # given a filter string, returns { sql => $sql, binds => \@binds, deps => \@deps, name => $name };
989             sub generateFilterSQL {
990 0     0 0   my ($oq, $filterArray) = @_;
991              
992             # build an array of sql tokens, bind vals, and used deps
993             # also build a formatted name
994 0           my @sql;
995             my @binds;
996 0           my %deps;
997 0           my @name;
998 0           my $parenthesis = 0;
999              
1000 0           foreach my $exp (@$filterArray) {
1001              
1002 0 0         if ($exp eq 'AND') {
    0          
    0          
    0          
    0          
1003 0           push @sql, "AND";
1004 0           push @name, "AND";
1005             } elsif ($exp eq 'OR') {
1006 0           push @sql, "OR";
1007 0           push @name, "OR";
1008             }
1009              
1010             # [COLALIAS] != "literal"
1011             elsif ($$exp[0]==1) {
1012 0           my ($type, $numLeftParen, $leftColAlias, $operatorName, $rval, $numRightParen) = @$exp;
1013 0           $parenthesis+=$numLeftParen;
1014 0           $parenthesis-=$numRightParen;
1015              
1016 0           my $operator = uc($operatorName);
1017              
1018             # handle left side of expression
1019 0           my ($leftDeps, $leftSql, $leftName, $leftOpts, @leftBinds, $leftType);
1020 0           ($leftDeps, $leftSql, $leftName, $leftOpts) = @{ $$oq{select}{$leftColAlias} };
  0            
1021 0 0         $leftSql = $$leftOpts{filter_sql} if $$leftOpts{filter_sql};
1022 0 0         ($leftSql, @leftBinds) = @$leftSql if ref($leftSql) eq 'ARRAY';
1023 0           $leftType = $oq->get_col_type($leftColAlias, 'filter');
1024 0   0       $leftName ||= $leftColAlias;
1025              
1026             # handle right side of expression
1027 0           my ($rightSql, $rightName, @rightBinds);
1028            
1029 0           $rightName = $rval;
1030 0 0         if ($rightName eq '') {
    0          
1031 0           $rightName = "''";
1032             } elsif ($rightName =~ /\s/) {
1033 0           $rightName = '"'.$rightName.'"';
1034             }
1035              
1036 0 0         $rval = $$leftOpts{db_formatter}->($rval) if $$leftOpts{db_formatter};
1037              
1038             # if empty check
1039 0 0         if ($rval eq '') {
1040 0 0 0       if ($leftType eq 'char' || $leftType eq 'clob') {
1041 0 0         if ($$oq{dbtype} eq 'Oracle') {
1042 0           $leftSql = "COALESCE($leftSql,'_ _')";
1043 0           $rightSql = "_ _";
1044             } else {
1045 0           $leftSql = "COALESCE($leftSql,'')";
1046 0           $rightSql = "''";
1047             }
1048 0 0         $operator = ($operator =~ /\!|NOT/i) ? '!=' : '=';
1049             } else {
1050 0 0         $operator = ($operator =~ /\!|NOT/i) ? 'IS NOT NULL' : 'IS NULL';
1051             }
1052             }
1053              
1054             # else check against literal value
1055             else {
1056             # if numeric operator
1057 0 0         if ($operator =~ /\=|\<|\>/) {
1058              
1059             # if we are doing a numeric date comparison, parse for the date in common formats
1060 0 0 0       if ($leftType eq 'date' || $leftType eq 'datetime') {
    0 0        
    0 0        
1061              
1062             # is this a calculated date?
1063 0 0         if ($rval =~ /today\s*([\+\-])\s*(\d+)\s*(minute|hour|day|week|month|year|)s?/i) {
    0          
1064 0           my $sign = $1;
1065 0           my $num = $2;
1066 0   0       my $unit = uc($3) || 'DAY';
1067 0           $rightName = "today ".$sign.$num." ".lc($unit);
1068 0 0         $rightName .= 's' if $num != 1;
1069 0 0         $num *= -1 if $sign eq '-';
1070              
1071 0 0         if ($$oq{dbtype} eq 'Oracle') {
1072 0 0         my $now = $leftType eq 'datetime' ? 'SYSDATE' : 'TRUNC(SYSDATE)';
1073 0 0         if ($unit eq 'MINUTE') {
    0          
    0          
    0          
    0          
1074 0           $rightSql = "$now+($num/1440)"
1075             } elsif ($unit eq 'HOUR') {
1076 0           $rightSql = "$now+($num/24)"
1077             } elsif ($unit eq 'WEEK') {
1078 0           $rightSql = "$now+($num*7)"
1079             } elsif ($unit eq 'MONTH') {
1080 0           $rightSql = "ADD_MONTHS($now,$num)";
1081             } elsif ($unit eq 'YEAR') {
1082 0           $rightSql = "ADD_MONTHS($now,$num*12)";
1083             }
1084             } else {
1085 0 0         my $now = $leftType eq 'datetime' ? 'NOW()' : 'CURDATE()';
1086 0           $rightSql = "DATE_ADD($now, INTERVAL $num $unit)";
1087             }
1088             }
1089              
1090             elsif ($rval =~ /today\s*/i) {
1091 0           $rightName = "today";
1092 0 0         if ($$oq{dbtype} eq 'Oracle') {
1093 0 0         $rightSql = $leftType eq 'datetime' ? 'SYSDATE' : 'TRUNC(SYSDATE)';
1094             } else {
1095 0 0         $rightSql = $leftType eq 'datetime' ? 'NOW()' : 'CURDATE()';
1096             }
1097             }
1098              
1099             # else this is a date value
1100             else {
1101              
1102 0           my ($y,$m,$d,$h,$mi,$s,$hourType);
1103 0 0         if ($rval =~ /^(\d\d\d\d)[\-\/](\d\d?)[\-\/](\d\d?)/) {
    0          
    0          
    0          
    0          
1104 0           $y = $1;
1105 0           $m = $2;
1106 0           $d = $3;
1107             } elsif ($rval =~ /^(\d\d?)[\-\/](\d\d?)[\-\/](\d\d\d\d)/) {
1108 0           $m = $1;
1109 0           $d = $2;
1110 0           $y = $3;
1111             } elsif ($rval =~ /^(\d\d?)[\-\/](\d\d?)[\-\/](\d\d)\b/) {
1112 0           $m = $1;
1113 0           $d = $2;
1114 0           $y = int('20'.$3);
1115             } elsif ($rval =~ /^(\d\d\d\d)[\-\/](\d\d?)/) {
1116 0           $y = $1;
1117 0           $m = $2;
1118             }
1119             elsif ($rval =~ /^(\d\d\d\d)/) {
1120 0           $y = $1;
1121             }
1122             else {
1123 0           die "could not parse date: $rval";
1124             }
1125              
1126             # extract time component if the type supports it
1127 0 0         if ($leftType eq 'datetime') {
1128 0 0         if ($rval =~ /\b(\d\d?)\:(\d\d?)[\:\.](\d\d?)/) {
    0          
    0          
1129 0           $h = $1;
1130 0           $mi = $2;
1131 0           $s = $3;
1132             }
1133             elsif ($rval =~ /\b(\d\d?)\:(\d\d?)/) {
1134 0           $h = $1;
1135 0           $mi = $2;
1136             }
1137             elsif ($rval =~ /\b(\d\d?)\s*(am|pm)/i) {
1138 0           $h = $1;
1139 0           $mi = '00';
1140             }
1141 0 0         if ($rval =~ /A/i) {
    0          
1142 0           $hourType='AM';
1143             } elsif ($rval =~ /P/i) {
1144 0           $hourType='PM';
1145             }
1146             }
1147              
1148             # format date expression for mysql
1149 0 0         if ($$oq{dbtype} eq 'mysql') {
1150 0           my $format = '%Y';
1151 0           my $val = "$y";
1152 0 0         if ($m) {
1153 0           $format .= '-%m';
1154 0           $val .= "-$m";
1155 0 0         if ($d) {
1156 0           $format .= '-%d';
1157 0           $val .= "-$d";
1158            
1159 0 0         if ($mi ne '') {
1160 0 0         $format .= $hourType ? ' %h' : ' %H';
1161 0           $format .= ':%i';
1162 0           $val .= " $h:$mi";
1163 0 0         if ($s ne '') {
1164 0           $format .= ':%s';
1165 0           $val .= ":$s";
1166             }
1167 0 0         if ($hourType) {
1168 0           $format .= ' %p';
1169 0           $val .= " $hourType";
1170             }
1171             }
1172             }
1173             }
1174 0           $rightSql = "STR_TO_DATE(?,'$format')";
1175 0           push @rightBinds, $val;
1176            
1177             # remove lvalue time if rval doesn't use it
1178 0 0 0       if ($leftType eq 'datetime' && $mi eq '') {
1179 0           $leftSql = "DATE($leftSql)";
1180             }
1181             }
1182              
1183             # format date expression for oracle and postgres (use compatible to_date function)
1184             else {
1185 0           my $format = 'YYYY';
1186 0           my $val = $y;
1187 0 0         if ($m) {
1188 0           $format .= "-MM";
1189 0           $val .= "-$m";
1190 0 0         if ($d) {
1191 0           $format .= "-DD";
1192 0           $val .= "-$d";
1193              
1194 0 0         if ($mi ne '') {
1195 0 0         $format .= $hourType ? ' HH' : ' HH24';
1196 0           $format .= ':MI';
1197 0           $val .= " $h:$mi";
1198 0 0         if ($s ne '') {
1199 0           $format .= ':SS';
1200 0           $val .= ":$s";
1201             }
1202 0 0         if ($hourType) {
1203 0           $format .= " $hourType";
1204 0           $val .= " $hourType";
1205             }
1206             }
1207             }
1208             }
1209              
1210 0           $rightSql = "TO_DATE(?,'$format')";
1211 0           push @rightBinds, $val;
1212              
1213             # remove lvalue time if rval doesn't use it
1214 0 0 0       if ($leftType eq 'datetime' && $mi eq '') {
1215 0           $leftSql = "TRUNC($leftSql)";
1216             }
1217             }
1218             }
1219             }
1220              
1221             # if this is a numeric comparison and rvalue is not a number, convert left side to text
1222             elsif ($leftType eq 'num' && $rval !~ /^(\-?\d*\.\d+|\-?\d+)$/) {
1223 0 0         if ($$oq{dbtype} eq 'mysql') {
1224 0           $leftSql = "CONCAT('',$leftSql)";
1225 0           $rightSql = '?';
1226 0           push @rightBinds, $rval;
1227             } else {
1228 0           $leftSql = "TO_CHAR($leftSql)";
1229 0           $rightSql = '?';
1230 0           push @rightBinds, $rval;
1231             }
1232             }
1233              
1234             # if numeric operator and field is an oracle clob, convert using to_char
1235             elsif ($$oq{dbtype} eq 'Oracle' && $leftType eq 'clob') {
1236 0           $leftSql = "TO_CHAR($leftSql)";
1237 0           $rightSql = '?';
1238 0           push @rightBinds, $rval;
1239             }
1240              
1241             else {
1242 0           $rightSql = '?';
1243 0           push @rightBinds, $rval;
1244             }
1245             }
1246              
1247             # like operator
1248             else {
1249              
1250             # convert contains operator to like
1251 0 0         if ($operatorName =~ /contains/i) {
1252 0 0 0       $leftSql = "LOWER($leftSql)" if ! $leftType eq 'char' || $leftType eq 'clob';
1253 0 0         $operator = $operatorName =~ /not/i ? "NOT LIKE" : "LIKE";
1254 0           $rval = '%'.lc($rval).'%';
1255             }
1256              
1257             # allow * as wildcard
1258 0 0         if ($operator =~ /like/i) {
1259 0           $rval =~ s/\*/\%/g;
1260             }
1261              
1262             # remove redundant wildcards
1263 0           $rval =~ s/\%\%+/\%/g;
1264              
1265             # if left side is date, convert to text so like operator works as expected
1266 0 0         if ($$leftOpts{date_format}) {
1267 0 0         if ($$oq{dbtype} eq 'mysql') {
1268 0           $leftSql = "DATE_FORMAT($leftSql,'$$leftOpts{date_format}')";
1269             } else {
1270 0           $leftSql = "TO_CHAR($leftSql,'$$leftOpts{date_format}')";
1271             }
1272             }
1273              
1274 0           $rightSql = '?';
1275 0           push @rightBinds, $rval;
1276             }
1277             }
1278              
1279             # if the leftSql uses a new cursor we need to write an exists expression
1280             # search dep path to see if a new_cursor is used
1281 0           my @path = ($$leftDeps[0]);
1282 0           my $i=0;
1283 0           while (1) {
1284 0 0         die "infinite dep loop detected" if ++$i==50;
1285 0           my $parentDep = $$oq{joins}{$path[-1]}[0][0];
1286 0 0         last unless $parentDep;
1287 0           push @path, $parentDep;
1288             }
1289              
1290             # find the oldest parent new cursor if it exists
1291 0           while (@path) {
1292 0 0         if ($$oq{joins}{$path[-1]}[3]{new_cursor}) {
1293 0           last;
1294             } else {
1295 0           pop @path;
1296             }
1297             }
1298            
1299             # if @path has elements, this uses a new_cursor and we must construct an exists expression
1300 0 0         if (@path) {
1301 0           @path = reverse @path;
1302 0           my ($preSql, $postSql, @preBinds);
1303 0           foreach my $joinDep (@path) {
1304 0           my ($fromSql, @fromBinds) = @{ $$oq{joins}{$joinDep}[1] };
  0            
1305              
1306             # unwrap SQL-92 join and add join to where
1307 0           $fromSql =~ s/^\s+//;
1308 0           $fromSql =~ s/^LEFT\s*//i;
1309 0           $fromSql =~ s/^OUTER\s*//i;
1310 0           $fromSql =~ s/^JOIN\s*//i;
1311              
1312 0           my $corelatedJoin;
1313 0 0         if ($fromSql =~ /^(.*)\bON\s*\((.*)\)\s*$/is) {
1314 0           $fromSql = $1;
1315 0           $corelatedJoin = $2;
1316             } else {
1317 0           die "could not parse for corelated join\n";
1318             }
1319              
1320             # in a one2many filter that has a negative operator, we need to use
1321             # a NOT EXISTS and unnegate the operator
1322 0 0         if ($rightName eq "''") {
    0          
    0          
1323 0 0         if ($operator eq '=') {
    0          
1324 0           $preSql .= "NOT ";
1325 0           $operator = '!=';
1326             }
1327             elsif ($operator eq 'IS NULL') {
1328 0           $preSql .= "NOT ";
1329 0           $operator = 'IS NOT NULL';
1330             }
1331             }
1332             elsif ($operator eq '!=') {
1333 0           $operator = '=';
1334 0           $preSql .= "NOT ";
1335             }
1336             elsif ($operator =~ s/NOT\ //) {
1337 0           $preSql .= "NOT ";
1338             }
1339 0           $preSql .= " EXISTS (\n SELECT 1\n FROM $fromSql\n WHERE ($corelatedJoin)\n AND ";
1340 0           $postSql .= ')';
1341 0           push @preBinds, @fromBinds;
1342             }
1343              
1344             # update left expression deps and binds
1345 0           $leftDeps = $$oq{joins}{$path[0]}[0];
1346 0 0         unshift @leftBinds, @preBinds if @preBinds;
1347 0           $leftSql = $preSql.$leftSql;
1348 0           $rightSql .= $postSql;
1349             }
1350              
1351 0           my $sql = '(' x $numLeftParen;
1352 0           $sql .= $leftSql;
1353 0           $sql .= ' '.$operator;
1354 0 0         $sql .= ' '.$rightSql if $rightSql ne '';
1355 0           $sql .= ')' x $numRightParen;
1356              
1357             # glue expression
1358 0           push @sql, $sql;
1359 0           push @binds, @leftBinds, @rightBinds;
1360 0           push @name, $leftName, $operatorName;
1361 0 0         push @name, $rightName if $rightName ne '';
1362 0           $deps{$_}=1 for @$leftDeps;
1363             }
1364              
1365              
1366             # namedFilter(args)
1367             elsif ($$exp[0]==2) {
1368 0           my ($type, $numLeftParen,$namedFilterAlias,$argArray,$numRightParen) = @$exp;
1369 0           $parenthesis+=$numLeftParen;
1370 0           $parenthesis-=$numRightParen;
1371              
1372             # generate filter sql, bind, name
1373 0           my $f = $$oq{named_filters}{$namedFilterAlias};
1374 0           my ($filterDeps, $filterSql, @filterBinds, $filterLabel);
1375 0 0         if (ref($f) eq 'ARRAY') {
    0          
1376 0           ($filterDeps, $filterSql, $filterLabel) = @$f;
1377             } elsif (ref($f) eq 'HASH') {
1378             die "could not find sql_generator for named_filter $namedFilterAlias\n"
1379 0 0         unless ref($$f{sql_generator}) eq 'CODE';
1380 0           ($filterDeps, $filterSql, $filterLabel) = @{ $$f{sql_generator}->(@$argArray) };
  0            
1381             } else {
1382 0           die "invalid named_filter: $namedFilterAlias\n";
1383             }
1384 0 0         ($filterSql, @filterBinds) = @$filterSql if ref($filterSql) eq 'ARRAY';
1385              
1386              
1387 0           my $sql = '(' x $numLeftParen;
1388 0           $sql .= $filterSql;
1389 0           $sql .= ')' x $numRightParen;
1390              
1391             # glue expression
1392 0           push @sql, $sql;
1393 0           push @binds, @filterBinds;
1394 0           push @name, $filterLabel;
1395              
1396 0 0         if (ref($filterDeps) eq 'ARRAY') {
    0          
1397 0           $deps{$_}=1 for @$filterDeps;
1398             } elsif ($filterDeps) {
1399 0           $deps{$filterDeps}=1;
1400             }
1401             }
1402              
1403              
1404             # [COL] != [COL2]
1405             elsif ($$exp[0]==3) {
1406 0           my ($type, $numLeftParen,$leftColAlias,$operatorName,$rightColAlias,$numRightParen) = @$exp;
1407 0           $parenthesis+=$numLeftParen;
1408 0           $parenthesis-=$numRightParen;
1409              
1410 0           my $operator = uc($operatorName);
1411              
1412             # handle left side of expression
1413 0           my ($leftDeps, $leftSql, $leftName, $leftOpts, @leftBinds, $leftType);
1414 0           ($leftDeps, $leftSql, $leftName, $leftOpts) = @{ $$oq{select}{$leftColAlias} };
  0            
1415 0 0         $leftSql = $$leftOpts{filter_sql} if $$leftOpts{filter_sql};
1416 0 0         ($leftSql, @leftBinds) = @$leftSql if ref($leftSql) eq 'ARRAY';
1417 0           $leftType = $oq->get_col_type($leftColAlias, 'filter');
1418 0   0       $leftName ||= $leftColAlias;
1419              
1420             # handle right side of expression
1421 0           my ($rightDeps, $rightSql, $rightName, $rightOpts, @rightBinds, $rightType);
1422 0           ($rightDeps, $rightSql, $rightName, $rightOpts) = @{ $$oq{select}{$rightColAlias} };
  0            
1423 0 0         $rightSql = $$rightOpts{filter_sql} if $$rightOpts{filter_sql};
1424 0 0         ($rightSql, @rightBinds) = @$rightSql if ref($rightSql) eq 'ARRAY';
1425 0           $rightType = $oq->get_col_type($rightColAlias, 'filter');
1426 0   0       $rightName ||= $rightColAlias;
1427              
1428             # do type conversion to ensure types are the same
1429 0 0         if ($leftType ne $rightType) {
1430 0 0         if ($$oq{dbtype} eq 'mysql') {
1431 0 0         $leftSql = "CONCAT('', $leftSql)" unless $leftType eq 'char';
1432 0 0         $rightSql = "CONCAT('', $rightSql)" unless $rightType eq 'char';
1433             } else {
1434 0 0         $leftSql = "TO_CHAR($leftSql)" unless $leftType eq 'char';
1435 0 0         $rightSql = "TO_CHAR($rightSql)" unless $rightType eq 'char';
1436             }
1437             }
1438              
1439             # if char ensure NULL is turned into empty string so comparison works
1440 0 0         if ($leftType eq 'char') {
1441 0 0         my $nullVal = $$oq{dbtype} eq 'Oracle' ? "'_ _'" : "''";
1442 0           $leftSql = "COALESCE($leftSql,$nullVal)";
1443 0           $rightSql = "COALESCE($rightSql,$nullVal)";
1444             }
1445              
1446             # handle case insensitivity
1447 0 0         if ($operatorName =~ /contains/i) {
1448 0 0         $operator = $operatorName =~ /not/i ? "NOT LIKE" : "LIKE";
1449 0           $leftSql = "LOWER($leftSql)";
1450 0           $rightSql = "LOWER($rightSql)";
1451 0 0 0       $rightSql = $$oq{dbtype} eq 'Oracle' || $$oq{dbtype} eq 'SQLite'
1452             ? "'%'||$leftSql||'%'"
1453             : "CONCAT('%',$leftSql,'%')";
1454             }
1455              
1456 0           my $sql = '(' x $numLeftParen;
1457 0           $sql .= "$leftSql $operator $rightSql";
1458 0           $sql .= ')' x $numRightParen;
1459              
1460             # glue expression
1461 0           push @sql, $sql;
1462 0           push @binds, @leftBinds, @rightBinds;
1463 0           push @name, $leftName, $operatorName, $rightName;
1464 0           $deps{$_}=1 for @$leftDeps;
1465 0           $deps{$_}=1 for @$rightDeps;
1466             }
1467             }
1468              
1469 0           my @deps = grep { $_ } keys %deps;
  0            
1470 0           my $sql = join(' ', @sql);
1471 0           my $name = join(' ', @name);
1472              
1473             # make sure parenthesis are balanced
1474 0 0         if ($parenthesis > 0) {
1475 0           my $p = ')' x $parenthesis;
1476 0           $sql .= $p;
1477 0           $name .= $p;
1478             }
1479              
1480 0           my %rv = ( sql => $sql, binds => \@binds, deps => \@deps, name => $name );
1481 0           return \%rv;
1482             }
1483              
1484              
1485             sub parseSort {
1486 0     0 0   my ($oq, $str) = @_;
1487 0           $str =~ /^\s+/;
1488              
1489 0           my (@sql, @binds, @name, %deps);
1490              
1491 0           while (1) {
1492              
1493             # parse named sort
1494 0 0         if ($str =~ /\G(\w+)\(\s*/gc) {
    0          
    0          
    0          
1495 0           my $namedSortAlias = $1;
1496 0           my @args;
1497 0           while (1) {
1498 0 0 0       if ($str =~ /\G\)\s*/gc) {
    0 0        
      0        
      0        
1499 0           last;
1500             }
1501             elsif ($str =~ /\G(\-?\d*\.\d+)\s*\,*\s*/gc ||
1502             $str =~ /\G(\-?\d+)\s*\,*\s*/gc ||
1503             $str =~ /\G\'([^\']*)\'\s*\,*\s*/gc ||
1504             $str =~ /\G\"([^\"]*)\"\s*\,*\s*/gc ||
1505             $str =~ /\G(\w+)\s*\,*\s*/gc) {
1506 0           push @args, $1;
1507             } else {
1508 0           die "could not parse named sort arguments\n";
1509             }
1510             }
1511 0           my ($sortDeps, $sortSql, @sortBinds, $sortLabel);
1512            
1513 0           my $s = $$oq{named_sorts}{$namedSortAlias};
1514 0 0         if (ref($s) eq 'ARRAY') {
    0          
1515 0           ($sortDeps, $sortSql, $sortLabel) = @$s;
1516             } elsif (ref($s) eq 'HASH') {
1517             die "could not find sql_generator for named_srot $namedSortAlias\n"
1518 0 0         unless ref($$s{sql_generator}) eq 'CODE';
1519 0           ($sortDeps, $sortSql, $sortLabel) = @{ $$s{sql_generator}->(@args) };
  0            
1520 0 0         ($sortSql, @sortBinds) = @$sortSql if ref($sortSql) eq 'ARRAY';
1521             } else {
1522 0           die "invalid named_filter: $namedSortAlias\n";
1523             }
1524              
1525 0           push @sql, $sortSql;
1526 0           push @binds, @sortBinds;
1527 0           push @name, $sortLabel;
1528 0           $deps{$_} =1 for @$sortDeps;
1529             }
1530              
1531             # parse named sort
1532             elsif ($str =~ /\G\[?(\w+)\]?\s*/gc) {
1533 0           my $colAlias = $1;
1534 0 0         die "missing sort col: $colAlias\n" unless $$oq{select}{$colAlias};
1535 0           my @sortBinds;
1536 0           my ($sortDeps, $sortSql, $sortLabel, $opts) = @{ $$oq{select}{$colAlias} };
  0            
1537 0 0         $sortSql = $$opts{sort_sql} if $$opts{sort_sql};
1538 0 0         ($sortSql, @sortBinds) = @$sortSql if ref($sortSql) eq 'ARRAY';
1539 0   0       $sortLabel ||= $colAlias;
1540              
1541 0 0         if ($str =~ /\Gdesc\s*/gci) {
1542 0           $sortSql .= ' DESC';
1543 0           $sortLabel .= ' (reverse)';
1544             }
1545              
1546 0           push @sql, $sortSql;
1547 0           push @binds, @sortBinds;
1548 0           push @name, $sortLabel;
1549 0           $deps{$_} =1 for @$sortDeps;
1550             }
1551              
1552             elsif ($str =~ /\G$/gc) {
1553 0           last;
1554             }
1555             elsif ($str =~ /\G\,\s*/gc) {
1556 0           next;
1557             }
1558             else {
1559 0           die "could not parse sort\n";
1560             }
1561             }
1562              
1563 0           my @deps = grep { $_ } keys %deps;
  0            
1564              
1565 0           return { sql => \@sql, binds => \@binds, deps => \@deps, name => \@name };
1566             }
1567              
1568              
1569             # normalize member variables
1570             sub _normalize {
1571 0     0     my $oq = shift;
1572             #$$oq{error_handler}->("DEBUG: \$oq->_normalize()\n") if $$oq{debug};
1573              
1574 0 0         $oq->{'AutoSetLongReadLen'} = 1 unless exists $oq->{'AutoSetLongReadLen'};
1575              
1576             # make sure all option hash refs exist
1577 0   0       $oq->{'select'}->{$_}->[3] ||= {} for keys %{ $oq->{'select'} };
  0            
1578 0   0       $oq->{'joins' }->{$_}->[3] ||= {} for keys %{ $oq->{'joins'} };
  0            
1579              
1580              
1581             # since the sql & deps definitions can optionally be entered as arrays
1582             # turn all into arrays if not already
1583 0           for ( # key, index
1584             ['select', 0], ['select', 1],
1585             ['joins', 0], ['joins', 1], ['joins', 2],
1586             ['named_filters', 0], ['named_filters', 1],
1587             ['named_sorts', 0], ['named_sorts', 1] ) {
1588 0           my ($key, $i) = @$_;
1589 0   0       $oq->{$key} ||= {};
1590 0           foreach my $alias (keys %{ $oq->{$key} }) {
  0            
1591 0 0 0       if (ref($oq->{$key}->{$alias}) eq 'ARRAY' &&
      0        
1592             defined $oq->{$key}->{$alias}->[$i] &&
1593             ref($oq->{$key}->{$alias}->[$i]) ne 'ARRAY') {
1594 0           $oq->{$key}->{$alias}->[$i] = [$oq->{$key}->{$alias}->[$i]];
1595             }
1596             }
1597             }
1598              
1599             # make sure the following select options, if they exist are array references
1600 0           foreach my $col (keys %{ $oq->{'select'} }) {
  0            
1601 0           my $opts = $oq->{'select'}->{$col}->[3];
1602 0           foreach my $opt (qw( select_sql sort_sql filter_sql )) {
1603             $opts->{$opt} = [$opts->{$opt}]
1604 0 0 0       if exists $opts->{$opt} && ref($opts->{$opt}) ne 'ARRAY';
1605             }
1606              
1607             # make sure defined deps exist
1608 0           foreach my $dep (@{ $$oq{'select'}{$col}[0] }) {
  0            
1609             die "dep $dep for select $col does not exist"
1610 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1611             }
1612             }
1613              
1614             # look for new cursors and define parent child links if not already defined
1615 0           foreach my $join (keys %{ $oq->{'joins'} }) {
  0            
1616 0           my $opts = $oq->{'joins'}->{$join}->[3];
1617 0 0         if (exists $opts->{new_cursor}) {
1618 0 0         if (ref($opts->{new_cursor}) ne 'HASH') {
1619 0           $oq->_formulate_new_cursor($join);
1620             } else {
1621             die "could not find keys, join, and sql for new cursor in $join"
1622             unless exists $opts->{new_cursor}->{'keys'} &&
1623             exists $opts->{new_cursor}->{'join'} &&
1624 0 0 0       exists $opts->{new_cursor}->{'sql'};
      0        
1625             }
1626             }
1627              
1628             # make sure defined deps exist
1629 0           foreach my $dep (@{ $$oq{'joins'}{$join}[0] }) {
  0            
1630             die "dep $dep for join $join does not exist"
1631 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1632             }
1633             }
1634              
1635             # make sure deps for named_sorts exist
1636 0           foreach my $named_sort (keys %{ $$oq{'named_sorts'} }) {
  0            
1637 0           foreach my $dep (@{ $$oq{'named_sorts'}{$named_sort}->[0] }) {
  0            
1638             die "dep $dep for named_sort $named_sort does not exist"
1639 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1640             }
1641             }
1642              
1643             # make sure deps for named_filter exist
1644 0           foreach my $named_filter (keys %{ $$oq{'named_filters'} }) {
  0            
1645 0 0         if (ref($$oq{'named_filters'}{$named_filter}) eq 'ARRAY') {
1646 0           foreach my $dep (@{ $$oq{'named_filters'}{$named_filter}->[0] }) {
  0            
1647             die "dep $dep for named_sort $named_filter does not exist"
1648 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1649             }
1650             }
1651             }
1652              
1653 0           $oq->{'col_types'} = undef;
1654              
1655 0           return undef;
1656             }
1657              
1658              
1659              
1660              
1661              
1662              
1663              
1664             # defines how a child cursor joins to its parent cursor
1665             # by defining keys, join, sql in child cursor
1666             # called from the _normalize method
1667             sub _formulate_new_cursor {
1668 0     0     my $oq = shift;
1669 0           my $joinAlias = shift;
1670              
1671             #$$oq{error_handler}->("DEBUG: \$oq->_formulate_new_cursor('$joinAlias')\n") if $$oq{debug};
1672              
1673             # vars to define
1674 0           my (@keys, $join, $sql, @sqlBinds);
1675              
1676             # get join definition
1677 0           my ($fromSql, @fromBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1678              
1679 0           my ($whereSql, @whereBinds);
1680 0           ($whereSql, @whereBinds) = @{ $oq->{joins}->{$joinAlias}->[2] }
1681 0 0         if defined $oq->{joins}->{$joinAlias}->[2];
1682              
1683             # if NOT an SQL-92 type join
1684 0 0         if (defined $whereSql) {
1685 0           $whereSql =~ s/\(\+\)/\ /g; # remove outer join notation
1686 0 0         die "BAD_PARAMS - where binds not allowed in 'new_cursor' joins"
1687             if scalar(@whereBinds);
1688             }
1689              
1690             # else is SQL-92 so separate out joins from table definition
1691             # do this by making it a pre SQL-92 type join
1692             # by defining $whereSql
1693             # and removing join sql from $fromSql
1694             else {
1695 0           $_ = $fromSql;
1696 0           m/\G\s*left\b/sicg;
1697 0           m/\G\s*join\b/sicg;
1698              
1699             # parse inline view
1700 0 0         if (m/\G\s*\(/scg) {
    0          
1701 0           $fromSql = '(';
1702 0           my $p=1;
1703 0           my $q;
1704 0   0       while ($p > 0 && m/\G(.)/scg) {
1705 0           my $c = $1;
1706 0 0 0       if ($q) { $q = '' if $c eq $q; } # if end of quote
  0 0          
    0          
    0          
    0          
1707 0           elsif ($c eq "'" || $c eq '"') { $q = $c; } # if start of quote
1708 0           elsif ($c eq '(') { $p++; } # if left paren
1709 0           elsif ($c eq ')') { $p--; } # if right paren
1710 0           $fromSql .= $c;
1711             }
1712             }
1713              
1714             # parse table name
1715             elsif (m/\G\s*(\w+)\b/scg) {
1716 0           $fromSql = $1;
1717             }
1718              
1719             else {
1720 0           die "could not parse tablename";
1721             }
1722              
1723             # include alias if it exists
1724 0 0 0       if (m/\G\s*([\d\w\_]+)\s*/scg && lc($1) ne 'on') {
1725 0           $fromSql .= ' '.$1;
1726 0           m/\G\s*on\b/cgi;
1727             }
1728              
1729             # get the whereSql
1730 0 0         if (m/\G\s*\((.*)\)\s*$/cgs) {
1731 0           $whereSql = $1;
1732             }
1733             }
1734              
1735             # define sql & sqlBinds
1736 0           $sql = $fromSql;
1737 0           @sqlBinds = @fromBinds;
1738            
1739             # parse $whereSql to create $join, and @keys
1740 0           foreach my $part (split /\b([\w\d\_]+\.[\w\d\_]+)\b/,$whereSql) {
1741 0 0         if ($part =~ /\b([\w\d\_]+)\.([\w\d\_]+)\b/) {
1742 0           my $dep = $1;
1743 0           my $sql = $2;
1744 0 0         if ($dep eq $joinAlias) {
1745 0           $join .= $part;
1746             } else {
1747 0           push @keys, [$dep, $sql];
1748 0           $join .= '?';
1749             }
1750             } else {
1751 0           $join .= $part;
1752             }
1753             }
1754              
1755             # fill in options
1756 0           $oq->{joins}->{$joinAlias}->[3]->{'new_cursor'} = {
1757             'keys' => \@keys, 'join' => $join, 'sql' => [$sql, @sqlBinds] };
1758              
1759 0           return undef;
1760             }
1761              
1762              
1763              
1764              
1765             # make sure the join counts are the same
1766             # throws exception with error when there is a problem
1767             # this can be an expensive wasteful operation and should not be done in a production env
1768             sub check_join_counts {
1769 0     0 0   my $oq = shift;
1770              
1771             #$$oq{error_handler}->("DEBUG: \$oq->check_join_counts()\n") if $$oq{debug};
1772              
1773              
1774             # since driving table count is computed first this will get set first
1775 0           my $drivingTableCount;
1776              
1777 0           foreach my $join (keys %{ $oq->{joins} }) {
  0            
1778 0           my ($cursors) = $oq->_order_deps($join);
1779 0           my @deps = map { @$_ } @$cursors; # flatten deps in cursors
  0            
1780 0           my $drivingTable = $deps[0];
1781              
1782             # now create from clause
1783 0           my ($fromSql, @fromBinds, @whereSql, @whereBinds);
1784 0           foreach my $joinAlias (@deps) {
1785 0           my ($sql, @sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1786              
1787             # if this is the driving table
1788 0 0         if (! $oq->{joins}->{$joinAlias}->[0]) {
    0          
1789             # alias it if not already aliased in sql
1790 0 0         $fromSql .= " $joinAlias" unless $sql =~ /\b$joinAlias\s*$/;
1791             }
1792              
1793             # if NOT sql-92 join
1794             elsif (defined $oq->{joins}->{$joinAlias}->[2]) {
1795 0           $fromSql .= ",\n $sql $joinAlias";
1796 0           push @fromBinds, @sqlBinds;
1797 0           my ($where_sql, @where_sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[2] };
  0            
1798 0           push @whereSql, $where_sql;
1799 0           push @whereBinds, @where_sqlBinds;
1800             }
1801              
1802             # else this is an SQL-92 type join
1803             else {
1804 0           $fromSql .= "\n$sql ";
1805             }
1806             }
1807              
1808 0           my $where;
1809 0 0         $where = 'WHERE '.join("\nAND ", @whereSql) if @whereSql;
1810              
1811 0           my $sql = "
1812             SELECT count(*)
1813             FROM (
1814             SELECT $drivingTable.*
1815             FROM $fromSql
1816             $where
1817             ) OPTIMALQUERYCNTCK ";
1818 0           my @binds = (@fromBinds,@whereBinds);
1819 0           my $count;
1820 0           eval { ($count) = $oq->{dbh}->selectrow_array($sql, undef, @binds); };
  0            
1821 0 0         die "Problem executing ERROR: $@\nSQL: $sql\nBINDS: ".join(',', @binds)."\n" if $@;
1822 0 0         $drivingTableCount = $count unless defined $drivingTableCount;
1823 0 0         confess "BAD_JOIN_COUNT - driving table $drivingTable count ".
1824             "($drivingTableCount) != driving table joined with $join".
1825             " count ($count)" if $count != $drivingTableCount;
1826             }
1827              
1828 0           return undef;
1829             }
1830              
1831              
1832              
1833             =comment
1834             $oq->get_col_type($alias,$context);
1835             =cut
1836             sub type_map {
1837 0     0 0   my $oq = shift;
1838             return {
1839 0           -1 => 'char',
1840             -2 => 'clob',
1841             -3 => 'clob',
1842             -4 => 'clob',
1843             -5 => 'num',
1844             -6 => 'num',
1845             -7 => 'num',
1846             -8 => 'char',
1847             -9 => 'char',
1848             0 => 'char',
1849             1 => 'char',
1850             2 => 'num',
1851             3 => 'num', # is decimal type
1852             4 => 'num',
1853             5 => 'num',
1854             6 => 'num', # float
1855             7 => 'num',
1856             8 => 'num',
1857             9 => 'date',
1858             10 => 'char', # time (no date)
1859             11 => 'datetime',
1860             12 => 'char',
1861             16 => 'date',
1862             30 => 'clob',
1863             40 => 'clob',
1864             91 => 'date',
1865             93 => 'datetime',
1866             95 => 'date',
1867             'TIMESTAMP' => 'datetime',
1868             'INTEGER' => 'num',
1869             'TEXT' => 'char',
1870             'VARCHAR' => 'char',
1871             'varchar' => 'char'
1872             };
1873             }
1874              
1875             # $type = $oq->get_col_type($alias,$context);
1876             sub get_col_type {
1877 0     0 0   my $oq = shift;
1878 0           my $alias = shift;
1879 0   0       my $context = shift || 'default';
1880             #$$oq{error_handler}->("DEBUG: \$oq->get_col_type($alias, $context)\n") if $$oq{debug};
1881              
1882             return $oq->{'select'}->{$alias}->[3]->{'col_type'} ||
1883 0   0       $oq->get_col_types($context)->{$alias};
1884             }
1885              
1886             #{ ColAlias => Type, .. } = $oq->get_col_types($context)
1887             # where $content in ('default','sort','filter','select')
1888             sub get_col_types {
1889 0     0 0   my $oq = shift;
1890 0   0       my $context = shift || 'default';
1891             #$$oq{error_handler}->("DEBUG: \$oq->get_col_types($context)\n") if $$oq{debug};
1892             return $oq->{'col_types'}->{$context}
1893 0 0         if defined $oq->{'col_types'};
1894              
1895 0           $oq->{'col_types'} = {
1896             'default' => {}, 'sort' => {},
1897             'filter' => {}, 'select' => {} };
1898              
1899 0           my (%deps, @selectColTypeOrder, @selectColAliasOrder, @select, @selectBinds, @where);
1900 0           foreach my $selectAlias (keys %{ $oq->{'select'} } ) {
  0            
1901 0           my $s = $oq->{'select'}->{$selectAlias};
1902              
1903             # did user already define this type?
1904 0 0         if (exists $s->[3]->{'col_type'}) {
1905 0           $oq->{'col_types'}->{'default'}->{$selectAlias} = $s->[3]->{'col_type'};
1906 0           $oq->{'col_types'}->{'select' }->{$selectAlias} = $s->[3]->{'col_type'};
1907 0           $oq->{'col_types'}->{'filter' }->{$selectAlias} = $s->[3]->{'col_type'};
1908 0           $oq->{'col_types'}->{'sort' }->{$selectAlias} = $s->[3]->{'col_type'};
1909             }
1910              
1911             # else write sql to determine type with context
1912             else {
1913 0           $deps{$_} = 1 for @{ $s->[0] };
  0            
1914              
1915 0           foreach my $type (
1916             ['default', $s->[1]],
1917             ['select', $s->[3]->{'select_sql'}],
1918             ['filter', $s->[3]->{'filter_sql'}],
1919             ['sort', $s->[3]->{'sort_sql'}] ) {
1920 0 0         next if ! defined $type->[1];
1921 0           push @selectColTypeOrder, $type->[0];
1922 0           push @selectColAliasOrder, $selectAlias;
1923 0           my ($sql, @binds) = @{ $type->[1] };
  0            
1924 0           push @select, $sql;
1925 0           push @selectBinds, @binds;
1926              
1927             # this next one is needed for oracle so inline views don't get processed
1928             # kinda stupid if you ask me
1929             # don't bother though if there is binds
1930             # this isn't neccessary for mysql since an explicit limit is
1931             # defined latter
1932 0 0 0       if ($$oq{dbtype} eq 'Oracle' && $#binds == -1) {
1933 0           push @where, "to_char($sql) = NULL";
1934             }
1935             }
1936             }
1937             }
1938              
1939             # are there unknown deps?
1940 0 0         if (%deps) {
1941              
1942             # order and flatten deps
1943 0           my @deps = keys %deps;
1944 0           my ($deps) = $oq->_order_deps(@deps);
1945              
1946              
1947 0           @deps = ();
1948 0           push @deps, @$_ for @$deps;
1949              
1950             # now create from clause
1951 0           my ($fromSql, @fromBinds);
1952 0           foreach my $joinAlias (@deps) {
1953 0           my ($sql, @sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1954 0           push @fromBinds, @sqlBinds;
1955              
1956             # if this is the driving table join
1957 0 0         if (! $oq->{joins}->{$joinAlias}->[0]) {
    0          
1958              
1959             # alias it if not already aliased in sql
1960 0           $fromSql .= $sql;
1961 0 0         $fromSql .= " $joinAlias" unless $sql =~ /\b$joinAlias\s*$/;
1962             }
1963              
1964             # if NOT sql-92 join
1965             elsif (defined $oq->{joins}->{$joinAlias}->[2]) {
1966 0           $fromSql .= ",\n $sql $joinAlias";
1967             }
1968              
1969             # else this is an SQL-92 type join
1970             else {
1971 0           $fromSql .= "\n$sql ";
1972             }
1973              
1974             }
1975              
1976 0           my $where;
1977 0 0         $where .= "\nAND " if $#where > -1;
1978 0           $where .= join("\nAND ", @where);
1979              
1980 0           my @binds = (@selectBinds, @fromBinds);
1981 0           my $sql = "
1982             SELECT ".join(',', @select)."
1983             FROM $fromSql";
1984              
1985 0 0 0       if ($$oq{dbtype} eq 'Oracle' || $$oq{dbtype} eq 'Microsoft SQL Server') {
    0          
1986 0           $sql .= "
1987             WHERE 1=2
1988             $where ";
1989             }
1990              
1991             elsif ($$oq{dbtype} eq 'mysql') {
1992 0           $sql .= "
1993             LIMIT 0 ";
1994             }
1995              
1996 0           my $sth;
1997 0           eval {
1998 0           local $oq->{dbh}->{PrintError} = 0;
1999 0           local $oq->{dbh}->{RaiseError} = 1;
2000 0           $sth = $oq->{dbh}->prepare($sql);
2001 0           $sth->execute(@binds);
2002 0 0         }; if ($@) {
2003 0           confess "SQL Error in get_col_types:\n$@\n$sql\n(".join(",",@binds).")";
2004             }
2005              
2006             # read types into col_types cache in object
2007 0           my $type_map = $oq->type_map();
2008 0           for (my $i=0; $i < scalar(@selectColAliasOrder); $i++) {
2009 0           my $name = $selectColAliasOrder[$i];
2010 0           my $type_code = $sth->{TYPE}->[$i];
2011              
2012             # remove parenthesis in type_code from sqlite
2013 0           $type_code =~ s/\([^\)]*\)//;
2014            
2015 0 0         my $type = $type_map->{$type_code} or
2016             die "could not find type code: $type_code for col $name";
2017 0           $oq->{'col_types'}->{$selectColTypeOrder[$i]}->{$name} = $type;
2018              
2019             # set the type for select, filter, and sort to the default
2020             # unless they are already defined
2021 0 0         if ($selectColTypeOrder[$i] eq 'default') {
2022 0   0       $oq->{'col_types'}->{'select' }->{$name} ||= $type;
2023 0   0       $oq->{'col_types'}->{'filter' }->{$name} ||= $type;
2024 0   0       $oq->{'col_types'}->{'sort' }->{$name} ||= $type;
2025             }
2026             }
2027              
2028 0           $sth->finish();
2029             }
2030              
2031 0           return $oq->{'col_types'}->{$context};
2032             }
2033              
2034              
2035              
2036              
2037             # prepare an sth
2038             sub prepare {
2039 0     0 0   my $oq = shift;
2040             #$$oq{error_handler}->("DEBUG: \$oq->prepare(".Dumper(\@_).")\n") if $$oq{debug};
2041 0           return DBIx::OptimalQuery::sth->new($oq,@_);
2042             }
2043              
2044              
2045              
2046             # returns ARRAYREF: [order,idx]
2047             # order is [ [dep1,dep2,dep3], [dep4,dep5,dep6] ], # cursor/dep order
2048             # idx is { dep1 => 0, dep4 => 1, .. etc .. } # index of what cursor dep is in
2049             sub _order_deps {
2050 0     0     my ($oq, @deps) = @_;
2051             #$$oq{error_handler}->("DEBUG: \$oq->_order_deps(".Dumper(\@_).")\n") if $$oq{debug};
2052              
2053             # add always_join deps
2054 0           foreach my $joinAlias (keys %{ $$oq{joins} }) {
  0            
2055 0 0         push @deps, $joinAlias if $$oq{joins}{$joinAlias}[3]{always_join};
2056             }
2057              
2058             # @order is an array of array refs. Where each array ref represents deps
2059             # for a separate cursor
2060             # %idx is a hash of scalars where the hash key is the dep name and the
2061             # hash value is what index into order (which cursor number)
2062             # where you find the dep
2063 0           my (@order, %idx);
2064              
2065             # var to detect infinite recursion
2066 0           my $maxRecurse = 1000;
2067              
2068             # recursive function to order deps
2069             # each dep calls this again on all parent deps until all deps are fulfilled
2070             # then the dep is added
2071             # modfies @order & %idx
2072 0           my $place_missing_deps;
2073             $place_missing_deps = sub {
2074 0     0     my ($dep) = @_;
2075              
2076             # detect infinite recursion
2077 0           $maxRecurse--;
2078 0 0         die "BAD_JOINS - could not link joins to meet all deps" if $maxRecurse == 0;
2079              
2080             # recursion to make sure parent deps are added first
2081 0 0         if (defined $oq->{'joins'}->{$dep}->[0]) {
2082 0           foreach my $parent_dep (@{ $oq->{'joins'}->{$dep}->[0] } ) {
  0            
2083 0 0         $place_missing_deps->($parent_dep) if ! exists $idx{$parent_dep};
2084             }
2085             }
2086              
2087             # at this point all parent deps have been added,
2088             # now add this dep if it has not already been added
2089 0 0         if (! exists $idx{$dep}) {
2090              
2091             # add new cursor if dep is main driving table or has option new_cursor
2092 0 0 0       if (! defined $oq->{'joins'}->{$dep}->[0] ||
2093             exists $oq->{'joins'}->{$dep}->[3]->{new_cursor}) {
2094 0           push @order, [$dep];
2095 0           $idx{$dep} = $#order;
2096             }
2097              
2098             # place dep in @order & %idx
2099             # uses the same cursor as its parent dep
2100             # this is found by looking at the parent_idx
2101             else {
2102 0   0       my $parent_idx = $idx{$oq->{'joins'}->{$dep}->[0]->[0]} || 0;
2103 0           push @{ $order[ $parent_idx ] }, $dep;
  0            
2104 0           $idx{$dep} = $parent_idx;
2105             }
2106             }
2107 0           return undef;
2108 0           };
2109              
2110 0           $place_missing_deps->($_) for @deps;
2111              
2112 0           return (\@order, \%idx);
2113             }
2114              
2115              
2116             1;