File Coverage

blib/lib/Fsdb/Filter/dbjoin.pm
Criterion Covered Total %
statement 21 367 5.7
branch 0 148 0.0
condition 0 48 0.0
subroutine 7 32 21.8
pod 7 7 100.0
total 35 602 5.8


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbjoin.pm
5             # Copyright (C) 1991-2016 by John Heidemann
6             # $Id: 653e1186837d37a1668d02763a023b730484a03c $
7             #
8             # This program is distributed under terms of the GNU general
9             # public license, version 2. See the file COPYING
10             # in $dblibdir for details.
11             #
12              
13             package Fsdb::Filter::dbjoin;
14              
15             =head1 NAME
16              
17             dbjoin - join two tables on common columns
18              
19             =head1 SYNOPSIS
20              
21             dbjoin [-Sid] --input table1.fsdb --input table2.fsdb [-nNrR] column [column...]
22              
23             OR
24              
25             cat table1.fsdb | dbjoin [-Sid] --input table2.fsdb [-nNrR] column [column...]
26              
27             =head1 DESCRIPTION
28              
29             Does a natural, inner join on TABLE1 and TABLE2
30             the specified columns. With the C<-a> option, or
31             with C<-t outer> it will do a natural, full outer join.
32              
33             (Database review:
34             inner joints output records only when there are matches in both tables
35             and will omit records that do not match.
36             Outer joins output all records from both tables,
37             filling with the empty value as needed.
38             Right (left) outer joins keep all elements of the right (left)
39             table, even those that don't match in the other table.)
40              
41             By default for non-hash joins, data will be sorted lexically,
42             but the usual sorting options can be mixed with the column
43             specification.
44              
45             Because two tables are required,
46             input is typically in files.
47             Standard input is accessible by the file "-".
48              
49             =head1 RESOURCE REQUIREMENTS AND PERFORMANCE
50              
51             Joins can be expensive.
52             Most databases have a query optimizer that
53             knows something about the data and so can select
54             algorithms for efficent operation,
55             in Fsdb, I are that optimizer.
56              
57             For I:
58             If data is already sorted, dbjoin will run more efficiently
59             by telling dbjoin the data is sorted with the C<-S>.
60              
61             The resource requirements L vary.
62             If input data is sorted and C<-S> is given,
63             then memory consumption is bounded by the
64             the sum of the largest number of records in either dataset
65             with the same value in the join column,
66             and there is no disk consumption.
67             If data is not sorted, then L requires
68             disk storage the size of both input files.
69              
70             One can minimize memory consumption by making sure
71             each record of table1 matches relatively few records in table2.
72             Typically this means that table2 should be the smaller.
73             For example, given two files: people.fsdb (schema: name iso_country_code)
74             and countries.fsdb (schema: iso_country_code full_country_name),
75             then
76              
77             dbjoin -i people.fsdb -i countries.fsdb iso_country_code
78              
79             will require less memory than
80              
81             dbjoin -i countries.fsdb -i people.fsdb iso_country_code
82              
83             if there are many people per country (as one would expect).
84             If warning "lots of matching rows accumulating in memory" appears,
85             this is the cause and try swapping join order.
86              
87             For I
88             (that is, with C<-m righthash> or C<-m lefthash>):
89             all of the right table (the second input) or the left (the first)
90             is loaded into memory (and "hashed").
91             The other table need not be sorted.
92             Runtime is O(n), but memory is O(size of hashed table).
93              
94              
95             =head1 OPTIONS
96              
97             =over 4
98              
99             =item B<-a> or B<--all>
100              
101             Perform a I,
102             include non-matches (each record which doesn't match at
103             all will appear once).
104             Default is an I.
105              
106             =item B<-t TYPE> or B<--type TYPE>
107              
108             Explicitly specify the join type.
109             TYPE must be inner, outer, left (outer), right (outer).
110             Default: inner.
111              
112             =item B<-m METHOD> or B<--method METHOD>
113              
114             Select join method (algorithm).
115             Choices are merge, righthash, and lefthash.
116             Default: merge.
117              
118             =item B<-S> or B<--pre-sorted>
119              
120             assume (and verify) data is already sorted
121              
122             =item B<-e E> or B<--empty E>
123              
124             give value E as the value for empty (null) records
125              
126             =item B<-T TmpDir>
127              
128             where to put tmp files.
129             Also uses environment variable TMPDIR, if -T is
130             not specified.
131             Default is /tmp.
132              
133             =back
134              
135             Sort specification options (can be interspersed with column names):
136              
137             =over 4
138              
139             =item B<-r> or B<--descending>
140              
141             sort in reverse order (high to low)
142              
143             =item B<-R> or B<--ascending>
144              
145             sort in normal order (low to high)
146              
147             =item B<-n> or B<--numeric>
148              
149             sort numerically
150              
151             =item B<-N> or B<--lexical>
152              
153             sort lexicographically
154              
155             =back
156              
157              
158             =for comment
159             begin_standard_fsdb_options
160              
161             This module also supports the standard fsdb options:
162              
163             =over 4
164              
165             =item B<-d>
166              
167             Enable debugging output.
168              
169             =item B<-i> or B<--input> InputSource
170              
171             Read from InputSource, typically a file name, or C<-> for standard input,
172             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
173              
174             =item B<-o> or B<--output> OutputDestination
175              
176             Write to OutputDestination, typically a file name, or C<-> for standard output,
177             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
178              
179             =item B<--autorun> or B<--noautorun>
180              
181             By default, programs process automatically,
182             but Fsdb::Filter objects in Perl do not run until you invoke
183             the run() method.
184             The C<--(no)autorun> option controls that behavior within Perl.
185              
186             =item B<--help>
187              
188             Show help.
189              
190             =item B<--man>
191              
192             Show full manual.
193              
194             =back
195              
196             =for comment
197             end_standard_fsdb_options
198              
199              
200             =head1 SAMPLE USAGE
201              
202             =head2 Input:
203              
204             #fsdb sid cid
205             1 10
206             2 11
207             1 12
208             2 12
209              
210             And in the file F:
211              
212             #fsdb cid cname
213             10 pascal
214             11 numanal
215             12 os
216              
217             =head2 Command:
218              
219             cat DATA/reg.fsdb | dbsort -n cid | dbjoin -i - -i DATA/classes -n cid
220              
221             =head2 Output:
222              
223             #fsdb cid sid cname
224             10 1 pascal
225             11 2 numanal
226             12 1 os
227             12 2 os
228             # - COMMENTS:
229             # | /home/johnh/BIN/DB/dbsort -n cid
230             # DATA/classes COMMENTS:
231             # joined comments:
232             # | /home/johnh/BIN/DB/dbjoin - DATA/classes cid
233              
234             =head1 SEE ALSO
235              
236             L.
237              
238              
239             =head1 CLASS FUNCTIONS
240              
241             =cut
242              
243             @ISA = qw(Fsdb::Filter);
244             ($VERSION) = 2.0;
245              
246 1     1   8855 use strict;
  1         1  
  1         38  
247 1     1   4 use Pod::Usage;
  1         2  
  1         144  
248 1     1   5 use Carp;
  1         1  
  1         52  
249              
250 1     1   4 use Fsdb::Filter;
  1         1  
  1         21  
251 1     1   4 use Fsdb::IO::Reader;
  1         2  
  1         21  
252 1     1   4 use Fsdb::IO::Writer;
  1         2  
  1         21  
253 1     1   3 use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort);
  1         1  
  1         2906  
