File Coverage

blib/lib/Fsdb/IO/Reader.pm
Criterion Covered Total %
statement 15 202 7.4
branch 0 106 0.0
condition 0 24 0.0
subroutine 5 27 18.5
pod 13 13 100.0
total 33 372 8.8


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # Fsdb::IO::Reader.pm
5             # $Id: 2a2f291dc6b6a5e06727ae853281470c6a663aef $
6             #
7             # Copyright (C) 2005-2015 by John Heidemann
8             #
9             # This program is free software; you can redistribute it and/or
10             # modify it under the terms of the GNU General Public License,
11             # version 2, as published by the Free Software Foundation.
12             #
13             # This program is distributed in the hope that it will be useful,
14             # but WITHOUT ANY WARRANTY; without even the implied warranty of
15             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16             # GNU General Public License for more details.
17             #
18             # You should have received a copy of the GNU General Public License along
19             # with this program; if not, write to the Free Software Foundation, Inc.,
20             # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21             #
22              
23              
24             package Fsdb::IO::Reader;
25              
26             =head1 NAME
27              
28             Fsdb::IO::Reader - handle formatting reading from a fsdb file (handle) or queue
29              
30             =cut
31              
32             @ISA = qw(Fsdb::IO);
33             ($VERSION) = 1.1;
34              
35 2     2   15 use strict;
  2         24  
  2         71  
36 2     2   11 use IO::File;
  2         5  
  2         330  
37 2     2   15 use Carp;
  2         5  
  2         104  
38 2     2   783 use IO::Uncompress::AnyUncompress;
  2         135764  
  2         141  
39              
40 2     2   540 use Fsdb::IO;
  2         9  
  2         5446  
