File Coverage

blib/lib/DBIx/Class/SQLMaker/LimitDialects.pm
Criterion Covered Total %
statement 249 250 99.6
branch 84 98 85.7
condition 29 43 67.4
subroutine 18 18 100.0
pod n/a
total 380 409 92.9


line stmt bran cond sub pod time code
1             package DBIx::Class::SQLMaker::LimitDialects;
2              
3 221     221   549389 use warnings;
  221         500  
  221         6732  
4 221     221   1173 use strict;
  221         474  
  221         657555  
5              
6             # constants are used not only here, but also in comparison tests
7             sub __rows_bindtype () {
8 1639     1639   8070 +{ sqlt_datatype => 'integer' }
9             }
10             sub __offset_bindtype () {
11 110     110   696 +{ sqlt_datatype => 'integer' }
12             }
13             sub __total_bindtype () {
14 43     43   434 +{ sqlt_datatype => 'integer' }
15             }
16              
17             =head1 NAME
18              
19             DBIx::Class::SQLMaker::LimitDialects - SQL::Abstract::Limit-like functionality for DBIx::Class::SQLMaker
20              
21             =head1 DESCRIPTION
22              
23             This module replicates a lot of the functionality originally found in
24             L. While simple limits would work as-is, the more
25             complex dialects that require e.g. subqueries could not be reliably
26             implemented without taking full advantage of the metadata locked within
27             L classes. After reimplementation of close to
28             80% of the L functionality it was deemed more
29             practical to simply make an independent DBIx::Class-specific limit-dialect
30             provider.
31              
32             =head1 SQL LIMIT DIALECTS
33              
34             Note that the actual implementations listed below never use C<*> literally.
35             Instead proper re-aliasing of selectors and order criteria is done, so that
36             the limit dialect are safe to use on joined resultsets with clashing column
37             names.
38              
39             Currently the provided dialects are:
40              
41             =head2 LimitOffset
42              
43             SELECT ... LIMIT $limit OFFSET $offset
44              
45             Supported by B and B
46              
47             =cut
48             sub _LimitOffset {
49 1591     1591   6887 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
50 1591         6352 $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ?";
51 1591         4546 push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
  1591         8972  
52 1591 100       6593 if ($offset) {
53 43         103 $sql .= " OFFSET ?";
54 43         80 push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
  43         179  
55             }
56 1591         6431 return $sql;
57             }
58              
59             =head2 LimitXY
60              
61             SELECT ... LIMIT $offset, $limit
62              
63             Supported by B and any L based DBD
64              
65             =cut
66             sub _LimitXY {
67 3     3   13 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
68 3         15 $sql .= $self->_parse_rs_attrs( $rs_attrs ) . " LIMIT ";
69 3 100       9 if ($offset) {
70 2         19 $sql .= '?, ';
71 2         6 push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset ];
  2         12  
72             }
73 3         8 $sql .= '?';
74 3         7 push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
  3         17  
75              
76 3         10 return $sql;
77             }
78              
79             =head2 RowNumberOver
80              
81             SELECT * FROM (
82             SELECT *, ROW_NUMBER() OVER( ORDER BY ... ) AS RNO__ROW__INDEX FROM (
83             SELECT ...
84             )
85             ) WHERE RNO__ROW__INDEX BETWEEN ($offset+1) AND ($limit+$offset)
86              
87              
88             ANSI standard Limit/Offset implementation. Supported by B and
89             B<< MSSQL >= 2005 >>.
90              
91             =cut
92             sub _RowNumberOver {
93 14     14   59 my ($self, $sql, $rs_attrs, $rows, $offset ) = @_;
94              
95             # get selectors, and scan the order_by (if any)
96 14         70 my $sq_attrs = $self->_subqueried_limit_attrs ( $sql, $rs_attrs );
97              
98             # make up an order if none exists
99 14   66     91 my $requested_order = (delete $rs_attrs->{order_by}) || $self->_rno_default_order;
100              
101             # the order binds (if any) will need to go at the end of the entire inner select
102 14         43 local $self->{order_bind};
103 14         60 my $rno_ord = $self->_order_by ($requested_order);
104 14         28 push @{$self->{select_bind}}, @{$self->{order_bind}};
  14         35  
  14         38  
105              
106             # this is the order supplement magic
107 14         37 my $mid_sel = $sq_attrs->{selection_outer};
108 14 100       46 if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
109 4         25 for my $extra_col (sort
110 2         12 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
111             keys %$extra_order_sel
112             ) {
113             $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
114             $extra_col,
115 6         27 $extra_order_sel->{$extra_col},
116             );
117             }
118             }
119              
120             # and this is order re-alias magic
121 14         44 for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
122 28 100       54 for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}} ) {
  23         45  
  28         159  
123 28         57 my $re_col = quotemeta ($col);
124 28         259 $rno_ord =~ s/$re_col/$map->{$col}/;
125             }
126             }
127              
128             # whatever is left of the order_by (only where is processed at this point)
129 14         65 my $group_having = $self->_parse_rs_attrs($rs_attrs);
130              
131 14         51 my $qalias = $self->_quote ($rs_attrs->{alias});
132 14         384 my $idx_name = $self->_quote ('rno__row__index');
133              
134 14         295 push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1], [ $self->__total_bindtype => $offset + $rows ];
  14         79  
