File Coverage

lib/DBIx/OptimalQuery.pm
Criterion Covered Total %
statement 30 1028 2.9
branch 0 512 0.0
condition 0 166 0.0
subroutine 10 42 23.8
pod 0 9 0.0
total 40 1757 2.2


line stmt bran cond sub pod time code
1             package DBIx::OptimalQuery::sth;
2              
3 8     8   808 use strict;
  8         17  
  8         211  
4 8     8   35 use warnings;
  8         10  
  8         191  
5 8     8   32 no warnings qw( uninitialized once redefine );
  8         10  
  8         243  
6              
7 8     8   10738 use DBI();
  8         121230  
  8         251  
8 8     8   62 use Carp;
  8         15  
  8         487  
9 8     8   4834 use Data::Dumper();
  8         46680  
  8         30243  
10              
11             sub Dumper {
12 0     0     local $Data::Dumper::Indent = 1;
13 0           local $Data::Dumper::SortKeys = 1;
14 0           Data::Dumper::Dumper(@_);
15             }
16              
17              
18             =comment
19             prepare a DBI sth from user defined selects, filters, sorts
20              
21             this constructor 'new' is called when a DBIx::OptimalQuery->prepare method
22             call is issued.
23              
24             my %opts = (
25             show => []
26             filter => ""
27             sort => ""
28             );
29              
30             $sth = $oq->prepare(%opts);
31             - same as -
32             $sth = DBIx::OptimalQuery::sth->new($oq,%opts);
33              
34             $sth->execute( limit => [0, 10]);
35             =cut
36             sub new {
37 0     0     my $class = shift;
38 0           my $oq = shift;
39 0           my %args = @_;
40              
41             #$$oq{error_handler}->("DEBUG: \$sth = $class->new(\$oq,\n".Dumper(\%args).")\n") if $$oq{debug};
42              
43 0           my $sth = bless \%args, $class;
44 0           $sth->{oq} = $oq;
45 0           $sth->_normalize();
46 0           $sth->create_select();
47 0           $sth->create_where();
48 0           $sth->create_order_by();
49              
50 0           return $sth;
51             }
52              
53 0     0     sub get_lo_rec { $_[0]{limit}[0] }
54 0     0     sub get_hi_rec { $_[0]{limit}[1] }
55              
56             sub set_limit {
57 0     0     my ($sth, $limit) = @_;
58 0           $$sth{limit} = $limit;
59 0           return undef;
60             }
61              
62             # execute statement
63             # notice that we can't execute other child cursors
64             # because their bind params are dependant on
65             # their parent cursor value
66             sub execute {
67 0     0     my ($sth) = @_;
68 0 0         return undef if $$sth{_already_executed};
69 0           $$sth{_already_executed}=1;
70              
71             #$$sth{oq}{error_handler}->("DEBUG: \$sth->execute()\n") if $$sth{oq}{debug};
72 0 0         return undef if $sth->count()==0;
73              
74 0           local $$sth{oq}{dbh}{LongReadLen};
75              
76             # build SQL for main cursor
77 0           { my $c = $sth->{cursors}->[0];
  0            
78 0           my @all_deps = (@{$c->{select_deps}}, @{$c->{where_deps}}, @{$c->{order_by_deps}});
  0            
  0            
  0            
79 0           my ($order) = $sth->{oq}->_order_deps(@all_deps);
80 0           my @from_deps; push @from_deps, @$_ for @$order;
  0            
81              
82             # create from_sql, from_binds
83             # vars prefixed with old_ is used for supported non sql-92 joins
84 0           my ($from_sql, @from_binds, $old_join_sql, @old_join_binds );
85              
86 0           foreach my $from_dep (@from_deps) {
87 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[1] };
  0            
88 0 0         push @from_binds, @binds if @binds;
89              
90             # if this is the driving table join
91 0 0         if (! $sth->{oq}->{joins}->{$from_dep}->[0]) {
    0          
92              
93             # alias it if not already aliased in sql
94 0           $from_sql .= $sql.' ';
95 0 0         $from_sql .= "$from_dep" unless $sql =~ /\b$from_dep\s*$/;
96 0           $from_sql .= "\n";
97             }
98            
99            
100             # if SQL-92 type join?
101             elsif (! defined $sth->{oq}->{joins}->{$from_dep}->[2]) {
102 0           $from_sql .= $sql."\n";
103             }
104            
105             # old style join
106             else {
107 0           $from_sql .= ", ".$sql.' '.$from_dep."\n";
108 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[2] };
  0            
109 0 0         $old_join_sql .= " AND " if $old_join_sql ne '';
110 0           $old_join_sql .= $sql;
111 0           push @old_join_binds, @binds;
112             }
113             }
114            
115            
116             # construct where clause
117 0           my $where;
118 0           { my @where;
  0            
119 0 0         push @where, '('.$old_join_sql.') ' if $old_join_sql;
120 0 0         push @where, '('.$c->{where_sql}.') ' if $c->{where_sql};
121 0 0         $where = ' WHERE '.join("\nAND ", @where) if @where;
122             }
123            
124             # generate sql and bind params
125 0           $$c{sql} = "SELECT ".join(',', @{ $c->{select_sql} })." FROM $from_sql $where ".
126 0 0         (($c->{order_by_sql}) ? "ORDER BY ".$c->{order_by_sql} : '');
127              
128 0           my @binds = (@{ $c->{select_binds} }, @from_binds, @old_join_binds,
129 0           @{$c->{where_binds}}, @{$c->{order_by_binds}} );
  0            
  0            
130 0           $$c{binds} = \@binds;
131              
132             # if clobs have been selected, find & set LongReadLen
133 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
      0        
134             $$sth{'oq'}{'AutoSetLongReadLen'} &&
135 0           scalar(@{$$c{'selected_lobs'}})) {
136              
137             my $maxlenlobsql = "SELECT greatest(".join(',',
138 0           map { "nvl(max(DBMS_LOB.GETLENGTH($_)),0)" } @{$$c{'selected_lobs'}}
  0            
139 0           ).") FROM (".$$c{'sql'}.")";
140              
141 0           my ($SetLongReadLen) = $$sth{oq}{dbh}->selectrow_array($maxlenlobsql, undef, @{$$c{'binds'}});
  0            
142              
143 0 0 0       if (! $$sth{oq}{dbh}{LongReadLen} || $SetLongReadLen > $$sth{oq}{dbh}{LongReadLen}) {
144 0           $$sth{oq}{dbh}{LongReadLen} = $SetLongReadLen;
145             }
146             }
147              
148 0           $sth->add_limit_sql();
149             }
150              
151              
152             # build children cursors
153 0           my $cursors = $sth->{cursors};
154 0           foreach my $i (1 .. $#$cursors) {
155 0           my $c = $sth->{cursors}->[$i];
156 0           my $sd = $c->{select_deps};
157              
158             # define sql and binds for joins for this child cursor
159             # in the following vars
160 0           my ($from_sql, @from_binds, $where_sql, @where_binds );
161              
162             # define vars for child cursor driving table
163             # these are handled differently since we aren't joining in parent deps
164             # they were precomputed in _normalize method when constructing $oq
165              
166             ($from_sql, @from_binds) =
167 0           @{ $sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor}->{sql} };
  0            
168 0           $where_sql = $sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor}->{'join'};
169 0           my $order_by_sql = '';
170 0 0         if ($sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor_order_by}) {
171 0           $order_by_sql = " ORDER BY ".$sth->{oq}->{joins}->{$sd->[0]}->[3]->{new_cursor_order_by};
172             }
173              
174 0           $from_sql .= "\n";
175              
176             # now join in all other deps normally for this cursor
177 0           foreach my $i (1 .. $#$sd) {
178 0           my $joinAlias = $sd->[$i];
179              
180 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$joinAlias}->[1] };
  0            
181              
182             # these will NOT be defined for sql-92 type joins
183             my ($joinWhereSql, @joinWhereBinds) =
184 0           @{ $sth->{oq}->{joins}->{$joinAlias}->[2] }
185 0 0         if defined $sth->{oq}->{joins}->{$joinAlias}->[2];
186              
187             # if SQL-92 type join?
188 0 0         if (! defined $joinWhereSql) {
189 0           $from_sql .= $sql."\n";
190 0           push @from_binds, @binds;
191             }
192              
193             # old style join
194             else {
195 0           $from_sql .= ",\n$sql $joinAlias";
196 0           push @from_binds, @binds;
197 0 0         if ($joinWhereSql) {
198 0 0         $where_sql .= " AND " if $where_sql;
199 0           $where_sql .= $joinWhereSql;
200             }
201 0           push @where_binds, @joinWhereBinds;
202             }
203             }
204              
205             # build child cursor sql
206             $c->{sql} = "
207 0           SELECT ".join(',', @{ $c->{select_sql} })."
  0            
208             FROM $from_sql
209             WHERE $where_sql
210             $order_by_sql ";
211 0           $c->{binds} = [ @{ $c->{select_binds} }, @from_binds, @where_binds ];
  0            
212              
213             # if clobs have been selected, find & set LongReadLen
214 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
      0        
