File Coverage

blib/lib/Fsdb/Filter/dbjoin.pm
Criterion Covered Total %
statement 21 367 5.7
branch 0 148 0.0
condition 0 48 0.0
subroutine 7 32 21.8
pod 7 7 100.0
total 35 602 5.8


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbjoin.pm
5             # Copyright (C) 1991-2016 by John Heidemann
6             # $Id: 653e1186837d37a1668d02763a023b730484a03c $
7             #
8             # This program is distributed under terms of the GNU general
9             # public license, version 2. See the file COPYING
10             # in $dblibdir for details.
11             #
12              
13             package Fsdb::Filter::dbjoin;
14              
15             =head1 NAME
16              
17             dbjoin - join two tables on common columns
18              
19             =head1 SYNOPSIS
20              
21             dbjoin [-Sid] --input table1.fsdb --input table2.fsdb [-nNrR] column [column...]
22              
23             OR
24              
25             cat table1.fsdb | dbjoin [-Sid] --input table2.fsdb [-nNrR] column [column...]
26              
27             =head1 DESCRIPTION
28              
29             Does a natural, inner join on TABLE1 and TABLE2
30             the specified columns. With the C<-a> option, or
31             with C<-t outer> it will do a natural, full outer join.
32              
33             (Database review:
34             inner joints output records only when there are matches in both tables
35             and will omit records that do not match.
36             Outer joins output all records from both tables,
37             filling with the empty value as needed.
38             Right (left) outer joins keep all elements of the right (left)
39             table, even those that don't match in the other table.)
40              
41             By default for non-hash joins, data will be sorted lexically,
42             but the usual sorting options can be mixed with the column
43             specification.
44              
45             Because two tables are required,
46             input is typically in files.
47             Standard input is accessible by the file "-".
48              
49             If only one input is given, the first (left) input
50             is taken from stdin.
51              
52             =head1 RESOURCE REQUIREMENTS AND PERFORMANCE
53              
54             Joins can be expensive.
55             Most databases have a query optimizer that
56             knows something about the data and so can select
57             algorithms for efficent operation,
58             in Fsdb, I are that optimizer.
59              
60             For I:
61             If data is already sorted, dbjoin will run more efficiently
62             by telling dbjoin the data is sorted with the C<-S>.
63              
64             The resource requirements L vary.
65             If input data is sorted and C<-S> is given,
66             then memory consumption is bounded by the
67             the sum of the largest number of records in either dataset
68             with the same value in the join column,
69             and there is no disk consumption.
70             If data is not sorted, then L requires
71             disk storage the size of both input files.
72              
73             One can minimize memory consumption by making sure
74             each record of table1 matches relatively few records in table2.
75             Typically this means that table2 should be the smaller.
76             For example, given two files: people.fsdb (schema: name iso_country_code)
77             and countries.fsdb (schema: iso_country_code full_country_name),
78             then
79              
80             dbjoin -i people.fsdb -i countries.fsdb iso_country_code
81              
82             will require less memory than
83              
84             dbjoin -i countries.fsdb -i people.fsdb iso_country_code
85              
86             if there are many people per country (as one would expect).
87             If warning "lots of matching rows accumulating in memory" appears,
88             this is the cause and try swapping join order.
89              
90             For I
91             (that is, with C<-m righthash> or C<-m lefthash>):
92             all of the right table (the second input) or the left (the first)
93             is loaded into memory (and "hashed").
94             The other table need not be sorted.
95             Runtime is O(n), but memory is O(size of hashed table).
96              
97              
98             =head1 OPTIONS
99              
100             =over 4
101              
102             =item B<-a> or B<--all>
103              
104             Perform a I,
105             include non-matches (each record which doesn't match at
106             all will appear once).
107             Default is an I.
108              
109             =item B<-t TYPE> or B<--type TYPE>
110              
111             Explicitly specify the join type.
112             TYPE must be inner, outer, left (outer), right (outer).
113             Default: inner.
114              
115             =item B<-m METHOD> or B<--method METHOD>
116              
117             Select join method (algorithm).
118             Choices are merge, righthash, and lefthash.
119             Default: merge.
120              
121             =item B<-S> or B<--pre-sorted>
122              
123             assume (and verify) data is already sorted
124              
125             =item B<-e E> or B<--empty E>
126              
127             give value E as the value for empty (null) records
128              
129             =item B<-T TmpDir>
130              
131             where to put tmp files.
132             Also uses environment variable TMPDIR, if -T is
133             not specified.
134             Default is /tmp.
135              
136             =back
137              
138             Sort specification options (can be interspersed with column names):
139              
140             =over 4
141              
142             =item B<-r> or B<--descending>
143              
144             sort in reverse order (high to low)
145              
146             =item B<-R> or B<--ascending>
147              
148             sort in normal order (low to high)
149              
150             =item B<-n> or B<--numeric>
151              
152             sort numerically
153              
154             =item B<-N> or B<--lexical>
155              
156             sort lexicographically
157              
158             =back
159              
160              
161             =for comment
162             begin_standard_fsdb_options
163              
164             This module also supports the standard fsdb options:
165              
166             =over 4
167              
168             =item B<-d>
169              
170             Enable debugging output.
171              
172             =item B<-i> or B<--input> InputSource
173              
174             Read from InputSource, typically a file name, or C<-> for standard input,
175             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
176              
177             =item B<-o> or B<--output> OutputDestination
178              
179             Write to OutputDestination, typically a file name, or C<-> for standard output,
180             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
181              
182             =item B<--autorun> or B<--noautorun>
183              
184             By default, programs process automatically,
185             but Fsdb::Filter objects in Perl do not run until you invoke
186             the run() method.
187             The C<--(no)autorun> option controls that behavior within Perl.
188              
189             =item B<--help>
190              
191             Show help.
192              
193             =item B<--man>
194              
195             Show full manual.
196              
197             =back
198              
199             =for comment
200             end_standard_fsdb_options
201              
202              
203             =head1 SAMPLE USAGE
204              
205             =head2 Input:
206              
207             #fsdb sid cid
208             1 10
209             2 11
210             1 12
211             2 12
212              
213             And in the file F:
214              
215             #fsdb cid cname
216             10 pascal
217             11 numanal
218             12 os
219              
220             =head2 Command:
221              
222             cat DATA/reg.fsdb | dbsort -n cid | dbjoin -i - -i DATA/classes -n cid
223              
224             =head2 Output:
225              
226             #fsdb cid sid cname
227             10 1 pascal
228             11 2 numanal
229             12 1 os
230             12 2 os
231             # - COMMENTS:
232             # | /home/johnh/BIN/DB/dbsort -n cid
233             # DATA/classes COMMENTS:
234             # joined comments:
235             # | /home/johnh/BIN/DB/dbjoin - DATA/classes cid
236              
237             =head1 SEE ALSO
238              
239             L.
240              
241              
242             =head1 CLASS FUNCTIONS
243              
244             =cut
245              
246             @ISA = qw(Fsdb::Filter);
247             ($VERSION) = 2.0;
248              
249 1     1   9058 use strict;
  1         2  
  1         31  
250 1     1   5 use Pod::Usage;
  1         2  
  1         96  
251 1     1   6 use Carp;
  1         2  
  1         49  
252              
253 1     1   4 use Fsdb::Filter;
  1         2  
  1         24  
254 1     1   4 use Fsdb::IO::Reader;
  1         3  
  1         21  
255 1     1   4 use Fsdb::IO::Writer;
  1         2  
  1         22  
256 1     1   5 use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort);
  1         2  
  1         2897  