135              
136 14         162 return <
137              
138             SELECT $sq_attrs->{selection_outer} FROM (
139             SELECT $mid_sel, ROW_NUMBER() OVER( $rno_ord ) AS $idx_name FROM (
140             SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${group_having}
141             ) $qalias
142             ) $qalias WHERE $idx_name >= ? AND $idx_name <= ?
143              
144             EOS
145              
146             }
147              
148             # some databases are happy with OVER (), some need OVER (ORDER BY (SELECT (1)) )
149             sub _rno_default_order {
150 8     8   33 return undef;
151             }
152              
153             =head2 SkipFirst
154              
155             SELECT SKIP $offset FIRST $limit * FROM ...
156              
157             Supported by B, almost like LimitOffset. According to
158             L C<... SKIP $offset LIMIT $limit ...> is also supported.
159              
160             =cut
161             sub _SkipFirst {
162 10     10   37 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
163              
164 10 50       73 $sql =~ s/^ \s* SELECT \s+ //ix
165             or $self->throw_exception("Unrecognizable SELECT: $sql");
166              
167             return sprintf ('SELECT %s%s%s%s',
168             $offset
169             ? do {
170 8         13 push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
  8         33  
171 8         17 'SKIP ? '
172             }
173             : ''
174             ,
175 10 100       31 do {
176 10         19 push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
  10         39  
177 10         38 'FIRST ? '
178             },
179             $sql,
180             $self->_parse_rs_attrs ($rs_attrs),
181             );
182             }
183              
184             =head2 FirstSkip
185              
186             SELECT FIRST $limit SKIP $offset * FROM ...
187              
188             Supported by B, reverse of SkipFirst. According to
189             L C<... ROWS $limit TO $offset ...> is also supported.
190              
191             =cut
192             sub _FirstSkip {
193 10     10   43 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
194              
195 10 50       77 $sql =~ s/^ \s* SELECT \s+ //ix
196             or $self->throw_exception("Unrecognizable SELECT: $sql");
197              
198             return sprintf ('SELECT %s%s%s%s',
199             do {
200 10         21 push @{$self->{pre_select_bind}}, [ $self->__rows_bindtype => $rows ];
  10         58  
201 10         43 'FIRST ? '
202             },
203             $offset
204 10 100       24 ? do {
205 8         17 push @{$self->{pre_select_bind}}, [ $self->__offset_bindtype => $offset];
  8         39  
206 8         32 'SKIP ? '
207             }
208             : ''
209             ,
210             $sql,
211             $self->_parse_rs_attrs ($rs_attrs),
212             );
213             }
214              
215              
216             =head2 RowNum
217              
218             Depending on the resultset attributes one of:
219              
220             SELECT * FROM (
221             SELECT *, ROWNUM AS rownum__index FROM (
222             SELECT ...
223             ) WHERE ROWNUM <= ($limit+$offset)
224             ) WHERE rownum__index >= ($offset+1)
225              
226             or
227              
228             SELECT * FROM (
229             SELECT *, ROWNUM AS rownum__index FROM (
230             SELECT ...
231             )
232             ) WHERE rownum__index BETWEEN ($offset+1) AND ($limit+$offset)
233              
234             or
235              
236             SELECT * FROM (
237             SELECT ...
238             ) WHERE ROWNUM <= ($limit+1)
239              
240             Supported by B.
241              
242             =cut
243             sub _RowNum {
244 14     14   57 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
245              
246 14         72 my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
247              
248 14         64 my $qalias = $self->_quote ($rs_attrs->{alias});
249 14         252 my $idx_name = $self->_quote ('rownum__index');
250 14         232 my $order_group_having = $self->_parse_rs_attrs($rs_attrs);
251              
252              
253             # if no offset (e.g. first page) - we can skip one of the subqueries
254 14 100       53 if (! $offset) {
255 4         10 push @{$self->{limit_bind}}, [ $self->__rows_bindtype => $rows ];
  4         47  
256              
257 4         38 return <
258             SELECT $sq_attrs->{selection_outer} FROM (
259             SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
260             ) $qalias WHERE ROWNUM <= ?
261             EOS
262             }
263              
264             #
265             # There are two ways to limit in Oracle, one vastly faster than the other
266             # on large resultsets: https://decipherinfosys.wordpress.com/2007/08/09/paging-and-countstopkey-optimization/
267             # However Oracle is retarded and does not preserve stable ROWNUM() values
268             # when called twice in the same scope. Therefore unless the resultset is
269             # ordered by a unique set of columns, it is not safe to use the faster
270             # method, and the slower BETWEEN query is used instead
271             #
272             # FIXME - this is quite expensive, and does not perform caching of any sort
273             # as soon as some of the SQLA-inlining work becomes viable consider adding
274             # some rudimentary caching support
275 10 100 100     54 if (
276             $rs_attrs->{order_by}
277             and
278             $rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
279 5         117 @{$rs_attrs}{qw/from order_by where/}
280             )
281             ) {
282 3         9 push @{$self->{limit_bind}}, [ $self->__total_bindtype => $offset + $rows ], [ $self->__offset_bindtype => $offset + 1 ];
  3         21  
283              
284 3         50 return <
285             SELECT $sq_attrs->{selection_outer} FROM (
286             SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
287             SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
288             ) $qalias WHERE ROWNUM <= ?
289             ) $qalias WHERE $idx_name >= ?
290             EOS
291             }
292             else {
293 7         16 push @{$self->{limit_bind}}, [ $self->__offset_bindtype => $offset + 1 ], [ $self->__total_bindtype => $offset + $rows ];
  7         39  
294              
295 7         76 return <
296             SELECT $sq_attrs->{selection_outer} FROM (
297             SELECT $sq_attrs->{selection_outer}, ROWNUM AS $idx_name FROM (
298             SELECT $sq_attrs->{selection_inner} $sq_attrs->{query_leftover}${order_group_having}
299             ) $qalias
300             ) $qalias WHERE $idx_name BETWEEN ? AND ?
301             EOS
302             }
303             }
304              
305             # used by _Top and _FetchFirst below
306             sub _prep_for_skimming_limit {
307 49     49   141 my ( $self, $sql, $rs_attrs ) = @_;
308              
309             # get selectors
310 49         204 my $sq_attrs = $self->_subqueried_limit_attrs ($sql, $rs_attrs);
311              
312 49         146 my $requested_order = delete $rs_attrs->{order_by};
313 49         235 $sq_attrs->{order_by_requested} = $self->_order_by ($requested_order);
314 49         167 $sq_attrs->{grpby_having} = $self->_parse_rs_attrs ($rs_attrs);
315              
316             # without an offset things are easy
317 49 100       155 if (! $rs_attrs->{offset}) {
318 9         30 $sq_attrs->{order_by_inner} = $sq_attrs->{order_by_requested};
319             }
320             else {
321 40         156 $sq_attrs->{quoted_rs_alias} = $self->_quote ($rs_attrs->{alias});
322              
323             # localise as we already have all the bind values we need
324 40         789 local $self->{order_bind};
325              
326             # make up an order unless supplied or sanity check what we are given
327 40         74 my $inner_order;
328 40 100       135 if ($sq_attrs->{order_by_requested}) {
329             $self->throw_exception (
330             'Unable to safely perform "skimming type" limit with supplied unstable order criteria'
331             ) unless ($rs_attrs->{result_source}->schema->storage->_order_by_is_stable(
332             $rs_attrs->{from},
333             $requested_order,
334             $rs_attrs->{where},
335 25 50       136 ));
336              
337 25         87 $inner_order = $requested_order;
338             }
339             else {
340             $inner_order = [ map
341 15         74 { "$rs_attrs->{alias}.$_" }
342             ( @{
343 15         32 $rs_attrs->{result_source}->_identifying_column_set
344             ||
345             $self->throw_exception(sprintf(
346             'Unable to auto-construct stable order criteria for "skimming type" limit '
347 15 50       93 . "dialect based on source '%s'", $rs_attrs->{result_source}->name) );
348             } )
349             ];
350             }
351              
352 40         171 $sq_attrs->{order_by_inner} = $self->_order_by ($inner_order);
353              
354 40         81 my @out_chunks;
355 40         123 for my $ch ($self->_order_by_chunks ($inner_order)) {
356 56 100       2137 $ch = $ch->[0] if ref $ch eq 'ARRAY';
357              
358 56         194 ($ch, my $is_desc) = $self->_split_order_chunk($ch);
359              
360             # !NOTE! outside chunks come in reverse order ( !$is_desc )
361 56 100       271 push @out_chunks, { ($is_desc ? '-asc' : '-desc') => \$ch };
362             }
363              
364 40         364 $sq_attrs->{order_by_middle} = $self->_order_by (\@out_chunks);
365              
366             # this is the order supplement magic
367 40         122 $sq_attrs->{selection_middle} = $sq_attrs->{selection_outer};
368 40 100       149 if (my $extra_order_sel = $sq_attrs->{order_supplement}) {
369 19         119 for my $extra_col (sort
370 12         56 { $extra_order_sel->{$a} cmp $extra_order_sel->{$b} }
371             keys %$extra_order_sel
372             ) {
373             $sq_attrs->{selection_inner} .= sprintf (', %s AS %s',
374             $extra_col,
375 29         121 $extra_order_sel->{$extra_col},
376             );
377              
378 29         100 $sq_attrs->{selection_middle} .= ', ' . $extra_order_sel->{$extra_col};
379             }
380              
381             # Whatever order bindvals there are, they will be realiased and
382             # reselected, and need to show up at end of the initial inner select
383 19         43 push @{$self->{select_bind}}, @{$self->{order_bind}};
  19         50  
  19         47  
384             }
385              
386             # and this is order re-alias magic
387 40         148 for my $map ($sq_attrs->{order_supplement}, $sq_attrs->{outer_renames}) {
388 80 100       161 for my $col (sort { (length $b) <=> (length $a) } keys %{$map||{}}) {
  56         176  
  80         471  
389 101         220 my $re_col = quotemeta ($col);
390             $_ =~ s/$re_col/$map->{$col}/
391 101         1294 for ($sq_attrs->{order_by_middle}, $sq_attrs->{order_by_requested});
392             }
393             }
394             }
395              
396 49         150 $sq_attrs;
397             }
398              
399             =head2 Top
400              
401             SELECT * FROM
402              
403             SELECT TOP $limit FROM (
404             SELECT TOP $limit FROM (
405             SELECT TOP ($limit+$offset) ...
406             ) ORDER BY $reversed_original_order
407             ) ORDER BY $original_order
408              
409             Unreliable Top-based implementation, supported by B<< MSSQL < 2005 >>.
410              
411             =head3 CAVEAT
412              
413             Due to its implementation, this limit dialect returns B
414             when $limit+$offset > total amount of rows in the resultset.
415              
416             =cut
417              
418             sub _Top {
419 26     26   110 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
420              
421 26         121 my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
422              
423             $sql = sprintf ('SELECT TOP %u %s %s %s %s',
424             $rows + ($offset||0),
425             $offset ? $lim->{selection_inner} : $lim->{selection_original},
426             $lim->{query_leftover},
427             $lim->{grpby_having},
428             $lim->{order_by_inner},
429 26 100 100     261 );
430              
431             $sql = sprintf ('SELECT TOP %u %s FROM ( %s ) %s %s',
432             $rows,
433             $lim->{selection_middle},
434             $sql,
435             $lim->{quoted_rs_alias},
436             $lim->{order_by_middle},
437 26 100       159 ) if $offset;
438              
439             $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
440             $lim->{selection_outer},
441             $sql,
442             $lim->{quoted_rs_alias},
443             $lim->{order_by_requested},
444             ) if $offset and (
445             $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
446 26 100 66     206 );
      66        
