File Coverage

blib/lib/Fsdb/Filter/dbsort.pm
Criterion Covered Total %
statement 27 148 18.2
branch 0 54 0.0
condition 0 9 0.0
subroutine 9 28 32.1
pod 10 10 100.0
total 46 249 18.4


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbsort.pm
5             # Copyright (C) 1991-2016 by John Heidemann
6             #
7             # This program is distributed under terms of the GNU general
8             # public license, version 2. See the file COPYING
9             # in $dblibdir for details.
10             #
11              
12             package Fsdb::Filter::dbsort;
13              
14             =head1 NAME
15              
16             dbsort - sort rows based on the the specified columns
17              
18             =head1 SYNOPSIS
19              
20             dbsort [-M MemLimit] [-T TemporaryDirectory] [-nNrR] column [column...]
21              
22             =head1 DESCRIPTION
23              
24             Sort all input rows as specified by the numeric or lexical columns.
25              
26             Dbsort consumes a fixed amount of memory regardless of input size.
27             (It reverts to temporary files on disk if necessary, based on the -M
28             and -T options.)
29              
30             The sort should be stable, but this has not yet been verified.
31              
32             For large inputs (those that spill to disk),
33             L will do some of the merging in parallel, if possible.
34             The B<--parallel> option can control the degree of parallelism,
35             if desired.
36              
37             =head1 OPTIONS
38              
39             General option:
40              
41             =over 4
42              
43             =item B<-M MaxMemBytes>
44              
45             Specify an approximate limit on memory usage (in bytes).
46             Larger values allow faster sorting because more operations
47             happen in-memory, provided you have enough memory.
48              
49             =item B<-T TmpDir>
50              
51             where to put tmp files.
52             Also uses environment variable TMPDIR, if -T is
53             not specified.
54             Default is /tmp.
55              
56             =item B<--parallelism N> or B<-j N>
57              
58             Allow up to N merges to happen in parallel.
59             Default is the number of CPUs in the machine.
60              
61             =back
62              
63             Sort specification options (can be interspersed with column names):
64              
65             =over 4
66              
67             =item B<-r> or B<--descending>
68              
69             sort in reverse order (high to low)
70              
71             =item B<-R> or B<--ascending>
72              
73             sort in normal order (low to high)
74              
75             =item B<-n> or B<--numeric>
76              
77             sort numerically
78              
79             =item B<-N> or B<--lexical>
80              
81             sort lexicographically
82              
83             =back
84              
85             =for comment
86             begin_standard_fsdb_options
87              
88             This module also supports the standard fsdb options:
89              
90             =over 4
91              
92             =item B<-d>
93              
94             Enable debugging output.
95              
96             =item B<-i> or B<--input> InputSource
97              
98             Read from InputSource, typically a file name, or C<-> for standard input,
99             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
100              
101             =item B<-o> or B<--output> OutputDestination
102              
103             Write to OutputDestination, typically a file name, or C<-> for standard output,
104             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
105              
106             =item B<--autorun> or B<--noautorun>
107              
108             By default, programs process automatically,
109             but Fsdb::Filter objects in Perl do not run until you invoke
110             the run() method.
111             The C<--(no)autorun> option controls that behavior within Perl.
112              
113             =item B<--header H>
114              
115             Use H as the full Fsdb header, rather than reading a header from
116             then input.
117              
118             =item B<--help>
119              
120             Show help.
121              
122             =item B<--man>
123              
124             Show full manual.
125              
126             =back
127              
128             =for comment
129             end_standard_fsdb_options
130              
131              
132             =head1 SAMPLE USAGE
133              
134             =head2 Input:
135              
136             #fsdb cid cname
137             10 pascal
138             11 numanal
139             12 os
140              
141             =head2 Command:
142              
143             cat data.fsdb | dbsort cname
144              
145             =head2 Output:
146              
147             #fsdb cid cname
148             11 numanal
149             12 os
150             10 pascal
151             # | dbsort cname
152              
153             =head1 SEE ALSO
154              
155             L,
156             L,
157             L
158              
159             =head1 CLASS FUNCTIONS
160              
161             =cut
162              
163              
164             @ISA = qw(Fsdb::Filter);
165             ($VERSION) = 2.0;
166              
167 1     1   9490 use strict;
  1         3  
  1         38  
168 1     1   7 use Pod::Usage;
  1         5  
  1         129  
169 1     1   8 use Carp;
  1         3  
  1         66  