257              
258              
259             =head2 new
260              
261             $filter = new Fsdb::Filter::dbjoin(@arguments);
262              
263             Create a new dbjoin object, taking command-line arguments.
264              
265             =cut
266              
267             sub new ($@) {
268 0     0 1   my $class = shift @_;
269 0           my $self = $class->SUPER::new(@_);
270 0           bless $self, $class;
271 0           $self->set_defaults;
272 0           $self->parse_options(@_);
273 0           $self->SUPER::post_new();
274 0           return $self;
275             }
276              
277              
278             =head2 set_defaults
279              
280             $filter->set_defaults();
281              
282             Internal: set up defaults.
283              
284             =cut
285              
286             sub set_defaults ($) {
287 0     0 1   my($self) = @_;
288 0           $self->SUPER::set_defaults();
289 0           $self->{_pre_sorted} = 0;
290 0           $self->{_sort_argv} = [];
291 0           $self->{_join_type} = 'inner';
292 0           $self->{_join_method} = 'merge';
293 0           $self->set_default_tmpdir;
294             }
295              
296             =head2 parse_options
297              
298             $filter->parse_options(@ARGV);
299              
300             Internal: parse command-line arguments.
301              
302             =cut
303              
304             sub parse_options ($@) {
305 0     0 1   my $self = shift @_;
306              
307 0           my(@argv) = @_;
308             $self->get_options(
309             \@argv,
310 0     0     'help|?' => sub { pod2usage(1); },
311 0     0     'man' => sub { pod2usage(-verbose => 2); },
312 0     0     'a|all!' => sub { $self->{_join_type} = 'outer'; },
313             'autorun!' => \$self->{_autorun},
314             'close!' => \$self->{_close},
315             'd|debug+' => \$self->{_debug},
316             'e|empty=s' => \$self->{_empty},
317 0     0     'i|input=s@' => sub { $self->parse_io_option('inputs', @_); },
318             'log!' => \$self->{_logprog},
319             'm|method=s' => \$self->{_join_method},
320 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
321             'S|pre-sorted+' => \$self->{_pre_sorted},
322             't|type=s' => \$self->{_join_type},
323             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
324             # sort key options:
325 0     0     'n|numeric' => sub { $self->parse_sort_option(@_); },
326 0     0     'N|lexical' => sub { $self->parse_sort_option(@_); },
327 0     0     'r|descending' => sub { $self->parse_sort_option(@_); },
328 0     0     'R|ascending' => sub { $self->parse_sort_option(@_); },
329 0     0     '<>' => sub { $self->parse_sort_option('<>', @_); },
330 0 0         ) or pod2usage(2);
331 0 0         croak $self->{_prog} . ": internal error, extra arguments.\n"
332             if ($#argv != -1);
333             }
334              
335             =head2 setup
336              
337             $filter->setup();
338              
339             Internal: setup, parse headers.
340              
341             =cut
342              
343             sub setup($) {
344 0     0 1   my($self) = @_;
345              
346             croak $self->{_prog} . ": no sorting key specified.\n"
347 0 0         if ($#{$self->{_sort_argv}} == -1);
  0            
348             croak $self->{_prog} . ": unknown join type " . $self->{_join_type} . ".\n"
349 0 0 0       if (!($self->{_join_type} eq 'inner' || $self->{_join_type} eq 'outer' || $self->{_join_type} eq 'left' || $self->{_join_type} eq 'right'));
      0        
      0        
350             croak $self->{_prog} . ": unknown join method " . $self->{_join_method} . ".\n"
351 0 0 0       if (!($self->{_join_method} eq 'merge' || $self->{_join_method} eq 'lefthash' || $self->{_join_method} eq 'righthash'));
      0        
352              
353 0           $self->setup_exactly_two_inputs;
354              
355             # hash join?
356 0           my($hashside, $fullside) = (undef, undef);
357 0 0         if ($self->{_join_method} eq 'righthash') {
    0          
358 0           ($hashside, $fullside) = (1, 0);
359             } elsif ($self->{_join_method} eq 'lefthash') {
360 0           ($hashside, $fullside) = (0, 1);
361             };
362              
363             #
364             # automatic input sorting?
365             #
366 0 0 0       if ($self->{_join_method} eq 'merge' && !$self->{_pre_sorted}) {
367 0           my(@final_sort_argv) = @{$self->{_sort_argv}};
  0            
368             unshift(@final_sort_argv, '-T', $self->{_tmpdir})
369 0 0         if (defined($self->{_tmpdir}));
370 0           foreach (0..1) {
371 0           my($new_reader, $new_fred) = dbpipeline_filter($self->{_inputs}[$_], [-comment_handler => $self->create_delay_comments_sub], dbsort('--nolog', @final_sort_argv));
372 0           $self->{_pre_sorted_inputs}[$_] = $self->{_inputs}[$_];
373 0           $self->{_ins}[$_] = $new_reader;
374 0           $self->{_in_freds}[$_] = $new_fred;
375             };
376             } else {
377 0           $self->finish_io_option('inputs', -comment_handler => $self->create_delay_comments_sub);
378             };
379              
380             # can't move next check earlier; it must be fater "finish_io_option".
381             croak $self->{_prog} . ": cannot handle input data with different field separators.\n"
382 0 0         if (!defined($self->{_ins}[0]->compare($self->{_ins}[1])));
383              
384             #
385             # figure the joined columns
386             #
387 0           my $i = 0; # counts output columns
388 0           my %join_keys;
389             my $key;
390 0           my @output_columns;
391 0           my %all_keys;
392 0           my @copy_codes = ('', '');
393 0           my $build_key_from_hashside_code = '';
394 0           my $build_key_from_fullside_code = '';
395 0           my $build_hashside_result_code = '';
396 0           my $build_output_from_hit_code = '';
397 0           my $merge_output_from_hit_code = '';
398 0           my $build_output_from_full_code = '';
399 0           my @hash_input_col_to_hit_col; # not actullay used
400 0           my $hit_ncol = 0;
401 0           for $key (@{$self->{_sort_argv}}) {
  0            
402 0 0         next if ($key =~ /^-/); # we deal with this later
403              
404             # figure out details for both sides
405 0           my @col_i;
406 0           foreach (0..1) {
407 0           $col_i[$_] = $self->{_ins}[$_]->col_to_i($key);
408 0 0         die($self->{_prog} . ": column ``$key'' is not in " . ($_ == 0 ? "left" : "right") ." join source.\n") if (!defined($col_i[$_]));
    0          
409             # for merge join, details are copy
410 0           $copy_codes[$_] .= '$out_fref->[' . $i . '] = $frefs[' . $_ . "]->[$col_i[$_]];\n";
411             # for hash join, also build the hash key
412 0 0         if ($self->{_join_method} ne 'merge') {
413 0 0         if ($_ == $hashside) {
414 0 0         $build_key_from_hashside_code .= '$key .= ' . ($i == 0 ? '' : '$fs . ') . '$fref->[' . $col_i[$_] . ']; ';
415 0 0         $build_hashside_result_code .= ($i == 0 ? '' : ", ") . '$fref->[' . $col_i[$_] . ']';
416 0           $hash_input_col_to_hit_col[$col_i[$_]] = $hit_ncol++;
417 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $i . "];\n";
418             };
419 0 0         if ($_ == $fullside) {
420 0 0         $build_key_from_fullside_code .= '$key .= ' . ($i == 0 ? '' : '$fs . ') . '$fref->[' . $col_i[$_] . ']; ';
421 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i[$_] . "];\n";
422             };
423             };
424             };
425              
426 0           push(@output_columns, $key);
427 0           $join_keys{$key} = $i;
428 0           $all_keys{$key} = $i;
429 0           $i++;
430             }
431              
432             #
433             # and the rest
434             #
435 0           my $copy_codes = '';
436 0           my $col_i;
437 0           foreach $key (@{$self->{_ins}[0]->cols}) {
  0            
438 0 0         next if (defined($all_keys{$key})); # already got it
439 0           push(@output_columns, $key);
440 0           $col_i = $self->{_ins}[0]->col_to_i($key);
441 0 0         defined($col_i) or die "assert";
442 0           $copy_codes[0] .= '$out_fref->[' . $i . '] = $frefs[0]->[' . $col_i . '];' . "\n";
443 0           $all_keys{$key} = $i;
444 0 0         if ($self->{_join_method} ne 'merge') {
445 0 0         if (0 == $hashside) {
446 0           $build_hashside_result_code .= ', $fref->[' . $col_i . ']';
447 0           my $hit_col = $hit_ncol++;
448 0           $hash_input_col_to_hit_col[$col_i] = $hit_col;
449 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
450 0           $merge_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
451             };
452 0 0         if (0 == $fullside) {
453 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i . '];';
454             };
455             };
456 0           $i++;
457             };
458 0           foreach $key (@{$self->{_ins}[1]->cols}) {
  0            
459 0 0         next if (defined($join_keys{$key}));
460             # detect duplicates that are not joined upon (error)
461             # (this represents duplicate fieds in the two merged things).
462             # Reject this because we don't want to silently prefer one to the other.
463 0 0         if (defined($all_keys{$key})) {
464 0           croak $self->{_prog} . ": column $key is in both of the joined files, but is not joined upon.\nAll non-joined columns must be unique.\nBefore joining you must\nrename one of the source columns\nor remove one of the duplicate input columns.\n";
465             };
466 0           push(@output_columns, $key);
467 0           $col_i = $self->{_ins}[1]->col_to_i($key);
468 0 0         defined($col_i) or die "assert";
469 0           $copy_codes[1] .= '$out_fref->[' . $i . '] = $frefs[1]->[' . $col_i . '];' . "\n";
470 0           $all_keys{$key} = $i;
471 0 0         if ($self->{_join_method} ne 'merge') {
472 0 0         if (1 == $hashside) {
473 0           $build_hashside_result_code .= ', $fref->[' . $col_i . ']';
474 0           my $hit_col = $hit_ncol++;
475 0           $hash_input_col_to_hit_col[$col_i] = $hit_col;
476 0           $build_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
477 0           $merge_output_from_hit_code .= '$out_fref->[' . $i . '] = $hit->[' . $hit_col . '];';
478             };
479 0 0         if (1 == $fullside) {
480 0           $build_output_from_full_code .= '$out_fref->[' . $i . '] = $fref->[' . $col_i . '];';
481             };
482             };
483 0           $i++;
484             };
485 0           $self->{_copy_codes} = \@copy_codes;
486              
487 0           $self->{_build_key_from_hashside_code} = $build_key_from_hashside_code;
488 0           $self->{_build_key_from_fullside_code} = $build_key_from_fullside_code;
489 0           $self->{_build_hashside_result_code} = $build_hashside_result_code;
490 0           $self->{_build_output_from_hit_code} = $build_output_from_hit_code;
491 0           $self->{_merge_output_from_hit_code} = $merge_output_from_hit_code;
492 0           $self->{_build_output_from_full_code} = $build_output_from_full_code;
493              
494 0           $self->finish_io_option('output', -clone => $self->{_ins}[0], -cols => \@output_columns);
495              
496             #
497             # comparision code
498             #
499 0           $self->{_compare_code} = $self->create_compare_code(@{$self->{_ins}}, 'frefs[0]', 'frefs[1]');;
  0            
500             croak $self->{_prog} . ": no join field specified.\n"
501 0 0         if (!defined($self->{_compare_code}));
502              
503 0 0         print "COMPARE CODE:\n\t" . $self->{_compare_code} . "\n" if ($self->{_debug});
504 0           foreach (0..1) {
505 0           $self->{_compare_code_ins}[$_] = $self->create_compare_code($self->{_ins}[$_], $self->{_ins}[$_], "prev_frefs[$_]", "frefs[$_]");
506             croak $self->{_prog} . ": no join field specified.\n"
507 0 0         if (!defined($self->{_compare_code_ins}[$_]));
508             };
509             }
510              
511              
512             =head2 run_merge_join
513              
514             $filter->run_merge_join();
515              
516             Internal: run over each rows.
517              
518             =cut
519             sub run_merge_join($) {
520 0     0 1   my($self) = @_;
521              
522             #
523             # Eval compare_sub in this lexical context
524             # of our variables.
525             #
526 0           my @prev_frefs;
527             my @frefs;
528 0           my(@right_frefs);
529 0           my $compare_sub;
530 0           my @check_compare_subs;
531             my $code = '$compare_sub = ' . $self->{_compare_code} . "\n" .
532             '$check_compare_subs[0] = ' . $self->{_compare_code_ins}[0] . "\n" .
533 0           '$check_compare_subs[1] = ' . $self->{_compare_code_ins}[1] . "\n";
534 0           eval $code;
535 0 0         $@ && croak $self->{_prog} . ": internal eval error in compare code: $@.\n";
536              
537 0           my @fastpath_subs;
538 0           foreach (0..1) {
539 0           $fastpath_subs[$_] = $self->{_ins}[$_]->fastpath_sub();
540             };
541 0           my $out_fastpath_sub = $self->{_out}->fastpath_sub();
542              
543             #
544             # Set up some "macros".
545             #
546 0           my $out_fref = [];
547 0           my $copy_left_to_out_fref;
548             my $copy_right_to_out_fref;
549             $code = '$copy_left_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[0] . "\n};\n" .
550 0           '$copy_right_to_out_fref = sub {' . "\n" . $self->{_copy_codes}[1] . "\n};\n";
551 0           eval $code;
552 0 0         $@ && croak $self->{_prog} . ": internal eval error in copy code: $@.\n$code\n";
553             my $reset_out_fref = sub {
554 0     0     $out_fref = [ ($self->{_empty}) x $self->{_out}->ncols ];
555 0           };
556             my $emit_non_match_left = sub {
557 0     0     &{$reset_out_fref}();
  0            
558 0           &{$copy_left_to_out_fref}();
  0            
559 0           &{$out_fastpath_sub}($out_fref);
  0            
560 0           };
561             my $emit_non_match_right = sub {
562 0     0     &{$reset_out_fref}();
  0            
563 0           &{$copy_right_to_out_fref}();
  0            
564 0           &{$out_fastpath_sub}($out_fref);
  0            
565 0           };
566 0 0         if ($self->{_join_type} eq 'inner') {
    0          
    0          
567 0     0     $emit_non_match_left = $emit_non_match_right = sub {};
568             } elsif ($self->{_join_type} eq 'left') {
569 0     0     $emit_non_match_right = sub {};
570             } elsif ($self->{_join_type} eq 'right') {
571 0     0     $emit_non_match_left = sub {};
572             };
573             my $advance_left = sub {
574 0     0     $prev_frefs[0] = $frefs[0];
575 0           $frefs[0] = &{$fastpath_subs[0]}();
  0            
576 0 0         if (defined($frefs[0])) {
577 0 0         &{$check_compare_subs[0]}() <= 0 or die "dbjoin: left stream is unsorted.\n";
  0            
578             };
579 0           };
580             my $advance_right = sub {
581 0     0     $prev_frefs[1] = $frefs[1];
582 0           $frefs[1] = &{$fastpath_subs[1]}();
  0            
583 0 0         if (defined($frefs[1])) {
584 0 0         &{$check_compare_subs[1]}() <= 0 or die "dbjoin: right stream is unsorted.\n";
  0            
585             };
586 0           };
587              
588             #
589             # join the data (assumes data already sorted)
590             #
591             # Algorithm (standard "Sort Merge Join"):
592             #
593             # more: walk through left (0) until it matches right
594             # emitting non-matching records as we go
595             # then we're in a match:
596             # find all matching rights and save them (maybe more than mem)
597             # then walk all matching lefts:
598             # for each one, output all the matches against the saved rights
599             # when we get a non-matching left, discard our saved rights
600             # and do more (above)
601             # finally, we may have leftover, non-matching rights, output them as non-matches
602             #
603             # Oh, and along the way, verify the inputs are actually sorted.
604             #
605             # The above algorithm (sort/merge join) is theoretically optimal
606             # at O(n log n) in n input records, but not experimentally optimal
607             # if left or right is small.
608             # In the current implementation, if |right| >> |left|, we lose.
609             #
610             # As an exercise to the reader, we could allow different
611             # algorithms. If right (or left) is small, we should just read
612             # it into memory and do a hash join against it.
613             # See for details.
614             #
615              
616             # prime the pump
617 0           foreach (0..1) {
618 0           $frefs[$_] = &{$fastpath_subs[$_]}();
  0            
619             };
620              
621             #
622             # Main loop: walk through left
623             #
624             until_eof:
625 0           for (;;) {
626             # eof on either stream? quit main loop
627 0 0 0       last if (!defined($frefs[0]) || !defined($frefs[1]));
628              
629             # eat data until we have a match
630 0           my $left_right_cmp;
631              
632             until_match:
633 0           for (;;) {
634 0 0         defined($frefs[0]) or die "assert";
635 0 0         defined($frefs[1]) or die "assert";
636              
637 0           $left_right_cmp = &{$compare_sub}();
  0            
638              
639 0 0         if ($left_right_cmp < 0) {
    0          
640             # left wins, so eat left
641 0           &{$emit_non_match_left}();
  0            
642 0           &{$advance_left}();
  0            
643 0 0         last until_eof if (!defined($frefs[0]));
644             } elsif ($left_right_cmp > 0) {
645             # right wins, eat right
646 0           &{$emit_non_match_right}();
  0            
647 0           &{$advance_right}();
  0            
648 0 0         last until_eof if (!defined($frefs[1]));
649             } else {
650 0           last until_match;
651             };
652             };
653              
654             #
655             # match, whoo hoo!
656             #
657 0 0         $left_right_cmp == 0 or die "assert";
658 0 0         defined($frefs[0]) or die "assert";
659 0 0         defined($frefs[1]) or die "assert";
660              
661             # accumulate rights
662             # Sigh, we save them in memory.
663             # xxx: we should really spill to disk if we get too many.
664 0           @right_frefs = ();
665             accumulate_rights:
666 0           for (;;) {
667 0           push(@right_frefs, $frefs[1]);
668 0 0         warn "internal warning: dbjoin: lots of matching rows accumulating in memory. Fixes: dbjoin code can spill to disk (not implemented) or dbjoin user can perhaps swap file orders.\n"
669             if ($#right_frefs == 2000); # just emit warning once
670 0           &{$advance_right}();
  0            
671 0 0         last accumulate_rights if (!defined($frefs[1]));
672 0           $left_right_cmp = &{$compare_sub}();
  0            
673 0 0         last accumulate_rights if ($left_right_cmp != 0);
674             };
675 0 0 0       (!defined($frefs[1]) || $left_right_cmp != 0) or die "assert";
676             #
677             # Ok, this is a bit gross, but we do it anyway.
678             # Right is now one beyond a match.
679             # Save it aside and we'll come back to it later.
680             # This way we can iterate with $frefs[1] over our saved @right_frefs,
681             # keeping our &{$fns}()'s happy.
682             #
683 0           my $right_fref_past_match = $frefs[1];
684              
685             # now walk lefts
686              
687 0           &{$reset_out_fref}();
  0            
688             walk_lefts:
689 0           for (;;) {
690 0           &{$copy_left_to_out_fref}();
  0            
691 0           foreach my $matching_right (@right_frefs) {
692 0           $frefs[1] = $matching_right; # hacky, but satisifys next line's call
693 0           &{$copy_right_to_out_fref}();
  0            
694 0           &{$out_fastpath_sub}($out_fref);
  0            
695             };
696 0           &{$advance_left}();
  0            
697 0 0         last walk_lefts if (!defined($frefs[0]));
698 0           $left_right_cmp = &{$compare_sub}();
  0            
699 0 0         last walk_lefts if ($left_right_cmp != 0);
700             };
701 0 0 0       (!defined($frefs[0]) || $left_right_cmp != 0) or die "assert";
702             # Put back our one-beyond right. Could even be eof.
703 0           $frefs[1] = $right_fref_past_match;
704              
705             # ok, we're now past a match,
706             # and maybe at eof on one stream.
707             # loop back to try again.
708 0 0 0       (!defined($frefs[0]) || !defined($frefs[1]) || $left_right_cmp != 0) or die "assert";
      0        
709             };
710              
711             # Ok, now at least one side or the other is eof.
712             # so drain both sides.
713 0           while (defined($frefs[0])) {
714 0           &{$emit_non_match_left}();
  0            
715 0           &{$advance_left}();
  0            
716             };
717 0           while (defined($frefs[1])) {
718 0           &{$emit_non_match_right}();
  0            
719 0           &{$advance_right}();
  0            
720             };
721              
722             # Reap the theads to suppress a warning (they SHOULD be done
723             # because they gave us eof, but who knows).
724 0 0         if (!$self->{_pre_sorted}) {
725 0           foreach (0..1) {
726 0           $self->{_in_freds}[$_]->join();
727             };
728             };
729             }
730              
731              
732             =head2 run_hash_join
733              
734             $filter->run_hash_join();
735              
736             Internal: run over each rows, doing a hash join.
737              
738             =cut
739             sub run_hash_join($$$) {
740 0     0 1   my($self, $hashside, $fullside) = @_;
741              
742             #
743             # A basic hash join: (This is Wikipedia's "Classic hash join".)
744             #
745             # Build phase:
746             #
747             # Load the hashed table into memory,
748             # building a hash table %h.
749             #
750             # Probe phase:
751             #
752             # Then walk the larger table,
753             # checking against the hash table for matches.
754             #
755             # We currently require it fit in memory.
756             #
757              
758             #
759             # Build.
760             #
761 0           my @fastpath_subs;
762 0           foreach (0..1) {
763 0           $fastpath_subs[$_] = $self->{_ins}[$_]->fastpath_sub();
764             };
765 0           my $out_fastpath_sub = $self->{_out}->fastpath_sub();
766              
767 0           my %hashed_table;
768             my %hashed_table_overflow;
769 0           my $fref;
770 0           my $overflow_count = 0;
771              
772 0           my $build_key_from_hashside;
773             my $build_key_from_fullside;
774 0           my $fs = $self->{_out}->fs;
775             my $code = '$build_key_from_hashside = sub {' . "\n\t" . 'my($key) = ""; ' . $self->{_build_key_from_hashside_code} .
776             "\n\t" . 'my @a = ( ' . $self->{_build_hashside_result_code} . ");\n\t" . 'return($key,\@a);' . "\n};\n" .
777             '$build_key_from_fullside = sub {' . "\n\t" . 'my($key) = ""; ' . $self->{_build_key_from_fullside_code} .
778 0           "\n\t" . 'return $key;' . "\n};\n";
779 0           eval $code;
780 0 0         $@ && croak $self->{_prog} . ": internal eval error in hash build code: $@.\n$code\n";
781              
782 0           while ($fref = &{$fastpath_subs[$hashside]}()) {
  0            
783             # build the hash entry
784 0           my($key, $aref) = &{$build_key_from_hashside}($fref);
  0            
785 0 0         if (defined($hashed_table{$key})) {
786             # overflow
787 0 0         if (ref($hashed_table{$key}) eq 'SCALAR') {
788             # already overflowed
789 0           push(@{$hashed_table_overflow{$key}}, $aref);
  0            
790             } else {
791             # new overflow
792             die "internal error: confused about overflow on $key\n"
793 0 0         if (defined($hashed_table_overflow{$key}));
794 0           my @new_overflow = ($hashed_table{$key}, $aref);
795 0           $hashed_table_overflow{$key} = \@new_overflow;
796 0           $hashed_table{$key} = 1; # mark new overflow
797 0           $overflow_count++;
798             };
799             } else {
800 0           $hashed_table{$key} = $aref;
801             };
802             };
803              
804 0           my %hashed_table_unmatched;
805 0           foreach (keys %hashed_table) {
806 0           $hashed_table_unmatched{$_} = 1;
807             };
808 0           my $out_fref = [];
809              
810 0           my $build_output_from_full;
811             my $build_output_from_hit;
812 0           my $merge_output_from_hit;
813 0           my $empty = $self->{_empty};
814 0           my $hit;
815 0           my $reset_output_code = '$out_fref = [ ($empty) x ' . $self->{_out}->ncols . " ];\n";
816             $code = '$build_output_from_full = sub {' . "\n" . $reset_output_code . $self->{_build_output_from_full_code} . "\n};\n" .
817             '$build_output_from_hit = sub {' . "\n" . $reset_output_code . $self->{_build_output_from_hit_code} . "\n};\n" .
818 0           '$merge_output_from_hit = sub {' . "\n" . $self->{_merge_output_from_hit_code} . "\n};\n";
819 0           eval $code;
820 0 0         $@ && croak $self->{_prog} . ": internal eval error in hash probe code: $@.\n$code\n";
821              
822             #
823             # Probe.
824             #
825             my($show_probe_non_matches) = ($self->{_join_type} eq 'outer') ||
826             ($self->{_join_type} eq 'left' && $self->{_join_method} eq 'righthash') ||
827 0   0       ($self->{_join_type} eq 'right' && $self->{_join_method} eq 'lefthash');
828 0           while ($fref = &{$fastpath_subs[$fullside]}()) {
  0            
829             # probe and join
830 0           my $key = &{$build_key_from_fullside}();
  0            
831 0           $hit = $hashed_table{$key};
832 0 0         if (!defined($hit)) {
833             # no match
834 0 0         if ($show_probe_non_matches) {
835 0           &{$build_output_from_full}();
  0            
836 0           &{$out_fastpath_sub}($out_fref);
  0            
837             };
838             } else {
839 0           delete $hashed_table_unmatched{$key};
840 0           &{$build_output_from_full}();
  0            
841 0 0         if (ref($hit) eq 'ARRAY') {
842             # single hit
843 0           &{$merge_output_from_hit}();
  0            
844 0           &{$out_fastpath_sub}($out_fref);
  0            
845             } else {
846             # overflow hits
847             # xxx: if you replace "my $h" with "$hit", perl-5.22.2-362.fc24.x86_64 segfaults
848 0           foreach my $h (@{$hashed_table_overflow{$key}}) {
  0            
849 0           $hit = $h;
850 0           &{$merge_output_from_hit}();
  0            
851 0           &{$out_fastpath_sub}($out_fref);
  0            
852             };
853             };
854             };
855             };
856              
857             #
858             # Dump extra hashs, if necessary.
859             #
860 0 0 0       if (($self->{_join_type} eq 'outer') ||
      0        
      0        
      0        
861             ($self->{_join_type} eq 'left' && $self->{_join_method} eq 'lefthash') ||
862             ($self->{_join_type} eq 'right' && $self->{_join_method} eq 'righthash')) {
863 0           for my $key (sort keys %hashed_table_unmatched) {
864 0           $hit = $hashed_table{$key};
865 0 0         if (ref($hit) eq 'ARRAY') {
866             # single hit
867 0           &{$build_output_from_hit}();
  0            
868 0           &{$out_fastpath_sub}($out_fref);
  0            
869             } else {
870             # overflow hits
871 0           foreach $hit (@{$hashed_table_overflow{$key}}) {
  0            
872 0           &{$build_output_from_hit}();
  0            
873 0           &{$out_fastpath_sub}($out_fref);
  0            
874             };
875             };
876             };
877             };
878             }
879              
880              
881             =head2 run
882              
883             $filter->run();
884              
885             Internal: run over each rows.
886              
887             =cut
888             sub run($) {
889 0     0 1   my($self) = @_;
890 0 0         if ($self->{_join_method} eq 'merge') {
    0          
    0          
891 0           $self->run_merge_join();
892             } elsif ($self->{_join_method} eq 'righthash') {
893 0           $self->run_hash_join(1, 0);
894             } elsif ($self->{_join_method} eq 'lefthash') {
895 0           $self->run_hash_join(0, 1);
896             } else {
897 0           die $self->{_prog} . ": unknown join method " . $self->{_join_method} . "\n";
898             };
899             }
900              
901             =head1 AUTHOR and COPYRIGHT
902              
903             Copyright (C) 1991-2016 by John Heidemann
904              
905             This program is distributed under terms of the GNU general
906             public license, version 2. See the file COPYING
907             with the distribution for details.
908              
909             =cut
910              
911             1;