File Coverage

blib/lib/Fsdb/Filter/dbfilepivot.pm
Criterion Covered Total %
statement 24 126 19.0
branch 0 48 0.0
condition 0 9 0.0
subroutine 8 21 38.1
pod 5 5 100.0
total 37 209 17.7


line stmt bran cond sub pod time code
1             #!/usr/bin/perl
2              
3             #
4             # dbfilepivot.pm
5             # Copyright (C) 2011-2015 by John Heidemann
6             # $Id: d20486d484af7c41f6af347c47b45385d4a92712 $
7             #
8             # This program is distributed under terms of the GNU general
9             # public license, version 2. See the file COPYING
10             # in $dblibdir for details.
11             #
12              
13             package Fsdb::Filter::dbfilepivot;
14              
15             =head1 NAME
16              
17             dbfilepivot - pivot a table, converting multiple rows into single wide row
18              
19             =head1 SYNOPSIS
20              
21             dbfilepivot [-e empty] -k KeyField -p PivotField [-v ValueField]
22              
23             =head1 DESCRIPTION
24              
25             Pivot a table, converting multiple rows corresponding to the
26             same key into a single wide row.
27              
28             In a normalized database, one might have data with a schema like
29             (id, attribute, value),
30             but sometimes it's more convenient to see the data with a schema like
31             (id, attribute1, attribute2).
32             (For example, gnuplot's stacked histograms requires denormalized data.)
33             Dbfilepivot converts the normalized format to the denormalized,
34             but sometimes useful, format.
35             Here the "id" is the key, the attribute is the "pivot",
36             and the value is, well, the optional "value".
37              
38             An example is clearer. A gradebook usually looks like:
39              
40             #fsdb name hw_1 hw_2 hw_3
41             John 97 98 99
42             Paul - 80 82
43              
44             but a properly normalized format would represent it as:
45              
46             #fsdb name hw score
47             John 1 97
48             John 2 98
49             John 3 99
50             Paul 2 80
51             Paul 3 82
52              
53             This tool converts the second form into the first, when used as
54              
55             dbfilepivot -k name -p hw -v score
56              
57             Here name is the I column that indicates which rows belong
58             to the same entity,
59             hw is the I column that will be indicate which column
60             in the output is relevant,
61             and score is the I that indicates what goes in the
62             output.
63              
64             The pivot creates a new column C, C, etc.
65             for each tag, the contents of the pivot field in the input.
66             It then populates those new columns with the contents of the value field
67             in the input.
68              
69             If no value column is specified, then values are either empty or 1.
70              
71             Dbfilepivot assumes all lines with the same key are adjacent
72             in the input source, like L with the F<-S> option.
73             To enforce this invariant, by default, it I input be sorted by key.
74              
75             Dbfilepivot makes two passes over its data
76             and so requires temporary disk space equal to the input size.
77              
78             Memory usage is proportional to the number of unique pivot values.
79              
80             =head1 OPTIONS
81              
82             =over 4
83              
84             =item B<-k> or B<--key> KeyField
85              
86             specify which column is the key for grouping.
87             Required (no default).
88              
89             =item B<-p> or B<--pivot> PivotField
90              
91             specify which column is the key to indicate which column in the output
92             is relevant.
93             Required (no default).
94              
95             =item B<-v> or B<--value> ValueField
96              
97             Specify which column is the value in the output.
98             If none is given, 1 is used for the value.
99              
100             =item B<-C S> or B<--element-separator S>
101              
102             Specify the separator I used to join the input's key column
103             with its contents.
104             (Defaults to a single underscore.)
105              
106             =item B<-e E> or B<--empty E>
107              
108             give value E as the value for empty (null) records
109              
110             =item B<-S> or B<--pre-sorted>
111              
112             Assume data is already grouped by key.
113             Provided twice, it removes the validation of this assertion.
114             By default, we sort by key.
115              
116             =item B<-T TmpDir>
117              
118             where to put tmp files.
119             Also uses environment variable TMPDIR, if -T is
120             not specified.
121             Default is /tmp.
122              
123             =back
124              
125             =for comment
126             begin_standard_fsdb_options
127              
128             This module also supports the standard fsdb options:
129              
130             =over 4
131              
132             =item B<-d>
133              
134             Enable debugging output.
135              
136             =item B<-i> or B<--input> InputSource
137              
138             Read from InputSource, typically a file name, or C<-> for standard input,
139             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
140              
141             =item B<-o> or B<--output> OutputDestination
142              
143             Write to OutputDestination, typically a file name, or C<-> for standard output,
144             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
145              
146             =item B<--autorun> or B<--noautorun>
147              
148             By default, programs process automatically,
149             but Fsdb::Filter objects in Perl do not run until you invoke
150             the run() method.
151             The C<--(no)autorun> option controls that behavior within Perl.
152              
153             =item B<--help>
154              
155             Show help.
156              
157             =item B<--man>
158              
159             Show full manual.
160              
161             =back
162              
163             =for comment
164             end_standard_fsdb_options
165              
166              
167             =head1 SAMPLE USAGE
168              
169             =head2 Input:
170              
171             #fsdb name hw_1 hw_2 hw_3
172             John 97 98 99
173             Paul - 80 82
174              
175             =head2 Command:
176              
177             cat data.fsdb | dbfilepivot -k name -p hw -v score
178              
179             =head2 Output:
180              
181             #fsdb name hw score
182             John 1 97
183             John 2 98
184             John 3 99
185             Paul 2 80
186             Paul 3 82
187              
188             =head1 SEE ALSO
189              
190             L.
191             L.
192             L.
193             L.
194              
195              
196             =head1 CLASS FUNCTIONS
197              
198             =cut
199              
200             @ISA = qw(Fsdb::Filter);
201             $VERSION = 2.0;
202              
203 1     1   8128 use strict;
  1         2  
  1         37  
204 1     1   4 use Pod::Usage;
  1         2  
  1         136  
205 1     1   4 use Carp;
  1         2  
  1         66  
206              
207 1     1   6 use Fsdb::Filter;
  1         1  
  1         22  
208 1     1   4 use Fsdb::IO::Reader;
  1         1  
  1         25  
209 1     1   5 use Fsdb::IO::Writer;
  1         1  
  1         17  
210 1     1   4 use Fsdb::IO::Replayable;
  1         2  
  1         27  
211 1     1   4 use Fsdb::Filter::dbpipeline qw(dbpipeline_filter dbsort);
  1         1  
  1         1500  
212              
213              
214             =head2 new
215              
216             $filter = new Fsdb::Filter::dbfilepivot(@arguments);
217              
218             Create a new dbfilepivot object, taking command-line arguments.
219              
220             =cut
221              
222             sub new ($@) {
223 0     0 1   my $class = shift @_;
224 0           my $self = $class->SUPER::new(@_);
225 0           bless $self, $class;
226 0           $self->set_defaults;
227 0           $self->parse_options(@_);
228 0           $self->SUPER::post_new();
229 0           return $self;
230             }
231              
232              
233             =head2 set_defaults
234              
235             $filter->set_defaults();
236              
237             Internal: set up defaults.
238              
239             =cut
240              
241             sub set_defaults ($) {
242 0     0 1   my($self) = @_;
243 0           $self->SUPER::set_defaults();
244 0           $self->{_elem_separator} = '_';
245 0 0         $self->{_tmpdir} = defined($ENV{'TMPDIR'}) ? $ENV{'TMPDIR'} : "/tmp";
246 0           $self->{_key_column} = undef;
247 0           $self->{_pivot_column} = undef;
248 0           $self->{_value_column} = undef;
249 0           $self->{_pre_sorted} = 0;
250 0           $self->{_sort_order} = undef;
251 0           $self->{_sort_as_numeric} = undef;
252             }
253              
254             =head2 parse_options
255              
256             $filter->parse_options(@ARGV);
257              
258             Internal: parse command-line arguments.
259              
260             =cut
261              
262             sub parse_options ($@) {
263 0     0 1   my $self = shift @_;
264              
265 0           my(@argv) = @_;
266             $self->get_options(
267             \@argv,
268 0     0     'help|?' => sub { pod2usage(1); },
269 0     0     'man' => sub { pod2usage(-verbose => 2); },
270             'autorun!' => \$self->{_autorun},
271             'close!' => \$self->{_close},
272             'C|element-separator=s' => \$self->{_elem_separator},
273             'd|debug+' => \$self->{_debug},
274             'e|empty=s' => \$self->{_empty},
275 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
276             'k|key=s' => \$self->{_key_column},
277             'log!' => \$self->{_logprog},
278 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
279             'p|pivot=s' => \$self->{_pivot_column},
280             'S|pre-sorted+' => \$self->{_pre_sorted},
281             'T|tmpdir|tempdir=s' => \$self->{_tmpdir},
282             'v|value=s' => \$self->{_value_column},
283             # sort key options:
284 0     0     'n|numeric' => sub { $self->{_sort_as_numeric} = 1; },
285 0     0     'N|lexical' => sub { $self->{_sort_as_numeric} = undef; },
286 0     0     'r|descending' => sub { $self->{_sort_order} = -1; },
287 0     0     'R|ascending' => sub { $self->{_sort_order} = 1; },
288 0 0         ) or pod2usage(2);
289 0 0         pod2usage(2) if ($#argv != -1);
290             }
291              
292             =head2 setup
293              
294             $filter->setup();
295              
296             Internal: setup, parse headers.
297              
298             =cut
299              
300             sub setup ($) {
301 0     0 1   my($self) = @_;
302              
303             croak $self->{_prog} . ": invalid empty value (single quote).\n"
304 0 0         if ($self->{_empty} eq "'");
305              
306             #
307             # guarantee data is sorted
308             # (swap reader if necessary)
309 0 0         if ($self->{_pre_sorted}) {
310             # pre-sorted, so just read it
311 0           $self->finish_io_option('input', -comment_handler => $self->create_pass_comments_sub('_replayable_writer'));
312 0           $self->{_sorter_fred} = undef;
313             } else {
314             # not sorted, so sort it and read that
315 0           my @sort_args = ('--nolog', $self->{_key_column});
316 0 0         unshift(@sort_args, '--descending') if ($self->{_sort_order} == -1);
317 0 0         unshift(@sort_args, ($self->{_sort_as_numeric} ? '--numeric' : '--lexical'));
318 0           my($new_reader, $new_fred) = dbpipeline_filter($self->{_input}, [-comment_handler => $self->create_delay_comments_sub], dbsort(@sort_args));
319 0           $self->{_pre_sorted_input} = $self->{_input};
320 0           $self->{_in} = $new_reader;
321 0           $self->{_sorter_fred} = $new_fred;
322             };
323              
324 0 0         pod2usage(2) if (!defined($self->{_key_column}));
325 0           $self->{_key_coli} = $self->{_in}->col_to_i($self->{_key_column});
326             croak $self->{_prog} . ": key column " . $self->{_key_column} . " is not in input stream.\n"
327 0 0         if (!defined($self->{_key_coli}));
328              
329 0 0         pod2usage(2) if (!defined($self->{_pivot_column}));
330 0           $self->{_pivot_coli} = $self->{_in}->col_to_i($self->{_pivot_column});
331             croak $self->{_prog} . ": pivot column " . $self->{_pivot_column} . " is not in input stream.\n"
332 0 0         if (!defined($self->{_pivot_coli}));
333              
334 0 0         if (defined($self->{_value_column})) {
335 0           $self->{_value_coli} = $self->{_in}->col_to_i($self->{_value_column});
336             croak $self->{_prog} . ": value column " . $self->{_value_column} . " is not in input stream.\n"
337 0 0         if (!defined($self->{_value_coli}));
338             };
339              
340             #
341             # Read the data to find all possible pivots,
342             # saving a copy as we go.
343             #
344 0           $self->{_replayable} = new Fsdb::IO::Replayable(-writer_args => [ -clone => $self->{_in} ], -reader_args => [ -comment_handler => $self->create_pass_comments_sub ]);
345 0           my $save_out = $self->{_replayable_writer} = $self->{_replayable}->writer;
346 0           my $read_fastpath_sub = $self->{_in}->fastpath_sub();
347 0           my $save_write_fastpath_sub = $save_out->fastpath_sub;
348 0           my $fref;
349             my %pivots;
350 0           my $npivots = 0;
351             my $loop = q(
352             # first pass: reading data to find all possible pivots
353             while ($fref = &$read_fastpath_sub) {
354             my $value = $fref->[) . $self->{_pivot_coli} . q@];
355 0           if ($value ne '@ . $self->{_empty} . q@') {
356             $npivots++ if (!defined($pivots{$value}));
357             $pivots{$value} = 1;
358             };
359             &$save_write_fastpath_sub($fref);
360             };
361             @;
362 0 0         print $loop if ($self->{_debug});
363 0           eval $loop;
364 0 0         $@ && croak $self->{_prog} . ": internal eval error: $@.\n";
365              
366 0 0         if (defined($self->{_sorter_fred})) {
367 0           $self->{_sorter_fred}->join();
368 0           $self->{_sorter_fred} = undef;
369             };
370              
371 0           $self->{_replayable}->close;
372 0 0         croak $self->{_prog} . ": no input data or pivots\n"
373             if ($npivots == 0);
374              
375              
376             #
377             # Now that we know the pivots, make the new columns.
378             #
379             # kill the old pivot column, and value if given.
380             my @new_cols = grep(!($_ eq $self->{_pivot_column} ||
381             (defined($self->{_value_column}) && $_ eq $self->{_value_column})),
382 0   0       @{$self->{_in}->cols});
  0            
383 0           $self->finish_io_option('output', -clone => $self->{_in}, -cols => \@new_cols, -outputheader => 'delay');
384 0           my %tag_colis;
385             my %new_columns;
386 0           foreach (sort keys %pivots) {
387             # xxx: could try to sort numerically if all pivots are numbers
388 0           my $new_column = $self->{_pivot_column} . $self->{_elem_separator} . $_;
389 0           $new_columns{$new_column} = 1;
390             $self->{_out}->col_create($new_column)
391 0 0         or croak $self->{_prog} . ": cannot create column $new_column (maybe it already existed?)\n";
392 0           $tag_colis{$_} = $self->{_out}->col_to_i($new_column);
393             };
394 0           $self->{_tag_colis_href} = \%tag_colis;
395             # write the mapping code.
396 0           my $old_mapping_code = '';
397             # first the old bits
398 0           foreach (@{$self->{_in}->cols}) {
  0            
399             next if ($_ eq $self->{_pivot_column} ||
400 0 0 0       (defined($self->{_value_column}) && $_ eq $self->{_value_column}));
      0        
401             $old_mapping_code .= '$nf[' . $self->{_out}->col_to_i($_) . '] = ' .
402 0           '$fref->[' . $self->{_in}->col_to_i($_) . '];' . "\n";
403             };
404 0           $self->{_old_mapping_code} = $old_mapping_code;
405             # and initialize the new
406 0           my $new_initialization_code = '';
407 0           foreach (sort keys %new_columns) {
408 0           $new_initialization_code .= '$nf[' . $self->{_out}->col_to_i($_) . '] = ' . "\n";
409             };
410 0           $new_initialization_code .= "\t'" . $self->{_empty} . "';\n";
411 0           $self->{_new_initialization_code} = $new_initialization_code;
412             }
413              
414             =head2 run
415              
416             $filter->run();
417              
418             Internal: run over each rows.
419              
420             =cut
421             sub run ($) {
422 0     0 1   my($self) = @_;
423              
424 0           my $replayable_reader = $self->{_replayable}->reader;
425 0           my $read_fastpath_sub = $replayable_reader->fastpath_sub();
426 0           my $write_fastpath_sub = $self->{_out}->fastpath_sub();
427              
428             #
429             # Basic idea: mapreduce on the input
430             # with a multikey aware reducer.
431             #
432             # We don't actually run mapreduce
433             # because (sadly) it's easier to do it in-line
434             # given we assume sorted input.
435             #
436 0           my $emit_nf_code = '&$write_fastpath_sub(\@nf);';
437             my $check_ordering_code = '
438 0           die "' . $self->{_prog} . q': keys $old_key and $new_key are out-of-order\n" if ($old_key gt $new_key);
439             ';
440 0 0         $check_ordering_code = '' if ($self->{_pre_sorted} > 1);
441 0 0         my $value_value = (defined($self->{_value_column})) ? '$fref->[' . $self->{_value_coli} . ']' : '1';
442 0           my $tag_colis_href = $self->{_tag_colis_href};
443             my($loop) = q'
444             {
445             my $old_key = undef;
446             my $fref;
447             my @nf;
448             while ($fref = &$read_fastpath_sub()) {
449             my $new_key = $fref->[' . $self->{_key_coli} . '];
450             if (!defined($old_key) || $old_key ne $new_key) {
451             if (defined($old_key)) {
452             ' . $emit_nf_code .
453             $check_ordering_code . '
454             };
455             ' . $self->{_new_initialization_code} . '
456             ' . $self->{_old_mapping_code} . '
457             $old_key = $new_key;
458             };
459 0           $nf[$tag_colis_href->{$fref->[' . $self->{_pivot_coli} . ']}] = ' . $value_value . ';
460             };
461             if (defined($old_key)) {
462             ' . $emit_nf_code . "
463             };
464             }\n";
465 0 0         print $loop if ($self->{_debug});
466 0           eval $loop;
467 0 0         if ($@) {
468             # propagate sort failure cleanly
469 0 0         if ($@ =~ /^$self->{_prog}/) {
470 0           croak "$@";
471             } else {
472 0           croak $self->{_prog} . ": internal eval error: $@.\n";
473             };
474             };
475             }
476              
477              
478             =head1 AUTHOR and COPYRIGHT
479              
480             Copyright (C) 2011-2015 by John Heidemann
481              
482             This program is distributed under terms of the GNU general
483             public license, version 2. See the file COPYING
484             with the distribution for details.
485              
486             =cut
487              
488             1;