File Coverage

blib/lib/Fsdb/IO/Reader.pm
Criterion Covered Total %
statement 15 202 7.4
branch 0 106 0.0
condition 0 24 0.0
subroutine 5 27 18.5
pod 13 13 100.0
total 33 372 8.8


line stmt bran cond sub pod time code
1             #!/usr/bin/perl -w
2              
3             #
4             # Fsdb::IO::Reader.pm
5             # $Id: 2a2f291dc6b6a5e06727ae853281470c6a663aef $
6             #
7             # Copyright (C) 2005-2015 by John Heidemann
8             #
9             # This program is free software; you can redistribute it and/or
10             # modify it under the terms of the GNU General Public License,
11             # version 2, as published by the Free Software Foundation.
12             #
13             # This program is distributed in the hope that it will be useful,
14             # but WITHOUT ANY WARRANTY; without even the implied warranty of
15             # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
16             # GNU General Public License for more details.
17             #
18             # You should have received a copy of the GNU General Public License along
19             # with this program; if not, write to the Free Software Foundation, Inc.,
20             # 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301, USA.
21             #
22              
23              
24             package Fsdb::IO::Reader;
25              
26             =head1 NAME
27              
28             Fsdb::IO::Reader - handle formatting reading from a fsdb file (handle) or queue
29              
30             =cut
31              
32             @ISA = qw(Fsdb::IO);
33             ($VERSION) = 1.1;
34              
35 2     2   9 use strict;
  2         2  
  2         64  
36 2     2   7 use IO::File;
  2         3  
  2         301  
37 2     2   10 use Carp;
  2         2  
  2         101  
38 2     2   1312 use IO::Uncompress::AnyUncompress;
  2         160127  
  2         142  
39              
40 2     2   1576 use Fsdb::IO;
  2         5  
  2         4317  
