File Coverage

blib/lib/Fsdb/Filter/dbsubprocess.pm
Criterion Covered Total %
statement 27 113 23.8
branch 0 50 0.0
condition 0 9 0.0
subroutine 9 23 39.1
pod 6 6 100.0
total 42 201 20.9


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # dbsubprocess.pm
5             # Copyright (C) 1991-2015 by John Heidemann
6             # $Id: 630ba67be630dfda1dae8d724df2877a8f37a54e $
7             #
8             # This program is distributed under terms of the GNU general
9             # public license, version 2. See the file COPYING
10             # in $dblibdir for details.
11             #
12              
13             package Fsdb::Filter::dbsubprocess;
14              
15             =head1 NAME
16              
17             dbsubprocess - invoke a subprocess as a Fsdb filter object
18              
19             =head1 SYNOPSIS
20              
21             dbsubprocess [--] program [arguments...]
22              
23             =head1 DESCRIPTION
24              
25             Run PROGRAM as a process, with optional ARGUMENTS as program arguments,
26             feeding its standard input and standard output
27             as fsdb streams. A "--" can separate arguments to dbsubprocess
28             from the program and its arguments.
29              
30             As with similar tools, like open2, the caller is expected to take
31             care that the subprocess does not deadlock.
32              
33             This program is primarily for internal use by dbmapreduce.
34              
35             Like L, L program does have a
36             Unix command; instead it is used only from within Perl.
37              
38             =head1 OPTIONS
39              
40             =over 4
41              
42             =item B<-w> or B<--warnings>
43              
44             Enable warnings in user supplied code.
45             (Default to include warnings.)
46              
47             =item B<-E> or B<--endsub> SUB
48              
49             Call Perl SUB when the subprocess terminates.
50             The sub runs in the parent and is a Fred ending sub, see
51             L.
52              
53             =back
54              
55             =for comment
56             begin_standard_fsdb_options
57              
58             and the standard fsdb options:
59              
60             =over 4
61              
62             =item B<-d>
63              
64             Enable debugging output.
65              
66             =item B<-i> or B<--input> InputSource
67              
68             Read from InputSource, typically a file, or - for standard input,
69             or (if in Perl) a IO::Handle, Fsdb::IO objects.
70             (For this case, it cannot be Fsdb::BoundedQueue).
71              
72             =item B<-o> or B<--output> OutputDestination
73              
74             Write to OutputDestination, typically a file, or - for standard output,
75             or (if in Perl) a IO::Handle, Fsdb::IO or Fsdb::BoundedQueue objects.
76              
77             =item B<--autorun> or B<--noautorun>
78              
79             By default, programs process automatically,
80             but Fsdb::Filter objects in Perl do not run until you invoke
81             the run() method.
82             The C<--(no)autorun> option controls that behavior within Perl.
83              
84             =item B<--help>
85              
86             Show help.
87              
88             =item B<--man>
89              
90             Show full manual.
91              
92             =back
93              
94             =for comment
95             end_standard_fsdb_options
96              
97              
98             =head1 SAMPLE USAGE
99              
100             =head2 Input:
101              
102             #fsdb name id test1
103             a 1 80
104             b 2 70
105             c 3 65
106             d 4 90
107             e 5 70
108             f 6 90
109              
110             =head2 Command:
111              
112             the following perl code:
113              
114             use Fsdb::Filter::dbsubprocess;
115             my $f = new Fsdb::Filter::dbsubprocess(qw(cat));
116             $f->setup_run_finish;
117             exit 0;
118              
119             =head2 Output:
120              
121             #fsdb name id test1
122             a 1 80
123             b 2 70
124             c 3 65
125             d 4 90
126             e 5 70
127             f 6 90
128             # | dbsubprocess cat
129              
130             =head1 SEE ALSO
131              
132             L,
133             L
134              
135             =head1 CLASS FUNCTIONS
136              
137             =cut
138              
139             @ISA = qw(Fsdb::Filter);
140             ($VERSION) = 2.0;
141              
142 1     1   5 use strict;
  1         7  
  1         22  
143 1     1   4 use Pod::Usage;
  1         3  
  1         60  
144             # use IPC::Open2;
145 1     1   4 use Carp;
  1         1  
  1         36  
146 1     1   4 use IO::Pipe;
  1         1  
  1         16  
147              
148 1     1   3 use Fsdb::Support::Freds;
  1         1  
  1         14  
149 1     1   3 use Fsdb::Filter;
  1         1  
  1         13  
150 1     1   3 use Fsdb::Filter::dbfilecat;
  1         1  
  1         17  