254              
255              
256             =head2 new
257              
258             $filter = new Fsdb::Filter::dbjoin(@arguments);
259              
260             Create a new dbjoin object, taking command-line arguments.
261              
262             =cut
263              
264             sub new ($@) {
265 0     0 1   my $class = shift @_;
266 0           my $self = $class->SUPER::new(@_);
267 0           bless $self, $class;
268 0           $self->set_defaults;
269 0           $self->parse_options(@_);
270 0           $self->SUPER::post_new();
271 0           return $self;
272             }
273              
274              
275             =head2 set_defaults
276              
277             $filter->set_defaults();
278              
279             Internal: set up defaults.
280              
281             =cut
282              
283             sub set_defaults ($) {
284 0     0 1   my($self) = @_;
285 0           $self->SUPER::set_defaults();
286 0           $self->{_pre_sorted} = 0;
287 0           $self->{_sort_argv} = [];
288 0           $self->{_join_type} = 'inner';
289 0           $self->{_join_method} = 'merge';
290 0           $self->set_default_tmpdir;
291             }
292              
293             =head2 parse_options
294              
295             $filter->parse_options(@ARGV);
296              
297             Internal: parse command-line arguments.
298              
299             =cut
300              
301             sub parse_options ($@) {
302 0     0 1   my $self = shift @_;
303              
304 0           my(@argv) = @_;
305             $self->get_options(
306             \@argv,
307 0     0     'help|?' => sub { pod2usage(1); },
308 0     0     'man' => sub { pod2usage(-verbose => 2); },
309 0     0     'a|all!' => sub { $self->{_join_type} = 'outer'; },
310             'autorun!' => \$self->{_autorun},
311             'close!' => \$self->{_close},
312             'd|debug+' => \$self->{_debug},
313             'e|empty=s' => \$self->{_empty},
314 0     0     'i|input=s@' => sub { $self->parse_io_option('inputs', @_); },
315             'log!' => \$self->{_logprog},
316             'm|method=s' => \$self->{_join_method},
317 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
318             'S|pre-sorted+' => \$self->{_pre_sorted},
319             't|type=s' => \$self->{_join_type},
320             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
321             # sort key options:
322 0     0     'n|numeric' => sub { $self->parse_sort_option(@_); },
323 0     0     'N|lexical' => sub { $self->parse_sort_option(@_); },
324 0     0     'r|descending' => sub { $self->parse_sort_option(@_); },
325 0     0     'R|ascending' => sub { $self->parse_sort_option(@_); },
326 0     0     '<>' => sub { $self->parse_sort_option('<>', @_); },
327 0 0         ) or pod2usage(2);
328 0 0         croak $self->{_prog} . ": internal error, extra arguments.\n"
329             if ($#argv != -1);
330             }
331              
332             =head2 setup
333              
334             $filter->setup();
335              
336             Internal: setup, parse headers.
337              
338             =cut
339              
340             sub setup($) {
341 0     0 1   my($self) = @_;
342              
343             croak $self->{_prog} . ": no sorting key specified.\n"
344 0 0         if ($#{$self->{_sort_argv}} == -1);
  0            
345             croak $self->{_prog} . ": unknown join type " . $self->{_join_type} . ".\n"
346 0 0 0       if (!($self->{_join_type} eq 'inner' || $self->{_join_type} eq 'outer' || $self->{_join_type} eq 'left' || $self->{_join_type} eq 'right'));
      0        
      0        
347             croak $self->{_prog} . ": unknown join method " . $self->{_join_method} . ".\n"
348 0 0 0       if (!($self->{_join_method} eq 'merge' || $self->{_join_method} eq 'lefthash' || $self->{_join_method} eq 'righthash'));
      0        
349              
350 0           $self->setup_exactly_two_inputs;
351              
352             # hash join?
353 0           my($hashside, $fullside) = (undef, undef);
354 0 0         if ($self->{_join_method} eq 'righthash') {
    0          
355 0           ($hashside, $fullside) = (1, 0);
356             } elsif ($self->{_join_method} eq 'lefthash') {
357 0           ($hashside, $fullside) = (0, 1);
358             };
359              
360             #
361             # automatic input sorting?
362             #
363 0 0 0       if ($self->{_join_method} eq 'merge' && !$self->{_pre_sorted}) {
364 0           my(@final_sort_argv) = @{$self->{_sort_argv}};
  0            
365             unshift(@final_sort_argv, '-T', $self->{_tmpdir})
366 0 0         if (defined($self->{_tmpdir}));
367 0           foreach (0..1) {
368 0           my($new_reader, $new_fred) = dbpipeline_filter($self->{_inputs}[$_], [-comment_handler => $self->create_delay_comments_sub], dbsort('--nolog', @final_sort_argv));
369 0           $self->{_pre_sorted_inputs}[$_] = $self->{_inputs}[$_];
370 0           $self->{_ins}[$_] = $new_reader;
371 0           $self->{_in_freds}[$_] = $new_fred;
372             };
373             } else {
374 0           $self->finish_io_option('inputs', -comment_handler => $self->create_delay_comments_sub);
375             };
376              
377             # can't move next check earlier; it must be fater "finish_io_option".
378             croak $self->{_prog} . ": cannot handle input data with different field separators.\n"
379 0 0         if (!defined($self->{_ins}[0]->compare($self->{_ins}[1])));
380              
381             #
382             # figure the joined columns
383             #
384 0           my $i = 0; # counts output columns
385 0           my %join_keys;
386             my $key;
387 0           my @output_columns;
388 0           my %all_keys;
389 0           my @copy_codes = ('', '');
390 0           my $build_key_from_hashside_code = '';
391 0           my $build_key_from_fullside_code = '';
392 0           my $build_hashside_result_code = '';
393 0           my $build_output_from_hit_code = '';
394 0           my $merge_output_from_hit_code = '';
395 0           my $build_output_from_full_code = '';
396 0           my @hash_input_col_to_hit_col; # not actullay used
397 0           my $hit_ncol = 0;
398 0           for $key (@{$self->{_sort_argv}}) {
  0            
399 0 0         next if ($key =~ /^-/); # we deal with this later
400              
401             # figure out details for both sides
402 0           my @col_i;
403 0           foreach (0..1) {
404 0           $col_i[$_] = $self->{_ins}[$_]->col_to_i($key);
405 0 0         die($self->{_prog} . ": column ``$key'' is not in " . ($_ == 0 ? "left" : "right") ." join source.\n") if (!defined($col_i[$_]));
    0          
406             # for merge join, details are copy
407 0           $copy_codes[$_] .= '$out_fref->[' . $i . '] = $frefs[' . $_ . "]->[$col_i[$_]];\n";
408             # for hash join, also build the hash key
409 0 0         if ($self->{_join_method} ne 'merge') {
410 0 0         if ($_ == $hashside) {
411 0 0         $build_key_from_hashside_code .= '$key .= ' . ($i == 0 ? '' : '$fs . ') . '$fref->[' . $col_i[$_] . ']; ';
412 0 0         $build_hashside_result_code .= ($i == 0 ? '' : ", ") . '$fref->[' . $col_i[$_] . ']';
413 0           $hash_input_col_to_hit_col[$col_i[$_]] = $hit_ncol++;
414 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $i . "];\n";
415             };
416 0 0         if ($_ == $fullside) {
417 0 0         $build_key_from_fullside_code .= '$key .= ' . ($i == 0 ? '' : '$fs . ') . '$fref->[' . $col_i[$_] . ']; ';
418 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i[$_] . "];\n";
419             };
420             };
421             };
422              
423 0           push(@output_columns, $key);
424 0           $join_keys{$key} = $i;
425 0           $all_keys{$key} = $i;
426 0           $i++;
427             }
428              
429             #
430             # and the rest
431             #
432 0           my $copy_codes = '';
433 0           my $col_i;
434 0           foreach $key (@{$self->{_ins}[0]->cols}) {
  0            
435 0 0         next if (defined($all_keys{$key})); # already got it
436 0           push(@output_columns, $key);
437 0           $col_i = $self->{_ins}[0]->col_to_i($key);
438 0 0         defined($col_i) or die "assert";
439 0           $copy_codes[0] .= '$out_fref->[' . $i . '] = $frefs[0]->[' . $col_i . '];' . "\n";
440 0           $all_keys{$key} = $i;
441 0 0         if ($self->{_join_method} ne 'merge') {
442 0 0         if (0 == $hashside) {
443 0           $build_hashside_result_code .= ', $fref->[' . $col_i . ']';
444 0           my $hit_col = $hit_ncol++;
445 0           $hash_input_col_to_hit_col[$col_i] = $hit_col;
446 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
447 0           $merge_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
448             };
449 0 0         if (0 == $fullside) {
450 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i . '];';
451             };
452             };
453 0           $i++;
454             };
455 0           foreach $key (@{$self->{_ins}[1]->cols}) {
  0            
456 0 0         next if (defined($join_keys{$key}));
457             # detect duplicates that are not joined upon (error)
458             # (this represents duplicate fieds in the two merged things).
459             # Reject this because we don't want to silently prefer one to the other.
460 0 0         if (defined($all_keys{$key})) {
461 0           croak $self->{_prog} . ": column $key is in both of the joined files, but is not joined upon.\nAll non-joined columns must be unique.\nBefore joining you must\nrename one of the source columns\nor remove one of the duplicate input columns.\n";
462             };
463 0           push(@output_columns, $key);
464 0           $col_i = $self->{_ins}[1]->col_to_i($key);
465 0 0         defined($col_i) or die "assert";
466 0           $copy_codes[1] .= '$out_fref->[' . $i . '] = $frefs[1]->[' . $col_i . '];' . "\n";
467 0           $all_keys{$key} = $i;
468 0 0         if ($self->{_join_method} ne 'merge') {
469 0 0         if (1 == $hashside) {
470 0           $build_hashside_result_code .= ', $fref->[' . $col_i . ']';
471 0           my $hit_col = $hit_ncol++;
472 0           $hash_input_col_to_hit_col[$col_i] = $hit_col;
473 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
474 0           $merge_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
475             };
476 0 0         if (1 == $fullside) {
477 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i . '];';
478             };
479             };
480 0           $i++;
481             };
482 0           $self->{_copy_codes} = \@copy_codes;
483              
484 0           $self->{_build_key_from_hashside_code} = $build_key_from_hashside_code;
485 0           $self->{_build_key_from_fullside_code} = $build_key_from_fullside_code;
486 0           $self->{_build_hashside_result_code} = $build_hashside_result_code;
487 0           $self->{_build_output_from_hit_code} = $build_output_from_hit_code;
488 0           $self->{_merge_output_from_hit_code} = $merge_output_from_hit_code;
489 0           $self->{_build_output_from_full_code} = $build_output_from_full_code;
490              
491 0           $self->finish_io_option('output', -clone => $self->{_ins}[0], -cols => \@output_columns);
492              
493             #
494             # comparision code
495             #
496 0           $self->{_compare_code} = $self->create_compare_code(@{$self->{_ins}}, 'frefs[0]', 'frefs[1]');;
  0            
497             croak $self->{_prog} . ": no join field specified.\n"
498 0 0         if (!defined($self->{_compare_code}));
499              
500 0 0         print "COMPARE CODE:\n\t" . $self->{_compare_code} . "\n" if ($self->{_debug});
501 0           foreach (0..1) {
502 0           $self->{_compare_code_ins}[$_] = $self->create_compare_code($self->{_ins}[$_], $self->{_ins}[$_], "prev_frefs[$_]", "frefs[$_]");
503             croak $self->{_prog} . ": no join field specified.\n"
504 0 0         if (!defined($self->{_compare_code_ins}[$_]));
505             };
506             }
507              
508              
509             =head2 run_merge_join
510              
511             $filter->run_merge_join();
512              
513             Internal: run over each rows.
514              
515             =cut
516             sub run_merge_join($) {
517 0     0 1   my($self) = @_;
518              
519             #
520             # Eval compare_sub in this lexical context
521             # of our variables.
522             #
523 0           my @prev_frefs;
524             my @frefs;
525 0           my(@right_frefs);
526 0           my $compare_sub;
527 0           my @check_compare_subs;
528             my $code = '$compare_sub = ' . $self->{_compare_code} . "\n" .
529             '$check_compare_subs[0] = ' . $self->{_compare_code_ins}[0] . "\n" .
530 0           '$check_compare_subs[1] = ' . $self->{_compare_code_ins}[1] . "\n";
531 0           eval $code;
532 0 0         $@ && croak $self->{_prog} . ": internal eval error in compare code: $@.\n";
533              
534 0           my @fastpath_subs;
535 0           foreach (0..1) {
536 0           $fastpath_subs[$_] = $self->{_ins}[$_]->fastpath_sub();
537             };
538 0           my $out_fastpath_sub = $self->{_out}->fastpath_sub();
539              
540             #
541             # Set up some "macros".
542             #
543 0           my $out_fref = [];
544 0           my $copy_left_to_out_fref;
545             my $copy_right_to_out_fref;
546             $code = '$copy_left_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[0] . "\n};\n" .
547 0           '$copy_right_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[1] . "\n};\n";
548 0           eval $code;
549 0 0         $@ && croak $self->{_prog} . ": internal eval error in copy code: $@.\n$code\n";
550             my $reset_out_fref = sub {
551 0     0     $out_fref = [ ($self->{_empty}) x $self->{_out}->ncols ];
552 0           };
553             my $emit_non_match_left = sub {
554 0     0     &{$reset_out_fref}();
  0            
555 0           &{$copy_left_to_out_fref}();
  0            
556 0           &{$out_fastpath_sub}($out_fref);
  0            
557 0           };
558             my $emit_non_match_right = sub {
559 0     0     &{$reset_out_fref}();
  0            
560 0           &{$copy_right_to_out_fref}();
  0            
561 0           &{$out_fastpath_sub}($out_fref);
  0            
562 0           };
563 0 0         if ($self->{_join_type} eq 'inner') {
    0          
    0          
564 0     0     $emit_non_match_left = $emit_non_match_right = sub {};
565             } elsif ($self->{_join_type} eq 'left') {
566 0     0     $emit_non_match_right = sub {};
567             } elsif ($self->{_join_type} eq 'right') {
568 0     0     $emit_non_match_left = sub {};
569             };
570             my $advance_left = sub {
571 0     0     $prev_frefs[0] = $frefs[0];
572 0           $frefs[0] = &{$fastpath_subs[0]}();
  0            
573 0 0         if (defined($frefs[0])) {
574 0 0         &{$check_compare_subs[0]}() <= 0 or die "dbjoin: left stream is unsorted.\n";
  0            
575             };
576 0           };
577             my $advance_right = sub {
578 0     0     $prev_frefs[1] = $frefs[1];
579 0           $frefs[1] = &{$fastpath_subs[1]}();
  0            
580 0 0         if (defined($frefs[1])) {
581 0 0         &{$check_compare_subs[1]}() <= 0 or die "dbjoin: right stream is unsorted.\n";
  0            
582             };
583 0           };
584              
585             #
586             # join the data (assumes data already sorted)
587             #
588             # Algorithm (standard "Sort Merge Join"):
589             #
590             # more: walk through left (0) until it matches right
591             # emitting non-matching records as we go
592             # then we're in a match:
593             # find all matching rights and save them (maybe more than mem)
594             # then walk all matching lefts:
595             # for each one, output all the matches against the saved rights
596             # when we get a non-matching left, discard our saved rights
597             # and do more (above)
598             # finally, we may have leftover, non-matching rights, output them as non-matches
599             #
600             # Oh, and along the way, verify the inputs are actually sorted.
601             #
602             # The above algorithm (sort/merge join) is theoretically optimal
603             # at O(n log n) in n input records, but not experimentally optimal
604             # if left or right is small.
605             # In the current implementation, if |right| >> |left|, we lose.
606             #
607             # As an exercise to the reader, we could allow different
608             # algorithms. If right (or left) is small, we should just read
609             # it into memory and do a hash join against it.
610             # See for details.
611             #
612              
613             # prime the pump
614 0           foreach (0..1) {
615 0           $frefs[$_] = &{$fastpath_subs[$_]}();
  0            
616             };
617              
618             #
619             # Main loop: walk through left
620             #
621             until_eof:
622 0           for (;;) {
623             # eof on either stream? quit main loop
624 0 0 0       last if (!defined($frefs[0]) || !defined($frefs[1]));
625              
626             # eat data until we have a match
627 0           my $left_right_cmp;
628              
629             until_match:
630 0           for (;;) {
631 0 0         defined($frefs[0]) or die "assert";
632 0 0         defined($frefs[1]) or die "assert";
633              
634 0           $left_right_cmp = &{$compare_sub}();
  0            
635              
636 0 0         if ($left_right_cmp < 0) {
    0          
637             # left wins, so eat left
638 0           &{$emit_non_match_left}();
  0            
639 0           &{$advance_left}();
  0            
640 0 0         last until_eof if (!defined($frefs[0]));
641             } elsif ($left_right_cmp > 0) {
642             # right wins, eat right
643 0           &{$emit_non_match_right}();
  0            
644 0           &{$advance_right}();
  0            
645 0 0         last until_eof if (!defined($frefs[1]));
646             } else {
647 0           last until_match;
648             };
649             };
650              
651             #
652             # match, whoo hoo!
653             #
654 0 0         $left_right_cmp == 0 or die "assert";
655 0 0         defined($frefs[0]) or die "assert";
656 0 0         defined($frefs[1]) or die "assert";
657              
658             # accumulate rights
659             # Sigh, we save them in memory.
660             # xxx: we should really spill to disk if we get too many.
661 0           @right_frefs = ();
662             accumulate_rights:
663 0           for (;;) {
664 0           push(@right_frefs, $frefs[1]);
665 0 0         warn "internal warning: dbjoin: lots of matching rows accumulating in memory. Fixes: dbjoin code can spill to disk (not implemented) or dbjoin user can perhaps swap file orders.\n"
666             if ($#right_frefs == 2000); # just emit warning once
667 0           &{$advance_right}();
  0            
668 0 0         last accumulate_rights if (!defined($frefs[1]));
669 0           $left_right_cmp = &{$compare_sub}();
  0            
670 0 0         last accumulate_rights if ($left_right_cmp != 0);
671             };
672 0 0 0       (!defined($frefs[1]) || $left_right_cmp != 0) or die "assert";
673             #
674             # Ok, this is a bit gross, but we do it anyway.
675             # Right is now one beyond a match.
676             # Save it aside and we'll come back to it later.
677             # This way we can iterate with $frefs[1] over our saved @right_frefs,
678             # keeping our &{$fns}()'s happy.
679             #
680 0           my $right_fref_past_match = $frefs[1];
681              
682             # now walk lefts
683              
684 0           &{$reset_out_fref}();
  0            
685             walk_lefts:
686 0           for (;;) {
687 0           &{$copy_left_to_out_fref}();
  0            
688 0           foreach my $matching_right (@right_frefs) {
689 0           $frefs[1] = $matching_right; # hacky, but satisifys next line's call
690 0           &{$copy_right_to_out_fref}();
  0            
691 0           &{$out_fastpath_sub}($out_fref);
  0            
692             };
693 0           &{$advance_left}();
  0            
694 0 0         last walk_lefts if (!defined($frefs[0]));
695 0           $left_right_cmp = &{$compare_sub}();
  0            
696 0 0         last walk_lefts if ($left_right_cmp != 0);
697             };
698 0 0 0       (!defined($frefs[0]) || $left_right_cmp != 0) or die "assert";
699             # Put back our one-beyond right. Could even be eof.
700 0           $frefs[1] = $right_fref_past_match;
701              
702             # ok, we're now past a match,
703             # and maybe at eof on one stream.
704             # loop back to try again.
705 0 0 0       (!defined($frefs[0]) || !defined($frefs[1]) || $left_right_cmp != 0) or die "assert";
      0        
706             };
707              
708             # Ok, now at least one side or the other is eof.
709             # so drain both sides.
710 0           while (defined($frefs[0])) {
711 0           &{$emit_non_match_left}();
  0            
712 0           &{$advance_left}();
  0            
713             };
714 0           while (defined($frefs[1])) {
715 0           &{$emit_non_match_right}();
  0            
716 0           &{$advance_right}();
  0            
717             };
718              
719             # Reap the theads to suppress a warning (they SHOULD be done
720             # because they gave us eof, but who knows).
721 0 0         if (!$self->{_pre_sorted}) {
722 0           foreach (0..1) {
723 0           $self->{_in_freds}[$_]->join();
724             };
725             };
726             }
727              
728              
729             =head2 run_hash_join
730              
731             $filter->run_hash_join();
732              
733             Internal: run over each rows, doing a hash join.
734              
735             =cut
736             sub run_hash_join($$$) {
737 0     0 1   my($self, $hashside, $fullside) = @_;
738              
739             #
740             # A basic hash join: (This is Wikipedia's "Classic hash join".)
741             #
742             # Build phase:
743             #
744             # Load the hashed table into memory,
745             # building a hash table %h.
746             #
747             # Probe phase:
748             #
749             # Then walk the larger table,
750             # checking against the hash table for matches.
751             #
752             # We currently require it fit in memory.
753             #
754              
755             #
756             # Build.
757             #
758 0           my @fastpath_subs;
759 0           foreach (0..1) {
760 0           $fastpath_subs[$_] = $self->{_ins}[$_]->fastpath_sub();
761             };
762 0           my $out_fastpath_sub = $self->{_out}->fastpath_sub();
763              
764 0           my %hashed_table;
765             my %hashed_table_overflow;
766 0           my $fref;
767 0           my $overflow_count = 0;
768              
769 0           my $build_key_from_hashside;
770             my $build_key_from_fullside;
771 0           my $fs = $self->{_out}->fs;
772             my $code = '$build_key_from_hashside = sub {' . "\n\t" . 'my($key) = ""; ' . $self->{_build_key_from_hashside_code} .
773             "\n\t" . 'my @a = ( ' . $self->{_build_hashside_result_code} . ");\n\t" . 'return($key,\@a);' . "\n};\n" .
774             '$build_key_from_fullside = sub {' . "\n\t" . 'my($key) = ""; ' . $self->{_build_key_from_fullside_code} .
775 0           "\n\t" . 'return $key;' . "\n};\n";
776 0           eval $code;
777 0 0         $@ && croak $self->{_prog} . ": internal eval error in hash build code: $@.\n$code\n";
778              
779 0           while ($fref = &{$fastpath_subs[$hashside]}()) {
  0            
780             # build the hash entry
781 0           my($key, $aref) = &{$build_key_from_hashside}($fref);
  0            
782 0 0         if (defined($hashed_table{$key})) {
783             # overflow
784 0 0         if (ref($hashed_table{$key}) eq 'SCALAR') {
785             # already overflowed
786 0           push(@{$hashed_table_overflow{$key}}, $aref);
  0            
787             } else {
788             # new overflow
789             die "internal error: confused about overflow on $key\n"
790 0 0         if (defined($hashed_table_overflow{$key}));
791 0           my @new_overflow = ($hashed_table{$key}, $aref);
792 0           $hashed_table_overflow{$key} = \@new_overflow;
793 0           $hashed_table{$key} = 1; # mark new overflow
794 0           $overflow_count++;
795             };
796             } else {
797 0           $hashed_table{$key} = $aref;
798             };
799             };
800              
801 0           my %hashed_table_unmatched;
802 0           foreach (keys %hashed_table) {
803 0           $hashed_table_unmatched{$_} = 1;
804             };
805 0           my $out_fref = [];
806              
807 0           my $build_output_from_full;
808             my $build_output_from_hit;
809 0           my $merge_output_from_hit;
810 0           my $empty = $self->{_empty};
811 0           my $hit;
812 0           my $reset_output_code = '$out_fref = [ ($empty) x ' . $self->{_out}->ncols . " ];\n";
813             $code = '$build_output_from_full = sub {' . "\n" . $reset_output_code . $self->{_build_output_from_full_code} . "\n};\n" .
814             '$build_output_from_hit = sub {' . "\n" . $reset_output_code . $self->{_build_output_from_hit_code} . "\n};\n" .
815 0           '$merge_output_from_hit = sub {' . "\n" . $self->{_merge_output_from_hit_code} . "\n};\n";
816 0           eval $code;
817 0 0         $@ && croak $self->{_prog} . ": internal eval error in hash probe code: $@.\n$code\n";
818              
819             #
820             # Probe.
821             #
822             my($show_probe_non_matches) = ($self->{_join_type} eq 'outer') ||
823             ($self->{_join_type} eq 'left' && $self->{_join_method} eq 'righthash') ||
824 0   0       ($self->{_join_type} eq 'right' && $self->{_join_method} eq 'lefthash');
825 0           while ($fref = &{$fastpath_subs[$fullside]}()) {
  0            
826             # probe and join
827 0           my $key = &{$build_key_from_fullside}();
  0            
828 0           $hit = $hashed_table{$key};
829 0 0         if (!defined($hit)) {
830             # no match
831 0 0         if ($show_probe_non_matches) {
832 0           &{$build_output_from_full}();
  0            
833 0           &{$out_fastpath_sub}($out_fref);
  0            
834             };
835             } else {
836 0           delete $hashed_table_unmatched{$key};
837 0           &{$build_output_from_full}();
  0            
838 0 0         if (ref($hit) eq 'ARRAY') {
839             # single hit
840 0           &{$merge_output_from_hit}();
  0            
841 0           &{$out_fastpath_sub}($out_fref);
  0            
842             } else {
843             # overflow hits
844             # xxx: if you replace "my $h" with "$hit", perl-5.22.2-362.fc24.x86_64 segfaults
845 0           foreach my $h (@{$hashed_table_overflow{$key}}) {
  0            
846 0           $hit = $h;
847 0           &{$merge_output_from_hit}();
  0            
848 0           &{$out_fastpath_sub}($out_fref);
  0            
849             };
850             };
851             };
852             };
853              
854             #
855             # Dump extra hashs, if necessary.
856             #
857 0 0 0       if (($self->{_join_type} eq 'outer') ||
      0        
      0        
      0        
858             ($self->{_join_type} eq 'left' && $self->{_join_method} eq 'lefthash') ||
859             ($self->{_join_type} eq 'right' && $self->{_join_method} eq 'righthash')) {
860 0           for my $key (sort keys %hashed_table_unmatched) {
861 0           $hit = $hashed_table{$key};
862 0 0         if (ref($hit) eq 'ARRAY') {
863             # single hit
864 0           &{$build_output_from_hit}();
  0            
865 0           &{$out_fastpath_sub}($out_fref);
  0            
866             } else {
867             # overflow hits
868 0           foreach $hit (@{$hashed_table_overflow{$key}}) {
  0            
869 0           &{$build_output_from_hit}();
  0            
870 0           &{$out_fastpath_sub}($out_fref);
  0            
871             };
872             };
873             };
874             };
875             }
876              
877              
878             =head2 run
879              
880             $filter->run();
881              
882             Internal: run over each rows.
883              
884             =cut
885             sub run($) {
886 0     0 1   my($self) = @_;
887 0 0         if ($self->{_join_method} eq 'merge') {
    0          
    0          
888 0           $self->run_merge_join();
889             } elsif ($self->{_join_method} eq 'righthash') {
890 0           $self->run_hash_join(1, 0);
891             } elsif ($self->{_join_method} eq 'lefthash') {
892 0           $self->run_hash_join(0, 1);
893             } else {
894 0           die $self->{_prog} . ": unknown join method " . $self->{_join_method} . "\n";
895             };
896             }
897              
898             =head1 AUTHOR and COPYRIGHT
899              
900             Copyright (C) 1991-2016 by John Heidemann
901              
902             This program is distributed under terms of the GNU general
903             public license, version 2. See the file COPYING
904             with the distribution for details.
905              
906             =cut
907              
908             1;