41              
42             =head1 SAMPLE CODE
43              
44             Sample code reading an input stream:
45              
46             $in = new Fsdb::IO::Reader(-file => '-');
47             $in->error and die "cannot open stdin as fsdb: " . $in->error . "\n";
48             my @arow;
49             while ($in->read_row_to_aref(\@arow) {
50             # do something
51             };
52             $in->close;
53              
54             =cut
55              
56             =head1 METHODS
57              
58             =head2 new
59              
60             $fsdb = new Fsdb::IO::Reader(-file => $filename);
61             $fsdb = new Fsdb::IO::Reader(-header => "#fsdb -F t foo bar", -fh => $file_handle);
62              
63             Creates a new reader object from FILENAME.
64             (FILENAME can also be a IO::Handle object.)
65             Always succeeds, but
66             check the C method to test for failure.
67              
68             =head3 Options:
69              
70             =over 4
71              
72             =item B
73             See also the options in Fsdb::IO, including
74             C<-file>, C<-header>.
75              
76             =item B<-file FILENAME>
77             Open and read the given filename.
78              
79             =item B<-comment_handler $ref>
80              
81             Define how comments are handled. If $REF is a Fsdb::IO::Writer
82             object, comments are written to that stream as they are encountered.
83             if $REF is a ref to a scalar, then we assume that scalar
84             will be filled in with a Fsdb::IO::Writer object later and treat
85             it the same.
86             If it is of type code, then it is assumed to be a callback function
87             of the form:
88              
89             sub comment_handler ($) { my $comment = @_; }
90              
91             where the one argument will be a string with the unparsed comment
92             (with leading # and trailing newline).
93              
94             By default, or if $ref is undef, comments are consumed.
95              
96             There are several support routines to handle comments in a pipeline;
97             see L,
98             L,
99             L.
100              
101             =back
102              
103             User-specified -header arguments override a header provided in the input source.
104              
105             =cut
106              
107             sub new {
108 0     0 1   my $class = shift @_;
109 0           my $self = $class->SUPER::new(@_);
110 0           bless $self, $class;
111             #
112             # new instance variables
113 0           $self->{_unreadq} = [];
114             # Could pass out the code so rowobj_sub propages down to fastpath.
115             # Skip that for now.
116             # $self->{_read_rowobj_code} = ' die; '; # placeholders
117 0     0     $self->{_read_rowobj_sub} = sub { die; };
  0            
118             #
119 0           $self->config(@_);
120             #
121             # setup:
122 0 0 0       if (! ($self->{_fh} || $self->{_queue})) {
123 0           $self->{_error} = "cannot setup filehandle";
124 0           return $self;
125             };
126 0 0 0       if ($self->{_fh} && ref($self->{_fh}) eq 'IO::Pipe') {
127             # don't do this if we're IO::Pipe::End, since it's already been done
128 0           $self->{_fh}->reader();
129             };
130 0           $self->comment_handler_to_sub;
131             # Note: reader/writer difference: readers have io subs before headers; writers only after.
132 0           $self->create_io_subs();
133              
134 0 0         if (!defined($self->{_headerrow})) {
135             # get the header from the file (must not have been specified by the user)
136 0           $self->read_headerrow;
137 0           $self->parse_headerrow;
138             };
139 0 0         if (defined($self->{_headerrow})) {
140 0           $self->{_header_set} = 1; # go read-only
141             # rebuild io subs in case the fscode changed
142 0           $self->create_io_subs();
143             } else {
144 0           $self->{_error} = "no header line";
145 0           return $self;
146             };
147              
148 0           return $self;
149             }
150              
151             =head2 config_one
152              
153             documented in new
154              
155             =cut
156             sub config_one {
157 0     0 1   my($self, $aaref) = @_;
158 0 0         if ($aaref->[0] eq '-file') {
    0          
159 0           shift @$aaref;
160 0           my($file) = shift @$aaref;
161 0           my $fh;
162 0           my $mode = $self->default_binmode();
163 0 0         if ($file eq '-') {
164 0           $fh = new IO::Handle;
165 0           $fh->fdopen(fileno(STDIN),"<");
166 0           binmode $fh, $mode;
167             } else {
168 0           $fh = new IO::File $file, "<$mode";
169             };
170 0 0         if ($fh) {
171 0           $self->{_fh} = $fh;
172             } else {
173 0           $self->{_error} = "cannot open $file";
174             };
175             } elsif ($aaref->[0] eq '-comment_handler') {
176 0           shift @$aaref;
177 0           $self->{_comment_handler} = shift @$aaref;
178 0           $self->comment_handler_to_sub;
179             } else {
180 0           $self->SUPER::config_one($aaref);
181             };
182             }
183              
184             =head2 comment_handler_to_sub;
185              
186             internal use only: parses and sets up the comment handle callback.
187             (At input, _comment_sub is as given by -comment_handler,
188             but at exit it is always an anon function.
189              
190             =cut
191             sub comment_handler_to_sub {
192 0     0 1   my($self) = @_;
193 0 0         if (!defined($self->{_comment_handler})) {
    0          
    0          
    0          
194             # just consume comments
195 0     0     $self->{_comment_sub} = sub {};
196             } elsif (ref($self->{_comment_handler}) eq 'CODE') {
197             # assume the user did the right thing passing in a sub
198 0           $self->{_comment_sub} = $self->{_comment_handler};
199             } elsif (ref($self->{_comment_handler}) =~ /^Fsdb::IO::Writer/) {
200             # write a pass-through
201 0     0     $self->{_comment_sub} = sub { $self->{_comment_handler}->write_raw(@_); }
202 0           } elsif (ref($self->{_comment_handler}) eq 'SCALAR') {
203             # write a pass-through, but with one level of indirection
204             # (This trick is necessary because often the Writer
205             # cannot be opened before the Reader is created.)
206 0     0     $self->{_comment_sub} = sub { ${$self->{_comment_handler}}->write_raw(@_); }
  0            
207 0           } else {
208 0           croak "correct_comment_handler: invalid -comment_handler argument\n";
209             };
210             }
211              
212             =head2 _enable_compression
213              
214             $self->_enable_compression
215              
216             internal use only: switch from uncompressed to compressed.
217              
218             =cut
219             sub _enable_compression($) {
220 0     0     my($self) = @_;
221 0 0         return if (!$self->{_compression});
222              
223 0           my $phy_fh = $self->{_fh};
224 0           binmode($phy_fh, ":raw");
225             $self->{_fh} = new IO::Uncompress::AnyUncompress $phy_fh
226 0 0         or croak "Fsdb::IO::Reader: cannot switch to compression " . $self->{_compression};
227             # xxx: we now should push our encoding onto this new fh,
228             # but not clear how IO::Uncompress handles that.
229             }
230              
231              
232             =head2 create_io_subs
233              
234             $self->create_io_subs()
235              
236             internal use only: create a thunk that returns rowobjs.
237              
238             =cut
239             sub create_io_subs() {
240 0     0 1   my($self) = @_;
241 0 0         return if ($self->{_error});
242 0 0 0       croak "confusion: too many IO sources" if (defined($self->{_fh}) && defined($self->{_queue}));
243 0 0         if (defined($self->{_fh})) {
    0          
244 0 0 0       $self->_enable_compression() if ($self->{_compression} && $self->{_header_set});
245             # need to unserialize data from a file handle
246 0 0 0       if ($self->{_rscode} eq 'D') {
    0          
247             #
248             # Normal line-by-line (rowized) format.
249             # Carefully optimized.
250             #
251 0           my $fh = $self->{_fh};
252 0           my $fsre = $self->{_fsre};
253             $self->{_read_rowobj_sub} = sub {
254 0     0     my $line = $fh->getline;
255 0 0         return undef if (!defined($line)); # eof
256 0 0         return $line if ($line =~ /^\s*\#/); # comment, no longer chomped;
257 0           chomp $line;
258             # Note that, technically, the next line is meaningless
259             # if we haven't yet parsed the header.
260             # We assume read_headerrow will sort that out adequately.
261 0           my @f = split(/$fsre/, $line);
262 0           return \@f; # a row
263 0           };
264             } elsif ($self->{_rscode} eq 'C' || $self->{_rscode} eq 'I') {
265             #
266             # Colized-format.
267             # Not particularly optimized.
268             #
269 0           my $fh = $self->{_fh};
270 0           my $fsre = $self->{_fsre};
271             # set up buffers for partial objects
272 0           $self->{_rowize_eof} = undef;
273 0           $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ];
274 0           $self->{_rowize_started_row} = undef;
275             $self->{_read_rowobj_sub} = sub {
276 0 0   0     return undef if ($self->{_rowize_eof});
277             # get a row
278 0           for (;;) {
279             # get a line to build up a full row
280 0           my $line = $fh->getline;
281 0 0         if (!defined($line)) {
282 0           $self->{_rowize_eof} = 1;
283 0           last; # exit infinite for
284             }; # eof
285 0 0         return $line if ($line =~ /^\s*\#/); # comment is fast-path return
286 0 0         if ($line =~ /^\s*$/) {
287 0 0         last if ($self->{_rowize_started_row});
288 0           next; # skip blank lines before content
289             };
290             # parse one field, carefully
291 0           my($key, $value) = ($line =~ /^([^:]+):\s+(.*)$/);
292 0 0         croak("unparsable line '$line' (format should be ''key: value''\n") if (!defined($key));
293 0 0         croak("contents of line contain column separator: <$line>, will correct\n") if ($value =~ /$fsre/);
294 0 0 0       $value = $self->{_empty} if (!defined($value) || $value eq '');
295 0           my $i = $self->{_cols_to_i}->{$key};
296 0 0         croak ("unknown column '$key' in '$line'.\n") if (!defined($i));
297 0           $self->{_rowize_partial_row}[$i] = $value;
298 0           $self->{_rowize_started_row} = 1;
299             };
300             # special case eof
301 0 0 0       return undef if ($self->{_rowize_eof} && !$self->{_rowize_started_row});
302             # now return the new row
303 0           my @f = @{$self->{_rowize_partial_row}}; # copy (maybe not needed?)
  0            
304 0           $self->{_rowize_partial_row} = [ ($self->{_empty}) x ($self->ncols) ]; # reset
305 0           $self->{_rowize_started_row} = undef;
306 0           return \@f;
307 0           };
308             } else {
309 0           croak "undefined rscode " . $self->{_rscode} . "\n";
310             };
311             } elsif (defined($self->{_queue})) {
312             # data is preformatted from a queue
313 0           my $queue = $self->{_queue};
314             $self->{_read_rowobj_sub} = sub {
315 0     0     return $queue->dequeue;
316 0           };
317             } else {
318 0           croak "confusion: no IO source\n";
319             };
320             }
321              
322              
323             =head2 read_headerrow
324              
325             internal use only; reads the header
326              
327             =cut
328             sub read_headerrow {
329 0     0 1   my($self) = @_;
330 0 0         return if ($self->{_error});
331 0           my $headerrow = &{$self->{_read_rowobj_sub}};
  0            
332             # Note special case: if ref($headerrow) than read_rowobj_sub
333             # parsed the line for us and it wasn't a comment. Bad user! No header!
334 0 0 0       if (!defined($headerrow) || ref($headerrow)) {
335 0           my $printable_hr = $headerrow;
336 0 0         if (!defined($printable_hr)) {
    0          
337 0           $printable_hr = "[EOF]";
338             } elsif (ref($printable_hr) ne 'SCALAR') {
339 0           $printable_hr = "$printable_hr";
340 0           $printable_hr =~ s/\(.*\)//;
341             } else {
342 0 0         $printable_hr = substr($printable_hr, 0, 200) . " ..."
343             if (length($printable_hr) > 200);
344 0           $printable_hr =~ s/[^[:print:]]+//g;
345             };
346 0           $self->{_error} = "no header line (saw: $printable_hr)";
347 0           return;
348             };
349             # Note: internally, headers are newlineless.
350 0           chomp $headerrow;
351 0           $self->{_headerrow} = $headerrow;
352             };
353              
354              
355             # =head2 read_attributes
356             #
357             # Read the attributes. Called automatically to get attributes,
358             # if any.
359             #
360             # =cut
361             # sub read_attributes {
362             # my($self) = @_;
363             # croak "double attribute read.\n" if ($self->{_attributes_set});
364             # $self->{_attributes_set} = 1;
365             #
366             # my $fref;
367             # while ($fref = $self->read_rowobj) {
368             # last if (!defined($fref)); # eof!
369             # last if (ref($fref)); # data (expected exit path)
370             # last if ($fref !~ /^#%\s+([^:])+:\s+(.*)$/);
371             # $self->{_attributes}{$1} = $2;
372             # };
373             # # put the last thing back
374             # $self->unread_rowobj($fref);
375             # # sigh, we now blown the fastpath :-(
376             # };
377             #
378             # =head2 check_attributes
379             #
380             # internal use only; check that attributes have been read.
381             # (for a writer, they always are)
382             #
383             # =cut
384             # sub check_attributes {
385             # return if ($self->{_attributes_set});
386             # if (!defined($self->{_headerrow})) {
387             # $self->read_headerrow;
388             # $self->parse_headerrow;
389             # };
390             # $self->read_attributes;
391             # }
392             #
393              
394              
395              
396             =head2 read_rowobj
397              
398             $rowobj = $fsdb->read_rowobj;
399              
400             Reads a line of input and returns a "row object",
401             either a scalar string for a comment or header,
402             or an array reference for a row,
403             or undef on end-of-stream.
404             This routine is the fastest way to do full-featured fsdb-formatted IO.
405             (Although see also Fsdb::Reader::fastpath_sub.)
406              
407             Unlike all the other routines (including fastpath_sub),
408             read_rowobj does not do comment processing (calling comment_sub).
409              
410             =cut
411             sub read_rowobj {
412 0     0 1   my($self) = @_;
413 0 0         return undef if (defined($self->{_error}));
414              
415             # first, check unread
416 0 0         if ($#{$self->{_unreadq}} >= 0) {
  0            
417 0           my $frontref = shift @{$self->{_unreadq}};
  0            
418 0           return $frontref;
419             };
420              
421 0           return &{$self->{_read_rowobj_sub}};
  0            
422             }
423              
424              
425             =head2 read_row_to_aref
426              
427             $fsdb->read_row_to_aref(\@a);
428              
429             Then $a[0] is the 0th column, etc.
430             Returns undef if the read fails, typically due to EOF.
431              
432             =cut
433              
434             sub read_row_to_aref {
435 0     0 1   my($self, $aref) = @_;
436              
437 0           while (1) {
438 0           my $rowobj = $self->read_rowobj;
439 0 0         if (!defined($rowobj)) {
    0          
440 0           return undef; # eof
441             } elsif (!ref($rowobj)) {
442             # comment
443 0           &{$self->{_comment_sub}}($rowobj);
  0            
444             } else {
445             # assert(ref($rowobj) eq 'ARRAY');
446 0           @$aref = @$rowobj;
447 0           return 1;
448             };
449             };
450             }
451              
452             =head2 unread_rowobj
453              
454             $fsdb->unread_rowobj($fref)
455              
456             Put an fref back into the stream.
457              
458             =cut
459              
460             sub unread_rowobj {
461 0     0 1   my($self, $fref) = @_;
462             croak "unread_fref attempted with active fastpath\n"
463 0 0         if ($self->{_fastpath_active});
464 0           unshift @{$self->{_unreadq}}, $fref;
  0            
465             }
466              
467             =head2 unread_row_from_aref
468              
469             $fsdb->unread_row_from_aref(\@a);
470              
471             Put array @a back into the file.
472              
473             =cut
474              
475             sub unread_row_from_aref {
476 0     0 1   my($self, $aref) = @_;
477             croak "unread_row_from_aref attempted with active fastpath\n"
478 0 0         if ($self->{_fastpath_active});
479 0           my @a = @$aref; # make a copy
480 0           unshift @{$self->{_unreadq}}, \@a;
  0            
481             }
482              
483             =head2 read_row_to_href
484              
485             $fsdb->read_row_to_href(\%h);
486              
487             Read the next row into hash C<%h>.
488             Then $h{'colname'} is the value of that column.
489             Returns undef if the read fails, typically due to EOF.
490              
491             =cut
492              
493             sub read_row_to_href {
494 0     0 1   my($self, $href) = @_;
495 0           my @a;
496 0 0         $self->read_row_to_aref(\@a) or return undef;
497 0           foreach my $i (0..$#{$self->{_cols}}) {
  0            
498 0           $href->{$self->{_cols}[$i]} = $a[$i];
499             };
500 0           return 1;
501             }
502              
503             =head2 unread_row_from_href
504              
505             $fsdb->unread_row_from_href(\%h);
506              
507             Put hash %h back into the file.
508              
509             =cut
510              
511             sub unread_row_from_href {
512 0     0 1   my($self, $href) = @_;
513 0           my @a = ('-' x $#{$self->{_cols}}); # null record
  0            
514 0           foreach (keys %$href) {
515 0           my($i) = $self->{_cols_to_i}->{$_};
516 0 0         defined($i) or croak "column name $_ is not in current file";
517 0           $a[$i] = $href->{$_};
518             };
519 0           $self->unread_row_from_aref(\@a);
520             }
521              
522              
523             =head2 fastpath_ok
524              
525             $fsdb->fastpath_ok();
526              
527             Check if we can do fast-path IO
528             (post-header, no pending unread rows, no errors).
529              
530             =cut
531             sub fastpath_ok {
532 0     0 1   my($self) = @_;
533              
534 0 0         return undef if (defined($self->{_error}));
535 0 0         return undef if (!defined($self->{_headerrow}));
536 0 0         return undef if ($#{$self->{_unreadq}} >= 0);
  0            
537 0           return 1;
538             }
539              
540             =head2 fastpath_sub
541              
542             $sub = $fsdb->fastpath_sub()
543             $row_aref = &$sub();
544              
545             Return an anonymous sub that does read fast-path when called.
546             This code stub returns a new $aref
547             corresponding with a data line,
548             and handles comments as specified by -comment_handler
549              
550             =cut
551             sub fastpath_sub {
552 0     0 1   my($self) = @_;
553              
554 0 0         $self->fastpath_ok or croak "not able to do read fastpath\n";
555 0           $self->{_fastpath_active} = 1;
556             # use lexical variables to emulate static to avoid object resolution
557             {
558 0           my $fh = $self->{_fh};
  0            
559 0           my $fsre = $self->{_fsre};
560 0           my $read_rowobj_sub = $self->{_read_rowobj_sub};
561 0           my $comment_sub = $self->{_comment_sub};
562 0 0         croak "Fsdb::IO::Reader::fastpath_sub missing comment handling subroutine.\n"
563             if (!defined($comment_sub));
564             # xxx: this code should track read_row_to_aref
565             my $fastpath = sub {
566 0     0     while (1) {
567 0           my $rowobj = &$read_rowobj_sub;
568 0 0         if (!defined($rowobj)) {
    0          
569 0           return undef; # eof
570             } elsif (!ref($rowobj)) {
571             # comment
572 0           &$comment_sub($rowobj);
573             } else {
574             # assert(ref($rowobj) eq 'ARRAY')
575 0           return $rowobj;
576             };
577             };
578 0           };
579             # for more visibility:
580             # $fastpath = sub { my @a:shared; $self->read_row_to_aref(\@a); return \@a; };
581 0           return $fastpath;
582             }
583             }
584              
585              
586             1;