41              
42             =head1 SAMPLE CODE
43              
44             Sample code reading an input stream:
45              
46             $in = new Fsdb::IO::Reader(-file => '-');
47             $in->error and die "cannot open stdin as fsdb: " . $in->error . "\n";
48             my @arow;
49             while ($in->read_row_to_aref(\@arow) {
50             # do something
51             };
52             $in->close;
53              
54             =cut
55              
56             =head1 METHODS
57              
58             =head2 new
59              
60             $fsdb = new Fsdb::IO::Reader(-file => $filename);
61             $fsdb = new Fsdb::IO::Reader(-header => "#fsdb -F t foo bar", -fh => $file_handle);
62              
63             Creates a new reader object from FILENAME.
64             (FILENAME can also be a IO::Handle object.)
65             Always succeeds, but
66             check the C method to test for failure.
67              
68             =head3 Options:
69              
70             =over 4
71              
72             =item B
73             See also the options in Fsdb::IO, including
74             C<-file>, C<-header>.
75              
76             =item B<-file FILENAME>
77             Open and read the given filename.
78              
79             =item B<-comment_handler $ref>
80              
81             Define how comments are handled. If $REF is a Fsdb::IO::Writer
82             object, comments are written to that stream as they are encountered.
83             if $REF is a ref to a scalar, then we assume that scalar
84             will be filled in with a Fsdb::IO::Writer object later and treat
85             it the same.
86             If it is of type code, then it is assumed to be a callback function
87             of the form:
88              
89             sub comment_handler ($) { my $comment = @_; }
90              
91             where the one argument will be a string with the unparsed comment
92             (with leading # and trailing newline).
93              
94             By default, or if $ref is undef, comments are consumed.
95              
96             A typical handler if you have an output Fsdb stream is:
97              
98             sub { $out->write_raw(@_); };
99              
100             (That is the code created by L.)
101              
102             There are several support routines to handle comments in a pipeline;
103             see L,
104             L,
105             L.
106              
107             =back
108              
109             User-specified -header arguments override a header provided in the input source.
110              
111             =cut
112              
113             sub new {
114 0     0 1   my $class = shift @_;
115 0           my $self = $class->SUPER::new(@_);
116 0           bless $self, $class;
117             #
118             # new instance variables
119 0           $self->{_unreadq} = [];
120             # Could pass out the code so rowobj_sub propages down to fastpath.
121             # Skip that for now.
122             # $self->{_read_rowobj_code} = ' die; '; # placeholders
123 0     0     $self->{_read_rowobj_sub} = sub { die; };
  0            
124             #
125 0           $self->config(@_);
126             #
127             # setup:
128 0 0 0       if (! ($self->{_fh} || $self->{_queue})) {
129 0           $self->{_error} = "cannot setup filehandle";
130 0           return $self;
131             };
132 0 0 0       if ($self->{_fh} && ref($self->{_fh}) eq 'IO::Pipe') {
133             # don't do this if we're IO::Pipe::End, since it's already been done
134 0           $self->{_fh}->reader();
135             };
136 0           $self->comment_handler_to_sub;
137             # Note: reader/writer difference: readers have io subs before headers; writers only after.
138 0           $self->create_io_subs();
139              
140 0 0         if (!defined($self->{_headerrow})) {
141             # get the header from the file (must not have been specified by the user)
142 0           $self->read_headerrow;
143 0           $self->parse_headerrow;
144             };
145 0 0         if (defined($self->{_headerrow})) {
146 0           $self->{_header_set} = 1; # go read-only
147             # rebuild io subs in case the fscode changed
148 0           $self->create_io_subs();
149             } else {
150 0           $self->{_error} = "no header line";
151 0           return $self;
152             };
153              
154 0           return $self;
155             }
156              
157             =head2 config_one
158              
159             documented in new
160              
161             =cut
162             sub config_one {
163 0     0 1   my($self, $aaref) = @_;
164 0 0         if ($aaref->[0] eq '-file') {
    0          
165 0           shift @$aaref;
166 0           my($file) = shift @$aaref;
167 0           my $fh;
168 0           my $mode = $self->default_binmode();
169 0 0         if ($file eq '-') {
170 0           $fh = new IO::Handle;
171 0           $fh->fdopen(fileno(STDIN),"<");
172 0           binmode $fh, $mode;
173             } else {
174 0           $fh = new IO::File $file, "<$mode";
175             };
176 0 0         if ($fh) {
177 0           $self->{_fh} = $fh;
178             } else {
179 0           $self->{_error} = "cannot open $file";
180             };
181             } elsif ($aaref->[0] eq '-comment_handler') {
182 0           shift @$aaref;
183 0           $self->{_comment_handler} = shift @$aaref;
184 0           $self->comment_handler_to_sub;
185             } else {
186 0           $self->SUPER::config_one($aaref);
187             };
188             }
189              
190             =head2 comment_handler_to_sub;
191              
192             internal use only: parses and sets up the comment handle callback.
193             (At input, _comment_sub is as given by -comment_handler,
194             but at exit it is always an anon function.
195              
196             =cut
197             sub comment_handler_to_sub {
198 0     0 1   my($self) = @_;
199 0 0         if (!defined($self->{_comment_handler})) {
    0          
    0          
    0          
200             # just consume comments
201 0     0     $self->{_comment_sub} = sub {};
202             } elsif (ref($self->{_comment_handler}) eq 'CODE') {
203             # assume the user did the right thing passing in a sub
204 0           $self->{_comment_sub} = $self->{_comment_handler};
205             } elsif (ref($self->{_comment_handler}) =~ /^Fsdb::IO::Writer/) {
206             # write a pass-through
207 0     0     $self->{_comment_sub} = sub { $self->{_comment_handler}->write_raw(@_); }
208 0           } elsif (ref($self->{_comment_handler}) eq 'SCALAR') {
209             # write a pass-through, but with one level of indirection
210             # (This trick is necessary because often the Writer
211             # cannot be opened before the Reader is created.)
212 0     0     $self->{_comment_sub} = sub { ${$self->{_comment_handler}}->write_raw(@_); }
  0            
213 0           } else {
214 0           croak "correct_comment_handler: invalid -comment_handler argument\n";
215             };
216             }
217              
218             =head2 _enable_compression
219              
220             $self->_enable_compression
221              
222             internal use only: switch from uncompressed to compressed.
223              
224             =cut
225             sub _enable_compression($) {
226 0     0     my($self) = @_;
227 0 0         return if (!$self->{_compression});
228              
229 0           my $phy_fh = $self->{_fh};
230 0           binmode($phy_fh, ":raw");
231             $self->{_fh} = new IO::Uncompress::AnyUncompress $phy_fh
232 0 0         or croak "Fsdb::IO::Reader: cannot switch to compression " . $self->{_compression};
233             # xxx: we now should push our encoding onto this new fh,
234             # but not clear how IO::Uncompress handles that.
235             }
236              
237              
238             =head2 create_io_subs
239              
240             $self->create_io_subs()
241              
242             internal use only: create a thunk that returns rowobjs.
243              
244             =cut
245             sub create_io_subs() {
246 0     0 1   my($self) = @_;
247 0 0         return if ($self->{_error});
248 0 0 0       croak "confusion: too many IO sources" if (defined($self->{_fh}) && defined($self->{_queue}));
249 0 0         if (defined($self->{_fh})) {
    0          
250 0 0 0       $self->_enable_compression() if ($self->{_compression} && $self->{_header_set});
251             # need to unserialize data from a file handle
252 0 0 0       if ($self->{_rscode} eq 'D') {
    0          
253             #
254             # Normal line-by-line (rowized) format.
255             # Carefully optimized.
256             #
257 0           my $fh = $self->{_fh};
258 0           my $fsre = $self->{_fsre};
259             $self->{_read_rowobj_sub} = sub {
260 0     0     my $line = $fh->getline;
261 0 0         return undef if (!defined($line)); # eof
262 0 0         return $line if ($line =~ /^\s*\#/); # comment, no longer chomped;
263 0           chomp $line;
264             # Note that, technically, the next line is meaningless
265             # if we haven't yet parsed the header.
266             # We assume read_headerrow will sort that out adequately.
267 0           my @f = split(/$fsre/, $line);
268 0           return \@f; # a row
269 0           };
270             } elsif ($self->{_rscode} eq 'C' || $self->{_rscode} eq 'I') {
271             #
272             # Colized-format.
273             # Not particularly optimized.
274             #
275 0           my $fh = $self->{_fh};
276 0           my $fsre = $self->{_fsre};
277             # set up buffers for partial objects
278 0           $self->{_rowize_eof} = undef;
279 0           $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ];
280 0           $self->{_rowize_started_row} = undef;
281             $self->{_read_rowobj_sub} = sub {
282 0 0   0     return undef if ($self->{_rowize_eof});
283             # get a row
284 0           for (;;) {
285             # get a line to build up a full row
286 0           my $line = $fh->getline;
287 0 0         if (!defined($line)) {
288 0           $self->{_rowize_eof} = 1;
289 0           last; # exit infinite for
290             }; # eof
291 0 0         return $line if ($line =~ /^\s*\#/); # comment is fast-path return
292 0 0         if ($line =~ /^\s*$/) {
293 0 0         last if ($self->{_rowize_started_row});
294 0           next; # skip blank lines before content
295             };
296             # parse one field, carefully
297 0           my($key, $value) = ($line =~ /^([^:]+):\s+(.*)$/);
298 0 0         croak("unparsable line '$line' (format should be ''key: value''\n") if (!defined($key));
299 0 0         croak("contents of line contain column separator: <$line>, will correct\n") if ($value =~ /$fsre/);
300 0 0 0       $value = $self->{_empty} if (!defined($value) || $value eq '');
301 0           my $i = $self->{_cols_to_i}->{$key};
302 0 0         croak ("unknown column '$key' in '$line'.\n") if (!defined($i));
303 0           $self->{_rowize_partial_row}[$i] = $value;
304 0           $self->{_rowize_started_row} = 1;
305             };
306             # special case eof
307 0 0 0       return undef if ($self->{_rowize_eof} && !$self->{_rowize_started_row});
308             # now return the new row
309 0           my @f = @{$self->{_rowize_partial_row}}; # copy (maybe not needed?)
  0            
310 0           $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ]; # reset
311 0           $self->{_rowize_started_row} = undef;
312 0           return \@f;
313 0           };
314             } else {
315 0           croak "undefined rscode " . $self->{_rscode} . "\n";
316             };
317             } elsif (defined($self->{_queue})) {
318             # data is preformatted from a queue
319 0           my $queue = $self->{_queue};
320             $self->{_read_rowobj_sub} = sub {
321 0     0     return $queue->dequeue;
322 0           };
323             } else {
324 0           croak "confusion: no IO source\n";
325             };
326             }
327              
328              
329             =head2 read_headerrow
330              
331             internal use only; reads the header
332              
333             =cut
334             sub read_headerrow {
335 0     0 1   my($self) = @_;
336 0 0         return if ($self->{_error});
337 0           my $headerrow = &{$self->{_read_rowobj_sub}};
  0            
338             # Note special case: if ref($headerrow) than read_rowobj_sub
339             # parsed the line for us and it wasn't a comment. Bad user! No header!
340 0 0 0       if (!defined($headerrow) || ref($headerrow)) {
341 0           my $printable_hr = $headerrow;
342 0 0         if (!defined($printable_hr)) {
    0          
343 0           $printable_hr = "[EOF]";
344             } elsif (ref($printable_hr) ne 'SCALAR') {
345 0           $printable_hr = "$printable_hr";
346 0           $printable_hr =~ s/\(.*\)//;
347             } else {
348 0 0         $printable_hr = substr($printable_hr, 0, 200) . " ..."
349             if (length($printable_hr) > 200);
350 0           $printable_hr =~ s/[^[:print:]]+//g;
351             };
352 0           $self->{_error} = "no header line (saw: $printable_hr)";
353 0           return;
354             };
355             # Note: internally, headers are newlineless.
356 0           chomp $headerrow;
357 0           $self->{_headerrow} = $headerrow;
358             };
359              
360              
361             # =head2 read_attributes
362             #
363             # Read the attributes. Called automatically to get attributes,
364             # if any.
365             #
366             # =cut
367             # sub read_attributes {
368             # my($self) = @_;
369             # croak "double attribute read.\n" if ($self->{_attributes_set});
370             # $self->{_attributes_set} = 1;
371             #
372             # my $fref;
373             # while ($fref = $self->read_rowobj) {
374             # last if (!defined($fref)); # eof!
375             # last if (ref($fref)); # data (expected exit path)
376             # last if ($fref !~ /^#%\s+([^:])+:\s+(.*)$/);
377             # $self->{_attributes}{$1} = $2;
378             # };
379             # # put the last thing back
380             # $self->unread_rowobj($fref);
381             # # sigh, we now blown the fastpath :-(
382             # };
383             #
384             # =head2 check_attributes
385             #
386             # internal use only; check that attributes have been read.
387             # (for a writer, they always are)
388             #
389             # =cut
390             # sub check_attributes {
391             # return if ($self->{_attributes_set});
392             # if (!defined($self->{_headerrow})) {
393             # $self->read_headerrow;
394             # $self->parse_headerrow;
395             # };
396             # $self->read_attributes;
397             # }
398             #
399              
400              
401              
402             =head2 read_rowobj
403              
404             $rowobj = $fsdb->read_rowobj;
405              
406             Reads a line of input and returns a "row object",
407             either a scalar string for a comment or header,
408             or an array reference for a row,
409             or undef on end-of-stream.
410             This routine is the fastest way to do full-featured fsdb-formatted IO.
411             (Although see also Fsdb::Reader::fastpath_sub.)
412              
413             Unlike all the other routines (including fastpath_sub),
414             read_rowobj does not do comment processing (calling comment_sub).
415              
416             =cut
417             sub read_rowobj {
418 0     0 1   my($self) = @_;
419 0 0         return undef if (defined($self->{_error}));
420              
421             # first, check unread
422 0 0         if ($#{$self->{_unreadq}} >= 0) {
  0            
423 0           my $frontref = shift @{$self->{_unreadq}};
  0            
424 0           return $frontref;
425             };
426              
427 0           return &{$self->{_read_rowobj_sub}};
  0            
428             }
429              
430              
431             =head2 read_row_to_aref
432              
433             $fsdb->read_row_to_aref(\@a);
434              
435             Then $a[0] is the 0th column, etc.
436             Returns undef if the read fails, typically due to EOF.
437              
438             =cut
439              
440             sub read_row_to_aref {
441 0     0 1   my($self, $aref) = @_;
442              
443 0           while (1) {
444 0           my $rowobj = $self->read_rowobj;
445 0 0         if (!defined($rowobj)) {
    0          
446 0           return undef; # eof
447             } elsif (!ref($rowobj)) {
448             # comment
449 0           &{$self->{_comment_sub}}($rowobj);
  0            
450             } else {
451             # assert(ref($rowobj) eq 'ARRAY');
452 0           @$aref = @$rowobj;
453 0           return 1;
454             };
455             };
456             }
457              
458             =head2 unread_rowobj
459              
460             $fsdb->unread_rowobj($fref)
461              
462             Put an fref back into the stream.
463              
464             =cut
465              
466             sub unread_rowobj {
467 0     0 1   my($self, $fref) = @_;
468             croak "unread_fref attempted with active fastpath\n"
469 0 0         if ($self->{_fastpath_active});
470 0           unshift @{$self->{_unreadq}}, $fref;
  0            
471             }
472              
473             =head2 unread_row_from_aref
474              
475             $fsdb->unread_row_from_aref(\@a);
476              
477             Put array @a back into the file.
478              
479             =cut
480              
481             sub unread_row_from_aref {
482 0     0 1   my($self, $aref) = @_;
483             croak "unread_row_from_aref attempted with active fastpath\n"
484 0 0         if ($self->{_fastpath_active});
485 0           my @a = @$aref; # make a copy
486 0           unshift @{$self->{_unreadq}}, \@a;
  0            
487             }
488              
489             =head2 read_row_to_href
490              
491             $fsdb->read_row_to_href(\%h);
492              
493             Read the next row into hash C<%h>.
494             Then $h{'colname'} is the value of that column.
495             Returns undef if the read fails, typically due to EOF.
496              
497             =cut
498              
499             sub read_row_to_href {
500 0     0 1   my($self, $href) = @_;
501 0           my @a;
502 0 0         $self->read_row_to_aref(\@a) or return undef;
503 0           foreach my $i (0..$#{$self->{_cols}}) {
  0            
504 0           $href->{$self->{_cols}[$i]} = $a[$i];
505             };
506 0           return 1;
507             }
508              
509             =head2 unread_row_from_href
510              
511             $fsdb->unread_row_from_href(\%h);
512              
513             Put hash %h back into the file.
514              
515             =cut
516              
517             sub unread_row_from_href {
518 0     0 1   my($self, $href) = @_;
519 0           my @a = ('-' x $#{$self->{_cols}}); # null record
  0            
520 0           foreach (keys %$href) {
521 0           my($i) = $self->{_cols_to_i}->{$_};
522 0 0         defined($i) or croak "column name $_ is not in current file";
523 0           $a[$i] = $href->{$_};
524             };
525 0           $self->unread_row_from_aref(\@a);
526             }
527              
528              
529             =head2 fastpath_ok
530              
531             $fsdb->fastpath_ok();
532              
533             Check if we can do fast-path IO
534             (post-header, no pending unread rows, no errors).
535              
536             =cut
537             sub fastpath_ok {
538 0     0 1   my($self) = @_;
539              
540 0 0         return undef if (defined($self->{_error}));
541 0 0         return undef if (!defined($self->{_headerrow}));
542 0 0         return undef if ($#{$self->{_unreadq}} >= 0);
  0            
543 0           return 1;
544             }
545              
546             =head2 fastpath_sub
547              
548             $sub = $fsdb->fastpath_sub()
549             $row_aref = &$sub();
550              
551             Return an anonymous sub that does read fast-path when called.
552             This code stub returns a new $aref
553             corresponding with a data line,
554             and handles comments as specified by -comment_handler
555              
556             =cut
557             sub fastpath_sub {
558 0     0 1   my($self) = @_;
559              
560 0 0         $self->fastpath_ok or croak "not able to do read fastpath\n";
561 0           $self->{_fastpath_active} = 1;
562             # use lexical variables to emulate static to avoid object resolution
563             {
564 0           my $fh = $self->{_fh};
  0            
565 0           my $fsre = $self->{_fsre};
566 0           my $read_rowobj_sub = $self->{_read_rowobj_sub};
567 0           my $comment_sub = $self->{_comment_sub};
568 0 0         croak "Fsdb::IO::Reader::fastpath_sub missing comment handling subroutine.\n"
569             if (!defined($comment_sub));
570             # xxx: this code should track read_row_to_aref
571             my $fastpath = sub {
572 0     0     while (1) {
573 0           my $rowobj = &$read_rowobj_sub;
574 0 0         if (!defined($rowobj)) {
    0          
575 0           return undef; # eof
576             } elsif (!ref($rowobj)) {
577             # comment
578 0           &$comment_sub($rowobj);
579             } else {
580             # assert(ref($rowobj) eq 'ARRAY')
581 0           return $rowobj;
582             };
583             };
584 0           };
585             # for more visibility:
586             # $fastpath = sub { my @a:shared; $self->read_row_to_aref(\@a); return \@a; };
587 0           return $fastpath;
588             }
589             }
590              
591              
592             1;