File Coverage

blib/lib/Fsdb/Filter/dbmapreduce.pm
Criterion Covered Total %
statement 32 262 12.2
branch 0 134 0.0
condition 0 26 0.0
subroutine 11 32 34.3
pod 6 6 100.0
total 49 460 10.6


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbmapreduce.pm
5             # Copyright (C) 1991-2016 by John Heidemann
6             #
7             # This program is distributed under terms of the GNU general
8             # public license, version 2. See the file COPYING
9             # in $dblibdir for details.
10             #
11              
12              
13             package Fsdb::Filter::dbmapreduce;
14              
15             =head1 NAME
16              
17             dbmapreduce - reduce all input rows with the same key
18              
19             =head1 SYNOPSIS
20              
21             dbmapreduce [-dMS] [-k KeyField] [-f CodeFile] [-C Filtercode] [--] [ReduceCommand [ReduceArguments...]]
22              
23             =head1 DESCRIPTION
24              
25             Group input data by KeyField,
26             then apply a function (the "reducer") to each group.
27             The reduce function can be an external program
28             given by ReduceCommand and ReduceArguments,
29             or an Perl subroutine given in CodeFile or FilterCode.
30              
31             If a "--" appears before reduce command,
32             arguments after the -- passed the the command.
33              
34              
35             =head2 Grouping (The Mapper)
36              
37             By default the KeyField is the first field in the row.
38             Unlike Hadoop streaming, the -k KeyField option can explicitly
39             name where the key is in any column of each input row.
40              
41             By default, we sort the data to make sure data is grouped by key.
42             If the input is already grouped, the C<-S> option avoids this cost.
43              
44              
45             =head2 The Reducer
46              
47             Reduce functions default to be shell commands.
48             However, with C<-C>, one can use arbitrary Perl code
49              
50             (see the C<-C> option below for details).
51             the C<-f> option is useful to specify complex Perl code
52             somewhere other than the command line.
53              
54             Finally, as a special case, if there are no rows of input,
55             the reducer will be invoked once with the empty value (if it's an external
56             reducer) or with undef (if it's a subroutine).
57             It is expected to generate the output header,
58             and it may generate no data rows itself, or a null data row
59             of its choosing.
60              
61             =head2 Output
62              
63             For non-multi-key-aware reducers,
64             we add the KeyField use for each Reduce
65             is in the output stream.
66             (If the reducer passes the key we trust that it gives a correct value.)
67             We also insure that the output field separator is the
68             same as the input field separator.
69              
70             Adding the key and adjusting the output field separator
71             is not possible for
72             non-multi-key-aware reducers.
73              
74              
75             =head2 Comparison to Related Work
76              
77             This program thus implements Google-style map/reduce,
78             but executed sequentially.
79              
80             For input, these systems include a map function and apply it to input data
81             to generate the key.
82             We assume this key generation (the map function)
83             has occurred head of time.
84              
85             We also allow the grouping key to be in any column.
86             Hadoop Streaming requires it to be in the first column.
87              
88             By default, the reducer gets exactly (and only) one key.
89             This invariant is stronger than Google and Hadoop.
90             They both pass multiple keys to the
91             reducer, insuring that each key is grouped together.
92             With the C<-M> option, we also pass multiple multiple groups to the reducer.
93              
94             Unlike those systems, with the C<-S> option
95             we do not require the groups arrive in any particular
96             order, just that they be grouped together.
97             (They guarantees they arrive in lexically sorted order).
98             However, with C<-S> we create lexical ordering.
99              
100             With C<--prepend-key> we insure that the KeyField is in the output stream;
101             other systems do not enforce this.
102              
103              
104             =head2 Assumptions and requirements
105              
106             By default, data can be provided in arbitrary order
107             and the program consumes O(number of unique tags) memory,
108             and O(size of data) disk space.
109              
110             With the C<-S> option, data must arrive group by tags (not necessarily sorted),
111             and the program consumes O(number of tags) memory and no disk space.
112             The program will check and abort if this precondition is not met.
113              
114             With two C<-S>'s, program consumes O(1) memory, but doesn't verify
115             that the data-arrival precondition is met.
116              
117             The field separators of the input and the output
118             can now be different
119             (early versions of this tool prohibited such variation.)
120             With C<--copy-fs> we copy the input field separator to the output,
121             but only for non-multi-key-aware reducers.
122             (this used to be done automatically).
123              
124              
125             =head2 Known bugs
126              
127             As of 2013-09-21, we don't verify key order with options C<-M -S>.
128              
129              
130             =head1 OPTIONS
131              
132             =over 4
133              
134             =item B<-k> or B<--key> KeyField
135              
136             specify which column is the key for grouping (default: the first column)
137              
138             =item B<-S> or B<--pre-sorted>
139              
140             Assume data is already grouped by tag.
141             Provided twice, it removes the validation of this assertion.
142              
143             =item B<-M> or B<--multiple-ok>
144              
145             Assume the ReduceCommand can handle multiple grouped keys,
146             and the ReduceCommand is responsible for outputting the
147             with each output row.
148             (By default, a separate ReduceCommand is run for each key,
149             and dbmapreduce adds the key to each output row.)
150              
151             =item B<-K> or B<--pass-current-key>
152              
153             Pass the current key as an argument to the external,
154             non-map-aware ReduceCommand.
155             This is only done optionally since some external commands
156             do not expect an extra argument.
157             (Internal, non-map-aware Perl reducers are always given
158             the current key as an argument.)
159              
160             =item B<--prepend-key>
161              
162             Add the current key into the reducer output
163             for non-multi-key-aware reducers only.
164             Not done by default.
165              
166             =item B<--copy-fs> or B<--copy-fieldseparator>
167              
168             Change the field separator of a
169             non-multi-key-aware reducers to match the input's field separator.
170             Not done by default.
171              
172             =item B<--parallelism=N> or B<-j N>
173              
174             Allow up to N reducers to run in parallel.
175             Default is the number of CPUs in the machine.
176              
177             =item B<-C FILTER-CODE> or B<--filter-code=FILTER-CODE>
178              
179             Provide FILTER-CODE, Perl code that generates and returns
180             a Fsdb::Filter object that implements the reduce function.
181             The provided code should be an anonymous sub
182             that creates a Fsdb Filter that implements the reduce object.
183              
184             The reduce object will then be called with --input and --output
185             parameters that hook it into a the reduce with queues.
186              
187             One sample fragment that works is just:
188              
189             dbcolstats(qw(--nolog duration))
190              
191             So this command:
192              
193             cat DATA/stats.fsdb | \
194             dbmapreduce -k experiment -C 'dbcolstats(qw(--nolog duration))'
195              
196             is the same as the example
197              
198             cat DATA/stats.fsdb | \
199             dbmapreduce -k experiment -- dbcolstats duration
200              
201             except that with C<-C> there is no forking and so things run faster.
202              
203             If C is invoked from within Perl, then one can use
204             a code SUB as well:
205             dbmapreduce(-k => 'experiment',
206             -C => sub { dbcolstats(qw(--nolong duration)) });
207              
208             The reduce object must consume I input as a Fsdb stream,
209             and close the output Fsdb stream. (If this assumption is not
210             met the map/reduce will be aborted.)
211              
212             For non-map-reduce-aware filters,
213             when the filter-generator code runs, C<$_[0]> will be the current key.
214              
215             =item B<-f CODE-FILE> or B<--code-file=CODE-FILE>
216              
217             Includes F in the program.
218             This option is useful for more complicated perl reducer functions.
219              
220             Thus, if reducer.pl has the code.
221              
222             sub make_reducer {
223             my($current_key) = @_;
224             dbcolstats(qw(--nolog duration));
225             }
226              
227             Then the command
228              
229             cat DATA/stats.fsdb | \
230             dbmapreduce -k experiment -f reducer.pl -C make_reducer
231              
232             does the same thing as the example.
233              
234              
235             =item B<-w> or B<--warnings>
236              
237             Enable warnings in user supplied code.
238             Warnings are issued if an external reducer fails to consume all input.
239             (Default to include warnings.)
240              
241             =item B<-T TmpDir>
242              
243             where to put tmp files.
244             Also uses environment variable TMPDIR, if -T is
245             not specified.
246             Default is /tmp.
247              
248             =back
249              
250             =for comment
251             begin_standard_fsdb_options
252              
253             This module also supports the standard fsdb options:
254              
255             =over 4
256              
257             =item B<-d>
258              
259             Enable debugging output.
260              
261             =item B<-i> or B<--input> InputSource
262              
263             Read from InputSource, typically a file name, or C<-> for standard input,
264             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
265              
266             =item B<-o> or B<--output> OutputDestination
267              
268             Write to OutputDestination, typically a file name, or C<-> for standard output,
269             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
270              
271             =item B<--autorun> or B<--noautorun>
272              
273             By default, programs process automatically,
274             but Fsdb::Filter objects in Perl do not run until you invoke
275             the run() method.
276             The C<--(no)autorun> option controls that behavior within Perl.
277              
278             =item B<--header> H
279              
280             Use H as the full Fsdb header, rather than reading a header from
281             then input.
282              
283             =item B<--help>
284              
285             Show help.
286              
287             =item B<--man>
288              
289             Show full manual.
290              
291             =back
292              
293             =for comment
294             end_standard_fsdb_options
295              
296              
297             =head1 SAMPLE USAGE
298              
299             =head2 Input:
300              
301             #fsdb experiment duration
302             ufs_mab_sys 37.2
303             ufs_mab_sys 37.3
304             ufs_rcp_real 264.5
305             ufs_rcp_real 277.9
306              
307             =head2 Command:
308              
309             cat DATA/stats.fsdb | \
310             dbmapreduce --prepend-key -k experiment -- dbcolstats duration
311              
312             =head2 Output:
313              
314             #fsdb experiment mean stddev pct_rsd conf_range conf_low conf_high conf_pct sum sum_squared min max n
315             ufs_mab_sys 37.25 0.070711 0.18983 0.6353 36.615 37.885 0.95 74.5 2775.1 37.2 37.3 2
316             ufs_rcp_real 271.2 9.4752 3.4938 85.13 186.07 356.33 0.95 542.4 1.4719e+05 264.5 277.9 2
317             # | dbmapreduce -k experiment dbstats duration
318              
319              
320             =head1 SEE ALSO
321              
322             L.
323             L
324             L
325              
326              
327             =head1 CLASS FUNCTIONS
328              
329             OLD TEXT:
330             A few notes about the internal structure:
331             L uses two to four threads (actually Freds) to run.
332             An optional thread C<$self->{_in_fred}> sorts the input.
333             The main process reads input and groups input by key.
334             Each group is passed to a
335             secondary fred C<$self->{_reducer_thread}>
336             that invokes the reducer on each group
337             and does any output.
338             If the reducer is I map-aware, then
339             we create a final postprocessor thread that
340             adds the key back to the output.
341             Either the reducer or the postprocessor thread do output.
342              
343             NEW VERSION with Freds:
344              
345             A few notes about parallelism, since we have fairly different structure
346             depending on what we're doing:
347              
348             1. for multi-key aware reducers, there is no output post-processing.
349              
350             1a. if input is sorted and there is no input checking (-S -S),
351             we run the reducer in our own process.
352             (F)
353              
354             1b. with grouped input and input checking (-S),
355             we fork off an input process that checks grouping,
356             then run the reducer in our process.
357             (F)
358             xxx: case 1b not yet done
359              
360             1c. with ungrouped input,
361             we invoke an input process to do sorting,
362             then run the reducer in our process.
363             (F)
364              
365             2. for non-multi-key aware.
366             A sorter thread groups content, if necessary.
367             We breaks stuff into groups
368             and feeds them to a reducer Fred, one per group.
369             A dedicated additional Fred merges output and addes the missing key,
370             if necessary.
371             Either way, output ends up in a file.
372             A finally postprocessor thread merges all the output files.
373              
374             =cut
375              
376             @ISA = qw(Fsdb::Filter);
377             $VERSION = 2.0;
378              
379 1     1   20 use 5.010;
  1         3  
380 1     1   4 use strict;
  1         1  
  1         17  
381 1     1   3 use Pod::Usage;
  1         6  
  1         61  
382 1     1   6 use Carp;
  1         2  
  1         46  
383              
384 1     1   6 use Fsdb::Filter;
  1         1  
  1         19  
385 1     1   4 use Fsdb::IO::Reader;
  1         2  
  1         15  
386 1     1   3 use Fsdb::IO::Writer;
  1         2  
  1         17  
387 1     1   304 use Fsdb::Filter::dbsubprocess;
  1         3  
  1         24  
388 1     1   7 use Fsdb::Support::NamedTmpfile;
  1         3  
  1         17  
389 1     1   4 use Fsdb::Support::OS;
  1         1  
  1         36  
390 1     1   6 use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbpipeline_sink dbsort dbcolcreate dbfilecat dbfilealter dbsubprocess);
  1         1  
  1         2224  
