File Coverage

blib/lib/Fsdb/Filter/dbmerge.pm
Criterion Covered Total %
statement 38 351 10.8
branch 0 196 0.0
condition 0 49 0.0
subroutine 13 39 33.3
pod 12 12 100.0
total 63 647 9.7


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbmerge.pm
5             # Copyright (C) 1991-2015 by John Heidemann
6             # $Id: b578ea71b5b2a54bed1fd517633261c2832cd066 $
7             #
8             # This program is distributed under terms of the GNU general
9             # public license, version 2. See the file COPYING
10             # in $dblibdir for details.
11             #
12              
13             package Fsdb::Filter::dbmerge;
14              
15             =head1 NAME
16              
17             dbmerge - merge all inputs in sorted order based on the the specified columns
18              
19             =head1 SYNOPSIS
20              
21             dbmerge --input A.fsdb --input B.fsdb [-T TemporaryDirectory] [-nNrR] column [column...]
22              
23             or
24             cat A.fsdb | dbmerge --input - --input B.fsdb [-T TemporaryDirectory] [-nNrR] column [column...]
25              
26              
27             or
28             dbmerge [-T TemporaryDirectory] [-nNrR] column [column...] --inputs A.fsdb [B.fsdb ...]
29              
30              
31             =head1 DESCRIPTION
32              
33             Merge all provided, pre-sorted input files, producing one sorted result.
34             Inputs can both be specified with C<--input>, or one can come
35             from standard input and the other from C<--input>.
36             With C<--xargs>, each line of standard input is a filename for input.
37              
38             Inputs must have identical schemas (columns, column order,
39             and field separators).
40              
41             Unlike F, F supports an arbitrary number of
42             input files.
43              
44             Because this program is intended to merge multiple sources,
45             it does I default to reading from standard input.
46             If you wish to list F<-> as an explicit input source.
47              
48             Also, because we deal with multiple input files,
49             this module doesn't output anything until it's run.
50              
51             L consumes a fixed amount of memory regardless of input size.
52             It therefore buffers output on disk as necessary.
53             (Merging is implemented a series of two-way merges,
54             so disk space is O(number of records).)
55              
56             L will merge data in parallel, if possible.
57             The <--parallelism> option can control the degree of parallelism,
58             if desired.
59              
60              
61             =head1 OPTIONS
62              
63             General option:
64              
65             =over 4
66              
67             =item B<--xargs>
68              
69             Expect that input filenames are given, one-per-line, on standard input.
70             (In this case, merging can start incrementally.
71              
72             =item B<--removeinputs>
73              
74             Delete the source files after they have been consumed.
75             (Defaults off, leaving the inputs in place.)
76              
77             =item B<-T TmpDir>
78              
79             where to put tmp files.
80             Also uses environment variable TMPDIR, if -T is
81             not specified.
82             Default is /tmp.
83              
84             =item B<--parallelism N> or B<-j N>
85              
86             Allow up to N merges to happen in parallel.
87             Default is the number of CPUs in the machine.
88              
89             =item B<--endgame> (or B<--noendgame>)
90              
91             Enable endgame mode, extra parallelism when finishing up.
92             (On by default.)
93              
94             =back
95              
96             Sort specification options (can be interspersed with column names):
97              
98             =over 4
99              
100             =item B<-r> or B<--descending>
101              
102             sort in reverse order (high to low)
103              
104             =item B<-R> or B<--ascending>
105              
106             sort in normal order (low to high)
107              
108             =item B<-n> or B<--numeric>
109              
110             sort numerically
111              
112             =item B<-N> or B<--lexical>
113              
114             sort lexicographically
115              
116             =back
117              
118             =for comment
119             begin_standard_fsdb_options
120              
121             This module also supports the standard fsdb options:
122              
123             =over 4
124              
125             =item B<-d>
126              
127             Enable debugging output.
128              
129             =item B<-i> or B<--input> InputSource
130              
131             Read from InputSource, typically a file name, or C<-> for standard input,
132             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
133              
134             =item B<-o> or B<--output> OutputDestination
135              
136             Write to OutputDestination, typically a file name, or C<-> for standard output,
137             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
138              
139             =item B<--autorun> or B<--noautorun>
140              
141             By default, programs process automatically,
142             but Fsdb::Filter objects in Perl do not run until you invoke
143             the run() method.
144             The C<--(no)autorun> option controls that behavior within Perl.
145              
146             =item B<--help>
147              
148             Show help.
149              
150             =item B<--man>
151              
152             Show full manual.
153              
154             =back
155              
156             =for comment
157             end_standard_fsdb_options
158              
159              
160             =head1 SAMPLE USAGE
161              
162             =head2 Input:
163              
164             File F:
165              
166             #fsdb cid cname
167             11 numanal
168             10 pascal
169              
170             File F:
171              
172             #fsdb cid cname
173             12 os
174             13 statistics
175              
176             These two files are both sorted by C,
177             and they have identical schemas.
178              
179             =head2 Command:
180              
181             dbmerge --input a.fsdb --input b.fsdb cname
182              
183             or
184              
185             cat a.fsdb | dbmerge --input b.fsdb cname
186              
187             =head2 Output:
188              
189             #fsdb cid cname
190             11 numanal
191             12 os
192             10 pascal
193             13 statistics
194             # | dbmerge --input a.fsdb --input b.fsdb cname
195              
196             =head1 SEE ALSO
197              
198             L,
199             L,
200             L
201              
202             =head1 CLASS FUNCTIONS
203              
204             =cut
205              
206              
207             @ISA = qw(Fsdb::Filter);
208             ($VERSION) = 2.0;
209              
210 1     1   28 use 5.010;
  1         4  
211 1     1   7 use strict;
  1         2  
  1         34  
212 1     1   7 use Carp qw(croak);
  1         3  
  1         52  
213 1     1   7 use Pod::Usage;
  1         2  
  1         86  
214              
215 1     1   359 use IO::Pipe;
  1         1319  
  1         36  
216 1     1   331 use IO::Select;
  1         1717  
  1         55  
217              
218 1     1   15 use Fsdb::Filter;
  1         3  
  1         28  
219 1     1   7 use Fsdb::Filter::dbmerge2;
  1         3  
  1         28  
220 1     1   7 use Fsdb::IO::Reader;
  1         2  
  1         24  
221 1     1   6 use Fsdb::IO::Writer;
  1         3  
  1         23  
222 1     1   11 use Fsdb::Support::NamedTmpfile;
  1         4  
  1         24  
223 1     1   7 use Fsdb::Support::OS;
  1         3  
  1         40  
224 1     1   6 use Fsdb::Support::Freds;
  1         3  
  1         4282  
225              
226              
227             =head2 new
228              
229             $filter = new Fsdb::Filter::dbmerge(@arguments);
230              
231             Create a new object, taking command-line arguments.
232              
233             =cut
234              
235             sub new($@) {
236 0     0 1   my $class = shift @_;
237 0           my $self = $class->SUPER::new(@_);
238 0           bless $self, $class;
239 0           $self->set_defaults;
240 0           $self->parse_options(@_);
241 0           $self->SUPER::post_new();
242 0           return $self;
243             }
244              
245              
246             =head2 set_defaults
247              
248             $filter->set_defaults();
249              
250             Internal: set up defaults.
251              
252             =cut
253              
254             sub set_defaults($) {
255 0     0 1   my $self = shift @_;
256 0           $self->SUPER::set_defaults();
257 0           $self->{_remove_inputs} = undef;
258 0           $self->{_info}{input_count} = 2;
259 0           $self->{_sort_argv} = [];
260 0           $self->{_max_parallelism} = undef;
261 0           $self->{_parallelism_available} = undef;
262 0           $self->{_test} = '';
263 0           $self->{_xargs} = undef;
264 0           $self->{_endgame} = 1;
265 0           $self->set_default_tmpdir;
266             }
267              
268             =head2 parse_options
269              
270             $filter->parse_options(@ARGV);
271              
272             Internal: parse command-line arguments.
273              
274             =cut
275              
276             sub parse_options($@) {
277 0     0 1   my $self = shift @_;
278              
279 0           my(@argv) = @_;
280 0           my $past_sort_options = undef;
281             $self->get_options(
282             \@argv,
283 0     0     'help|?' => sub { pod2usage(1); },
284 0     0     'man' => sub { pod2usage(-verbose => 2); },
285             'autorun!' => \$self->{_autorun},
286             'close!' => \$self->{_close},
287             'd|debug+' => \$self->{_debug},
288             'endgame!' => \$self->{_endgame},
289 0     0     'i|input=s@' => sub { $self->parse_io_option('inputs', @_); },
290             'inputs!' => \$past_sort_options,
291             'j|parallelism=i' => \$self->{_max_parallelism},
292             'log!' => \$self->{_logprog},
293 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
294             'removeinputs!' => \$self->{_remove_inputs},
295             'test=s' => \$self->{_test},
296             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
297             'xargs!' => \$self->{_xargs},
298             # sort key options:
299 0     0     'n|numeric' => sub { $self->parse_sort_option(@_); },
300 0     0     'N|lexical' => sub { $self->parse_sort_option(@_); },
301 0     0     'r|descending' => sub { $self->parse_sort_option(@_); },
302 0     0     'R|ascending' => sub { $self->parse_sort_option(@_); },
303 0 0   0     '<>' => sub { if ($past_sort_options) {
304 0           $self->parse_io_option('inputs', @_);
305             } else {
306 0           $self->parse_sort_option('<>', @_);
307             };
308             },
309 0 0         ) or pod2usage(2);
310             }
311              
312             =head2 _pretty_fn
313              
314             _pretty_fn($fn)
315              
316             Internal: pretty-print a filename or Fsdb::BoundedQueue.
317              
318             =cut
319              
320             sub _pretty_fn {
321 0     0     my($fn) = @_;
322 0 0         return ref($fn) if (ref($fn));
323 0           return $fn;
324             }
325              
326              
327             =head2 segment_next_output
328              
329             $out = $self->segment_next_output($output_type)
330              
331             Internal: return a Fsdb::IO::Writer as $OUT
332             that either points to our output or a temporary file,
333             depending on how things are going.
334              
335             The $OUTPUT_TYPE can be 'final' or 'ipc' or 'file'.
336              
337             =cut
338              
339             sub segment_next_output($$) {
340 0     0 1   my ($self, $output_type) = @_;
341 0           my $out;
342 0 0         if ($output_type eq 'final') {
    0          
    0          
343             # $self->finish_io_option('output', -clone => $self->{_two_ins}[0]);
344             # $out = $self->{_out};
345 0           $out = $self->{_output}; # will pass this to the dbmerge2 module
346 0 0         print "# final output\n" if ($self->{_debug});
347             } elsif ($output_type eq 'file') {
348             # dump to a file for merging
349 0           my $tmpfile = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir});
350 0           $self->{_files_cleanup}{$tmpfile} = 'NamedTmpfile';
351 0           $out = $tmpfile; # just return the name
352             } elsif ($output_type eq 'ipc') {
353             # endgame-mode: send stuff down in-memory queues
354             # $out = shared_clone(new Fsdb::BoundedQueue);
355 0           my $read_fh = new IO::Handle;
356 0           my $write_fh = new IO::Handle;
357 0           pipe $read_fh, $write_fh;
358             # Without these lines we suffer mojobake on unicode
359             # as shown in test base TEST/dbsort_merge_unicode.cmd.
360 0           binmode $read_fh, ":utf8";
361 0           binmode $write_fh, ":utf8";
362 0           $out = [ $read_fh, $write_fh ];
363             } else {
364 0           die "internal error: dbmege.pm:segment_next_output bad output_type: $output_type\n";
365             };
366 0           return $out;
367             }
368              
369              
370             =head2 segment_cleanup
371              
372             $out = $self->segment_cleanup($file);
373              
374             Internal: Clean up a file, if necessary.
375             (Sigh, used to be function pointers, but
376             not clear how they would interact with threads.)
377              
378             =cut
379              
380             sub segment_cleanup($$) {
381 0     0 1   my($self, $file) = @_;
382 0 0         if (ref($file)) {
383 0 0         if (ref($file) =~ /^IO::/) {
    0          
384 0 0         print "# closing IO::*\n" if ($self->{_debug});
385 0           $file->close;
386             } elsif (ref($file) eq 'Fsdb::BoundedQueue') {
387             # nothing to do
388             } else {
389 0           die "internal error: unknown type in dbmerge::segment_cleanup\n";
390             };
391 0           return;
392             };
393 0           my($cleanup_type) = $self->{_files_cleanup}{$file};
394 0 0         die "bad (empty) file in dbmerge::segment_cleanup\n"
395             if (!defined($file));
396 0 0         if (!defined($cleanup_type)) {
    0          
    0          
397 0 0         print "# dbmerge: segment_cleanup: no cleanup for " . _pretty_fn($file) . "\n" if ($self->{_debug});
398             # nothing
399             } elsif ($cleanup_type eq 'unlink') {
400 0 0         print "# dbmerge: segment_cleanup: cleaning up $file\n" if ($self->{_debug});
401 0           unlink($file);
402             } elsif ($cleanup_type eq 'NamedTmpfile') {
403 0 0         print "# dbmerge: segment_cleanup: NamedTmpfile::cleanup_one $file\n" if ($self->{_debug});
404 0           Fsdb::Support::NamedTmpfile::cleanup_one($file);
405             } else {
406 0           die $self->{_prog} . ": internal error, unknown segment_cleanup type $cleanup_type\n";
407             };
408             }
409              
410             =head2 _unique_id
411              
412             $id = $self->_unique_id()
413              
414             Generate a sequence number for debugging.
415              
416             =cut
417             sub _unique_id() {
418 0     0     my($self) = @_;
419 0   0       $self->{_unique_id} //= 0;
420 0           return $self->{_unique_id}++;
421             };
422              
423              
424             =head2 segments_merge2_run
425              
426             $out = $self->segments_merge2_run($out_fn, $is_final_output,
427             $in0, $in1);
428              
429              
430             Internal: do the actual merge2 work (maybe our parent put us in a
431             thread, maybe not).
432              
433             =cut
434             sub segments_merge2_run($$$$$$) {
435 0     0 1   my($self, $out_fn, $is_final_output, $in0, $in1, $id) = @_;
436              
437 0           my @merge_options = qw(--autorun --nolog);
438             push(@merge_options, '--noclose', '--saveoutput' => \$self->{_out})
439 0 0         if ($is_final_output);
440              
441 0           my $debug_msg = '';
442 0 0         if ($self->{_debug}) {
443 0 0         $debug_msg = "(id $id) " . _pretty_fn($in0) . " with " . _pretty_fn($in1) . " to " . _pretty_fn($out_fn) . " " . ($is_final_output ? " (final)" : "") . " " . join(" ", @merge_options);
444             };
445             print "# segments_merge2_run: merge start $debug_msg\n"
446 0 0         if ($self->{_debug});
447             new Fsdb::Filter::dbmerge2(@merge_options,
448             '--input' => $in0,
449             '--input' => $in1,
450             '--output' => $out_fn,
451 0           @{$self->{_sort_argv}});
  0            
452             print "# segments_merge2_run: merge finish $debug_msg\n"
453 0 0         if ($self->{_debug});
454              
455 0           $self->segment_cleanup($in0);
456 0           $self->segment_cleanup($in1);
457 0 0         if (!$is_final_output) {
458             print "# segments_merge2_run: merge closing out " . ref($out_fn) . " $debug_msg\n"
459 0 0         if ($self->{_debug});
460             };
461             }
462              
463              
464             =head2 enqueue_work
465              
466             $self->enqueue_work($depth, $work);
467              
468             Internal: put $WORK on the queue at $DEPTH, updating the max count.
469              
470             =cut
471              
472             sub enqueue_work($$$) {
473 0     0 1   my($self, $depth, $work) = @_;
474 0   0       $self->{_work_max_files}[$depth] //= 0;
475 0           $self->{_work_max_files}[$depth]++;
476 0           push(@{$self->{_work}[$depth]}, $work);
  0            
477             };
478              
479             =head2 segments_merge_one_depth
480              
481             $self->segments_merge_one_depth($depth);
482              
483             Merge queued files, if any.
484              
485             Also release any queued threads.
486              
487             =cut
488              
489             sub segments_merge_one_depth($$) {
490 0     0 1   my($self, $depth) = @_;
491              
492 0           my $work_depth_ref = $self->{_work}[$depth];
493 0 0         return if ($#$work_depth_ref == -1); # no work at this dpeth for now
494              
495 0           my $closed = $self->{_work_closed}[$depth];
496              
497 0 0         print "# segments_merge_one_depth: scanning $depth\n" if ($self->{_debug});
498             #
499             # Merge the files in a binary tree.
500             #
501             # In the past, we did this in a very clever
502             # a file-system-cache-friendly order, based on ideas from
503             # "Information and Control in Gray-box Systems" by
504             # the Arpaci-Dusseaus at SOSP 2001.
505             #
506             # Unfortunately, this optimization makes the sort unstable
507             # and complicated,
508             # so it was dropped when paralleism was added.
509             #
510 0 0         while ($#{$work_depth_ref} >= ($closed ? 0 : 3)) {
  0            
511 0 0         if ($#{$work_depth_ref} == 0) {
  0            
512 0 0         last if (!$closed);
513             # one left, just punt it next
514 0 0         print "# segments_merge_one_depth: runt at depth $depth pushed to next depth.\n" if ($self->{_debug});
515 0           $self->enqueue_work($depth + 1, shift @{$work_depth_ref});
  0            
516 0 0         die "internal error\n" if ($#{$work_depth_ref} != -1);
  0            
517 0           last;
518             };
519             # are they blocked? force-start them if they are
520 0           my $waiting_on_inputs = 0;
521 0           foreach my $i (0..1) {
522 0           my $work_ref = $work_depth_ref->[$i];
523 0 0         if ($work_ref->[0] == -1) {
    0          
    0          
    0          
524 0 0         print "# segments_merge_one_depth: depth $depth forced start on $work_ref->[1].\n" if ($self->{_debug});
525 0           &{$work_ref->[2]}($work_ref); # start it
  0            
526 0           $waiting_on_inputs++;
527             } elsif ($work_ref->[0] == 0) {
528 0 0         print "# segments_merge_one_depth: depth $depth waiting on working " . _pretty_fn($work_ref->[1]) . ".\n" if ($self->{_debug});
529 0           $waiting_on_inputs++;
530             } elsif ($work_ref->[0] == 1) {
531             # running, so move fred to zombie queue; otherwise ok
532 0 0         print "# segments_merge_one_depth: pushing job to zombie list.\n" if ($self->{_debug});
533 0           push(@{$self->{_zombie_work}}, $work_ref);
  0            
534             } elsif ($work_ref->[0] == 2) {
535             # input is done
536             } else {
537 0           die "interal error: unknown status $work_ref->[0]\n";
538             };
539             };
540             # bail out if inputs are not done yet.
541 0 0         return if ($waiting_on_inputs);
542              
543             # now we KNOW we do not have blocked work
544 0           my(@two_fn) = (shift @${work_depth_ref}, shift @${work_depth_ref});
545 0           my $output_type = 'file';
546 0 0 0       if ($closed && $#{$work_depth_ref} == -1 && $depth == $#{$self->{_work}}) {
  0 0 0        
  0   0        
      0        
547 0           $output_type = 'final';
548             } elsif ($self->{_endgame} && $closed && $self->{_work_max_files}[$depth] <= $self->{_endgame_max_files}) {
549 0 0         print "# segments_merge_one_depth: endgame parallelism.\n" if ($self->{_debug});
550 0           $output_type = 'ipc';
551             };
552 0           my($out_fn) = $self->segment_next_output($output_type);
553 0 0         print "# segments_merge_one_depth: depth $depth planning " . _pretty_fn($two_fn[0][1]) . " and " . _pretty_fn($two_fn[1][1]) . " to " . _pretty_fn($out_fn) . ".\n" if ($self->{_debug});
554              
555 0           foreach my $i (0..1) {
556 0 0 0       next if (ref($two_fn[$i][1]) =~ /^(Fsdb::BoundedQueue|IO::)/ || $two_fn[$i][1] eq '-');
557 0 0         croak $self->{_prog} . ": file $two_fn[$i][1] is missing.\n"
558             if (! -f $two_fn[$i][1]);
559             };
560              
561 0 0         if ($output_type eq 'final') {
562             # last time: do it here, in-line
563             # so that we update $self->{_out} in the main thread
564 0           $self->segments_merge2_run($out_fn, 1, $two_fn[0][1], $two_fn[1][1], $self->_unique_id());
565 0           return;
566             };
567             #
568             # fork a Fred to do the merge
569             #
570 0 0         my $out_fn_reader = (ref($out_fn) eq 'ARRAY') ? $out_fn->[0] : $out_fn;
571 0 0         my $out_fn_writer = (ref($out_fn) eq 'ARRAY') ? $out_fn->[1] : $out_fn;
572 0           $out_fn = undef;
573 0           my $new_work_ref = [-1, $out_fn_reader, undef];
574 0           my $unique_id = $self->_unique_id();
575 0           my $desc = "dbmerge2($two_fn[0][1],$two_fn[1][1]) => $out_fn_reader (id $unique_id)";
576             my $start_sub = sub {
577 0     0     $self->{_parallelism_available}--;
578 0 0         $new_work_ref->[0] = ($output_type eq 'ipc' ? 1 : 0); # running
579             my $fred = new Fsdb::Support::Freds($desc, sub {
580 0 0         $out_fn_reader->close if ($output_type eq 'ipc');
581 0 0         print "# segments_merge_one_depth: Fred start $desc\n" if ($self->{_debug});
582 0           $self->segments_merge2_run($out_fn_writer, 0, $two_fn[0][1], $two_fn[1][1], $unique_id);
583 0 0 0       sleep(1) if (defined($self->{_test}) && $self->{_test} eq 'delay-finish');
584 0 0         print "# segments_merge_one_depth: Fred end $desc\n" if ($self->{_debug});
585 0           exit 0;
586             }, sub {
587 0           my($done_fred, $exit_code) = @_;
588             # we're done!
589 0 0         print "# segments_merge_one_depth: Fred post-mortem $desc\n" if ($self->{_debug});
590             # xxx: with TEST/dbmerge_3_input.cmd I sometimes get exit code 255 (!) although things complete.
591             # turned out Fsdb::Support::Freds::END was messing with $?.
592 0 0         croak "dbmerge: merge2 subprocess $desc, exit code: $exit_code\n" if ($exit_code != 0);
593 0           $new_work_ref->[0] = 2; # done
594 0           });
595 0           $new_work_ref->[2] = $fred;
596 0 0         $out_fn_writer->close if ($output_type eq 'ipc'); # discard
597 0           };
598 0           $new_work_ref->[2] = $start_sub;
599             # Put the thread in our queue,
600             # and run it if it's important (pipe or final),
601             # or if we have the capacity.
602 0           $self->enqueue_work($depth + 1, $new_work_ref);
603 0 0 0       &$start_sub() if ($output_type ne 'file' || $self->{_parallelism_available} > 0);
604 0 0         print "# segments_merge_one_depth: looping after $desc.\n" if ($self->{_debug});
605             };
606             # At this point all possible work has been queued and maybe started.
607             # If the depth is closed, the work should be empty.
608             # if not, there may be some files in the queue
609             }
610              
611             =head2 segments_xargs
612              
613             $self->segments_xargs();
614              
615             Internal: read new filenames to process (from stdin)
616             and send them to the work queue.
617              
618             Making a separate Fred to handle xargs is a lot of work,
619             but it guarantees it comes in on an IO::Handle that is selectable.
620              
621             =cut
622              
623             sub segments_xargs($) {
624 0     0 1   my($self) = @_;
625 0           my $ipc = $self->{_xargs_ipc_writer};
626              
627 0           my $num_inputs = 0;
628              
629             # read files as in fsdb format
630 0 0         if ($#{$self->{_inputs}} == 0) {
  0            
631             # hacky...
632 0           $self->{_input} = $self->{_inputs}[0];
633             };
634 0           $self->finish_io_option('input', -header => '#fsdb filename');
635 0           my $read_fastpath_sub = $self->{_in}->fastpath_sub();
636 0           while (my $fref = &$read_fastpath_sub()) {
637             # send each file for processing as level zero
638 0 0         print "# dbmerge: segments_xargs: got $fref->[0]\n" if ($self->{_debug});
639 0           $ipc->print($fref->[0] . "\n");
640 0           $num_inputs++;
641             };
642             # if ($num_inputs <= 1) {
643             # $ipc->print("-1\terror: --xargs, but zero or one input files; dbmerge needs at least two.\n"); # signal eof-f
644             # };
645 0           $ipc->close;
646 0           $self->{_ipc_writer} = undef;
647             }
648              
649              
650             =head2 segments_merge_all
651              
652             $self->segments_merge_all()
653              
654             Internal:
655             Merge queued files, if any.
656             Iterates over all depths of the merge tree,
657             and handles any forked threads.
658              
659             =head3 Merging Strategy
660              
661             Merging is done in a binary tree is managed through the C<_work> queue.
662             It has an array of C entries,
663             one for each level of the tree.
664              
665             Items are processed in order at each level of the tree,
666             and only level-by-level, so the sort is stable.
667              
668             =head3 Parallelism Model
669              
670             Parallelism is also managed through the C<_work> queue,
671             each element of which consists of one file or stream suitable for merging.
672             The work queue contains both ready output (files or BoundedQueue streams)
673             that can be immediately handled, and pairs of semaphore/pending output
674             for work that is not yet started.
675             All manipulation of the work queue happens in the main thread
676             (with C and C).
677              
678             We start a thread to handle each item in the work queue,
679             and limit parallelism to the C<_max_parallelism>,
680             defaulting to the number of available processors.
681              
682             There two two kinds of parallelism, regular and endgame.
683             For regular parallelism we pick two items off the work queue,
684             merge them, and put the result back on the queue as a new file.
685             Items in the work queue may not be ready. For in-progress items we
686             wait until they are done. For not-yet-started items
687             we start them, then wait until they are done.
688              
689             Endgame parallelism handles the final stages of a large merge.
690             When there are enough processors that we can start a merge jobs
691             for all remaining levels of the merge tree. At this point we switch
692             from merging to files to merging into C pipelines
693             that connect merge processes which start and run concurrently.
694              
695             The final merge is done in the main thread so that that the main thread
696             can handle the output stream and recording the merge action.
697              
698             =cut
699              
700             sub segments_merge_all($) {
701 0     0 1   my($self) = @_;
702              
703 0           my $xargs_select;
704 0 0         if ($self->{_xargs}) {
705 0           $xargs_select = IO::Select->new();
706 0           $xargs_select->add($self->{_xargs_ipc_reader});
707             };
708              
709             #
710             # Alternate forking off new merges from finished files
711             # and reaping finished processes.
712             #
713 0           my $overall_progress = 0;
714 0           my $PROGRESS_START = 0.001;
715 0           my $PROGRESS_MAX = 2;
716 0           my $PROGRESS_MULTIPLIER = 2;
717 0           my $progress_backoff = $PROGRESS_START;
718 0           for (;;) {
719             # done?
720 0           my $deepest = $#{$self->{_work}};
  0            
721 0 0 0       last if ($self->{_work_closed}[$deepest] && $#{$self->{_work}[$deepest]} <= 0);
  0            
722              
723             #
724             # First, put more work on the queue, where possible.
725             #
726             # Go through this loop multiple times because closing the last depth
727             # can actually allow work to start at the last+1 depth,
728             # and in endgame mode we risk blocking (due to flow control)
729             # if we don't start threads at all depths.
730             #
731 0           my $try_again = 1;
732 0           while ($try_again) {
733 0           foreach my $depth (0..$#{$self->{_work}}) {
  0            
734 0           $self->segments_merge_one_depth($depth);
735 0           $try_again = undef;
736 0 0 0       if ($#{$self->{_work}[$depth]} == -1 && $self->{_work_closed}[$depth]) {
  0            
737             # When one level is all in progress, we can close the next.
738 0           my $next_depth = $depth + 1;
739 0 0         if (!$self->{_work_closed}[$next_depth]) {
740 0           $self->{_work_closed}[$next_depth] = 1;
741 0           $try_again = 1;
742 0           $overall_progress++;
743 0 0         print "# segments_merge_all: closed work depth $next_depth\n" if ($self->{_debug});
744             };
745             };
746             };
747             };
748              
749             #
750             # Next, handle Freds that have finished.
751             # Reap as many as possible.
752             #
753 0 0         print "# segments_merge_all: reaping threads\n" if ($self->{_debug});
754 0           for (;;) {
755 0           my $fred_or_code = Fsdb::Support::Freds::join_any();
756 0 0         last if (ref($fred_or_code) eq '');
757 0           $overall_progress++;
758 0 0         croak "dbmerge: merge thread failed\n"
759             if ($fred_or_code->exit_code() != 0);
760 0 0         print "# segments_merge_all: merged fred " . $fred_or_code->info() . "\n" if ($self->{_debug});
761             };
762              
763             #
764             # Now start up more parallelism, if possible.
765             #
766 0           my $depth = 0;
767 0           my $i = 0;
768 0           while ($self->{_parallelism_available} > 0) {
769 0           my $work_ref = $self->{_work}[$depth][$i];
770 0 0 0       if (defined($work_ref) && $work_ref->[0] == -1) {
771             # start it (it will decrement parallelism)
772 0           &{$work_ref->[2]}($work_ref);
  0            
773 0           $overall_progress++;
774             };
775             # walk the whole work queue
776 0 0         if (++$i > $#{$self->{_work}[$depth]}) {
  0            
777 0 0         last if (++$depth > $#{$self->{_work}});
  0            
778 0           $i = 0;
779             };
780             };
781              
782             #
783             # Handle xargs, if any.
784             #
785             # Unfortunately, we busy-loop here,
786             # because we need to alternate reaping finished processes
787             # and xargs.
788             #
789             # Fortunately, this terminates when xargs are complete.
790             #
791 0 0         if ($self->{_xargs_ipc_status}) {
792 0           my(@ready) = $xargs_select->can_read($progress_backoff);
793 0           foreach my $fh (@ready) {
794 0           my ($fn) = $fh->getline;
795 0 0         if (defined($fn)) {
796 0           chomp($fn);
797             $self->{_files_cleanup}{$fn} = 'unlink'
798 0 0         if ($self->{_remove_inputs});
799 0           $self->enqueue_work(0, [2, $fn, undef]);
800 0 0         print "# xargs receive $fn\n" if ($self->{_debug});
801             } else {
802             # eof, so no more xargs
803 0           $self->{_work_closed}[0] = 1;
804 0           $self->{_xargs_ipc_status} = undef;
805             };
806 0           $overall_progress++;
807             };
808             };
809             #
810             # Avoid spinlooping.
811             #
812 0 0         if (!$overall_progress) {
813             # No progress, so stall.
814 0 0         print "# segments_merge_all: stalling for $progress_backoff\n" if ($self->{_debug});
815 0           sleep($progress_backoff);
816 0           $progress_backoff *= $PROGRESS_MULTIPLIER; # exponential backoff
817 0 0         $progress_backoff = $PROGRESS_MAX if ($progress_backoff > $PROGRESS_MAX);
818             } else {
819             # Woo-hoo, did something. Rush right back and try again.
820 0           $overall_progress = 0;
821 0           $progress_backoff = $PROGRESS_START;
822             };
823             };
824              
825             # reap endgame zombies
826 0           while (my $zombie_work_ref = shift(@{$self->{_zombie_work}})) {
  0            
827 0 0         next if ($zombie_work_ref->[0] == 2);
828 0 0         print "# waiting on zombie " . $zombie_work_ref->[2]->info() . "\n" if ($self->{_debug});
829 0           $zombie_work_ref->[2]->join();
830 0 0         croak "internal error: zombie didn't reset status\n" if ($zombie_work_ref->[0] != 2);
831             };
832              
833             # reap xargs (if it didn't already get picked up)
834 0 0         if ($self->{_xargs_fred}) {
835 0 0         print "# waiting on xargs fred\n" if ($self->{_debug});
836 0           $self->{_xargs_fred}->join();
837             };
838             #
839             }
840              
841              
842              
843             =head2 setup
844              
845             $filter->setup();
846              
847             Internal: setup, parse headers.
848              
849             =cut
850              
851             sub setup($) {
852 0     0 1   my($self) = @_;
853              
854             croak $self->{_prog} . ": no sorting key specified.\n"
855 0 0         if ($#{$self->{_sort_argv}} == -1);
  0            
856              
857 0 0 0       if (!$self->{_xargs} && $#{$self->{_inputs}} == -1) {
  0            
858 0           croak $self->{_prog} . ": no input sources specified, use --input or --xargs.\n";
859             };
860 0 0 0       if (!$self->{_xargs} && $#{$self->{_inputs}} == 0) {
  0            
861 0           croak $self->{_prog} . ": only one input source, but can't merge one file.\n";
862             };
863 0 0 0       if ($self->{_xargs} && $#{$self->{_inputs}} > 0) {
  0            
864 0           croak $self->{_prog} . ": --xargs and multiple inputs (perhaps you meant NOT --xargs?).\n";
865             };
866             # prove files exist (early error checking)
867 0           foreach (@{$self->{_inputs}}) {
  0            
868 0 0         next if (ref($_) ne ''); # skip objects
869 0 0         next if ($_ eq '-'); # special case: stdin
870 0 0         if (! -f $_) {
871 0           croak $self->{_prog} . ": input source $_ does not exist.\n";
872             };
873             };
874 0 0         if ($self->{_remove_inputs}) {
875 0           foreach (@{$self->{_inputs}}) {
  0            
876 0 0         $self->{_files_cleanup}{$_} = 'unlink'
877             if ($_ ne '-');
878             };
879             };
880             #
881             # the _work queue consists of
882             # 1. [2, filenames, fred] that for completed files need to be merged.
883             # 1a. [1, IO::Handle::Pipe, fred] for files that are being processed
884             # and can be merged (but are not yet done).
885             # 2. [-1, filename, $start_sub] for blocked threads that, when started
886             # by evaling $start_sub, will go to filename.
887             # 3. [0, filename, fred] for files in the process of being computed
888             #
889             # Filename can be an Fsdb::BoundedQueue or IO::Pipe objects for endgame mode threads
890             #
891             # _work_closed is set when that depth is no longer growing;
892             # at that time _work_depth_files is the maximum number of files there.
893             #
894             # Put stuff on it with $self->enqueue_work to keep the
895             # related variables correct.
896             #
897 0           $self->{_work}[0] = [];
898 0           $self->{_work_closed}[0] = 0;
899 0           $self->{_work_depth_files}[0] = 0;
900 0           $self->{_xargs_ipc_status} = undef;
901 0           $self->{_zombie_work} = []; # collect in-process but not-yet-done freds
902 0 0         if (!$self->{_xargs}) {
903 0           foreach (@{$self->{_inputs}}) {
  0            
904 0           $self->enqueue_work(0, [2, $_, undef]);
905             };
906 0           $self->{_work_closed}[0] = 1;
907             } else {
908 0           my $xargs_ipc_reader = new IO::Handle;
909 0           my $xargs_ipc_writer = new IO::Handle;
910 0 0         pipe($xargs_ipc_reader, $xargs_ipc_writer) or croak "cannot open pipe\n";
911 0           $self->{_xargs_ipc_status} = 'running';
912             $self->{_xargs_fred} = new Fsdb::Support::Freds('dbmerge:xargs',
913             sub {
914 0     0     $SIG{PIPE} = 'IGNORE'; # in case we finish before main reads anything
915 0           $xargs_ipc_reader->close;
916 0           $xargs_ipc_writer->autoflush(1);
917 0           $self->{_xargs_ipc_writer} = $xargs_ipc_writer;
918 0           $self->segments_xargs();
919 0           exit 0;
920             }, sub {
921 0     0     $self->{_xargs_ipc_status} = 'completed';
922 0           });
923 0           $xargs_ipc_reader->autoflush(1);
924 0           $xargs_ipc_writer->close;
925 0           $self->{_xargs_ipc_reader} = $xargs_ipc_reader;
926             # actual xargs reception happens in our main loop in segments_merge_all()
927             };
928             #
929             # control parallelism
930             #
931             # For the endgame, we overcommit by a large factor
932             # because in the merge tree many become blocked on the IO pipeline.
933             #
934 0   0       $self->{_max_parallelism} //= Fsdb::Support::OS::max_parallelism();
935 0   0       $self->{_parallelism_available} //= $self->{_max_parallelism};
936 0           my $viable_endgame_processes = $self->{_max_parallelism};
937 0           my $files_to_merge = 1;
938 0           while ($viable_endgame_processes > 0) {
939 0           $viable_endgame_processes -= $files_to_merge;
940 0           $files_to_merge *= 2;
941             };
942 0           $self->{_endgame_max_files} = int($files_to_merge);
943 0 0         STDOUT->autoflush(1) if ($self->{_debug});
944 0 0         print "# dbmerge: endgame_max_files: " . $self->{_endgame_max_files} . "\n" if($self->{_debug});
945             }
946              
947             =head2 run
948              
949             $filter->run();
950              
951             Internal: run over each rows.
952              
953             =cut
954             sub run($) {
955 0     0 1   my($self) = @_;
956              
957 0           $self->segments_merge_all();
958             };
959              
960            
961              
962             =head1 AUTHOR and COPYRIGHT
963              
964             Copyright (C) 1991-2015 by John Heidemann
965              
966             This program is distributed under terms of the GNU general
967             public license, version 2. See the file COPYING
968             with the distribution for details.
969              
970             =cut
971              
972             1;