215             $$sth{'oq'}{'AutoSetLongReadLen'} &&
216 0           scalar(@{$$c{'selected_lobs'}})) {
217             my ($SetLongReadLen) = $$sth{oq}{dbh}->selectrow_array("
218             SELECT greatest(".join(',',
219 0           map { "nvl(max(DBMS_LOB.GETLENGTH($_)),0)" } @{$$c{'selected_lobs'}}
  0            
220             ).")
221 0           FROM (".$$c{'sql'}.")", undef, @{$$c{'binds'}});
  0            
222 0 0 0       if (! $$sth{oq}{dbh}{LongReadLen} || $SetLongReadLen > $$sth{oq}{dbh}{LongReadLen}) {
223 0           $$sth{oq}{dbh}{LongReadLen} = $SetLongReadLen;
224             }
225             }
226             }
227              
228 0           eval {
229 0           my $c;
230              
231             # prepare all cursors
232 0           foreach $c (@$cursors) {
233 0 0         $$sth{oq}->{error_handler}->("SQL:\n".$c->{sql}."\nBINDS:\n".Dumper($c->{binds})."\n") if $$sth{oq}{debug};
234 0           $c->{sth} = $sth->{oq}->{dbh}->prepare($c->{sql});
235             }
236 0           $c = $$cursors[0];
237 0           $c->{sth}->execute( @{ $c->{binds} } );
  0            
238 0           my @fieldnames = @{ $$c{select_field_order} };
  0            
239 0           my %rec;
240 0           my @bindcols = \( @rec{ @fieldnames } );
241 0           $c->{sth}->bind_columns(@bindcols);
242 0           $c->{bind_hash} = \%rec;
243             };
244 0 0         if ($@) {
245 0           die "Problem with SQL; $@\n";
246             }
247 0           return undef;
248             }
249              
250             # function to add limit sql
251             # $sth->add_limit_sql()
252             sub add_limit_sql {
253 0     0     my ($sth) = @_;
254              
255             #$$sth{oq}{error_handler}->("DEBUG: \$sth->add_limit_sql()\n") if $$sth{oq}{debug};
256 0   0       my $lo_limit = $$sth{limit}[0] || 0;
257 0   0       my $hi_limit = $$sth{limit}[1] || $sth->count();
258 0           my $c = $sth->{cursors}->[0];
259              
260 0 0         if ($$sth{oq}{dbtype} eq 'Oracle') {
    0          
    0          
261             $c->{sql} = "
262             SELECT *
263             FROM (
264             SELECT tablernk1.*, rownum RANK
265             FROM (
266 0           ".$c->{sql}."
267             ) tablernk1
268             WHERE rownum <= ?
269             ) tablernk2
270             WHERE tablernk2.RANK >= ? ";
271 0           push @{$$c{binds}}, ($hi_limit, $lo_limit);
  0            
272 0           push @{$$c{select_field_order}}, "DBIXOQRANK";
  0            
273             }
274              
275             # sqlserver doesn't support limit/offset until Sql Server 2012 (which I don't have to test)
276             # the workaround is this ugly hack...
277             elsif ($$sth{oq}{dbtype} eq 'Microsoft SQL Server') {
278 0 0         die "missing required U_ID in select" unless exists $$sth{oq}{select}{U_ID};
279              
280 0           my $sql = $c->{sql};
281              
282             # extract order by sql, and binds in order by from sql
283 0           my $orderbysql;
284 0 0         if ($sql =~ s/\ (ORDER BY\ .*?)$//) {
    0          
285 0           $orderbysql = $1;
286 0           my $copy = $orderbysql;
287 0           my $bindCount = $copy =~ tr/,//;
288 0 0         if ($bindCount > 0) {
289 0           my @newBinds;
290 0           push @newBinds, pop @{$$c{binds}} for 1 .. $bindCount;
  0            
291 0           @{$$c{binds}} = (reverse @newBinds, @{$$c{binds}});
  0            
  0            
292             }
293 0           $orderbysql .= ", ".$$sth{oq}{select}{U_ID}[1][0];
294             } elsif (exists $$sth{oq}{select}{U_ID}) {
295 0           $orderbysql = " ORDER BY ".$$sth{oq}{select}{U_ID}[1][0];
296             }
297              
298             # remove first select keyword, and add new one with windowing
299 0 0         if ($sql =~ s/^(\s*SELECT\s*)//) {
300 0           my $limit = int($hi_limit - $lo_limit + 1);
301 0           my $lo_limit = int($lo_limit);
302              
303             # sqlserver doesn't allow placeholders for limit and offset here
304 0           $c->{sql} = "SELECT TOP $limit * FROM (SELECT ROW_NUMBER() OVER ($orderbysql) AS RANK, $sql) tablerank1 WHERE tablerank1.RANK >= $lo_limit";
305 0           unshift @{$$c{select_field_order}}, "DBIXOQRANK";
  0            
306             }
307             }
308              
309             elsif ($$sth{oq}{dbtype} eq 'Pg') {
310 0           my $a = $lo_limit - 1;
311 0           my $b = $hi_limit - $lo_limit + 1;
312 0           $c->{sql} .= "\nLIMIT ? OFFSET ?";
313 0           push @{$$c{binds}}, ($b, $a);
  0            
314             }
315              
316             else {
317 0           my $a = $lo_limit - 1;
318 0           my $b = $hi_limit - $lo_limit + 1;
319 0           $c->{sql} .= "\nLIMIT ?,?";
320 0           push @{$$c{binds}}, ($a, $b);
  0            
321             }
322              
323 0           return undef;
324             }
325              
326              
327             # normalize member variables
328             sub _normalize {
329 0     0     my $sth = shift;
330             #$$sth{oq}{error_handler}->("DEBUG: \$sth->_normalize()\n") if $$sth{oq}{debug};
331              
332             # if show is not defined - then define it
333 0 0         if (! exists $sth->{show}) {
334 0           my @select;
335 0           foreach my $select (@{ $sth->{oq}->{'select'} } ) {
  0            
336 0           push @select, $select;
337             }
338 0           $sth->{show} = \@select;
339             }
340              
341             # define filter & sort if not defined
342 0 0         $sth->{'filter'} = "" if ! exists $sth->{'filter'};
343 0 0         $sth->{'sort'} = "" if ! exists $sth->{'sort'};
344 0           $sth->{'fetch_index'} = 0;
345 0           $sth->{'count'} = undef;
346 0           $sth->{'cursors'} = undef;
347              
348 0           return undef;
349             }
350              
351              
352              
353             # define @select & @select_binds, and add deps
354             sub create_select {
355 0     0     my $sth = shift;
356             #$$sth{oq}{error_handler}->("DEBUG: \$sth->create_select()\n") if $$sth{oq}{debug};
357              
358            
359             # find all of the columns that need to be shown
360 0           my %show;
361              
362             # find all deps to be used in select including cols marked always_select
363 0           my (@deps, @select_sql, @select_binds);
364 0           { my %deps;
  0            
365              
366             # add deps, @select, @select_binds for items in show
367 0           foreach my $show (@{ $sth->{show} }) {
  0            
368 0 0         $show{$show} = 1 if exists $sth->{'oq'}->{'select'}->{$show};
369 0           foreach my $dep (@{ $sth->{'oq'}->{'select'}->{$show}->[0] }) {
  0            
370 0           $deps{$dep} = 1;
371             }
372             }
373              
374             # add deps used in always_select
375 0           foreach my $colAlias (keys %{ $sth->{'oq'}->{'select'} }) {
  0            
376 0 0         if ($sth->{'oq'}->{'select'}->{$colAlias}->[3]->{always_select} ) {
377 0           $show{$colAlias} = 1;
378 0           $deps{$_} = 1 for @{ $sth->{'oq'}->{'select'}->{$colAlias}->[0] };
  0            
379             }
380             }
381 0           @deps = keys %deps;
382             }
383              
384             # order and index deps into appropriate cursors
385 0           my ($dep_order, $dep_idx) = $sth->{oq}->_order_deps(@deps);
386              
387             # look though select again and add all cols with is_hidden option
388             # if all their deps have been fulfilled
389 0           foreach my $colAlias (keys %{ $sth->{'oq'}->{'select'} }) {
  0            
390 0 0         if ($sth->{'oq'}->{'select'}->{$colAlias}->[3]->{is_hidden}) {
391 0           my $deps = $sth->{'oq'}->{'select'}->{$colAlias}->[0];
392 0           my $all_deps_met = 1;
393 0           for (@$deps) {
394 0 0         if (! exists $dep_idx->{$_}) {
395 0           $all_deps_met = 0;
396 0           last;
397             }
398             }
399 0 0         $show{$colAlias} = 1 if $all_deps_met;
400             }
401             }
402              
403             # create main cursor structure & attach deps for main cursor
404 0           $sth->{'cursors'} = [ $sth->_get_main_cursor_template() ];
405 0           $sth->{'cursors'}->[0]->{'select_deps'} = $dep_order->[0];
406              
407             # unique counter that is used to uniquely identify cols in parent cursors
408             # to their children cursors
409 0           my $parent_bind_tag_idx = 0;
410              
411             # create other cursors (if they exist)
412             # and define how they join to their parent cursors
413             # by defining parent_join, parent_keys
414 0           foreach my $i (1 .. $#$dep_order) {
415 0           push @{ $sth->{'cursors'} }, $sth->_get_sub_cursor_template();
  0            
416 0           $sth->{'cursors'}->[$i]->{'select_deps'} = $dep_order->[$i];
417              
418             # add parent_join, parent_keys for this child cursor
419 0           my $driving_child_join_alias = $dep_order->[$i]->[0];
420 0           my $cursor_opts = $sth->{'oq'}->{'joins'}->{$driving_child_join_alias}->[3]->{new_cursor};
421 0           foreach my $part (@{ $cursor_opts->{'keys'} } ) {
  0            
422 0           my ($dep,$sql) = @$part;
423 0           my $key = 'DBIXOQMJK'.$parent_bind_tag_idx; $parent_bind_tag_idx++;
  0            
424 0           my $parent_cursor_idx = $dep_idx->{$dep};
425 0 0         die "could not find dep: $dep for new cursor" if $parent_cursor_idx eq '';
426 0           push @{ $sth->{'cursors'}->[$parent_cursor_idx]->{select_field_order} }, $key;
  0            
427 0           push @{ $sth->{'cursors'}->[$parent_cursor_idx]->{select_sql} }, "$dep.$sql AS $key";
  0            
428 0           push @{ $sth->{'cursors'}->[$i]->{'parent_keys'} }, $key;
  0            
429             }
430 0           $sth->{'cursors'}->[$i]->{'parent_join'} = $cursor_opts->{'join'};
431             }
432            
433             # plug in select_sql, select_binds for cursors
434 0           foreach my $show (keys %show) {
435 0           my $select = $sth->{'oq'}->{'select'}->{$show};
436 0 0         next if ! $select;
437              
438 0           my $cursor = $sth->{'cursors'}->[$dep_idx->{$select->[0]->[0]}];
439              
440 0           my $select_sql;
441              
442             # if type is date then use specified date format
443 0 0 0       if (! $$select[3]{select_sql} && $$select[3]{date_format}) {
444 0           my @tmp = @{ $select->[1] }; $select_sql = \ @tmp; # need a real copy cause we are going to mutate it
  0            
  0            
445 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' ||
    0          
446             $$sth{oq}{dbtype} eq 'Pg') {
447 0           $$select_sql[0] = "to_char(".$$select_sql[0].",'".$$select[3]{date_format}."')";
448             } elsif ($$sth{oq}{dbtype} eq 'mysql') {
449 0           $$select_sql[0] = "date_format(".$$select_sql[0].",'".$$select[3]{date_format}."')";
450             } else {
451 0           die "unsupported DB";
452             }
453             }
454              
455             # else just copy the select
456             else {
457 0   0       $select_sql = $select->[3]->{select_sql} || $select->[1];
458             }
459              
460             # remember if a lob is selected
461 0 0 0       if ($$sth{oq}{dbtype} eq 'Oracle' &&
462             $sth->{oq}->get_col_types('select')->{$show} eq 'clob') {
463 0           push @{ $cursor->{selected_lobs} }, $show;
  0            
464             #$select_sql->[0] = 'to_char('.$select_sql->[0].')';
465             }
466              
467 0 0         if ($select_sql->[0] ne '') {
468 0           push @{ $cursor->{select_field_order} }, $show;
  0            
469 0           push @{ $cursor->{select_sql} }, $select_sql->[0].' AS '.$show;
  0            
470 0           push @{ $cursor->{select_binds} }, @$select_sql[1 .. $#$select_sql];
  0            
471             }
472             }
473              
474 0           return undef;
475             }
476            
477              
478              
479              
480             # template for the main cursor
481             sub _get_main_cursor_template {
482 0     0     { sth => undef,
483             sql => "",
484             binds => [],
485             selected_lobs => [],
486             select_field_order => [],
487             select_sql => [],
488             select_binds => [],
489             select_deps => [],
490             where_sql => "",
491             where_binds => [],
492             where_deps => [],
493             where_name => "",
494             order_by_sql => "",
495             order_by_binds => [],
496             order_by_deps => [],
497             order_by_name => []
498             };
499             }
500              
501             # template for explicitly defined additional cursors
502             sub _get_sub_cursor_template {
503 0     0     { sth => undef,
504             sql => "",
505             binds => [],
506             selected_lobs => [],
507             select_field_order => [],
508             select_sql => [],
509             select_deps => [],
510             select_binds => [],
511             parent_join => "",
512             parent_keys => [],
513             };
514             }
515              
516              
517              
518              
519            
520            
521              
522              
523             # modify cursor and add where clause data
524             sub create_where {
525 0     0     my ($sth) = @_;
526              
527             # define cursor where_sql, where_deps, where_name where_binds from parsed filter types
528 0           my $c = $sth->{cursors}->[0];
529 0           foreach my $filterType (qw( filter hiddenFilter forceFilter)) {
530 0 0         next if $$sth{$filterType} eq '';
531 0           my $filterArray = $$sth{oq}->parseFilter($$sth{$filterType});
532 0           my $filterSQL = $$sth{oq}->generateFilterSQL($filterArray);
533              
534 0           push @{ $$c{where_deps} }, @{ $$filterSQL{deps} };
  0            
  0            
535 0 0         if ($$c{where_sql}) {
536 0           $$c{where_sql} .= ' AND ('.$$filterSQL{sql}.')';
537             } else {
538 0           $$c{where_sql} = $$filterSQL{sql};
539             }
540 0           push @{ $$c{where_binds} }, @{ $$filterSQL{binds} };
  0            
  0            
541 0 0         $$c{where_name} = $$filterSQL{name} if $filterType eq 'filter';
542             }
543              
544 0           return undef;
545             }
546              
547              
548              
549              
550             # modify cursor and add order by data
551             sub create_order_by {
552 0     0     my ($sth) = @_;
553 0           my $c = $sth->{cursors}->[0];
554              
555 0           my $s = $$sth{oq}->parseSort($$sth{'sort'});
556 0           $$c{order_by_deps} = $$s{deps};
557 0           $$c{order_by_sql} = join(',', @{ $$s{sql} });
  0            
558 0           $$c{order_by_binds} = $$s{binds};
559 0           $$c{order_by_name} = $$s{name};
560 0           return undef;
561             }
562              
563              
564              
565              
566              
567              
568              
569              
570              
571            
572              
573              
574             # fetch next row or return undef when done
575             sub fetchrow_hashref {
576 0     0     my ($sth) = @_;
577 0 0         return undef unless $sth->count() > 0;
578 0           $sth->execute(); # execute if not already existed
579              
580             #$$sth{oq}{error_handler}->("DEBUG: \$sth->fetchrow_hashref()\n") if $$sth{oq}{debug};
581              
582 0           my $cursors = $sth->{cursors};
583 0           my $c = $cursors->[0];
584              
585             # bind hash value to column data
586 0           my $rec = $$c{bind_hash};
587              
588             # fetch record
589 0 0         if (my $v = $c->{sth}->fetch()) {
590              
591 0           foreach my $i (0 .. $#$v) {
592              
593             # if col type is decimal auto trim 0s after decimal
594 0 0 0       if ($c->{sth}->{TYPE}->[$i] eq '3' && $$v[$i] =~ /\./) {
595 0           $$v[$i] =~ s/0+$//;
596 0           $$v[$i] =~ s/\.$//;
597             }
598             }
599            
600 0           $sth->{'fetch_index'}++;
601              
602             # execute other cursors
603 0           foreach my $i (1 .. $#$cursors) {
604 0           $c = $cursors->[$i];
605              
606 0           $c->{sth}->execute( @{ $c->{binds} },
607 0           map { $$rec{$_} } @{ $c->{parent_keys} } );
  0            
  0            
608              
609 0           my $cols = $$c{select_field_order};
610 0           @$rec{ @$cols } = [];
611              
612 0           while (my @vals = $c->{sth}->fetchrow_array()) {
613 0           for (my $i=0; $i <= $#$cols; $i++) {
614 0           push @{ $$rec{$$cols[$i]} }, $vals[$i];
  0            
615             }
616             }
617 0           $c->{sth}->finish();
618             }
619 0           return $rec;
620             } else {
621 0           return undef;
622             }
623             }
624              
625             # finish sth
626             sub finish {
627 0     0     my ($sth) = @_;
628             #$$sth{oq}{error_handler}->("DEBUG: \$sth->finish()\n") if $$sth{oq}{debug};
629 0           foreach my $c (@{$$sth{cursors}}) {
  0            
630 0 0         $$c{sth}->finish() if $$c{sth};
631 0           undef $$c{sth};
632             }
633 0           return undef;
634             }
635              
636             # get count for sth
637             sub count {
638 0     0     my $sth = shift;
639              
640             # if count is not already defined, define it
641 0 0         if (! defined $sth->{count}) {
642             #$$sth{oq}{error_handler}->("DEBUG: \$sth->count()\n") if $$sth{oq}{debug};
643              
644 0           my $c = $sth->{cursors}->[0];
645              
646 0           my $drivingTable = $c->{select_deps}->[0];
647              
648             # only need to join in driving table with
649             # deps used in where clause
650 0           my ($deps) = $sth->{oq}->_order_deps($drivingTable, @{$c->{where_deps}});
  0            
651 0           my @from_deps; push @from_deps, @$_ for @$deps;
  0            
652              
653             # create from_sql, from_binds
654             # vars prefixed with old_ is used for supported non sql-92 joins
655 0           my ($from_sql, @from_binds, $old_join_sql, @old_join_binds );
656 0           foreach my $from_dep (@from_deps) {
657 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[1] };
  0            
658 0 0         push @from_binds, @binds if @binds;
659              
660             # if this is the driving table join
661 0 0         if (! $sth->{oq}->{joins}->{$from_dep}->[0]) {
    0          
662              
663             # alias it if not already aliased in sql
664 0 0         $sql .= " $from_dep" unless $sql =~ /\b$from_dep\s*$/;
665 0           $from_sql .= $sql;
666             }
667              
668             # if SQL-92 type join?
669             elsif (! $sth->{oq}->{joins}->{$from_dep}->[2]) {
670 0           $from_sql .= "\n".$sql;
671             }
672              
673             # old style join
674             else {
675 0           $from_sql .= ",\n".$sql.' '.$from_dep;
676 0           my ($sql, @binds) = @{ $sth->{oq}->{joins}->{$from_dep}->[2] };
  0            
677 0 0         if ($sql) {
678 0 0         $old_join_sql .= " AND " if $old_join_sql ne '';
679 0           $old_join_sql .= $sql;
680             }
681 0           push @old_join_binds, @binds;
682             }
683             }
684              
685              
686             # construct where clause
687 0           my $where;
688 0           { my @where;
  0            
689 0 0         push @where, '('.$old_join_sql.') ' if $old_join_sql;
690 0 0         push @where, '('.$c->{where_sql}.') ' if $c->{where_sql};
691 0 0         $where = 'WHERE '.join("\nAND ", @where) if @where;
692             }
693              
694             # generate sql and bind params
695 0           my $sql = "
696             SELECT count(*)
697             FROM (
698             SELECT $drivingTable.*
699             FROM $from_sql
700             $where
701             ) cnt_query";
702 0           my @binds = (@from_binds, @old_join_binds, @{$c->{where_binds}});
  0            
703              
704 0           eval {
705 0 0         $$sth{oq}->{error_handler}->("SQL:\n$sql\nBINDS:\n".Dumper(\@binds)."\n") if $$sth{oq}{debug};
706 0           ($sth->{count}) = $sth->{oq}->{dbh}->selectrow_array($sql, undef, @binds);
707 0 0         }; if ($@) {
708 0           die "Problem finding count for SQL:\n$sql\nBINDS:\n".join(',',@binds)."\n\n$@\n";
709             }
710             }
711              
712 0           return $sth->{count};
713             }
714              
715 0     0     sub fetch_index { $_->{'fetch_index'} }
716              
717             sub filter_descr {
718 0     0     my $sth = shift;
719 0           return $sth->{cursors}->[0]->{'where_name'};
720             }
721              
722             sub sort_descr {
723 0     0     my $sth = shift;
724 0 0         if (wantarray) {
725 0           return @{ $sth->{cursors}->[0]->{'order_by_name'} };
  0            
726             } else {
727 0           return join(', ', @{ $sth->{cursors}->[0]->{'order_by_name'} });
  0            
728             }
729             }
730              
731              
732              
733              
734              
735              
736              
737              
738              
739              
740              
741              
742              
743              
744              
745              
746              
747              
748              
749              
750              
751              
752              
753              
754              
755              
756              
757              
758              
759              
760              
761              
762              
763              
764              
765              
766              
767              
768              
769             package DBIx::OptimalQuery;
770              
771             =comment
772              
773             use DBIx::OptimalQuery;
774             my $oq = DBIx::OptimalQuery->new(
775             select => {
776             'alias' => [dep, sql, nice_name, { OPTIONS } ]
777             }
778              
779             joins => {
780             'alias' => [dep, join_sql, where_sql, { OPTIONS } ]
781             }
782              
783             named_filters => {
784             'name' => [dep, sql, nice]
785             'name' => {
786             sql_generator => sub {
787             my %args = @_;
788             return [dep, sql, name]
789             }
790             title => "text displayed on interactive filter"
791             }
792             },
793              
794             named_sorts => {
795             'name' => [dep, sql, nice]
796             'name' => { sql_generator => sub { return [dep, sql, name] } }
797             },
798              
799            
800              
801             debug => 0 | 1
802             );
803             =cut
804              
805 8     8   74 use strict;
  8         27  
  8         242  