391              
392             my $REDUCER_GROUP_SYNCHRONIZATION_FLAG = 'reducer group synchronization flag';
393              
394             =head2 new
395              
396             $filter = new Fsdb::Filter::dbmapreduce(@arguments);
397              
398             Create a new dbmapreduce object, taking command-line arguments.
399              
400             =cut
401              
402             sub new ($@) {
403 0     0 1   my $class = shift @_;
404 0           my $self = $class->SUPER::new(@_);
405 0           bless $self, $class;
406 0           $self->set_defaults;
407 0           $self->parse_options(@_);
408 0           $self->SUPER::post_new();
409 0           return $self;
410             }
411              
412              
413             =head2 set_defaults
414              
415             $filter->set_defaults();
416              
417             Internal: set up defaults.
418              
419             =cut
420              
421             sub set_defaults ($) {
422 0     0 1   my($self) = @_;
423 0           $self->SUPER::set_defaults();
424 0           $self->{_key_column} = undef;
425 0           $self->{_pre_sorted} = 0;
426 0           $self->{_filter_generator_code} = undef;
427 0           $self->{_reduce_generator} = undef;
428 0           $self->{_reducer_is_multikey_aware} = undef;
429 0           $self->{_external_command_argv} = [];
430 0           $self->{_pass_current_key} = undef;
431 0           $self->{_prepend_key} = undef;
432 0           $self->{_copy_fscode} = undef;
433 0           $self->{_filter_generator_code} = undef;
434 0           $self->{_code_files} = [];
435 0           $self->{_warnings} = 1;
436 0           $self->{_max_parallelism} = undef;
437 0           $self->{_parallelism_available} = undef;
438 0           $self->{_test_parallelism} = undef;
439 0           $self->{_header} = undef;
440 0           $self->set_default_tmpdir;
441             }
442              
443             =head2 parse_options
444              
445             $filter->parse_options(@ARGV);
446              
447             Internal: parse command-line arguments.
448              
449             =cut
450              
451             sub parse_options ($@) {
452 0     0 1   my $self = shift @_;
453              
454 0           my(@argv) = @_;
455             $self->get_options(
456             \@argv,
457 0     0     'help|?' => sub { pod2usage(1); },
458 0     0     'man' => sub { pod2usage(-verbose => 2); },
459             'autorun!' => \$self->{_autorun},
460             'C|filter-code|code=s' => \$self->{_filter_generator_code},
461             'close!' => \$self->{_close},
462             'copy-fs|copy-fieldseparator!' => \$self->{_copy_fscode},
463             'd|debug+' => \$self->{_debug},
464             'f|code-files=s@' => $self->{_code_files},
465             'header=s' => \$self->{_header},
466 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
467             'j|parallelism=i' => \$self->{_max_parallelism},
468             'k|key=s' => \$self->{_key_column},
469             'K|pass-current-key!' => \$self->{_pass_current_key},
470 0     0     'prepend-key' => sub { $self->{_prepend_key} = 1; },
471 0     0     'no-prepend-key' => sub { $self->{_prepend_key} = 0; }, # set but false
472             'log!' => \$self->{_logprog},
473             'M|multiple-ok!' => \$self->{_reducer_is_multikey_aware},
474 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
475             'S|pre-sorted+' => \$self->{_pre_sorted},
476             'test-parallelism!' => \$self->{_test_parallelism}, # for test suite only
477             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
478             'saveoutput=s' => \$self->{_save_output},
479             'w|warnings!' => \$self->{_warnings},
480 0 0         ) or pod2usage(2);
481 0           push (@{$self->{_external_command_argv}}, @argv);
  0            
482             }
483              
484              
485             =head2 setup
486              
487             $filter->setup();
488              
489             Internal: setup, parse headers.
490              
491             =cut
492              
493             sub setup($) {
494 0     0 1   my($self) = @_;
495              
496             $self->{_prepend_key} = !$self->{_reducer_is_multikey_aware}
497 0 0         if (!defined($self->{_prepend_key}));
498             croak $self->{_prog} . ": cannot prepend keys for multikey-aware reducers.\n"
499 0 0 0       if ($self->{_prepend_key} && $self->{_reducer_is_multikey_aware});
500              
501 0           my $included_code = '';
502             #
503             # get any extra code
504             #
505 0           foreach my $code_file (@{$self->{_code_files}}) {
  0            
506 0 0         open(IN, "< $code_file") or croak $self->{_prog} . ": cannot read code from $code_file\n";
507 0           $included_code .= join('', );
508 0           close IN;
509             };
510              
511             #
512             # control parallelism
513             #
514 0   0       $self->{_max_parallelism} //= Fsdb::Support::OS::max_parallelism();
515 0 0         $self->{_max_parallelism} = 1 if ($self->{_max_parallelism} < 1); # always allow some
516 0   0       $self->{_parallelism_available} //= $self->{_max_parallelism};
517              
518             #
519             # what are we running?
520             #
521             # Figure it out, and generate a
522             # $self->{_reducer_generator_sub} that creates a
523             # filter object that will be passed to dbpipeline_open2
524             # to reduce one (or many, if map_aware_reducer) keys.
525             #
526 0 0         if ($#{$self->{_external_command_argv}} >= 0) {
  0 0          
527             # external command
528 0           my @argv = @{$self->{_external_command_argv}};
  0            
529 0 0         shift @argv if ($argv[0] eq '--');
530 0           my $empty = $self->{_empty};
531 0           my @pre_argv;
532 0 0         push @pre_argv, ($self->{_warnings} ? '--warnings' : '--nowarnings'),
533             '--nolog', '--';
534 0           my $reducer_generator_sub;
535 0 0         if ($self->{_pass_current_key}) {
536             $reducer_generator_sub = sub {
537 0   0 0     return dbsubprocess(@pre_argv, @argv, $_[0] // $empty);
538 0           };
539             } else {
540             $reducer_generator_sub = sub {
541 0     0     return dbsubprocess(@pre_argv, @argv);
542 0           };
543             };
544 0           $self->{_reducer_generator_sub} = $reducer_generator_sub;
545 0 0         print STDERR "# dbmapreduce/setup: external command is " . join(" ", @pre_argv, @argv) . "\n" if ($self->{_debug} > 2);
546             } elsif (defined($self->{_filter_generator_code})) {
547 0           my $reducer_generator_sub;
548 0 0         if (ref($self->{_filter_generator_code}) eq 'CODE') {
549 0 0         print STDERR "# dbmapreduce/setup: direct code assignment for reducer sub\n" if ($self->{_debug});
550 0           $reducer_generator_sub = $self->{_filter_generator_code};
551             } else {
552 0           my $sub_code;
553             $sub_code =
554             "use Fsdb::Filter::dbpipeline qw(:all);\n" .
555             $included_code .
556             '$reducer_generator_sub = sub {' . "\n" .
557             $self->{_filter_generator_code} .
558 0           "\n\t;\n};\n";
559 0 0         print STDERR "# dbmapreduce/setup: sub_code: $sub_code" if ($self->{_debug});
560 0           eval $sub_code;
561 0 0         $@ && croak $self->{_prog} . ": error evaluating user-provided reducer sub:\n$sub_code\nerror is: $@.\n";
562             };
563 0           $self->{_reducer_generator_sub} = $reducer_generator_sub;
564             } else {
565 0           croak $self->{_prog} . ": reducer not specified.\n";
566             };
567              
568             #
569             # do we need to group the keys for the user?
570             #
571 0           my($input_reader_aref) = ();
572 0   0       my $raw_to_raw = ($#{$self->{_external_command_argv}} >= 0 && $self->{_reducer_is_multikey_aware});
573 0 0         if ($raw_to_raw) {
574             # external and we're good? just hook it together
575             # (test case: dbmapreduce_cat.cmd)
576 0           $input_reader_aref = [-raw_fh => 1];
577             } else {
578 0           push (@$input_reader_aref, -comment_handler => $self->create_tolerant_pass_comments_sub('_cat_writer'));
579             };
580 0 0         push(@$input_reader_aref, -header => $self->{_header}) if (defined($self->{_header}));
581 0 0         if ($self->{_pre_sorted}) {
582 0           $self->finish_io_option('input', @$input_reader_aref);
583             } else {
584             # not pre-sorted, so do lexical sort
585 0 0         my $sort_column = defined($self->{_key_column}) ? $self->{_key_column} : '0';
586 0           my(@sort_args) = ('--nolog');
587 0 0         push(@sort_args, '--header' => $self->{_header}) if (defined($self->{_header}));
588 0           push(@sort_args, $sort_column);
589             my($new_reader, $new_fred) = dbpipeline_filter(
590             $self->{_input},
591 0           $input_reader_aref,
592             dbsort(@sort_args));
593 0           $self->{_pre_sorted_input} = $self->{_input};
594 0           $self->{_in} = $new_reader;
595 0           $self->{_sorter_fred} = $new_fred;
596             #
597             # We will join the sorter in finish().
598             #
599             };
600              
601             #
602             # figure out key column's name, now that we've done setup
603             #
604 0 0         if ($raw_to_raw) {
    0          
605             # raw, so no parsing input at all
606 0           $self->{_key_coli} = undef;
607             } elsif (defined($self->{_key_column})) {
608 0           $self->{_key_coli} = $self->{_in}->col_to_i($self->{_key_column});
609             croak $self->{_prog} . ": key column " . $self->{_key_column} . " is not in input stream.\n"
610 0 0         if (!defined($self->{_key_coli}));
611             } else {
612             # default to first column
613 0           $self->{_key_coli} = 0;
614 0           $self->{_key_column} = $self->{_in}->i_to_col(0);
615             };
616              
617             #
618             # setup the postprocessing thread
619             #
620 0           $self->_setup_reducer();
621              
622 0           $self->{_reducer_invocation_count} = 0;
623             # $SIG{'PIPE'} = 'IGNORE';
624             }
625              
626              
627              
628             =head2 _setup_reducer
629              
630             _setup_reducer
631              
632             (internal)
633             One Fred that runs the reducer and produces output.
634             C<_reducer_queue> is sends the new key,
635             then a Fsdb stream, then EOF (undef)
636             for each group.
637             We setup the output, suppress all but the first header,
638             and add in the keys if necessary.
639              
640             =cut
641              
642             sub _setup_reducer() {
643 0     0     my($self) = @_;
644              
645 0 0         if ($self->{_reducer_is_multikey_aware}) {
646             # croak "case not yet handled--need to verify correct sort order\n" if ($self->{_pre_sorted} == 1);
647             # No need to do input checking,
648             # and reducer promises to handle whatever we give it,
649             # and we assume it outputs the key, so
650             # just start the reducer on our own input and run it here.
651 0           my $reducer = &{$self->{_reducer_generator_sub}}();
  0            
652             $reducer->parse_options('--input' => $self->{_in},
653             '--output' => $self->{_output},
654             '--saveoutput' => \$self->{_out},
655 0           '--noclose');
656 0           $reducer->setup();
657 0           $self->{_multikey_aware_reducer} = $reducer;
658 0           return;
659             } else {
660             # do nothing; we do our work below
661             };
662             }
663              
664             =head2 _key_to_string
665              
666             $self->_key_to_string($key)
667              
668             Convert a key (maybe undef) to a string for status messages.
669              
670             =cut
671              
672             sub _key_to_string($$) {
673 0     0     my($self, $key) = @_;
674 0 0         return defined($key) ? $key : '(undef)';
675             }
676              
677              
678             =head2 _open_new_key
679              
680             _open_new_key
681              
682             (internal)
683              
684             Note that new_key can be undef if there was no input.
685              
686             =cut
687              
688             sub _open_new_key {
689 0     0     my($self, $new_key) = @_;
690              
691 0 0         print STDERR "# dbmapreduce: _open_new_key on " . $self->_key_to_string($new_key) . "\n" if ($self->{_debug} >= 2);
692              
693 0           $self->{_current_key} = $new_key;
694              
695             # If already running and can handle multiple tags, just keep going.
696 0 0         die "internal error: no more multikey here\n" if ($self->{_reducer_is_multikey_aware});
697              
698             #
699             # make the reducer
700             #
701 0           my $output_file = Fsdb::Support::NamedTmpfile::alloc($self->{_tmpdir});
702 0           my @reducer_modules;
703 0           push(@reducer_modules, &{$self->{_reducer_generator_sub}}($new_key));
  0            
704 0 0         if ($self->{_copy_fscode}) {
705 0           push(@reducer_modules, dbfilealter('--nolog', '-F', $self->{_in}->fscode()));
706             };
707 0 0         if ($self->{_prepend_key}) {
708             # croak $self->{_prog} . ": no key_column\n" if (!defined($self->{_key_column}));
709 0   0       push(@reducer_modules, dbcolcreate('--no-recreate-fatal', '--nolog', '--first', '-e', $new_key // $self->{_empty}, $self->{_key_column}));
710             };
711 0 0         print STDERR "# reducer output to $output_file (in process $$)\n" if ($self->{_debug});
712             # $reducer_modules[$#reducer_modules]->parse_options('--output' => $output_file);
713 0           unshift(@reducer_modules, '--output' => $output_file);
714 0           my %work_queue_entry;
715 0           $work_queue_entry{'status'} = 'running';
716 0           $work_queue_entry{'output'} = $output_file;
717 0           my $debug = $self->{_debug};
718             my($to_reducer_writer, $reducer_fred) = dbpipeline_sink([-clone => $self->{_in}],
719             '--fred_description' => 'dbmapreduce:dbpipeline_sink(to_reducer)',
720             '--fred_exit_sub' => sub {
721 0     0     $work_queue_entry{'status'} = 'done';
722 0 0         print STDERR "# dbmapreduce:reducer: output $output_file\n" if ($debug);
723 0 0         print STDERR "# dbmapreduce:reducer: zero size $output_file\n" if (-z $output_file);
724 0           }, @reducer_modules);
725 0           $work_queue_entry{'fred'} = $reducer_fred;
726              
727 0           $self->{_to_reducer_writer} = $to_reducer_writer;
728 0           $self->{_current_reducer_fastpath_sub} = $to_reducer_writer->fastpath_sub();
729 0           push (@{$self->{_work_queue}}, \%work_queue_entry);
  0            
730             }
731              
732             =head2 _close_old_key
733              
734             _close_old_key
735              
736             Internal, finish a key.
737              
738             =cut
739              
740             sub _close_old_key {
741 0     0     my($self, $key, $final) = @_;
742              
743 0 0         print STDERR "# dbmapreduce: _close_old_key on " . $self->_key_to_string($key) . "\n" if ($self->{_debug} >= 2);
744              
745 0 0         if (!defined($key)) {
746 0 0         croak $self->{_prog} . ": internal error: _close_old_key called on non-final null-key.\n"
747             if (!$final);
748             };
749 0 0         die "internal error: no more multikey here\n" if ($self->{_reducer_is_multikey_aware});
750              
751             croak $self->{_prog} . ": internal error: current key doesn't equal prior key " . $self->_key_to_string($self->{_current_key}) . " != key " . $self->_key_to_string($key) . "\n"
752 0 0 0       if (defined($key) && $self->{_current_key} ne $key);
753             # finish the reducer
754 0 0 0       print STDERR "# dbmapreduce: _close_old_key closing reducer (" . ($key // "null key") . ")\n" if ($self->{_debug} >= 2);
755 0           $self->{_to_reducer_writer}->close;
756             }
757              
758             =head2 _check_finished_reducers
759              
760             $self->_check_finished_reducers($force);
761              
762             Internal: see if any reducer freds finished, optionally $FORCE-ing
763             all to finish.
764              
765             This routine also enforces a maximum amount of parallelism, blocking us when we have too
766             many reducers running.
767              
768             =cut
769              
770             sub _check_finished_reducers($$) {
771 0     0     my($self, $force) = @_;
772              
773 0 0         my $force_status = ($force ? "forced" : "optional");
774 0           my $backlog = $#{$self->{_work_queue}} + 1;
  0            
775 0 0         $self->{_cat_writer}->write_rowobj("# dbmapreduce: test_parallelism backlog $backlog, max $self->{_max_parallelism}\n") if ($self->{_test_parallelism});
776 0 0         if ($backlog >= $self->{_max_parallelism}) {
777 0           $force = 2;
778 0           $force_status = "backlog-forced";
779             }
780              
781 0 0         print STDERR "# dbmerge:_check_finished_reducers: $force_status\n" if ($self->{_debug});
782 0           for(;;) {
783 0           my $fred_or_code = Fsdb::Support::Freds::join_any();
784 0 0         last if (ref($fred_or_code) eq '');
785 0 0         croak "dbmapreduce: reducer failed\n"
786             if ($fred_or_code->exit_code() != 0);
787 0 0         print STDERR "# dbmerge:_check_finished_reducers: merged fred " . $fred_or_code->info() . "\n" if ($self->{_debug});
788             };
789             #
790             # Reducers finish-sub has adjusted the work queue.
791             # Try to push out output.
792             # Be forceful (and block) if required.
793             #
794 0           while ($#{$self->{_work_queue}} >= 0) {
  0            
795 0           my $work_queue_href = $self->{_work_queue}->[0];
796 0 0         if ($force) {
797 0           my $fred = $work_queue_href->{fred};
798 0 0         print STDERR "# dbmerge:_check_finished_reducers: blocking on pending fred " . $fred->info() . "\n" if ($self->{_debug});
799 0           my $exit_code = $fred->join();
800 0 0         croak "dbmapreduce: reducer " . $fred->info() . " failed, exit $exit_code\n" if ($exit_code != 0);
801             croak "dbmapreduce: reducer didn't leave status done\n"
802 0 0         if ($work_queue_href->{status} ne 'done');
803             };
804 0 0         if ($work_queue_href->{status} ne 'done') {
805 0 0         croak $self->{_prog} . ": internal error, reducer refused to complete\n" if ($force);
806 0           last;
807             };
808             # this one is done, send it to output
809 0           my $output = $work_queue_href->{output};
810 0 0         print STDERR "# dbmerge->_check_finished_reducers: done with output $output\n" if ($self->{_debug});
811 0           $self->{_cat_writer}->write_rowobj([$output]);
812 0           shift(@{$self->{_work_queue}});
  0            
813             };
814             }
815              
816              
817              
818             =head2 _mapper_run
819              
820             $filter->_mapper_run();
821              
822             Internal: run over each rows, grouping them.
823             Fork off reducer as necessary.
824              
825             =cut
826             sub _mapper_run($) {
827 0     0     my($self) = @_;
828              
829 0           $self->{_work_queue} = []; # track running reducers
830 0           my $read_fastpath_sub = $self->{_in}->fastpath_sub();
831 0           my $reducer_fastpath_sub = undef;
832              
833             #
834             # output merger
835             #
836 0 0         print STDERR "# opening dbfilecat\n" if ($self->{_debug});
837 0           my(@writer_args) = (-cols => [qw(filename)], -outputheader => 'never', -autoflush => 1);
838             my($cat_writer, $cat_fred) = dbpipeline_sink(\@writer_args,
839             '--fred_description' => 'dbmapreduce:dbpipeline_sink(cat_writer)',
840             '--output' => $self->{_output},
841 0           dbfilecat(qw(--nolog --xargs --removeinputs)));
842 0 0         croak $self->{_prog} . ": cannot invoke dbfilecat.\n"
843             if ($cat_writer->error);
844 0           $self->{_cat_writer} = $cat_writer;
845 0           $self->{_cat_fred} = $cat_fred;
846              
847             # read data
848 0           my($last_key) = undef;
849 0           my $fref;
850 0           my $key_coli = $self->{_key_coli};
851 0           $self->{_key_counts} = {};
852 0           my $nrows = 0;
853 0           my $debug = $self->{_debug};
854 0           while ($fref = &$read_fastpath_sub()) {
855             # print STDERR "data line: " . join(" ", @$fref) . "\n";
856 0           my($key) = $fref->[$key_coli];
857            
858 0 0 0       if (!defined($last_key) || $key ne $last_key) {
859             # start a new one
860             # check for out-of-order duplicates
861 0 0         if ($self->{_pre_sorted} == 1) {
862             croak $self->{_prog} . ": single key ``$key'' split into multiple groups, selection of -S was invalid\n"
863 0 0         if (defined($self->{_key_counts}{$key}));
864 0           $self->{_key_counts}{$key} = 1;
865             };
866             # finish off old one?
867 0 0         if (defined($last_key)) {
868 0           $self->_close_old_key($last_key);
869 0           $self->_check_finished_reducers(0);
870             };
871 0           $self->_open_new_key($key);
872 0           $last_key = $key;
873 0           $reducer_fastpath_sub = $self->{_current_reducer_fastpath_sub};
874 0 0         die "no reducer\n" if (!defined($reducer_fastpath_sub));
875             };
876             # pass the data to be reduced
877 0           &{$reducer_fastpath_sub}($fref);
  0            
878             };
879              
880 0 0         if (!defined($last_key)) {
881             # no input data, so write a single null key
882 0           $self->_open_new_key(undef);
883             };
884              
885             # print STDERR "done with input, last_key=$last_key\n";
886             # close out any pending processing? (use the force option)
887 0           $self->_close_old_key($last_key, 1);
888 0           $self->_check_finished_reducers(1);
889             # will clean up cat_writer in finish
890             }
891              
892              
893             =head2 run
894              
895             $filter->run();
896              
897             Internal: run over each rows.
898              
899             =cut
900             sub run($) {
901 0     0 1   my($self) = @_;
902              
903 0 0         if ($self->{_reducer_is_multikey_aware}) {
904 0           $self->{_multikey_aware_reducer}->run();
905             } else {
906 0           $self->_mapper_run();
907             };
908             }
909              
910             =head2 finish
911              
912             $filter->finish();
913              
914             Internal: write trailer.
915              
916             =cut
917              
918             sub finish($) {
919 0     0 1   my($self) = @_;
920              
921             #
922             # Join any pending Freds.
923             #
924 0 0         if ($self->{_sorter_fred}) {
925             print STDERR "# mapreduce main: join sorter\n"
926 0 0         if ($self->{_debug});
927 0           $self->{_sorter_fred}->join();
928             croak $self->{_prog} . ": input sorter failed: " . $self->{_sorter_fred}->error()
929 0 0         if ($self->{_sorter_fred}->error());
930             };
931              
932 0 0         if ($self->{_reducer_is_multikey_aware}) {
933 0           $self->{_multikey_aware_reducer}->finish();
934             # output our log message, in-line
935 0           $self->SUPER::finish();
936             } else {
937             # output log message by sending it all to cat_writer (a hack)
938 0           $self->{_out} = $self->{_cat_writer};
939 0           $self->SUPER::finish(); # will close it
940 0           $self->{_cat_writer}->close;
941             print STDERR "# mapreduce main: join dbfilecat\n"
942 0 0         if ($self->{_debug});
943 0           $self->{_cat_fred}->join();
944 0 0         if (my $error = $self->{_cat_fred}->error()) {
945 0           croak $self->{_prog} . ": dbfilecat erred: $error";
946             };
947             };
948             }
949              
950              
951             =head1 AUTHOR and COPYRIGHT
952              
953             Copyright (C) 1991-2016 by John Heidemann
954              
955             This program is distributed under terms of the GNU general
956             public license, version 2. See the file COPYING
957             with the distribution for details.
958              
959             =cut
960              
961             1;