151 1     1   3 use Fsdb::IO::Reader;
  1         2  
  1         13  
152 1     1   3 use Fsdb::IO::Writer;
  1         1  
  1         963  
153              
154              
155             =head2 new
156              
157             $filter = new Fsdb::Filter::dbsubprocess(@arguments);
158              
159             Create a new dbsubprocess object, taking command-line arguments.
160              
161             =cut
162              
163             sub new {
164 0     0 1   my $class = shift @_;
165 0           my $self = $class->SUPER::new(@_);
166 0           bless $self, $class;
167 0           $self->set_defaults;
168 0           $self->parse_options(@_);
169 0           $self->SUPER::post_new();
170 0           return $self;
171             }
172              
173              
174             =head2 set_defaults
175              
176             $filter->set_defaults();
177              
178             Internal: set up defaults.
179              
180             =cut
181              
182             sub set_defaults ($) {
183 0     0 1   my($self) = @_;
184 0           $self->SUPER::set_defaults();
185 0           $self->{_external_command_argv} = [];
186 0           $self->{_warnings} = 1;
187 0           $self->{_ending_sub} = undef;
188             }
189              
190             =head2 parse_options
191              
192             $filter->parse_options(@ARGV);
193              
194             Internal: parse options
195              
196             =cut
197              
198             sub parse_options ($@) {
199 0     0 1   my $self = shift @_;
200              
201 0           my(@argv) = @_;
202             $self->get_options(
203             \@argv,
204 0     0     'help|?' => sub { pod2usage(1); },
205 0     0     'man' => sub { pod2usage(-verbose => 2); },
206             'autorun!' => \$self->{_autorun},
207             'close!' => \$self->{_close},
208             'd|debug+' => \$self->{_debug},
209             'E|endsub=s' => \$self->{_ending_sub},
210 0     0     'i|input=s' => sub { $self->parse_io_option('input', @_); },
211             'log!' => \$self->{_logprog},
212 0     0     'o|output=s' => sub { $self->parse_io_option('output', @_); },
213             'saveoutput=s' => \$self->{_save_output},
214             'w|warnings!' => \$self->{_warnings},
215 0 0         ) or pod2usage(2);
216 0           push (@{$self->{_external_command_argv}}, @argv);
  0            
217             }
218              
219             =head2 setup
220              
221             $filter->setup();
222              
223             Internal: setup, parse headers.
224              
225             =cut
226              
227             sub setup ($) {
228 0     0 1   my($self) = @_;
229              
230 0           shift @{$self->{_external_command_argv}}
231 0 0 0       if ($#{$self->{_external_command_argv}} >= 0 && $self->{_external_command_argv}[0] eq '--');
  0            
232             croak $self->{_prog} . ": no program given.\n"
233 0 0         if ($#{$self->{_external_command_argv}} < 0);
  0            
234              
235 0           my $input_ref = ref($self->{_input});
236 0 0 0       if ($input_ref =~ /^Fsdb::BoundedQueue/) {
    0          
    0          
    0          
    0          
237 0           croak $self->{_prog} . ": cannot handle BoundedQueue any more.\n"
238             } elsif ($input_ref =~ /^IO::/) {
239 0           $self->{_in_fileno} = $self->{_input}->fileno;
240             } elsif ($input_ref =~ /^Fsdb::IO::Reader/) {
241             # start up a converter Fred
242 0           my $pipe = new IO::Pipe;
243 0 0         croak $self->{_prog} . ": error opening pipe.\n"
244             if ($pipe->error);
245 0           my $input = $self->{_input};
246             my $input_fred = new Fsdb::Support::Freds('dbsubprocess_Fsdb::IO::Reader_converter',
247             sub {
248 0     0     $pipe->writer();
249 0           new Fsdb::Filter::dbfilecat(
250             '--autorun',
251             '--nolog',
252             '--input' => $input,
253             '--output' => $pipe);
254 0           exit 0;
255 0           });
256 0           $self->{_input_fred} = $input_fred;
257 0           $pipe->reader();
258 0           $self->{_in_fileno} = $pipe->fileno;
259             } elsif ($input_ref eq '' && $self->{_input} eq '-') {
260 0           $self->{_in_fileno} = 0; # stdin
261             } elsif ($input_ref eq '') {
262             # a file
263 0           my $fh = IO::File->new($self->{_input}, "r");
264 0           $fh->binmode;
265 0           $self->{_in_fileno} = $fh->fileno;
266             } else {
267 0           croak $self->{_prog} . ": unknown input method (ref: $input_ref).\n"
268             };
269              
270 0           my $output_ref = ref($self->{_output});
271 0 0 0       if ($output_ref =~ /^Fsdb::BoundedQueue/) {
    0          
    0          
    0          
    0          
272 0           croak $self->{_prog} . ": cannot handle BoundedQueue any more.\n"
273             } elsif ($output_ref =~ /^IO::/) {
274 0           $self->{_out_fileno} = $self->{_output}->fileno;
275             } elsif ($output_ref =~ /^Fsdb::IO::Writer/) {
276 0           croak $self->{_prog} . ": cannot handle Fsdb::IO::Writer yet.\n"
277             } elsif ($output_ref eq '' && $self->{_output} eq '-') {
278 0           $self->{_out_fileno} = 1; # stdout
279             } elsif ($output_ref eq '') {
280             # a file
281 0           my $fh = IO::File->new($self->{_output}, "w");
282 0           $fh->binmode;
283 0 0         croak $self->{_prog} . ": cannot open output file: " . $self->{_output} . ".\n"
284             if ($fh->error);
285 0           $self->{_out_fileno} = $fh->fileno;
286             croak $self->{_prog} . ": strangely unset fileno for output file: " . $self->{_output} . ".\n"
287 0 0         if (!defined($self->{_out_fileno}));
288             } else {
289 0           croak $self->{_prog} . ": unknown output method.\n"
290             };
291             }
292              
293             =head2 run
294              
295             $filter->run();
296              
297             Internal: run over all data rows.
298              
299             =cut
300             sub run ($) {
301 0     0 1   my($self) = @_;
302              
303             # catch sigpipe for failure cases in the child
304 0 0         if ($self->{_warnings}) {
305             $SIG{'PIPE'} = sub {
306 0     0     warn $self->{_prog} . ": external dbmapreduce reduce program exited with SIGPIPE (" . join(" ", @{$self->{_external_command_argv}}) . "), probably not consuming all input.\n";
  0            
307 0           };
308             } else {
309 0     0     $SIG{'PIPE'} = sub { };
310             };
311              
312             #
313             # run the subproc
314             # most of this is cribbed from IPC::Open2, but simplified.
315             #
316 0           my $child_rdr_fd = $self->{_in_fileno};
317 0 0         croak $self->{_prog} . ": internal error, in_fileno not ready.\n" if (!defined($child_rdr_fd));
318 0           my $child_wtr_fd = $self->{_out_fileno};
319 0 0         croak $self->{_prog} . ": internal error, out_fileno not ready.\n" if (!defined($child_wtr_fd));
320 0           my $args_ref = \@{$self->{_external_command_argv}};
  0            
321             my $fred = new Fsdb::Support::Freds('dbsubprocess',
322             sub {
323             # in child
324 0     0     untie *STDIN;
325 0           untie *STDOUT;
326 0 0         open \*STDIN, "<&=", $child_rdr_fd or croak $self->{_prog} . ": cannot reopen stdin from $child_rdr_fd\n";
327 0 0         open \*STDOUT, ">&=", $child_wtr_fd or croak $self->{_prog} . ": cannot reopen stdout to $child_wtr_fd\n";
328             # ignore stderr
329 0 0         exec @$args_ref or croak $self->{_prog} . ": cannot exec: " . join(" ", @$args_ref) . "\n";
330             # never returns, either way.
331 0           die; # just in case
332 0           }, $self->{_ending_sub});
333 0           $self->{_fred} = $fred;
334             }
335              
336             =head2 finish
337              
338             $filter->finish();
339              
340             Internal: run over all data rows.
341              
342             =cut
343             sub finish($) {
344 0     0 1   my($self) = @_;
345              
346             # and reap the subprocess
347 0           foreach my $fred ($self->{_input_fred}, $self->{_fred}) {
348 0 0         if (defined($fred)) {
349 0           $fred->join();
350 0 0         croak $self->{_prog} . ": fred failed: " . $fred->error()
351             if ($fred->error());
352             };
353             };
354             # fake up _out
355             my $out = IO::Handle->new_from_fd($self->{_out_fileno}, "w")
356 0 0         or croak $self->{_prog} . ": cannot write log\n";
357 0           $self->{_out} = $out;
358 0           $self->SUPER::finish(); # will close it
359             # $out->print("# " . $self->compute_program_log() . "\n");
360             }
361              
362             =head1 AUTHOR and COPYRIGHT
363              
364             Copyright (C) 1991-2015 by John Heidemann
365              
366             This program is distributed under terms of the GNU general
367             public license, version 2. See the file COPYING
368             with the distribution for details.
369              
370             =cut
371              
372             1;
373