447              
448 26         154 return $sql;
449             }
450              
451             =head2 FetchFirst
452              
453             SELECT * FROM
454             (
455             SELECT * FROM (
456             SELECT * FROM (
457             SELECT * FROM ...
458             ) ORDER BY $reversed_original_order
459             FETCH FIRST $limit ROWS ONLY
460             ) ORDER BY $original_order
461             FETCH FIRST $limit ROWS ONLY
462             )
463              
464             Unreliable FetchFirst-based implementation, supported by B<< IBM DB2 <= V5R3 >>.
465              
466             =head3 CAVEAT
467              
468             Due to its implementation, this limit dialect returns B
469             when $limit+$offset > total amount of rows in the resultset.
470              
471             =cut
472              
473             sub _FetchFirst {
474 23     23   77 my ( $self, $sql, $rs_attrs, $rows, $offset ) = @_;
475              
476 23         89 my $lim = $self->_prep_for_skimming_limit($sql, $rs_attrs);
477              
478             $sql = sprintf ('SELECT %s %s %s %s FETCH FIRST %u ROWS ONLY',
479             $offset ? $lim->{selection_inner} : $lim->{selection_original},
480             $lim->{query_leftover},
481             $lim->{grpby_having},
482             $lim->{order_by_inner},
483 23 100 100     194 $rows + ($offset||0),
484             );
485              
486             $sql = sprintf ('SELECT %s FROM ( %s ) %s %s FETCH FIRST %u ROWS ONLY',
487             $lim->{selection_middle},
488             $sql,
489             $lim->{quoted_rs_alias},
490             $lim->{order_by_middle},
491 23 100       124 $rows,
492             ) if $offset;
493              
494              
495             $sql = sprintf ('SELECT %s FROM ( %s ) %s %s',
496             $lim->{selection_outer},
497             $sql,
498             $lim->{quoted_rs_alias},
499             $lim->{order_by_requested},
500             ) if $offset and (
501             $lim->{order_by_requested} or $lim->{selection_middle} ne $lim->{selection_outer}
502 23 100 66     135 );
      66        
503              
504 23         133 return $sql;
505             }
506              
507             =head2 GenericSubQ
508              
509             SELECT * FROM (
510             SELECT ...
511             )
512             WHERE (
513             SELECT COUNT(*) FROM $original_table cnt WHERE cnt.id < $original_table.id
514             ) BETWEEN $offset AND ($offset+$rows-1)
515              
516             This is the most evil limit "dialect" (more of a hack) for I stupid
517             databases. It works by ordering the set by some unique column, and calculating
518             the amount of rows that have a less-er value (thus emulating a L-like
519             index). Of course this implies the set can only be ordered by a single unique
520             column.
521              
522             Also note that this technique can be and often is B. You
523             may have much better luck using L
524             instead.
525              
526             Currently used by B, due to lack of any other option.
527              
528             =cut
529             sub _GenericSubQ {
530 24     24   89 my ($self, $sql, $rs_attrs, $rows, $offset) = @_;
531              
532 24         64 my $main_rsrc = $rs_attrs->{result_source};
533              
534             # Explicitly require an order_by
535             # GenSubQ is slow enough as it is, just emulating things
536             # like in other cases is not wise - make the user work
537             # to shoot their DBA in the foot
538             $self->throw_exception (
539             'Generic Subquery Limit does not work on resultsets without an order. Provide a stable, '
540             . 'main-table-based order criteria.'
541 24 50       69 ) unless $rs_attrs->{order_by};
542              
543 24         109 my $usable_order_colinfo = $main_rsrc->schema->storage->_extract_colinfo_of_stable_main_source_order_by_portion(
544             $rs_attrs
545             );
546              
547             $self->throw_exception(
548             'Generic Subquery Limit can not work with order criteria based on sources other than the main one'
549             ) if (
550 24 50       193 ! keys %{$usable_order_colinfo||{}}
551             or
552             grep
553 24 50 33     62 { $_->{-source_alias} ne $rs_attrs->{alias} }
  70         207  
554             (values %$usable_order_colinfo)
555             );
556              
557             ###
558             ###
559             ### we need to know the directions after we figured out the above - reextract *again*
560             ### this is eyebleed - trying to get it to work at first
561 24         65 my $supplied_order = delete $rs_attrs->{order_by};
562              
563 24         42 my @order_bits = do {
564 24         64 local $self->{quote_char};
565 24         62 local $self->{order_bind};
566 24 100       82 map { ref $_ ? $_->[0] : $_ } $self->_order_by_chunks ($supplied_order)
  84         4215  
567             };
568              
569             # truncate to what we'll use
570 24         143 $#order_bits = ( (keys %$usable_order_colinfo) - 1 );
571              
572             # @order_bits likely will come back quoted (due to how the prefetch
573             # rewriter operates
574             # Hence supplement the column_info lookup table with quoted versions
575 24 100       116 if ($self->quote_char) {
576             $usable_order_colinfo->{$self->_quote($_)} = $usable_order_colinfo->{$_}
577 15         67 for keys %$usable_order_colinfo;
578             }
579              
580             # calculate the condition
581 24         875 my $count_tbl_alias = 'rownum__emulation';
582 24         58 my $main_alias = $rs_attrs->{alias};
583 24         565 my $main_tbl_name = $main_rsrc->name;
584              
585 24         61 my (@unqualified_names, @qualified_names, @is_desc, @new_order_by);
586              
587 24         70 for my $bit (@order_bits) {
588              
589 70         198 ($bit, my $is_desc) = $self->_split_order_chunk($bit);
590              
591 70         161 push @is_desc, $is_desc;
592 70         156 push @unqualified_names, $usable_order_colinfo->{$bit}{-colname};
593 70         126 push @qualified_names, $usable_order_colinfo->{$bit}{-fq_colname};
594              
595 70 100       249 push @new_order_by, { ($is_desc ? '-desc' : '-asc') => $usable_order_colinfo->{$bit}{-fq_colname} };
596             };
597              
598 24         55 my (@where_cond, @skip_colpair_stack);
599 24         98 for my $i (0 .. $#order_bits) {
600 70         150 my $ci = $usable_order_colinfo->{$order_bits[$i]};
601              
602 70         131 my ($subq_col, $main_col) = map { "$_.$ci->{-colname}" } ($count_tbl_alias, $main_alias);
  140         389  
603 70 100       312 my $cur_cond = { $subq_col => { ($is_desc[$i] ? '>' : '<') => { -ident => $main_col } } };
604              
605 70         195 push @skip_colpair_stack, [
606             { $main_col => { -ident => $subq_col } },
607             ];
608              
609             # we can trust the nullability flag because
610             # we already used it during _id_col_set resolution
611             #
612 70 100       159 if ($ci->{is_nullable}) {
613 24         39 push @{$skip_colpair_stack[-1]}, { $main_col => undef, $subq_col=> undef };
  24         67  
614              
615 24 100       154 $cur_cond = [
    100          
616             {
617             ($is_desc[$i] ? $subq_col : $main_col) => { '!=', undef },
618             ($is_desc[$i] ? $main_col : $subq_col) => undef,
619             },
620             {
621             $subq_col => { '!=', undef },
622             $main_col => { '!=', undef },
623             -and => $cur_cond,
624             },
625             ];
626             }
627              
628 70         253 push @where_cond, { '-and', => [ @skip_colpair_stack[0..$i-1], $cur_cond ] };
629             }
630              
631             # reuse the sqlmaker WHERE, this will not be returning binds
632 24         51 my $counted_where = do {
633 24         84 local $self->{where_bind};
634 24         100 $self->where(\@where_cond);
635             };
636              
637             # construct the rownum condition by hand
638 24         3480 my $rownum_cond;
639 24 100       68 if ($offset) {
640 15         37 $rownum_cond = 'BETWEEN ? AND ?';
641 15         29 push @{$self->{limit_bind}},
  15         85  
642             [ $self->__offset_bindtype => $offset ],
643             [ $self->__total_bindtype => $offset + $rows - 1]
644             ;
645             }
646             else {
647 9         20 $rownum_cond = '< ?';
648 9         16 push @{$self->{limit_bind}},
  9         43  
649             [ $self->__rows_bindtype => $rows ]
650             ;
651             }
652              
653             # and what we will order by inside
654 24         57 my $inner_order_sql = do {
655 24         62 local $self->{order_bind};
656              
657 24         101 my $s = $self->_order_by (\@new_order_by);
658              
659             $self->throw_exception('Inner gensubq order may not contain binds... something went wrong')
660 24 50       44 if @{$self->{order_bind}};
  24         108  
661              
662 24         68 $s;
663             };
664              
665             ### resume originally scheduled programming
666             ###
667             ###
668              
669             # we need to supply the order for the supplements to be properly calculated
670 24         286 my $sq_attrs = $self->_subqueried_limit_attrs (
671             $sql, { %$rs_attrs, order_by => \@new_order_by }
672             );
673              
674 24         118 my $in_sel = $sq_attrs->{selection_inner};
675              
676             # add the order supplement (if any) as this is what will be used for the outer WHERE
677 24         50 $in_sel .= ", $_" for sort keys %{$sq_attrs->{order_supplement}};
  24         105  
678              
679 24         96 my $group_having_sql = $self->_parse_rs_attrs($rs_attrs);
680              
681              
682             return sprintf ("
683             SELECT $sq_attrs->{selection_outer}
684             FROM (
685             SELECT $in_sel $sq_attrs->{query_leftover}${group_having_sql}
686             ) %s
687             WHERE ( SELECT COUNT(*) FROM %s %s $counted_where ) $rownum_cond
688             $inner_order_sql
689 72         913 ", map { $self->_quote ($_) } (
690             $rs_attrs->{alias},
691 24         167 $main_tbl_name,
692             $count_tbl_alias,
693             ));
694             }
695              
696              
697             # !!! THIS IS ALSO HORRIFIC !!! /me ashamed
698             #
699             # Generates inner/outer select lists for various limit dialects
700             # which result in one or more subqueries (e.g. RNO, Top, RowNum)
701             # Any non-main-table columns need to have their table qualifier
702             # turned into a column alias (otherwise names in subqueries clash
703             # and/or lose their source table)
704             #
705             # Returns mangled proto-sql, inner/outer strings of SQL QUOTED selectors
706             # with aliases (to be used in whatever select statement), and an alias
707             # index hashref of QUOTED SEL => QUOTED ALIAS pairs (to maybe be used
708             # for string-subst higher up).
709             # If an order_by is supplied, the inner select needs to bring out columns
710             # used in implicit (non-selected) orders, and the order condition itself
711             # needs to be realiased to the proper names in the outer query. Thus we
712             # also return a hashref (order doesn't matter) of QUOTED EXTRA-SEL =>
713             # QUOTED ALIAS pairs, which is a list of extra selectors that do *not*
714             # exist in the original select list
715             sub _subqueried_limit_attrs {
716 101     101   345 my ($self, $proto_sql, $rs_attrs) = @_;
717              
718 101 50       378 $self->throw_exception(
719             'Limit dialect implementation usable only in the context of DBIC (missing $rs_attrs)'
720             ) unless ref ($rs_attrs) eq 'HASH';
721              
722             # mangle the input sql as we will be replacing the selector entirely
723 101 50 33     2439 unless (
724             $rs_attrs->{_selector_sql}
725             and
726             $proto_sql =~ s/^ \s* SELECT \s* \Q$rs_attrs->{_selector_sql}//ix
727             ) {
728 0         0 $self->throw_exception("Unrecognizable SELECT: $proto_sql");
729             }
730              
731 101         433 my ($re_sep, $re_alias) = map { quotemeta $_ } ( $self->{name_sep}, $rs_attrs->{alias} );
  202         631  
732              
733             # correlate select and as, build selection index
734 101         244 my (@sel, $in_sel_index);
735 101         199 for my $i (0 .. $#{$rs_attrs->{select}}) {
  101         441  
736              
737 425         911 my $s = $rs_attrs->{select}[$i];
738 425 50       940 my $sql_alias = (ref $s) eq 'HASH' ? $s->{-as} : undef;
739              
740 425 100       1358 my ($sql_sel) = length ref $s
741             # we throw away the @bind here deliberately
742             ? $self->_recurse_fields( $s )
743             : $self->_quote( $s )
744             ;
745              
746             push @sel, {
747             arg => $s,
748             sql => $sql_sel,
749             unquoted_sql => ( length ref $s
750             ? do {
751 47         122 local $self->{quote_char};
752 47         152 ($self->_recurse_fields ($s))[0]; # ignore binds again
753             }
754             : $s
755             ),
756             as =>
757             $sql_alias
758             ||
759 425 100 33     9846 $rs_attrs->{as}[$i]
760             ||
761             $self->throw_exception("Select argument $i ($s) without corresponding 'as'")
762             ,
763             };
764              
765             # anything with a placeholder in it needs re-selection
766 425 100       1623 $in_sel_index->{$sql_sel}++ unless $sql_sel =~ / (?: ^ | \W ) \? (?: \W | $ ) /x;
767              
768 425 50       907 $in_sel_index->{$self->_quote ($sql_alias)}++ if $sql_alias;
769              
770             # record unqualified versions too, so we do not have
771             # to reselect the same column twice (in qualified and
772             # unqualified form)
773 425 100 66     2651 if (! ref $s && $sql_sel =~ / $re_sep (.+) $/x) {
774 378         1335 $in_sel_index->{$1}++;
775             }
776             }
777              
778              
779             # re-alias and remove any name separators from aliases,
780             # unless we are dealing with the current source alias
781             # (which will transcend the subqueries as it is necessary
782             # for possible further chaining)
783             # same for anything we do not recognize
784 101         244 my ($sel, $renamed);
785 101         287 for my $node (@sel) {
786 425         4567 push @{$sel->{original}}, $node->{sql};
  425         1046  
787              
788 425 100 100     3467 if (
      100        
789             ! $in_sel_index->{$node->{sql}}
790             or
791             $node->{as} =~ / (?
792             or
793             $node->{unquoted_sql} =~ / (?
794             ) {
795 163         522 $node->{as} = $self->_unqualify_colname($node->{as});
796 163         468 my $quoted_as = $self->_quote($node->{as});
797 163         2878 push @{$sel->{inner}}, sprintf '%s AS %s', $node->{sql}, $quoted_as;
  163         775  
798 163         302 push @{$sel->{outer}}, $quoted_as;
  163         632  
799 163         548 $renamed->{$node->{sql}} = $quoted_as;
800             }
801             else {
802 262         450 push @{$sel->{inner}}, $node->{sql};
  262         647  
803 262 50       414 push @{$sel->{outer}}, $self->_quote (ref $node->{arg} ? $node->{as} : $node->{arg});
  262         920  
804             }
805             }
806              
807             # see if the order gives us anything
808 101         988 my $extra_order_sel;
809 101         389 for my $chunk ($self->_order_by_chunks ($rs_attrs->{order_by})) {
810             # order with bind
811 136 100       5392 $chunk = $chunk->[0] if (ref $chunk) eq 'ARRAY';
812 136         449 ($chunk) = $self->_split_order_chunk($chunk);
813              
814 136 100       414 next if $in_sel_index->{$chunk};
815              
816             $extra_order_sel->{$chunk} ||= $self->_quote (
817 55 50 33     236 'ORDER__BY__' . sprintf '%03d', scalar keys %{$extra_order_sel||{}}
  55         587  
818             );
819             }
820              
821             return {
822             query_leftover => $proto_sql,
823 101         3385 (map {( "selection_$_" => join (', ', @{$sel->{$_}} ) )} keys %$sel ),
  303         621  
  303         1884  
824             outer_renames => $renamed,
825             order_supplement => $extra_order_sel,
826             };
827             }
828              
829             sub _unqualify_colname {
830 163     163   425 my ($self, $fqcn) = @_;
831 163         454 $fqcn =~ s/ \. /__/xg;
832 163         436 return $fqcn;
833             }
834              
835             =head1 FURTHER QUESTIONS?
836              
837             Check the list of L.
838              
839             =head1 COPYRIGHT AND LICENSE
840              
841             This module is free software L
842             by the L. You can
843             redistribute it and/or modify it under the same terms as the
844             L.
845              
846             =cut
847              
848             1;