170              
171 1     1   8 use Fsdb::IO::Reader;
  1         3  
  1         27  
172 1     1   6 use Fsdb::IO::Writer;
  1         2  
  1         25  
173 1     1   7 use Fsdb::Filter;
  1         93  
  1         30  
174 1     1   7 use Fsdb::Support::NamedTmpfile;
  1         3  
  1         34  
175 1     1   499 use Fsdb::Filter::dbmerge;
  1         4  
  1         46  
176 1     1   515 use Fsdb::Filter::dbpipeline qw(dbpipeline_sink dbmerge);
  1         6  
  1         3144  
177              
178             my($PERL_MEM_OVERHEAD) = 50; # approx. bytes of overhead for each record in mem
179             my($PERL_MEM_SCALING) = 2; # divided user requested mem by this factor to account to perl memory usage (huge approximatation)
180              
181              
182             =head2 new
183              
184             $filter = new Fsdb::Filter::dbsort(@arguments);
185              
186             Create a new object, taking command-line arguments.
187              
188             =cut
189              
190             sub new ($@) {
191 0     0 1   my $class = shift @_;
192 0           my $self = $class->SUPER::new(@_);
193 0           bless $self, $class;
194 0           $self->set_defaults;
195 0           $self->parse_options(@_);
196 0           $self->SUPER::post_new();
197 0           return $self;
198             }
199              
200              
201             =head2 set_defaults
202              
203             $filter->set_defaults();
204              
205             Internal: set up defaults.
206              
207             =cut
208              
209             sub set_defaults ($) {
210 0     0 1   my $self = shift @_;
211 0           $self->SUPER::set_defaults();
212 0           $self->{_max_memory} = 1024*1024*256;
213 0           $self->{_mem_debug} = undef;
214 0           $self->{_sort_argv} = [];
215 0           $self->set_default_tmpdir;
216 0           $self->{_max_parallelism} = undef;
217 0           $self->{_header} = undef;
218             }
219              
220             =head2 parse_options
221              
222             $filter->parse_options(@ARGV);
223              
224             Internal: parse command-line arguments.
225              
226             =cut
227              
228             sub parse_options ($@) {
229 0     0 1   my $self = shift @_;
230              
231 0           my(@argv) = @_;
232             $self->get_options(
233             \@argv,
234 0     0     'help|?' => sub { pod2usage(1); },
235 0     0     'man' => sub { pod2usage(-verbose => 2); },
236             'autorun!' => \$self->{_autorun},
237             'close!' => \$self->{_close},
238             'd|debug+' => \$self->{_debug},
239             'header=s' => \$self->{_header},
240 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
241             'j|parallelism=i' => \$self->{_max_parallelism},
242             'log!' => \$self->{_logprog},
243             'M|maxmemory=i' => \$self->{_max_memory},
244 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
245             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
246             # sort key options:
247 0     0     'n|numeric' => sub { $self->parse_sort_option(@_); },
248 0     0     'N|lexical' => sub { $self->parse_sort_option(@_); },
249 0     0     'r|descending' => sub { $self->parse_sort_option(@_); },
250 0     0     'R|ascending' => sub { $self->parse_sort_option(@_); },
251 0     0     '<>' => sub { $self->parse_sort_option('<>', @_); },
252 0 0         ) or pod2usage(2);
253 0 0         croak $self->{_prog} . ": internal error, extra arguments.\n"
254             if ($#argv != -1);
255             }
256              
257              
258             =head2 setup
259              
260             $filter->setup();
261              
262             Internal: setup, parse headers.
263              
264             =cut
265              
266             sub setup($) {
267 0     0 1   my($self) = @_;
268              
269             croak $self->{_prog} . ": no sorting key specified.\n"
270 0 0         if ($self->{_sort_argv} == -1);
271              
272             #
273             # setup final IO
274             #
275 0           my(@finish_args) = (-comment_handler => $self->create_delay_comments_sub);
276 0 0         push (@finish_args, -header => $self->{_header}) if (defined($self->{_header}));
277 0           $self->finish_io_option('input', @finish_args);
278              
279 0           $self->{_compare_code} = $self->create_compare_code($self->{_in}, $self->{_in});
280             croak $self->{_prog} . ": no sort field specified.\n"
281 0 0         if (!defined($self->{_compare_code}));
282 0 0         print "COMPARE CODE:\n\t" . $self->{_compare_code} . "\n" if ($self->{_debug});
283 0           my $compare_sub;
284 0           eval '$self->{_compare_sub} = $compare_sub = ' . $self->{_compare_code} . ';';
285 0 0         $@ && croak $self->{_prog} . ": internal eval error in compare code: $@.\n";
286             # $@ && croak "dbsort.pm: internal eval error in compare code: $@.\n";
287             }
288              
289             =head2 segment_start
290              
291             $self->segment_start(\@rows);
292              
293             Sorting happens internally,
294             to handle large things in pieces if necessary.
295              
296             call C<$self->segment_start> to init things and to restart after an overflow
297             C<$self->segment_overflow> to close one segment and start the next,
298             and C<$self->segment_merge_finish> to put them back together again.
299              
300             Note that we don't invoke the merge code unless the data
301             exceeds some threshold size, so small sorts happen completely
302             in memory.
303              
304             Once we give up on memory, all the merging happens by making
305             passes over the disk files.
306              
307             =cut
308              
309             sub segment_start ($\@) {
310 0     0 1   my ($self, $rows_ref) = @_;
311              
312 0           $#{$rows_ref} = -1; # truncate array
  0            
313             }
314              
315             =head2 segment_next_output
316              
317             $out = $self->segment_next_output($input_finished)
318              
319             Internal: return a Fsdb::IO::Writer as $OUT
320             that either points to our output or a temporary file,
321             depending on how things are going.
322              
323             =cut
324              
325             sub segment_next_output($$) {
326 0     0 1   my ($self, $input_finished) = @_;
327 0   0       my $final_output = ($#{$self->{_files_to_merge}} == -1 && $input_finished);
328 0           my $out;
329 0 0         if ($final_output) {
330 0 0         if (!defined($self->{_merge_fred})) {
331             # setup output
332             # (if merging, then we did this when we forked the merge thread)
333 0           $self->finish_io_option('output', -clone => $self->{_in});
334             };
335 0           $out = $self->{_out};
336 0 0         print "# dbsort segment_next_output: final output\n" if ($self->{_debug});
337             } else {
338             # dump to a file for merging
339 0           my $tmpfile = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir});
340 0           $out = $tmpfile; # just return the name
341 0           push(@{$self->{_files_to_merge}}, $tmpfile);
  0            
342 0 0         print "# dbsort segment_next_output: intermediate file: $tmpfile\n" if ($self->{_debug});
343             };
344 0           return ($out, $final_output);
345             }
346              
347             =head2 segment_overflow
348              
349             $self->segment_overflow(\@rows, $input_finished)
350              
351             Called to sort @ROWS, writing them to the appropriate place.
352             $INPUT_FINISHED is set if all input has been read.
353              
354             =cut
355              
356             #sub so1 {
357             # my ($self, $rows_ref) = @_;
358             # my(@sorted_rows) = sort { $a->[0] <=> $b->[0] } @{$rows_ref};
359             # return @sorted_rows;
360             #}
361              
362             sub segment_overflow($\@$) {
363 0     0 1   my($self, $rows_ref, $input_finished) = @_;
364              
365 0           my $compare_sub = $self->{_compare_sub};
366 0           my(@sorted_rows) = sort $compare_sub @{$rows_ref};
  0            
367             # my(@sorted_rows) = $self->so1($rows_ref);
368              
369 0           my ($out_fn, $final_output) = $self->segment_next_output($input_finished, 'Fsdb:IO');
370 0           my $out;
371 0 0         if (ref($out_fn) =~ /^Fsdb::IO::Writer/) {
372 0 0         die "dbsort segment_overflow: suprise writer and NOT final_output\n"
373             if (!$final_output);
374 0           $out = $out_fn; # a bit hacky, but whatever
375             } else {
376 0 0         die "dbsort segment_overflow: suprise filename and final_output\n"
377             if ($final_output);
378 0           $out = new Fsdb::IO::Writer(-file => $out_fn, -clone => $self->{_in});
379             };
380              
381 0           my $write_fastpath_sub = $out->fastpath_sub;
382 0           foreach (@sorted_rows) {
383 0           &$write_fastpath_sub($_);
384             };
385              
386 0 0         if (!$final_output) {
387 0           $out->close;
388 0           $self->segment_merge_start($out_fn);
389 0           $self->segment_start($rows_ref);
390             };
391             }
392              
393             =head2 segment_merge_start
394              
395             $self->segment_merge_start($fn);
396              
397             Start merging on file $FN.
398             Fork off a merge thread, if necessary.
399              
400             =cut
401             sub segment_merge_start($$) {
402 0     0 1   my($self, $fn) = @_;
403              
404 0 0         if (!defined($self->{_merge_fred})) {
405             # create our output so we can give it to merge-thread
406 0           $self->finish_io_option('output', -clone => $self->{_in}); # , -outputheader => 'never');
407              
408 0 0         print "# forking merge thread\n" if ($self->{_debug});
409 0           my(@writer_args) = (-cols => [qw(filename)], -autoflush => 1);
410 0           my(@merge_args) = qw(--nolog --noclose --removeinputs --xargs);
411             push(@merge_args, '--parallelism', $self->{_max_parallelism})
412 0 0         if (defined($self->{_max_parallelism}));
413             push(@merge_args, '-T', $self->{_tmpdir})
414 0 0         if (defined($self->{_tmpdir}));
415 0           push(@merge_args, @{$self->{_sort_argv}});
  0            
416              
417             my($writer, $merge_fred) = dbpipeline_sink(\@writer_args,
418             '--output' => $self->{_out},
419 0           dbmerge(@merge_args));
420 0 0 0       croak "dbsort: internal error in invoking dbmerge\n"
421             if (!defined($writer) || !defined($merge_fred));
422 0           $self->{_merge_writer} = $writer;
423 0           $self->{_merge_fred} = $merge_fred;
424             };
425 0 0         print "# dbsort segment_merge_start: sending merge thread: $fn\n" if ($self->{_debug});
426 0           $self->{_merge_writer}->write_row($fn);
427             }
428              
429              
430             =head2 segment_merge_finish
431              
432             $self->segment_merge_finish();
433              
434             Merge queued files, if any.
435             Just call L to do all the real work.
436              
437             =cut
438              
439             sub segment_merge_finish($) {
440 0     0 1   my($self) = @_;
441 0 0         return if (!defined($self->{_merge_fred}));
442 0 0         return if ($#{$self->{_files_to_merge}} == -1);
  0            
443              
444 0 0         print "# final output\n" if ($self->{_debug});
445             # tell it we're done
446 0           $self->{_merge_writer}->close();
447             # and make it do its work
448 0           $self->{_merge_fred}->join();
449 0           $self->{_merge_fred} = undef;
450             }
451              
452              
453             =head2 run
454              
455             $filter->run();
456              
457             Internal: run over each rows.
458              
459             =cut
460             sub run ($) {
461 0     0 1   my($self) = @_;
462              
463             #
464             # read in and set up the data
465             #
466 0           $self->{_files_to_merge} = [];
467 0           my $read_fastpath_sub = $self->{_in}->fastpath_sub();
468              
469 0           my $fref; # the current row
470             my @rows; # an array of frefs for each row, $i long
471 0           my $memory_used = 0;
472 0           my $scaled_max_memory = int($self->{_max_memory} / (1.0 * $PERL_MEM_SCALING));
473 0           my $row_mem_overhead = $PERL_MEM_OVERHEAD * ($#{$self->{_in}->cols} + 2);
  0            
474            
475 0           $self->segment_start(\@rows);
476 0           my $i = 0;
477 0           while ($fref = &$read_fastpath_sub) {
478 0           push (@rows, $fref);
479 0           $memory_used += $row_mem_overhead;
480 0           foreach (@$fref) {
481 0           $memory_used += length($_);
482             };
483 0 0         if ($memory_used > $scaled_max_memory) {
484 0           $self->segment_overflow(\@rows);
485 0           $memory_used = 0;
486             };
487             };
488             # handle end case
489 0 0         $self->segment_overflow(\@rows, 1) if ($#rows > -1); # (spill any records in queued)
490             # merge, if necessary
491 0           $self->segment_merge_finish();
492             # handle the null case: no output
493 0 0 0       if ($#rows == -1 && $#{$self->{_files_to_merge}} == -1) {
  0            
494             # open _out, just so we can log ourselves in finish()
495 0           $self->finish_io_option('output', -clone => $self->{_in});
496             };
497             };
498            
499              
500              
501             =head1 AUTHOR and COPYRIGHT
502              
503             Copyright (C) 1991-2015 by John Heidemann
504              
505             This program is distributed under terms of the GNU general
506             public license, version 2. See the file COPYING
507             with the distribution for details.
508              
509             =cut
510              
511             1;