806 8     8   39 use Carp;
  8         14  
  8         478  
807 8     8   52 use Data::Dumper;
  8         13  
  8         364  
808 8     8   43 use DBI();
  8         14  
  8         56016  
809              
810             sub new {
811 0     0 0   my $class = shift;
812 0           my %args = @_;
813 0           my $oq = bless \%args, $class;
814              
815 0   0       $$oq{debug} ||= 0;
816             #$$oq{error_handler}->("DEBUG: $class->new(".Dumper(\%args).")\n") if $$oq{debug};
817              
818             die "BAD_PARAMS - must provide a dbh!"
819 0 0         unless $oq->{'dbh'};
820             die "BAD_PARAMS - must define a select key in call to constructor"
821 0 0         unless ref($oq->{'select'}) eq 'HASH';
822             die "BAD_PARAMS - must define a joins key in call to constructor"
823 0 0         unless ref($oq->{'joins'}) eq 'HASH';
824              
825              
826 0           $oq->_normalize();
827              
828 0           $$oq{dbtype} = $$oq{dbh}{Driver}{Name};
829 0 0         $$oq{dbtype} = $$oq{dbh}->get_info(17) if $$oq{dbtype} eq 'ODBC';
830              
831 0           return $oq;
832             }
833              
834              
835              
836             # returns [
837             # # type 1 - (selectalias operator literal)
838             # [1,$numLeftParen,$leftExpSelectAlias,$op,$rightExpLiteral,$numRightParen],
839             # # type 2 - (namedfilter, arguments)
840             # [2,$numLeftParen,$namedFilter,$argArray,$numRightParen]
841             # # type 3 - (selectalias operator selectalias)
842             # [3,$numLeftParen,$leftExpSelectAlias,$op,$rightExpSelectAlias,$numRightParen],
843             # # logic operator
844             # 'AND'|'OR'
845             # ]
846             sub parseFilter {
847 0     0 0   my ($oq, $f) = @_;
848 0           $f =~ /^\s+/; # trim leading spaces
849              
850 0           my @rv;
851 0 0         return \@rv if $f eq '';
852              
853 0           my $error;
854              
855 0           my $parenthesis=0;
856            
857 0           while (! $error) {
858 0           my $numLeftP=0;
859 0           my $numRightP=0;
860              
861             # parse opening parenthesis
862 0           while ($f =~ /\G\(\s*/gc) { $numLeftP++; $parenthesis++;}
  0            
  0            
863              
864             # if this looks like a named filter
865 0 0         if ($f=~/\G(\w+)\s*\(\s*/gc) {
866 0           my $namedFilter = $1;
867              
868 0 0         if (! exists $$oq{named_filters}{$namedFilter}) {
869 0           $error = "invalid named filter: $namedFilter";
870 0           next;
871             }
872              
873             # parse named filter arguments
874 0           my @args;
875 0           while (! $error) {
876 0 0 0       if ($f =~ /\G\)\s*/gc) {
    0 0        
877 0           last;
878             }
879             elsif ($f =~ /\G\'([^\']*)\'\s*\,?\s*/gc ||
880             $f =~ /\G\"([^\"]*)\"\s*\,?\s*/gc ||
881             $f =~ /\G([^\)\,]*)\s*\,?\s*/gc) {
882 0           push @args, $1;
883             } else {
884 0           $error = "could not parse named filter arguments";
885             }
886             }
887 0 0         next if $error;
888              
889             # parse closing parenthesis
890 0           while ($f =~ /\G\)\s*/gc) {
891 0 0         if ($parenthesis > 0) {
892 0           $parenthesis--;
893 0           $numRightP++;
894             }
895             }
896              
897 0           push @rv, [2,$numLeftP,$namedFilter,\@args,$numRightP];
898             }
899              
900             # else this is an expression
901             else {
902 0           my $lexp;
903             my $rexp;
904 0           my $typeNum = 1;
905              
906             # grab select alias used on the left side of the expression
907 0 0         if ($f=~/\G\[([^\]]+)\]\s*/gc) { $lexp = $1; }
  0 0          
908 0           elsif ($f=~/\G(\w+)\s*/gc) { $lexp = $1; }
909             else {
910 0           $error = "missing left expression";
911             }
912              
913             # make sure the select alias is valid
914 0 0         if (! $$oq{select}{$lexp}) {
915 0           $error = "invalid field $lexp";
916 0           next;
917             }
918              
919             # parse the operator
920 0           my $op;
921 0 0         if ($f =~ /\G(\!\=|\=|\<\=|\>\=|\<|\>|like|not\ ?like|contains|not\ ?contains)\s*/igc) {
922 0           $op = lc($1);
923             }
924             else {
925 0           $error = "invalid operator";
926 0           next;
927             }
928              
929             # if rexp is a select alias
930 0 0 0       if ($f=~/\G\[([^\]]+)\]\s*/gc) {
    0          
    0          
931 0           $rexp = $1;
932 0           $typeNum = 3;
933             }
934              
935             # else if rexp is a literal
936             elsif ($f =~ /\G\'([^\']*)\'\s*/gc ||
937             $f =~ /\G\"([^\"]*)\"\s*/gc) {
938 0           $rexp = $1;
939             }
940              
941             # else if rexp is a word
942             elsif ($f =~ /\G(\S+)\s*/gc) {
943 0           $rexp = $1;
944              
945             # is word a col alias?
946 0 0         if ($$oq{select}{$rexp}) {
947 0           $typeNum = 3;
948             }
949             }
950              
951             else {
952 0           $error = "missing right expression";
953 0           next;
954             }
955              
956             # parse closing parenthesis
957 0           while ($f =~ /\G\)\s*/gc) {
958 0 0         if ($parenthesis > 0) {
959 0           $parenthesis--;
960 0           $numRightP++;
961             }
962             }
963              
964 0           push @rv, [$typeNum, $numLeftP, $lexp, $op, $rexp, $numRightP];
965             }
966              
967             # parse logic operator
968 0 0         if ($f =~ /(AND|OR)\s*/gci) {
969 0           push @rv, uc($1);
970             }
971             else {
972 0           last;
973             }
974             }
975              
976 0 0         if ($error) {
977 0           my $p = pos($f);
978 0           $error .= " at ".substr($f, 0, $p).'<*>'.substr($f, $p);
979 0           die $error."\n";
980             }
981              
982 0           return \@rv;
983             }
984              
985              
986             # given a filter string, returns { sql => $sql, binds => \@binds, deps => \@deps, name => $name };
987             sub generateFilterSQL {
988 0     0 0   my ($oq, $filterArray) = @_;
989              
990             # build an array of sql tokens, bind vals, and used deps
991             # also build a formatted name
992 0           my @sql;
993             my @binds;
994 0           my %deps;
995 0           my @name;
996 0           my $parenthesis = 0;
997              
998 0           foreach my $exp (@$filterArray) {
999              
1000 0 0         if ($exp eq 'AND') {
    0          
    0          
    0          
    0          
1001 0           push @sql, "AND";
1002 0           push @name, "AND";
1003             } elsif ($exp eq 'OR') {
1004 0           push @sql, "OR";
1005 0           push @name, "OR";
1006             }
1007              
1008             # [COLALIAS] != "literal"
1009             elsif ($$exp[0]==1) {
1010 0           my ($type, $numLeftParen, $leftColAlias, $operatorName, $rval, $numRightParen) = @$exp;
1011 0           $parenthesis+=$numLeftParen;
1012 0           $parenthesis-=$numRightParen;
1013              
1014 0           my $operator = uc($operatorName);
1015              
1016             # handle left side of expression
1017 0           my ($leftDeps, $leftSql, $leftName, $leftOpts, @leftBinds, $leftType);
1018 0           ($leftDeps, $leftSql, $leftName, $leftOpts) = @{ $$oq{select}{$leftColAlias} };
  0            
1019 0 0         $leftSql = $$leftOpts{filter_sql} if $$leftOpts{filter_sql};
1020 0 0         ($leftSql, @leftBinds) = @$leftSql if ref($leftSql) eq 'ARRAY';
1021 0           $leftType = $oq->get_col_type($leftColAlias, 'filter');
1022 0   0       $leftName ||= $leftColAlias;
1023              
1024             # handle right side of expression
1025 0           my ($rightSql, $rightName, @rightBinds);
1026            
1027 0           $rightName = $rval;
1028 0 0         if ($rightName eq '') {
    0          
1029 0           $rightName = "''";
1030             } elsif ($rightName =~ /\s/) {
1031 0           $rightName = '"'.$rightName.'"';
1032             }
1033              
1034 0 0         $rval = $$leftOpts{db_formatter}->($rval) if $$leftOpts{db_formatter};
1035              
1036             # if empty check
1037 0 0         if ($rval eq '') {
1038 0 0 0       if ($leftType eq 'char' || $leftType eq 'clob') {
1039 0 0         if ($$oq{dbtype} eq 'Oracle') {
1040 0           $leftSql = "COALESCE($leftSql,'_ _')";
1041 0           $rightSql = "_ _";
1042             } else {
1043 0           $leftSql = "COALESCE($leftSql,'')";
1044 0           $rightSql = "''";
1045             }
1046 0 0         $operator = ($operator =~ /\!|NOT/i) ? '!=' : '=';
1047             } else {
1048 0 0         $operator = ($operator =~ /\!|NOT/i) ? 'IS NOT NULL' : 'IS NULL';
1049             }
1050             }
1051              
1052             # else check against literal value
1053             else {
1054             # if numeric operator
1055 0 0         if ($operator =~ /\=|\<|\>/) {
1056              
1057             # if we are doing a numeric date comparison, parse for the date in common formats
1058 0 0 0       if ($leftType eq 'date' || $leftType eq 'datetime') {
    0 0        
    0 0        
1059              
1060             # is this a calculated date?
1061 0 0         if ($rval =~ /today\s*([\+\-])\s*(\d+)\s*(minute|hour|day|week|month|year|)s?/i) {
    0          
1062 0           my $sign = $1;
1063 0           my $num = $2;
1064 0   0       my $unit = uc($3) || 'DAY';
1065 0           $rightName = "today ".$sign.$num." ".lc($unit);
1066 0 0         $rightName .= 's' if $num != 1;
1067 0 0         $num *= -1 if $sign eq '-';
1068              
1069 0 0         if ($$oq{dbtype} eq 'Oracle') {
1070 0 0         my $now = $leftType eq 'datetime' ? 'SYSDATE' : 'TRUNC(SYSDATE)';
1071 0 0         if ($unit eq 'MINUTE') {
    0          
    0          
    0          
    0          
    0          
1072 0           $rightSql = "$now+($num/1440)"
1073             } elsif ($unit eq 'HOUR') {
1074 0           $rightSql = "$now+($num/24)"
1075             } elsif ($unit eq 'DAY') {
1076 0           $rightSql = "$now+$num"
1077             } elsif ($unit eq 'WEEK') {
1078 0           $rightSql = "$now+($num*7)"
1079             } elsif ($unit eq 'MONTH') {
1080 0           $rightSql = "ADD_MONTHS($now,$num)";
1081             } elsif ($unit eq 'YEAR') {
1082 0           $rightSql = "ADD_MONTHS($now,$num*12)";
1083             }
1084             } else {
1085 0 0         my $now = $leftType eq 'datetime' ? 'NOW()' : 'CURDATE()';
1086 0           $rightSql = "DATE_ADD($now, INTERVAL $num $unit)";
1087             }
1088             }
1089              
1090             elsif ($rval =~ /today\s*/i) {
1091 0           $rightName = "today";
1092 0 0         if ($$oq{dbtype} eq 'Oracle') {
1093 0 0         $rightSql = $leftType eq 'datetime' ? 'SYSDATE' : 'TRUNC(SYSDATE)';
1094             } else {
1095 0 0         $rightSql = $leftType eq 'datetime' ? 'NOW()' : 'CURDATE()';
1096             }
1097             }
1098              
1099             # else this is a date value
1100             else {
1101              
1102 0           my ($y,$m,$d,$h,$mi,$s,$hourType);
1103 0 0         if ($rval =~ /^(\d\d\d\d)[\-\/](\d\d?)[\-\/](\d\d?)/) {
    0          
    0          
    0          
    0          
1104 0           $y = $1;
1105 0           $m = $2;
1106 0           $d = $3;
1107             } elsif ($rval =~ /^(\d\d?)[\-\/](\d\d?)[\-\/](\d\d\d\d)/) {
1108 0           $m = $1;
1109 0           $d = $2;
1110 0           $y = $3;
1111             } elsif ($rval =~ /^(\d\d?)[\-\/](\d\d?)[\-\/](\d\d)\b/) {
1112 0           $m = $1;
1113 0           $d = $2;
1114 0           $y = int('20'.$3);
1115             } elsif ($rval =~ /^(\d\d\d\d)[\-\/](\d\d?)/) {
1116 0           $y = $1;
1117 0           $m = $2;
1118             }
1119             elsif ($rval =~ /^(\d\d\d\d)/) {
1120 0           $y = $1;
1121             }
1122             else {
1123 0           die "could not parse date: $rval";
1124             }
1125              
1126             # extract time component if the type supports it
1127 0 0         if ($leftType eq 'datetime') {
1128 0 0         if ($rval =~ /\b(\d\d?)\:(\d\d?)[\:\.](\d\d?)/) {
    0          
    0          
1129 0           $h = $1;
1130 0           $mi = $2;
1131 0           $s = $3;
1132             }
1133             elsif ($rval =~ /\b(\d\d?)\:(\d\d?)/) {
1134 0           $h = $1;
1135 0           $mi = $2;
1136             }
1137             elsif ($rval =~ /\b(\d\d?)\s*(am|pm)/i) {
1138 0           $h = $1;
1139 0           $mi = '00';
1140             }
1141 0 0         if ($rval =~ /A/i) {
    0          
1142 0           $hourType='AM';
1143             } elsif ($rval =~ /P/i) {
1144 0           $hourType='PM';
1145             }
1146             }
1147              
1148             # format date expression for mysql
1149 0 0         if ($$oq{dbtype} eq 'mysql') {
1150 0           my $format = '%Y';
1151 0           my $val = "$y";
1152 0 0         if ($m) {
1153 0           $format .= '-%m';
1154 0           $val .= "-$m";
1155 0 0         if ($d) {
1156 0           $format .= '-%d';
1157 0           $val .= "-$d";
1158            
1159 0 0         if ($mi ne '') {
1160 0 0         $format .= $hourType ? ' %h' : ' %H';
1161 0           $format .= ':%i';
1162 0           $val .= " $h:$mi";
1163 0 0         if ($s ne '') {
1164 0           $format .= ':%s';
1165 0           $val .= ":$s";
1166             }
1167 0 0         if ($hourType) {
1168 0           $format .= ' %p';
1169 0           $val .= " $hourType";
1170             }
1171             }
1172             }
1173             }
1174 0           $rightSql = "STR_TO_DATE(?,'$format')";
1175 0           push @rightBinds, $val;
1176            
1177             # remove lvalue time if rval doesn't use it
1178 0 0 0       if ($leftType eq 'datetime' && $mi eq '') {
1179 0           $leftSql = "DATE($leftSql)";
1180             }
1181             }
1182              
1183             # format date expression for oracle and postgres (use compatible to_date function)
1184             else {
1185 0           my $format = 'YYYY';
1186 0           my $val = $y;
1187 0 0         if ($m) {
1188 0           $format .= "-MM";
1189 0           $val .= "-$m";
1190 0 0         if ($d) {
1191 0           $format .= "-DD";
1192 0           $val .= "-$d";
1193              
1194 0 0         if ($mi ne '') {
1195 0 0         $format .= $hourType ? ' HH' : ' HH24';
1196 0           $format .= ':MI';
1197 0           $val .= " $h:$mi";
1198 0 0         if ($s ne '') {
1199 0           $format .= ':SS';
1200 0           $val .= ":$s";
1201             }
1202 0 0         if ($hourType) {
1203 0           $format .= " $hourType";
1204 0           $val .= " $hourType";
1205             }
1206             }
1207             }
1208             }
1209              
1210 0           $rightSql = "TO_DATE(?,'$format')";
1211 0           push @rightBinds, $val;
1212              
1213             # remove lvalue time if rval doesn't use it
1214 0 0 0       if ($leftType eq 'datetime' && $mi eq '') {
1215 0           $leftSql = "TRUNC($leftSql)";
1216             }
1217             }
1218             }
1219             }
1220              
1221             # if this is a numeric comparison and rvalue is not a number, convert left side to text
1222             elsif ($leftType eq 'num' && $rval !~ /^(\-?\d*\.\d+|\-?\d+)$/) {
1223 0 0         if ($$oq{dbtype} eq 'mysql') {
1224 0           $leftSql = "CONCAT('',$leftSql)";
1225 0           $rightSql = '?';
1226 0           push @rightBinds, $rval;
1227             } else {
1228 0           $leftSql = "TO_CHAR($leftSql)";
1229 0           $rightSql = '?';
1230 0           push @rightBinds, $rval;
1231             }
1232             }
1233              
1234             # if numeric operator and field is an oracle clob, convert using to_char
1235             elsif ($$oq{dbtype} eq 'Oracle' && $leftType eq 'clob') {
1236 0           $leftSql = "TO_CHAR($leftSql)";
1237 0           $rightSql = '?';
1238 0           push @rightBinds, $rval;
1239             }
1240              
1241             else {
1242 0           $rightSql = '?';
1243 0           push @rightBinds, $rval;
1244             }
1245             }
1246              
1247             # like operator
1248             else {
1249              
1250             # convert contains operator to like
1251 0 0         if ($operatorName =~ /contains/i) {
1252 0 0 0       $leftSql = "LOWER($leftSql)" if $leftType eq 'char' || $leftType eq 'clob';
1253 0 0         $operator = $operatorName =~ /not/i ? "NOT LIKE" : "LIKE";
1254 0           $rval = '%'.lc($rval).'%';
1255             }
1256              
1257             # allow * as wildcard
1258 0 0         if ($operator =~ /like/i) {
1259 0           $rval =~ s/\*/\%/g;
1260             }
1261              
1262             # remove redundant wildcards
1263 0           $rval =~ s/\%\%+/\%/g;
1264              
1265             # if left side is date, convert to text so like operator works as expected
1266 0 0         if ($$leftOpts{date_format}) {
1267 0 0         if ($$oq{dbtype} eq 'mysql') {
1268 0           $leftSql = "DATE_FORMAT($leftSql,'$$leftOpts{date_format}')";
1269             } else {
1270 0           $leftSql = "TO_CHAR($leftSql,'$$leftOpts{date_format}')";
1271             }
1272             }
1273              
1274 0           $rightSql = '?';
1275 0           push @rightBinds, $rval;
1276             }
1277             }
1278              
1279             # if the leftSql uses a new cursor we need to write an exists expression
1280             # search dep path to see if a new_cursor is used
1281 0           my @path = ($$leftDeps[0]);
1282 0           my $i=0;
1283 0           while (1) {
1284 0 0         die "infinite dep loop detected" if ++$i==50;
1285 0           my $parentDep = $$oq{joins}{$path[-1]}[0][0];
1286 0 0         last unless $parentDep;
1287 0           push @path, $parentDep;
1288             }
1289              
1290             # find the oldest parent new cursor if it exists
1291 0           while (@path) {
1292 0 0         if ($$oq{joins}{$path[-1]}[3]{new_cursor}) {
1293 0           last;
1294             } else {
1295 0           pop @path;
1296             }
1297             }
1298            
1299             # if @path has elements, this uses a new_cursor and we must construct an exists expression
1300 0 0         if (@path) {
1301 0           @path = reverse @path;
1302 0           my ($preSql, $postSql, @preBinds);
1303 0           foreach my $joinDep (@path) {
1304 0           my ($fromSql, @fromBinds) = @{ $$oq{joins}{$joinDep}[1] };
  0            
1305              
1306             # unwrap SQL-92 join and add join to where
1307 0           $fromSql =~ s/^\s+//;
1308 0           $fromSql =~ s/^LEFT\s*//i;
1309 0           $fromSql =~ s/^OUTER\s*//i;
1310 0           $fromSql =~ s/^JOIN\s*//i;
1311              
1312 0           my $corelatedJoin;
1313 0 0         if ($fromSql =~ /^(.*)\bON\s*\((.*)\)\s*$/is) {
1314 0           $fromSql = $1;
1315 0           $corelatedJoin = $2;
1316             } else {
1317 0           die "could not parse for corelated join\n";
1318             }
1319              
1320             # in a one2many filter that has a negative operator, we need to use
1321             # a NOT EXISTS and unnegate the operator
1322 0 0         if ($rightName eq "''") {
    0          
    0          
1323 0 0         if ($operator eq '=') {
    0          
1324 0           $preSql .= "NOT ";
1325 0           $operator = '!=';
1326             }
1327             elsif ($operator eq 'IS NULL') {
1328 0           $preSql .= "NOT ";
1329 0           $operator = 'IS NOT NULL';
1330             }
1331             }
1332             elsif ($operator eq '!=') {
1333 0           $operator = '=';
1334 0           $preSql .= "NOT ";
1335             }
1336             elsif ($operator =~ s/NOT\ //) {
1337 0           $preSql .= "NOT ";
1338             }
1339 0           $preSql .= " EXISTS (\n SELECT 1\n FROM $fromSql\n WHERE ($corelatedJoin)\n AND ";
1340 0           $postSql .= ')';
1341 0           push @preBinds, @fromBinds;
1342             }
1343              
1344             # update left expression deps and binds
1345 0           $leftDeps = $$oq{joins}{$path[0]}[0];
1346 0 0         unshift @leftBinds, @preBinds if @preBinds;
1347 0           $leftSql = $preSql.$leftSql;
1348 0           $rightSql .= $postSql;
1349             }
1350              
1351 0           my $sql = '(' x $numLeftParen;
1352 0           $sql .= $leftSql;
1353 0           $sql .= ' '.$operator;
1354 0 0         $sql .= ' '.$rightSql if $rightSql ne '';
1355 0           $sql .= ')' x $numRightParen;
1356              
1357             # glue expression
1358 0           push @sql, $sql;
1359 0           push @binds, @leftBinds, @rightBinds;
1360 0           push @name, $leftName, $operatorName;
1361 0 0         push @name, $rightName if $rightName ne '';
1362 0           $deps{$_}=1 for @$leftDeps;
1363             }
1364              
1365              
1366             # namedFilter(args)
1367             elsif ($$exp[0]==2) {
1368 0           my ($type, $numLeftParen,$namedFilterAlias,$argArray,$numRightParen) = @$exp;
1369 0           $parenthesis+=$numLeftParen;
1370 0           $parenthesis-=$numRightParen;
1371              
1372             # generate filter sql, bind, name
1373 0           my $f = $$oq{named_filters}{$namedFilterAlias};
1374 0           my ($filterDeps, $filterSql, @filterBinds, $filterLabel);
1375 0 0         if (ref($f) eq 'ARRAY') {
    0          
1376 0           ($filterDeps, $filterSql, $filterLabel) = @$f;
1377             } elsif (ref($f) eq 'HASH') {
1378             die "could not find sql_generator for named_filter $namedFilterAlias\n"
1379 0 0         unless ref($$f{sql_generator}) eq 'CODE';
1380 0           ($filterDeps, $filterSql, $filterLabel) = @{ $$f{sql_generator}->(@$argArray) };
  0            
1381             } else {
1382 0           die "invalid named_filter: $namedFilterAlias\n";
1383             }
1384 0 0         ($filterSql, @filterBinds) = @$filterSql if ref($filterSql) eq 'ARRAY';
1385              
1386              
1387 0           my $sql = '(' x $numLeftParen;
1388             # put expression in parenthesis since it may contain OR expression without parenthesis which will screw up order of operations
1389 0           $sql .= '('.$filterSql.')';
1390 0           $sql .= ')' x $numRightParen;
1391              
1392             # glue expression
1393 0           push @sql, $sql;
1394 0           push @binds, @filterBinds;
1395 0           push @name, $filterLabel;
1396              
1397 0 0         if (ref($filterDeps) eq 'ARRAY') {
    0          
1398 0           $deps{$_}=1 for @$filterDeps;
1399             } elsif ($filterDeps) {
1400 0           $deps{$filterDeps}=1;
1401             }
1402             }
1403              
1404              
1405             # [COL] != [COL2]
1406             elsif ($$exp[0]==3) {
1407 0           my ($type, $numLeftParen,$leftColAlias,$operatorName,$rightColAlias,$numRightParen) = @$exp;
1408 0           $parenthesis+=$numLeftParen;
1409 0           $parenthesis-=$numRightParen;
1410              
1411 0           my $operator = uc($operatorName);
1412              
1413             # handle left side of expression
1414 0           my ($leftDeps, $leftSql, $leftName, $leftOpts, @leftBinds, $leftType);
1415 0           ($leftDeps, $leftSql, $leftName, $leftOpts) = @{ $$oq{select}{$leftColAlias} };
  0            
1416 0 0         $leftSql = $$leftOpts{filter_sql} if $$leftOpts{filter_sql};
1417 0 0         ($leftSql, @leftBinds) = @$leftSql if ref($leftSql) eq 'ARRAY';
1418 0           $leftType = $oq->get_col_type($leftColAlias, 'filter');
1419 0   0       $leftName ||= $leftColAlias;
1420              
1421             # handle right side of expression
1422 0           my ($rightDeps, $rightSql, $rightName, $rightOpts, @rightBinds, $rightType);
1423 0           ($rightDeps, $rightSql, $rightName, $rightOpts) = @{ $$oq{select}{$rightColAlias} };
  0            
1424 0 0         $rightSql = $$rightOpts{filter_sql} if $$rightOpts{filter_sql};
1425 0 0         ($rightSql, @rightBinds) = @$rightSql if ref($rightSql) eq 'ARRAY';
1426 0           $rightType = $oq->get_col_type($rightColAlias, 'filter');
1427 0   0       $rightName ||= $rightColAlias;
1428              
1429             # do type conversion to ensure types are the same
1430 0 0         if ($leftType ne $rightType) {
1431 0 0         if ($$oq{dbtype} eq 'mysql') {
1432 0 0         $leftSql = "CONCAT('', $leftSql)" unless $leftType eq 'char';
1433 0 0         $rightSql = "CONCAT('', $rightSql)" unless $rightType eq 'char';
1434             } else {
1435 0 0         $leftSql = "TO_CHAR($leftSql)" unless $leftType eq 'char';
1436 0 0         $rightSql = "TO_CHAR($rightSql)" unless $rightType eq 'char';
1437             }
1438             }
1439              
1440             # if char ensure NULL is turned into empty string so comparison works
1441 0 0         if ($leftType eq 'char') {
1442 0 0         my $nullVal = $$oq{dbtype} eq 'Oracle' ? "'_ _'" : "''";
1443 0           $leftSql = "COALESCE($leftSql,$nullVal)";
1444 0           $rightSql = "COALESCE($rightSql,$nullVal)";
1445             }
1446              
1447             # handle case insensitivity
1448 0 0         if ($operatorName =~ /contains/i) {
1449 0 0         $operator = $operatorName =~ /not/i ? "NOT LIKE" : "LIKE";
1450 0           $leftSql = "LOWER($leftSql)";
1451 0           $rightSql = "LOWER($rightSql)";
1452 0 0 0       $rightSql = $$oq{dbtype} eq 'Oracle' || $$oq{dbtype} eq 'SQLite'
1453             ? "'%'||$leftSql||'%'"
1454             : "CONCAT('%',$leftSql,'%')";
1455             }
1456              
1457 0           my $sql = '(' x $numLeftParen;
1458 0           $sql .= "$leftSql $operator $rightSql";
1459 0           $sql .= ')' x $numRightParen;
1460              
1461             # glue expression
1462 0           push @sql, $sql;
1463 0           push @binds, @leftBinds, @rightBinds;
1464 0           push @name, $leftName, $operatorName, $rightName;
1465 0           $deps{$_}=1 for @$leftDeps;
1466 0           $deps{$_}=1 for @$rightDeps;
1467             }
1468             }
1469              
1470 0           my @deps = grep { $_ } keys %deps;
  0            
1471 0           my $sql = join(' ', @sql);
1472 0           my $name = join(' ', @name);
1473              
1474             # make sure parenthesis are balanced
1475 0 0         if ($parenthesis > 0) {
1476 0           my $p = ')' x $parenthesis;
1477 0           $sql .= $p;
1478 0           $name .= $p;
1479             }
1480              
1481 0           my %rv = ( sql => $sql, binds => \@binds, deps => \@deps, name => $name );
1482 0           return \%rv;
1483             }
1484              
1485              
1486             sub parseSort {
1487 0     0 0   my ($oq, $str) = @_;
1488 0           $str =~ /^\s+/;
1489              
1490 0           my (@sql, @binds, @name, %deps);
1491              
1492 0           while (1) {
1493              
1494             # parse named sort
1495 0 0         if ($str =~ /\G(\w+)\(\s*/gc) {
    0          
    0          
    0          
1496 0           my $namedSortAlias = $1;
1497 0           my @args;
1498 0           while (1) {
1499 0 0 0       if ($str =~ /\G\)\s*/gc) {
    0 0        
      0        
      0        
1500 0           last;
1501             }
1502             elsif ($str =~ /\G(\-?\d*\.\d+)\s*\,*\s*/gc ||
1503             $str =~ /\G(\-?\d+)\s*\,*\s*/gc ||
1504             $str =~ /\G\'([^\']*)\'\s*\,*\s*/gc ||
1505             $str =~ /\G\"([^\"]*)\"\s*\,*\s*/gc ||
1506             $str =~ /\G(\w+)\s*\,*\s*/gc) {
1507 0           push @args, $1;
1508             } else {
1509 0           die "could not parse named sort arguments\n";
1510             }
1511             }
1512 0           my ($sortDeps, $sortSql, @sortBinds, $sortLabel);
1513            
1514 0           my $s = $$oq{named_sorts}{$namedSortAlias};
1515 0 0         if (ref($s) eq 'ARRAY') {
    0          
1516 0           ($sortDeps, $sortSql, $sortLabel) = @$s;
1517             } elsif (ref($s) eq 'HASH') {
1518             die "could not find sql_generator for named_srot $namedSortAlias\n"
1519 0 0         unless ref($$s{sql_generator}) eq 'CODE';
1520 0           ($sortDeps, $sortSql, $sortLabel) = @{ $$s{sql_generator}->(@args) };
  0            
1521 0 0         ($sortSql, @sortBinds) = @$sortSql if ref($sortSql) eq 'ARRAY';
1522             } else {
1523 0           die "invalid named_filter: $namedSortAlias\n";
1524             }
1525              
1526 0           push @sql, $sortSql;
1527 0           push @binds, @sortBinds;
1528 0           push @name, $sortLabel;
1529 0           $deps{$_} =1 for @$sortDeps;
1530             }
1531              
1532             # parse named sort
1533             elsif ($str =~ /\G\[?(\w+)\]?\s*/gc) {
1534 0           my $colAlias = $1;
1535 0 0         die "missing sort col: $colAlias\n" unless $$oq{select}{$colAlias};
1536 0           my @sortBinds;
1537 0           my ($sortDeps, $sortSql, $sortLabel, $opts) = @{ $$oq{select}{$colAlias} };
  0            
1538 0 0         $sortSql = $$opts{sort_sql} if $$opts{sort_sql};
1539 0 0         ($sortSql, @sortBinds) = @$sortSql if ref($sortSql) eq 'ARRAY';
1540 0   0       $sortLabel ||= $colAlias;
1541              
1542 0 0         if ($str =~ /\Gdesc\s*/gci) {
1543 0           $sortSql .= ' DESC';
1544 0           $sortLabel .= ' (reverse)';
1545             }
1546              
1547 0           push @sql, $sortSql;
1548 0           push @binds, @sortBinds;
1549 0           push @name, $sortLabel;
1550 0           $deps{$_} =1 for @$sortDeps;
1551             }
1552              
1553             elsif ($str =~ /\G$/gc) {
1554 0           last;
1555             }
1556             elsif ($str =~ /\G\,\s*/gc) {
1557 0           next;
1558             }
1559             else {
1560 0           die "could not parse sort\n";
1561             }
1562             }
1563              
1564 0           my @deps = grep { $_ } keys %deps;
  0            
1565              
1566 0           return { sql => \@sql, binds => \@binds, deps => \@deps, name => \@name };
1567             }
1568              
1569              
1570             # normalize member variables
1571             sub _normalize {
1572 0     0     my $oq = shift;
1573             #$$oq{error_handler}->("DEBUG: \$oq->_normalize()\n") if $$oq{debug};
1574              
1575 0 0         $oq->{'AutoSetLongReadLen'} = 1 unless exists $oq->{'AutoSetLongReadLen'};
1576              
1577             # make sure all option hash refs exist
1578 0   0       $oq->{'select'}->{$_}->[3] ||= {} for keys %{ $oq->{'select'} };
  0            
1579 0   0       $oq->{'joins' }->{$_}->[3] ||= {} for keys %{ $oq->{'joins'} };
  0            
1580              
1581              
1582             # since the sql & deps definitions can optionally be entered as arrays
1583             # turn all into arrays if not already
1584 0           for ( # key, index
1585             ['select', 0], ['select', 1],
1586             ['joins', 0], ['joins', 1], ['joins', 2],
1587             ['named_filters', 0], ['named_filters', 1],
1588             ['named_sorts', 0], ['named_sorts', 1] ) {
1589 0           my ($key, $i) = @$_;
1590 0   0       $oq->{$key} ||= {};
1591 0           foreach my $alias (keys %{ $oq->{$key} }) {
  0            
1592 0 0 0       if (ref($oq->{$key}->{$alias}) eq 'ARRAY' &&
      0        
1593             defined $oq->{$key}->{$alias}->[$i] &&
1594             ref($oq->{$key}->{$alias}->[$i]) ne 'ARRAY') {
1595 0           $oq->{$key}->{$alias}->[$i] = [$oq->{$key}->{$alias}->[$i]];
1596             }
1597             }
1598             }
1599              
1600             # make sure the following select options, if they exist are array references
1601 0           foreach my $col (keys %{ $oq->{'select'} }) {
  0            
1602 0           my $opts = $oq->{'select'}->{$col}->[3];
1603 0           foreach my $opt (qw( select_sql sort_sql filter_sql )) {
1604             $opts->{$opt} = [$opts->{$opt}]
1605 0 0 0       if exists $opts->{$opt} && ref($opts->{$opt}) ne 'ARRAY';
1606             }
1607              
1608             # make sure defined deps exist
1609 0           foreach my $dep (@{ $$oq{'select'}{$col}[0] }) {
  0            
1610             die "dep $dep for select $col does not exist"
1611 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1612             }
1613             }
1614              
1615             # look for new cursors and define parent child links if not already defined
1616 0           foreach my $join (keys %{ $oq->{'joins'} }) {
  0            
1617 0           my $opts = $oq->{'joins'}->{$join}->[3];
1618 0 0         if (exists $opts->{new_cursor}) {
1619 0 0         if (ref($opts->{new_cursor}) ne 'HASH') {
1620 0           $oq->_formulate_new_cursor($join);
1621             } else {
1622             die "could not find keys, join, and sql for new cursor in $join"
1623             unless exists $opts->{new_cursor}->{'keys'} &&
1624             exists $opts->{new_cursor}->{'join'} &&
1625 0 0 0       exists $opts->{new_cursor}->{'sql'};
      0        
1626             }
1627             }
1628              
1629             # make sure defined deps exist
1630 0           foreach my $dep (@{ $$oq{'joins'}{$join}[0] }) {
  0            
1631             die "dep $dep for join $join does not exist"
1632 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1633             }
1634             }
1635              
1636             # make sure deps for named_sorts exist
1637 0           foreach my $named_sort (keys %{ $$oq{'named_sorts'} }) {
  0            
1638 0           foreach my $dep (@{ $$oq{'named_sorts'}{$named_sort}->[0] }) {
  0            
1639             die "dep $dep for named_sort $named_sort does not exist"
1640 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1641             }
1642             }
1643              
1644             # make sure deps for named_filter exist
1645 0           foreach my $named_filter (keys %{ $$oq{'named_filters'} }) {
  0            
1646 0 0         if (ref($$oq{'named_filters'}{$named_filter}) eq 'ARRAY') {
1647 0           foreach my $dep (@{ $$oq{'named_filters'}{$named_filter}->[0] }) {
  0            
1648             die "dep $dep for named_sort $named_filter does not exist"
1649 0 0 0       if defined $dep && ! exists $$oq{'joins'}{$dep};
1650             }
1651             }
1652             }
1653              
1654 0           $oq->{'col_types'} = undef;
1655              
1656 0           return undef;
1657             }
1658              
1659              
1660              
1661              
1662              
1663              
1664              
1665             # defines how a child cursor joins to its parent cursor
1666             # by defining keys, join, sql in child cursor
1667             # called from the _normalize method
1668             sub _formulate_new_cursor {
1669 0     0     my $oq = shift;
1670 0           my $joinAlias = shift;
1671              
1672             #$$oq{error_handler}->("DEBUG: \$oq->_formulate_new_cursor('$joinAlias')\n") if $$oq{debug};
1673              
1674             # vars to define
1675 0           my (@keys, $join, $sql, @sqlBinds);
1676              
1677             # get join definition
1678 0           my ($fromSql, @fromBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1679              
1680 0           my ($whereSql, @whereBinds);
1681 0           ($whereSql, @whereBinds) = @{ $oq->{joins}->{$joinAlias}->[2] }
1682 0 0         if defined $oq->{joins}->{$joinAlias}->[2];
1683              
1684             # if NOT an SQL-92 type join
1685 0 0         if (defined $whereSql) {
1686 0           $whereSql =~ s/\(\+\)/\ /g; # remove outer join notation
1687 0 0         die "BAD_PARAMS - where binds not allowed in 'new_cursor' joins"
1688             if scalar(@whereBinds);
1689             }
1690              
1691             # else is SQL-92 so separate out joins from table definition
1692             # do this by making it a pre SQL-92 type join
1693             # by defining $whereSql
1694             # and removing join sql from $fromSql
1695             else {
1696 0           $_ = $fromSql;
1697 0           m/\G\s*left\b/sicg;
1698 0           m/\G\s*join\b/sicg;
1699              
1700             # parse inline view
1701 0 0         if (m/\G\s*\(/scg) {
    0          
1702 0           $fromSql = '(';
1703 0           my $p=1;
1704 0           my $q;
1705 0   0       while ($p > 0 && m/\G(.)/scg) {
1706 0           my $c = $1;
1707 0 0 0       if ($q) { $q = '' if $c eq $q; } # if end of quote
  0 0          
    0          
    0          
    0          
1708 0           elsif ($c eq "'" || $c eq '"') { $q = $c; } # if start of quote
1709 0           elsif ($c eq '(') { $p++; } # if left paren
1710 0           elsif ($c eq ')') { $p--; } # if right paren
1711 0           $fromSql .= $c;
1712             }
1713             }
1714              
1715             # parse table name
1716             elsif (m/\G\s*(\w+)\b/scg) {
1717 0           $fromSql = $1;
1718             }
1719              
1720             else {
1721 0           die "could not parse tablename";
1722             }
1723              
1724             # include alias if it exists
1725 0 0 0       if (m/\G\s*([\d\w\_]+)\s*/scg && lc($1) ne 'on') {
1726 0           $fromSql .= ' '.$1;
1727 0           m/\G\s*on\b/cgi;
1728             }
1729              
1730             # get the whereSql
1731 0 0         if (m/\G\s*\((.*)\)\s*$/cgs) {
1732 0           $whereSql = $1;
1733             }
1734             }
1735              
1736             # define sql & sqlBinds
1737 0           $sql = $fromSql;
1738 0           @sqlBinds = @fromBinds;
1739            
1740             # parse $whereSql to create $join, and @keys
1741 0           foreach my $part (split /\b([\w\d\_]+\.[\w\d\_]+)\b/,$whereSql) {
1742 0 0         if ($part =~ /\b([\w\d\_]+)\.([\w\d\_]+)\b/) {
1743 0           my $dep = $1;
1744 0           my $sql = $2;
1745 0 0         if ($dep eq $joinAlias) {
1746 0           $join .= $part;
1747             } else {
1748 0           push @keys, [$dep, $sql];
1749 0           $join .= '?';
1750             }
1751             } else {
1752 0           $join .= $part;
1753             }
1754             }
1755              
1756             # fill in options
1757 0           $oq->{joins}->{$joinAlias}->[3]->{'new_cursor'} = {
1758             'keys' => \@keys, 'join' => $join, 'sql' => [$sql, @sqlBinds] };
1759              
1760 0           return undef;
1761             }
1762              
1763              
1764              
1765              
1766             # make sure the join counts are the same
1767             # throws exception with error when there is a problem
1768             # this can be an expensive wasteful operation and should not be done in a production env
1769             sub check_join_counts {
1770 0     0 0   my $oq = shift;
1771              
1772             #$$oq{error_handler}->("DEBUG: \$oq->check_join_counts()\n") if $$oq{debug};
1773              
1774              
1775             # since driving table count is computed first this will get set first
1776 0           my $drivingTableCount;
1777              
1778 0           foreach my $join (keys %{ $oq->{joins} }) {
  0            
1779 0           my ($cursors) = $oq->_order_deps($join);
1780 0           my @deps = map { @$_ } @$cursors; # flatten deps in cursors
  0            
1781 0           my $drivingTable = $deps[0];
1782              
1783             # now create from clause
1784 0           my ($fromSql, @fromBinds, @whereSql, @whereBinds);
1785 0           foreach my $joinAlias (@deps) {
1786 0           my ($sql, @sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1787              
1788             # if this is the driving table
1789 0 0         if (! $oq->{joins}->{$joinAlias}->[0]) {
    0          
1790             # alias it if not already aliased in sql
1791 0 0         $fromSql .= " $joinAlias" unless $sql =~ /\b$joinAlias\s*$/;
1792             }
1793              
1794             # if NOT sql-92 join
1795             elsif (defined $oq->{joins}->{$joinAlias}->[2]) {
1796 0           $fromSql .= ",\n $sql $joinAlias";
1797 0           push @fromBinds, @sqlBinds;
1798 0           my ($where_sql, @where_sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[2] };
  0            
1799 0           push @whereSql, $where_sql;
1800 0           push @whereBinds, @where_sqlBinds;
1801             }
1802              
1803             # else this is an SQL-92 type join
1804             else {
1805 0           $fromSql .= "\n$sql ";
1806             }
1807             }
1808              
1809 0           my $where;
1810 0 0         $where = 'WHERE '.join("\nAND ", @whereSql) if @whereSql;
1811              
1812 0           my $sql = "
1813             SELECT count(*)
1814             FROM (
1815             SELECT $drivingTable.*
1816             FROM $fromSql
1817             $where
1818             ) OPTIMALQUERYCNTCK ";
1819 0           my @binds = (@fromBinds,@whereBinds);
1820 0           my $count;
1821 0           eval { ($count) = $oq->{dbh}->selectrow_array($sql, undef, @binds); };
  0            
1822 0 0         die "Problem executing ERROR: $@\nSQL: $sql\nBINDS: ".join(',', @binds)."\n" if $@;
1823 0 0         $drivingTableCount = $count unless defined $drivingTableCount;
1824 0 0         confess "BAD_JOIN_COUNT - driving table $drivingTable count ".
1825             "($drivingTableCount) != driving table joined with $join".
1826             " count ($count)" if $count != $drivingTableCount;
1827             }
1828              
1829 0           return undef;
1830             }
1831              
1832              
1833              
1834             =comment
1835             $oq->get_col_type($alias,$context);
1836             =cut
1837             sub type_map {
1838 0     0 0   my $oq = shift;
1839             return {
1840 0           -1 => 'char',
1841             -2 => 'clob',
1842             -3 => 'clob',
1843             -4 => 'clob',
1844             -5 => 'num',
1845             -6 => 'num',
1846             -7 => 'num',
1847             -8 => 'char',
1848             -9 => 'char',
1849             0 => 'char',
1850             1 => 'char',
1851             2 => 'num',
1852             3 => 'num', # is decimal type
1853             4 => 'num',
1854             5 => 'num',
1855             6 => 'num', # float
1856             7 => 'num',
1857             8 => 'num',
1858             9 => 'date',
1859             10 => 'char', # time (no date)
1860             11 => 'datetime',
1861             12 => 'char',
1862             16 => 'date',
1863             30 => 'clob',
1864             40 => 'clob',
1865             91 => 'date',
1866             93 => 'datetime',
1867             95 => 'date',
1868             'TIMESTAMP' => 'datetime',
1869             'INTEGER' => 'num',
1870             'TEXT' => 'char',
1871             'VARCHAR' => 'char',
1872             'varchar' => 'char'
1873             };
1874             }
1875              
1876             # $type = $oq->get_col_type($alias,$context);
1877             sub get_col_type {
1878 0     0 0   my $oq = shift;
1879 0           my $alias = shift;
1880 0   0       my $context = shift || 'default';
1881             #$$oq{error_handler}->("DEBUG: \$oq->get_col_type($alias, $context)\n") if $$oq{debug};
1882              
1883             return $oq->{'select'}->{$alias}->[3]->{'col_type'} ||
1884 0   0       $oq->get_col_types($context)->{$alias};
1885             }
1886              
1887             #{ ColAlias => Type, .. } = $oq->get_col_types($context)
1888             # where $content in ('default','sort','filter','select')
1889             sub get_col_types {
1890 0     0 0   my $oq = shift;
1891 0   0       my $context = shift || 'default';
1892             #$$oq{error_handler}->("DEBUG: \$oq->get_col_types($context)\n") if $$oq{debug};
1893             return $oq->{'col_types'}->{$context}
1894 0 0         if defined $oq->{'col_types'};
1895              
1896 0           $oq->{'col_types'} = {
1897             'default' => {}, 'sort' => {},
1898             'filter' => {}, 'select' => {} };
1899              
1900 0           my (%deps, @selectColTypeOrder, @selectColAliasOrder, @select, @selectBinds, @where);
1901 0           foreach my $selectAlias (keys %{ $oq->{'select'} } ) {
  0            
1902 0           my $s = $oq->{'select'}->{$selectAlias};
1903              
1904             # did user already define this type?
1905 0 0         if (exists $s->[3]->{'col_type'}) {
1906 0           $oq->{'col_types'}->{'default'}->{$selectAlias} = $s->[3]->{'col_type'};
1907 0           $oq->{'col_types'}->{'select' }->{$selectAlias} = $s->[3]->{'col_type'};
1908 0           $oq->{'col_types'}->{'filter' }->{$selectAlias} = $s->[3]->{'col_type'};
1909 0           $oq->{'col_types'}->{'sort' }->{$selectAlias} = $s->[3]->{'col_type'};
1910             }
1911              
1912             # else write sql to determine type with context
1913             else {
1914 0           $deps{$_} = 1 for @{ $s->[0] };
  0            
1915              
1916 0           foreach my $type (
1917             ['default', $s->[1]],
1918             ['select', $s->[3]->{'select_sql'}],
1919             ['filter', $s->[3]->{'filter_sql'}],
1920             ['sort', $s->[3]->{'sort_sql'}] ) {
1921 0 0         next if ! defined $type->[1];
1922 0           push @selectColTypeOrder, $type->[0];
1923 0           push @selectColAliasOrder, $selectAlias;
1924 0           my ($sql, @binds) = @{ $type->[1] };
  0            
1925 0           push @select, $sql;
1926 0           push @selectBinds, @binds;
1927              
1928             # this next one is needed for oracle so inline views don't get processed
1929             # kinda stupid if you ask me
1930             # don't bother though if there is binds
1931             # this isn't neccessary for mysql since an explicit limit is
1932             # defined latter
1933 0 0 0       if ($$oq{dbtype} eq 'Oracle' && $#binds == -1) {
1934 0           push @where, "to_char($sql) = NULL";
1935             }
1936             }
1937             }
1938             }
1939              
1940             # are there unknown deps?
1941 0 0         if (%deps) {
1942              
1943             # order and flatten deps
1944 0           my @deps = keys %deps;
1945 0           my ($deps) = $oq->_order_deps(@deps);
1946              
1947              
1948 0           @deps = ();
1949 0           push @deps, @$_ for @$deps;
1950              
1951             # now create from clause
1952 0           my ($fromSql, @fromBinds);
1953 0           foreach my $joinAlias (@deps) {
1954 0           my ($sql, @sqlBinds) = @{ $oq->{joins}->{$joinAlias}->[1] };
  0            
1955 0           push @fromBinds, @sqlBinds;
1956              
1957             # if this is the driving table join
1958 0 0         if (! $oq->{joins}->{$joinAlias}->[0]) {
    0          
1959              
1960             # alias it if not already aliased in sql
1961 0           $fromSql .= $sql;
1962 0 0         $fromSql .= " $joinAlias" unless $sql =~ /\b$joinAlias\s*$/;
1963             }
1964              
1965             # if NOT sql-92 join
1966             elsif (defined $oq->{joins}->{$joinAlias}->[2]) {
1967 0           $fromSql .= ",\n $sql $joinAlias";
1968             }
1969              
1970             # else this is an SQL-92 type join
1971             else {
1972 0           $fromSql .= "\n$sql ";
1973             }
1974              
1975             }
1976              
1977 0           my $where;
1978 0 0         $where .= "\nAND " if $#where > -1;
1979 0           $where .= join("\nAND ", @where);
1980              
1981 0           my @binds = (@selectBinds, @fromBinds);
1982 0           my $sql = "
1983             SELECT ".join(',', @select)."
1984             FROM $fromSql";
1985              
1986 0 0 0       if ($$oq{dbtype} eq 'Oracle' || $$oq{dbtype} eq 'Microsoft SQL Server') {
    0          
1987 0           $sql .= "
1988             WHERE 1=2
1989             $where ";
1990             }
1991              
1992             elsif ($$oq{dbtype} eq 'mysql') {
1993 0           $sql .= "
1994             LIMIT 0 ";
1995             }
1996              
1997 0           my $sth;
1998 0           eval {
1999 0           local $oq->{dbh}->{PrintError} = 0;
2000 0           local $oq->{dbh}->{RaiseError} = 1;
2001 0           $sth = $oq->{dbh}->prepare($sql);
2002 0           $sth->execute(@binds);
2003 0 0         }; if ($@) {
2004 0           confess "SQL Error in get_col_types:\n$@\n$sql\n(".join(",",@binds).")";
2005             }
2006              
2007             # read types into col_types cache in object
2008 0           my $type_map = $oq->type_map();
2009 0           for (my $i=0; $i < scalar(@selectColAliasOrder); $i++) {
2010 0           my $name = $selectColAliasOrder[$i];
2011 0           my $type_code = $sth->{TYPE}->[$i];
2012              
2013             # remove parenthesis in type_code from sqlite
2014 0           $type_code =~ s/\([^\)]*\)//;
2015            
2016 0 0         my $type = $type_map->{$type_code} or
2017             die "could not find type code: $type_code for col $name";
2018 0           $oq->{'col_types'}->{$selectColTypeOrder[$i]}->{$name} = $type;
2019              
2020             # set the type for select, filter, and sort to the default
2021             # unless they are already defined
2022 0 0         if ($selectColTypeOrder[$i] eq 'default') {
2023 0   0       $oq->{'col_types'}->{'select' }->{$name} ||= $type;
2024 0   0       $oq->{'col_types'}->{'filter' }->{$name} ||= $type;
2025 0   0       $oq->{'col_types'}->{'sort' }->{$name} ||= $type;
2026             }
2027             }
2028              
2029 0           $sth->finish();
2030             }
2031              
2032 0           return $oq->{'col_types'}->{$context};
2033             }
2034              
2035              
2036              
2037              
2038             # prepare an sth
2039             sub prepare {
2040 0     0 0   my $oq = shift;
2041             #$$oq{error_handler}->("DEBUG: \$oq->prepare(".Dumper(\@_).")\n") if $$oq{debug};
2042 0           return DBIx::OptimalQuery::sth->new($oq,@_);
2043             }
2044              
2045              
2046              
2047             # returns ARRAYREF: [order,idx]
2048             # order is [ [dep1,dep2,dep3], [dep4,dep5,dep6] ], # cursor/dep order
2049             # idx is { dep1 => 0, dep4 => 1, .. etc .. } # index of what cursor dep is in
2050             sub _order_deps {
2051 0     0     my ($oq, @deps) = @_;
2052             #$$oq{error_handler}->("DEBUG: \$oq->_order_deps(".Dumper(\@_).")\n") if $$oq{debug};
2053              
2054             # add always_join deps
2055 0           foreach my $joinAlias (keys %{ $$oq{joins} }) {
  0            
2056 0 0         push @deps, $joinAlias if $$oq{joins}{$joinAlias}[3]{always_join};
2057             }
2058              
2059             # @order is an array of array refs. Where each array ref represents deps
2060             # for a separate cursor
2061             # %idx is a hash of scalars where the hash key is the dep name and the
2062             # hash value is what index into order (which cursor number)
2063             # where you find the dep
2064 0           my (@order, %idx);
2065              
2066             # var to detect infinite recursion
2067 0           my $maxRecurse = 1000;
2068              
2069             # recursive function to order deps
2070             # each dep calls this again on all parent deps until all deps are fulfilled
2071             # then the dep is added
2072             # modfies @order & %idx
2073 0           my $place_missing_deps;
2074             $place_missing_deps = sub {
2075 0     0     my ($dep) = @_;
2076              
2077             # detect infinite recursion
2078 0           $maxRecurse--;
2079 0 0         die "BAD_JOINS - could not link joins to meet all deps" if $maxRecurse == 0;
2080              
2081             # recursion to make sure parent deps are added first
2082 0 0         if (defined $oq->{'joins'}->{$dep}->[0]) {
2083 0           foreach my $parent_dep (@{ $oq->{'joins'}->{$dep}->[0] } ) {
  0            
2084 0 0         $place_missing_deps->($parent_dep) if ! exists $idx{$parent_dep};
2085             }
2086             }
2087              
2088             # at this point all parent deps have been added,
2089             # now add this dep if it has not already been added
2090 0 0         if (! exists $idx{$dep}) {
2091              
2092             # add new cursor if dep is main driving table or has option new_cursor
2093 0 0 0       if (! defined $oq->{'joins'}->{$dep}->[0] ||
2094             exists $oq->{'joins'}->{$dep}->[3]->{new_cursor}) {
2095 0           push @order, [$dep];
2096 0           $idx{$dep} = $#order;
2097             }
2098              
2099             # place dep in @order & %idx
2100             # uses the same cursor as its parent dep
2101             # this is found by looking at the parent_idx
2102             else {
2103 0   0       my $parent_idx = $idx{$oq->{'joins'}->{$dep}->[0]->[0]} || 0;
2104 0           push @{ $order[ $parent_idx ] }, $dep;
  0            
2105 0           $idx{$dep} = $parent_idx;
2106             }
2107             }
2108 0           return undef;
2109 0           };
2110              
2111 0           $place_missing_deps->($_) for @deps;
2112              
2113 0           return (\@order, \%idx);
2114             }
2115              
2116              
2117             1;