File Coverage

blib/lib/Paranoid/IO/FileMultiplexer.pm
Criterion Covered Total %
statement 462 499 92.5
branch 125 206 60.6
condition 44 100 44.0
subroutine 46 46 100.0
pod 10 10 100.0
total 687 861 79.7


line stmt bran cond sub pod time code
1             # Paranoid::IO::FileMultiplexer -- File Multiplexer Object
2             #
3             # $Id: lib/Paranoid/IO/FileMultiplexer.pm, 2.10 2022/03/08 00:01:04 acorliss Exp $
4             #
5             # This software is free software. Similar to Perl, you can redistribute it
6             # and/or modify it under the terms of either:
7             #
8             # a) the GNU General Public License
9             # as published by the
10             # Free Software Foundation ; either version 1
11             # , or any later version
12             # , or
13             # b) the Artistic License 2.0
14             # ,
15             #
16             # subject to the following additional term: No trademark rights to
17             # "Paranoid" have been or are conveyed under any of the above licenses.
18             # However, "Paranoid" may be used fairly to describe this unmodified
19             # software, in good faith, but not as a trademark.
20             #
21             # (c) 2005 - 2021, Arthur Corliss (corliss@digitalmages.com)
22             # (tm) 2008 - 2021, Paranoid Inc. (www.paranoid.com)
23             #
24             #####################################################################
25              
26             #####################################################################
27             #
28             # Environment definitions
29             #
30             #####################################################################
31              
32             package Paranoid::IO::FileMultiplexer;
33              
34 11     11   5632 use 5.008;
  11         33  
35              
36 11     11   66 use strict;
  11         11  
  11         198  
37 11     11   33 use warnings;
  11         22  
  11         231  
38 11     11   55 use vars qw($VERSION);
  11         11  
  11         330  
39 11     11   44 use base qw(Exporter);
  11         22  
  11         649  
40 11     11   66 use Paranoid qw(:all);
  11         22  
  11         1243  
41 11     11   5269 use Paranoid::IO qw(:all);
  11         44  
  11         1804  
42 11     11   77 use Paranoid::Debug qw(:all);
  11         22  
  11         1331  
43 11     11   55 use Carp;
  11         22  
  11         484  
44 11     11   55 use Fcntl qw(:DEFAULT :flock :mode :seek);
  11         11  
  11         4081  
45 11     11   5137 use Paranoid::IO::FileMultiplexer::Block::FileHeader;
  11         33  
  11         484  
46 11     11   66 use Paranoid::IO::FileMultiplexer::Block::StreamHeader;
  11         22  
  11         308  
47 11     11   55 use Paranoid::IO::FileMultiplexer::Block::BATHeader;
  11         22  
  11         550  
48              
49             ($VERSION) = ( q$Revision: 2.10 $ =~ /(\d+(?:\.\d+)+)/sm );
50              
51 11     11   55 use constant PIOFMVER => '1.0';
  11         11  
  11         462  
52 11     11   55 use constant PERMMASK => 0666;
  11         22  
  11         385  
53 11     11   44 use constant DEFBSIZE => 4096;
  11         11  
  11         341  
54              
55 11     11   55 use constant ADDR_BAT => 0;
  11         22  
  11         341  
56 11     11   44 use constant ADDR_BLK => 1;
  11         22  
  11         495  
57 11     11   66 use constant ADDR_OFT => 2;
  11         11  
  11         41349  
58              
59             #####################################################################
60             #
61             # Module code follows
62             #
63             #####################################################################
64              
65             sub new {
66              
67             # Purpose: Creates a PIOFM object for manipulation
68             # Returns: Object reference or undef
69             # Usage: $obj = Paranoid::IO::FileMultiplexer->new(
70             # file => $fn,
71             # readOnly => 0,
72             # perms => $perms,
73             # blockSize => $bsize,
74             # );
75              
76 154     154 1 7216 my $class = shift;
77 154         506 my %args = @_;
78 154         1925 my $self = {
79             file => undef,
80             readOnly => 0,
81             perms => PERMMASK ^ umask,
82             header => undef,
83             streams => {},
84             streamPos => {},
85             blockSize => DEFBSIZE,
86             corrupted => 0,
87             %args
88             };
89              
90             pdebug( 'entering w/f: %s bs: %s p: %s ro: %s',
91 154         847 PDLEVEL1, @args{qw(file blockSize perms readOnly)} );
92 154         473 pIn();
93              
94 154         286 bless $self, $class;
95              
96             # Mandatory file name required
97             $self = undef
98 154 100 100     792 unless defined $args{file} and length $args{file};
99              
100 154 100       341 if ( defined $self ) {
101              
102             # Enable the lock stack
103 132         330 PIOLOCKSTACK = 1;
104              
105             # Attempt to open the file
106 132 50       286 if ( $$self{ro} ) {
107 0 0       0 $self = undef unless $self->_oldFile;
108             } else {
109 132 100 100     330 $self = undef unless $self->_newFile or $self->_oldFile;
110             }
111              
112             } else {
113 22         55 pdebug( 'invalid file name: %s', PDLEVEL1, $args{file} );
114             }
115              
116 154         418 subPostamble( PDLEVEL1, '$', $self );
117              
118 154         781 return $self;
119             }
120              
121             sub _newFile {
122              
123             # Purpose: Attempts to open the file as a new file
124             # Returns: Boolean
125             # Usage: $rv = $obj->_newFile;
126              
127 132     132   209 my $self = shift;
128 132         220 my $file = $$self{file};
129 132         198 my $bsize = $$self{blockSize};
130 132         176 my $rv = 0;
131 132         198 my $header;
132              
133 132         341 subPreamble(PDLEVEL2);
134              
135 132 50       682 if ( !$$self{readOnly} ) {
136              
137             # Allocate the header object (it will fail on invalid block sizes)
138 132         693 $header =
139             Paranoid::IO::FileMultiplexer::Block::FileHeader->new( $file,
140             $bsize );
141 132 100       330 if ( defined $header ) {
142              
143             # Open the file exclusively and get an flock
144 99         308 $rv = popen( $file, O_CREAT | O_RDWR | O_EXCL, $$self{perms} );
145 99 100       242 if ($rv) {
146              
147             # Lock file
148 77         231 pflock( $file, LOCK_EX );
149              
150             # Allocate the block and write the initial signature
151 77 50       462 $rv = $header->allocate and $header->writeSig;
152 77 50       220 $$self{header} = $header if $rv;
153              
154             # Release the lock
155 77         187 pflock( $file, LOCK_UN );
156             }
157             }
158             } else {
159 0         0 pdebug( 'cannot create a new file in readOnly mode', PDLEVEL1 );
160             }
161              
162 132         506 subPostamble( PDLEVEL2, '$', $rv );
163              
164 132         660 return $rv;
165             }
166              
167             sub _oldFile {
168              
169             # Purpose: Attempts to open the file as a new file
170             # Returns: Boolean
171             # Usage: $rv = $obj->_newFile;
172              
173 55     55   110 my $self = shift;
174 55         110 my $file = $$self{file};
175 55         88 my $bsize = $$self{blockSize};
176 55         88 my $rv = 0;
177 55         88 my $header;
178              
179 55         154 subPreamble(PDLEVEL2);
180              
181             # Allocate the header object (it will fail on invalid block sizes)
182 55         231 $header = Paranoid::IO::FileMultiplexer::Block::FileHeader->new( $file,
183             $bsize );
184 55 100       132 if ( defined $header ) {
185              
186             # Open the file exclusively and get an flock
187             $rv = popen( $file, ( $$self{readOnly} ? O_RDONLY : O_RDWR ),
188 22 50       110 $$self{perms} );
189 22 50       66 if ($rv) {
190              
191             # Lock file
192 22         77 pflock( $file, LOCK_SH );
193              
194             # Read an existing signature
195 22   66     88 $rv = $header->readSig && $header->readStreams;
196 22 100       66 if ($rv) {
197 11         44 $$self{header} = $header;
198 11         33 $$self{blockSize} = $header->blockSize;
199             }
200              
201             # Release the lock
202 22         77 pflock( $file, LOCK_UN );
203             }
204             }
205              
206 55         143 subPostamble( PDLEVEL2, '$', $rv );
207              
208 55         275 return $rv;
209             }
210              
211             sub header {
212              
213             # Purpose: Returns a reference to the header object
214             # Returns: Ref
215             # Usage: $header = $obj->header;
216              
217 363     363 1 86658 my $self = shift;
218 363         1595 return $$self{header};
219             }
220              
221             sub _reload {
222              
223             # Purpose: Reloads the file header information and purges the stream
224             # cache
225             # Returns: Boolean
226             # Usage: $rv = $obj->_reload;
227              
228 8     8   25 my $self = shift;
229 8         69 my $file = $$self{file};
230 8         161 my $header = $$self{header};
231 8         62 my $rv = 1;
232              
233 8         111 subPreamble(PDLEVEL4);
234              
235 8 50       33 if ( pflock( $file, LOCK_SH ) ) {
236 8 50 33     188 if ( $header->readSig && $header->readStreams ) {
237 8         56 $$self{streams} = {};
238             } else {
239 0         0 $$self{corrupt} = 1;
240 0         0 $rv = 0;
241             }
242 8         123 pflock( $file, LOCK_UN );
243             }
244              
245 8         134 subPostamble( PDLEVEL4, '$', $rv );
246              
247 8         19 return $rv;
248             }
249              
250             sub _getStream {
251              
252             # Purpose: Retrieves or creates a stream header object
253             # Returns: Ref
254             # Usage: $ref = $obj->_getStream($name);
255              
256 2948     2948   5166 my $self = shift;
257 2948         4483 my $sname = shift;
258 2948         5923 my $header = $$self{header};
259 2948         5053 my $file = $$self{file};
260 2948         4264 my ( $rv, %streams, $stream );
261              
262 2948         8470 subPreamble( PDLEVEL2, '$$', $sname, $header );
263              
264 2948 50 33     12088 if ( defined $sname and length $sname ) {
265              
266             # Reload if header fails validation
267 2948 100       11237 $self->_reload unless $header->validateBlocks;
268              
269             # Create the stream object if we don't have one cached
270 2948 100       10276 unless ( exists $$self{streams}{$sname} ) {
271 34         289 %streams = $header->streams;
272 34 50       137 if ( exists $streams{$sname} ) {
273             $stream =
274             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new(
275 34         218 $$self{file}, $streams{$sname}, $header->blockSize,
276             $sname );
277 34 50       200 if ( pflock( $file, LOCK_SH ) ) {
278 34 50 33     420 $$self{streams}{$sname} = $stream
279             if $stream->readSig
280             and $stream->readBATs;
281 34         158 pflock( $file, LOCK_UN );
282             }
283 34 50       156 unless ( exists $$self{streams}{$sname} ) {
284 0         0 pdebug( 'stream \'%s\' failed consistency checks',
285             PDLEVEL1, $sname );
286 0         0 $$self{corrupt} = 1;
287             }
288             } else {
289 0         0 pdebug( 'attempted to access a non-existent stream (%s)',
290             PDLEVEL1, $sname );
291             }
292             }
293              
294             # Retrieve a reference to the stream object
295             $stream =
296             exists $$self{streams}{$sname}
297 2948 50       8805 ? $$self{streams}{$sname}
298             : undef;
299              
300             # Reload stream signature if EOS has changed outside of this process
301 2948 50       6219 if ( defined $stream ) {
302 2948 50       10960 unless ( $stream->validateEOS ) {
303 0 0 0     0 unless ( $stream->readSig and $stream->readBATs ) {
304 0         0 $stream = undef;
305 0         0 pdebug( 'stream \'%s\' failed consistency checks',
306             PDLEVEL1, $sname );
307 0         0 $$self{corrupt} = 1;
308             }
309             }
310              
311             # Return the stream reference
312 2948         5995 $rv = $stream;
313             }
314             }
315              
316 2948         9148 subPostamble( PDLEVEL4, '$', $rv );
317              
318 2948         8112 return $rv;
319             }
320              
321             sub _getBAT {
322              
323             # Purpose: Returns a BAT which has been loaded and validated
324             # Returns: Ref
325             # Usage: $ref = $obj->_getBAT($sname, $seq);
326              
327 1215     1215   2207 my $self = shift;
328 1215         1787 my $sname = shift;
329 1215         2472 my $seq = shift;
330 1215         3261 my $file = $$self{file};
331 1215         2961 my ( $rv, $stream, @bats, $bat );
332              
333 1215         4282 subPreamble( PDLEVEL4, '$$', $sname, $seq );
334              
335 1215         3508 $stream = $self->_getStream($sname);
336 1215 50       3523 if ( defined $stream ) {
337              
338             # Get the list of BATs
339 1215         4086 @bats = $stream->bats;
340              
341 1215 50       4664 if ( $seq <= $#bats ) {
342             $bat = Paranoid::IO::FileMultiplexer::Block::BATHeader->new(
343 1215         8312 $$self{file}, $bats[$seq], $$self{blockSize}, $sname, $seq );
344 1215 50       3953 if ( pflock( $file, LOCK_SH ) ) {
345 1215 50 33     7292 $rv = $bat
      33        
346             if defined $bat
347             and $bat->readSig
348             and $bat->readData;
349 1215         4329 pflock( $file, LOCK_UN );
350             }
351 1215 50       4138 pdebug( 'BAT %s for stream \'%s\' failed consistency checks',
352             PDLEVEL1, $seq, $sname )
353             unless $rv;
354             }
355             }
356              
357 1215         3729 subPostamble( PDLEVEL4, '$', $rv );
358              
359 1215         4618 return $rv;
360             }
361              
362             sub _chkData {
363              
364             # Purpose: Checks that a data block appears to be present
365             # Returns: Boolean
366             # Usage: $rv = $obj->_chkData;
367              
368 22     22   55 my $self = shift;
369 22         44 my $bn = shift;
370 22         517 my $file = $$self{file};
371 22         44 my $bsize = $$self{blockSize};
372 22         55 my ( $rv, $block, $raw );
373              
374 22         99 subPreamble( PDLEVEL4, '$', $bn );
375              
376 22         110 $block = Paranoid::IO::FileMultiplexer::Block->new( $file, $bn, $bsize );
377 22   33     143 $rv = ( defined $block and $block->bread( \$raw, 0, 1 ) == 1 );
378              
379 22 50       99 unless ($rv) {
380 0         0 pdebug( 'data block list at dn %s but cannot be read', PDLEVEL1,
381             $bn );
382 0         0 $rv = 0;
383 0         0 $$self{corrupted} = 1;
384             }
385              
386 22         77 subPostamble( PDLEVEL4, '$', $rv );
387              
388 22         143 return $rv;
389             }
390              
391             sub _chkBAT {
392              
393             # Purpose: Checks that a BAT appears consistent
394             # Returns: Boolean
395             # Usage: $rv = $obj->_chkBAT($bn, $snmae, $seq);
396              
397 22     22   33 my $self = shift;
398 22         66 my $bn = shift;
399 22         44 my $sname = shift;
400 22         33 my $seq = shift;
401 22         44 my $file = $$self{file};
402 22         44 my $bsize = $$self{blockSize};
403 22         44 my ( $rv, $block, @data );
404              
405 22         77 subPreamble( PDLEVEL4, '$$$', $bn, $sname, $seq );
406              
407 22         99 $block = Paranoid::IO::FileMultiplexer::Block::BATHeader->new( $file, $bn,
408             $bsize, $sname, $seq );
409 22   33     154 $rv = ( defined $block and $block->readSig and $block->readData );
410              
411 22 50       99 unless ($rv) {
412 0         0 pdebug( 'BAT at bn %s fails consistency checks', PDLEVEL1, $bn );
413 0         0 $rv = 0;
414 0         0 $$self{corrupted} = 1;
415             }
416              
417 22 50       55 if ($rv) {
418 22         209 @data = $block->dataBlocks;
419 22 50       88 foreach (@data) { $rv = 0 unless $self->_chkData($_) }
  22         110  
420             }
421              
422 22         88 subPostamble( PDLEVEL4, '$', $rv );
423              
424 22         143 return $rv;
425             }
426              
427             sub _chkStream {
428              
429             # Purpose: Checks that a stream appears consistent
430             # Returns: Boolean
431             # Usage: $rv = $obj->_chkStream($bn, $sname);
432              
433 44     44   55 my $self = shift;
434 44         88 my $bn = shift;
435 44         99 my $sname = shift;
436 44         88 my $file = $$self{file};
437 44         77 my $bsize = $$self{blockSize};
438 44         77 my ( $rv, $i, $block, @bats );
439              
440 44         121 subPreamble( PDLEVEL4, '$$', $bn, $sname );
441              
442 44         209 $block =
443             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new( $file, $bn,
444             $bsize, $sname );
445 44   66     253 $rv = ( defined $block and $block->readSig and $block->readBATs );
446              
447 44 100       165 unless ($rv) {
448 11         44 pdebug( 'Stream at bn %s (%s) fails consistency checks',
449             PDLEVEL1, $bn, $sname, $sname, $sname );
450 11         33 $rv = 0;
451 11         22 $$self{corrupted} = 1;
452             }
453              
454 44 100       110 if ($rv) {
455 33         176 @bats = $block->bats;
456 33         88 $i = 0;
457 33         99 foreach (@bats) {
458 22 50       110 $rv = 0 unless $self->_chkBAT( $_, $sname, $i );
459 22         66 $i++;
460             }
461             }
462              
463 44         143 subPostamble( PDLEVEL4, '$', $rv );
464              
465 44         286 return $rv;
466             }
467              
468             sub chkConsistency {
469              
470             # Purpose: Checks the file for consistency
471             # Returns: Boolean
472             # Usage: $rv = $obj->chkConsistency;
473              
474 22     22 1 66 my $self = shift;
475 22         55 my $file = $$self{file};
476 22         44 my $header = $$self{header};
477 22         55 my $bsize = $$self{blockSize};
478 22         44 my $rv = 1;
479 22         44 my %streams;
480              
481 22         77 subPreamble(PDLEVEL1);
482              
483             # TODO: There is one major flaw in this consistency check, in that is
484             # TODO: possible to list a header block as a data block in a BAT.
485             # TODO: Writes to said block will obviously introduce consistency errors
486             # TODO: and corruption in the future. Depending on the size of the file,
487             # TODO: however, doing an exhaustive search on all data blocks and making
488             # TODO: sure they're not in use as a header block could be memory
489             # TODO: intensive. We might have to bite the bullet, though.
490             #
491             # Possible solution (which isn't perfect): look for signatures and see if
492             # they load error free. I.e., any block that starts with PIOFM. If we've
493             # already passed the rest of the consistency checks, anything pointing to
494             # what looks like a header block, but doesn't pass consistency checks, we
495             # really don't care about. We might warn if it does pass, though, and
496             # then brute-force check each data block number against a full list of
497             # stream/BAT block numbers.
498              
499             # Apply a read lock for the duration
500 22 50       88 if ( pflock( $file, LOCK_SH ) ) {
501              
502             # Check header
503 22 50 33     132 if ( $header->readSig && $header->readStreams ) {
504              
505             # Check streams
506 22         121 %streams = $header->streams;
507 22         143 foreach ( sort keys %streams ) {
508 44 100       209 $rv = 0 unless $self->_chkStream( $streams{$_}, $_ );
509             }
510              
511             } else {
512 0         0 pdebug( 'file header failed consistency checks', PDLEVEL1 );
513 0         0 $$self{corrupted} = 1;
514 0         0 $rv = 0;
515             }
516              
517 22         99 pflock( $file, LOCK_UN );
518              
519             } else {
520 0         0 pdebug( 'failed to get a read lock', PDLEVEL1 );
521 0         0 $rv = 0;
522             }
523              
524 22 100       77 if ($rv) {
525 11         44 $$self{corrupted} = 0;
526             } else {
527 11         33 $$self{corrupted} = 1;
528 11         44 pdebug( 'error - setting corrupted flag to true', PDLEVEL1 );
529             }
530              
531 22         110 subPostamble( PDLEVEL1, '$', $rv );
532              
533 22         165 return $rv;
534             }
535              
536             sub _addBlock {
537              
538             # Purpose: Adds a data block to the file and updates the file header
539             # Returns: Integer (block number of new block)
540             # Usage: $bn = $self->_addBlock;
541              
542 114     114   259 my $self = shift;
543 114         252 my $header = $$self{header};
544 114         253 my ( $rv, $bn, $data );
545              
546 114         315 subPreamble(PDLEVEL2);
547              
548 114         384 $bn = $header->blocks;
549             $data =
550             Paranoid::IO::FileMultiplexer::Block->new( $$self{file}, $bn,
551 114         637 $$self{blockSize} );
552 114 50 33     598 $rv = $bn if defined $data and $data->allocate and $header->incrBlocks;
      33        
553              
554 114         401 subPostamble( PDLEVEL2, '$', $rv );
555              
556 114         775 return $rv;
557             }
558              
559             sub _addBAT {
560              
561             # Purpose: Adds a BAT to the file and updates the file header, and calls
562             # _addBlock
563             # Returns: Integer (block number of new BAT)
564             # Usage: $bn = $self->_addBAT($sname, $seq);
565              
566 44     44   132 my $self = shift;
567 44         77 my $sname = shift;
568 44         77 my $seq = shift;
569 44         132 my $header = $$self{header};
570 44         99 my ( $rv, $bn, $bat );
571              
572 44         143 subPreamble( PDLEVEL2, '$$', $sname, $seq );
573              
574 44         176 $bn = $header->blocks;
575             $bat =
576             Paranoid::IO::FileMultiplexer::Block::BATHeader->new( $$self{file},
577 44         462 $bn, $$self{blockSize}, $sname, $seq );
578 44 50 33     242 $rv = $bn
      33        
      33        
579             if defined $bat
580             and $bat->allocate
581             and $bat->writeSig
582             and $header->incrBlocks;
583              
584 44 50       341 $bat->addData( $self->_addBlock ) if defined $rv;
585              
586 44         176 subPostamble( PDLEVEL2, '$', $rv );
587              
588 44         418 return $rv;
589             }
590              
591             sub _addStream {
592              
593             # Purpose: Adds a Stream to the file and updates the file header, and calls
594             # _addBAT
595             # Returns: Integer (block number of new stream)
596             # Usage: $bn = $self->_addStream($sname);
597              
598 33     33   110 my $self = shift;
599 33         110 my $sname = shift;
600 33         110 my $header = $$self{header};
601 33         77 my ( $rv, $bn, $stream );
602              
603 33         231 subPreamble( PDLEVEL2, '$', $sname );
604              
605 33         165 $bn = $header->blocks;
606             $stream =
607             Paranoid::IO::FileMultiplexer::Block::StreamHeader->new( $$self{file},
608 33         330 $bn, $$self{blockSize}, $sname );
609 33 50 33     253 $rv = $bn
      33        
      33        
610             if defined $stream
611             and $stream->allocate
612             and $stream->writeSig
613             and $header->incrBlocks;
614              
615 33 50       253 $stream->addBAT( $self->_addBAT( $sname, 0 ) ) if defined $rv;
616              
617 33         242 subPostamble( PDLEVEL2, '$', $rv );
618              
619 33         209 return $rv;
620             }
621              
622             sub addStream {
623              
624             # Purpose: Adds the requested stream
625             # Returns: Boolean
626             # Usage: $rv = $obj->addStream($name);
627              
628 44     44 1 220 my $self = shift;
629 44         88 my $sname = shift;
630 44         121 my $file = $$self{file};
631 44         110 my $header = $$self{header};
632 44   66     242 my $bypass = $$self{readOnly} || $$self{corrupted};
633 44         88 my $rv = 0;
634              
635 44         165 subPreamble( PDLEVEL1, '$', $sname );
636              
637 44 100       143 unless ($bypass) {
638              
639             # Get an exclusive lock
640 33 50       132 if ( pflock( $file, LOCK_EX ) ) {
641              
642             # Validate file header block count
643 33         77 $rv = 1;
644 33 50       165 $rv = $self->_reload unless $header->validateBlocks;
645              
646             # Add the stream
647 33 50 50     220 $rv = $header->addStream( $sname, $header->blocks )
648             and $self->_addStream($sname)
649             if $rv;
650              
651             # Release the lock
652 33         132 pflock( $file, LOCK_UN );
653              
654             } else {
655 0         0 pdebug( 'failed to get an exclusive lock', PDLEVEL1 );
656             }
657             }
658              
659 44         231 pOut();
660 44         143 pdebug( 'leaving w/rv: %s', PDLEVEL1, $rv );
661              
662 44         242 return $rv;
663             }
664              
665             sub _calcAddr {
666              
667             # Purpose: Calculates the (BAT, Data, offset) address of the stream
668             # position
669             # Returns: Array (BAT #, Data #, offset)
670             # Usage: @addr = $self->_calcAddr($pos);
671              
672 1408     1408   3632 my $self = shift;
673 1408         2979 my $pos = shift;
674 1408         3240 my $bsize = $$self{blockSize};
675 1408         2571 my ( @rv, $bat, $max );
676              
677 1408 100       4245 if ( $pos < $bsize ) {
678 543         1784 @rv = ( 0, 0, $pos );
679             } else {
680              
681             $bat = Paranoid::IO::FileMultiplexer::Block::BATHeader->new(
682 865         5156 $$self{file}, 0, $bsize );
683 865 50       2660 if ( defined $bat ) {
684 865         2767 $max = $bat->maxData;
685              
686 865         4039 $rv[ADDR_BAT] = int( $pos / ( $max * $bsize ) );
687 865         3493 $rv[ADDR_BLK] =
688             int( ( $pos - ( $rv[ADDR_BAT] * $max * $bsize ) ) / $bsize );
689 865         3806 $rv[ADDR_OFT] = $pos -
690             ( $rv[ADDR_BAT] * $max * $bsize + $rv[ADDR_BLK] * $bsize );
691              
692             }
693             }
694              
695 1408         17034 return @rv;
696             }
697              
698             sub strmSeek {
699              
700             # Purpose: Updates the stream cursor position
701             # Returns: Integer/undef on error
702             # Usage: $rv = $obj->_strmSeek($sname, $pos, $whence);
703              
704 1702     1702 1 3069 my $self = shift;
705 1702         2449 my $sname = shift;
706 1702         2957 my $pos = shift;
707 1702         2589 my $whence = shift;
708 1702         2585 my $cur = 0;
709 1702         2770 my $rv = 1;
710              
711 1702         5135 subPreamble( PDLEVEL2, '$$$', $sname, $pos, $whence );
712              
713 1702 50       4720 $whence = SEEK_SET unless defined $whence;
714 1702 50       3637 $pos = 0 unless defined $whence;
715              
716 1702 100       3997 if ( $whence == SEEK_SET ) {
717 1180         3799 $$self{streamPos}{$sname} = $pos;
718             } else {
719 522 50       2862 $cur = $$self{streamPos}{$sname} if exists $$self{streamPos}{$sname};
720              
721 522 50       2861 if ( $whence == SEEK_CUR ) {
    50          
722 0         0 $cur += $pos;
723             } elsif ( $whence == SEEK_END ) {
724 522         2186 $cur = $$self{streams}{$sname}->eos + $pos;
725             } else {
726 0         0 pdebug( 'invalid value for whence in seek (%s)',
727             PDLEVEL1, $whence );
728 0         0 $rv = undef;
729             }
730 522         1903 $$self{streamPos}{$sname} = $cur;
731             }
732 1702 50       5549 $$self{streamPos}{$sname} = 0 if $$self{streamPos}{$sname} < 0;
733              
734 1702 50       5005 $rv = $$self{streamPos}{$sname} if defined $rv;
735 1702 100       4645 $rv = PTRUE_ZERO if $rv == 0;
736              
737 1702         5246 subPostamble( PDLEVEL2, '$', $rv );
738              
739 1702         5882 return $rv;
740             }
741              
742             sub strmTell {
743              
744             # Purpose: Returns the current stream cursor position
745             # Returns: Integer
746             # Usage: $rv = $obj->_strmTell($sname);
747              
748 1156     1156 1 2007 my $self = shift;
749 1156         1666 my $sname = shift;
750 1156         1665 my $rv;
751              
752 1156 100       3760 $$self{streamPos}{$sname} = 0 unless exists $$self{streamPos}{$sname};
753              
754 1156         3898 return $$self{streamPos}{$sname};
755             }
756              
757             sub _growStream {
758              
759             # Purpose: Grows the stream as needed to accomodate the upcoming write
760             # based on the address of the write's starting position
761             # Returns: Boolean/Integer (bn of last block added)
762             # Usage: $rv = $obj->_growStream($sname, @addr);
763              
764 577     577   1412 my $self = shift;
765 577         1127 my $sname = shift;
766 577         1595 my @addr = @_;
767 577         1126 my $file = $$self{file};
768 577         1174 my $rv = 1;
769 577         1140 my ( $max, $stream, $bat, @bats, @blocks );
770              
771 577         1770 subPreamble( PDLEVEL3, '$@', $sname, @addr );
772              
773             # Get the stream and list of bats
774 577         1872 $stream = $self->_getStream($sname);
775 577         2449 @bats = $stream->bats;
776              
777             # Start padding BATs
778 577         2929 while ( $#bats <= $addr[ADDR_BAT] ) {
779              
780             # Add a BAT
781 577 100       2389 if ( $#bats < $addr[ADDR_BAT] ) {
782              
783             # Only add a BAT if we're still below the BAT address
784 11         66 $rv = $self->_addBAT( $sname, scalar @bats );
785 11 50       55 if ($rv) {
786 11         88 $stream->addBAT($rv);
787 11         55 @bats = $stream->bats;
788             } else {
789 0         0 last;
790             }
791             }
792              
793             # Add data blocks as needed
794 577         2643 $bat = $self->_getBAT( $sname, $#bats );
795 577         3199 @blocks = $bat->dataBlocks;
796 577 50       4492 while (
797             $#bats == $addr[ADDR_BAT]
798             ? $#blocks < $addr[ADDR_BLK]
799             : !$bat->full
800             ) {
801              
802 70         341 $rv = $self->_addBlock;
803 70 50       238 if ($rv) {
804 70         350 $bat->addData($rv);
805 70         240 @blocks = $bat->dataBlocks;
806             } else {
807 0         0 last;
808             }
809             }
810              
811 577 50       3443 last if $#bats == $addr[ADDR_BAT];
812             }
813              
814 577 50       1676 pdebug( 'failed to grow the stream (%s)', PDLEVEL1, $sname ) unless $rv;
815              
816 577         1976 subPostamble( PDLEVEL3, '$', $rv );
817              
818 577         5748 return $rv;
819             }
820              
821             sub _strmWrite {
822              
823             # Purpose: Writes to the specified stream
824             # Returns: Integer/undef (bytes written/error)
825             # Usage: $bytes = $obj->_strmWrite($sname, $content);
826              
827 555     555   1107 my $self = shift;
828 555         880 my $sname = shift;
829 555         1148 my $content = shift;
830 555         1349 my $file = $$self{file};
831 555         1349 my $bsize = $$self{blockSize};
832 555         1686 my ( $rv, $stream, $bat, $block, $pos );
833 555         0 my ( @addr, @blocks, $bn, $blkLeft, $offset, $clength, $chunk, $bw );
834              
835 555         1816 subPreamble( PDLEVEL1, '$$', $sname, $content );
836              
837 555 50       1698 if ( pflock( $file, LOCK_EX ) ) {
838              
839 555         2061 $stream = $self->_getStream($sname);
840 555 50 33     5985 if ( defined $stream and defined $content and length $content ) {
      33        
841              
842             # Get the current position
843 555         2143 $pos = $self->strmTell($sname);
844              
845             # Get the address
846 555         3493 @addr = $self->_calcAddr( $pos + length $content );
847              
848             # Allocate blocks as needed
849 555 50       2643 if ( $self->_growStream( $sname, @addr ) ) {
850 555         2990 @addr = $self->_calcAddr($pos);
851              
852             # Get the specified BAT and data block
853 555         2536 $bat = $self->_getBAT( $sname, $addr[ADDR_BAT] );
854 555         2646 @blocks = $bat->dataBlocks;
855              
856             # Get the specified block
857 555         4256 $block =
858             Paranoid::IO::FileMultiplexer::Block->new( $file,
859             $blocks[ $addr[ADDR_BLK] ], $bsize );
860              
861 555 50 33     3489 if ( defined $bat and defined $block ) {
862              
863             # Start writing
864 555         1734 $offset = $rv = 0;
865 555         2108 while ( $rv < length $content ) {
866              
867             # We need to know how much room is left in the block
868 625         1799 $blkLeft = $bsize - $addr[ADDR_OFT];
869              
870             # We need to know if the remaining content will fit in
871             # that block
872 625         1566 $clength = length($content) - $offset;
873 625 100       2003 $chunk = $clength <= $blkLeft ? $clength : $blkLeft;
874              
875             # Write the chunk
876 625         2924 $bw =
877             $block->bwrite( $content, $addr[ADDR_OFT], $chunk,
878             $offset );
879 625         1603 $rv += $bw;
880 625         1158 $offset += $bw;
881 625         1316 $pos += $bw;
882              
883             # Exit if we couldn't write the full chunk
884 625 50       2196 unless ( $bw == $chunk ) {
885 0         0 pdebug(
886             'failed to write entire contents: %s bytes',
887             PDLEVEL1, $rv );
888 0         0 last;
889             }
890              
891             # Get the next block if we have bytes left
892 625 100       3586 if ( $rv < length $content ) {
893 70         284 @addr = $self->_calcAddr($pos);
894 70 50       384 unless ( $bat->sequence == $addr[ADDR_BAT] ) {
895 0         0 $bat =
896             $self->_getBAT( $sname, $addr[ADDR_BAT] );
897 0         0 @blocks = $bat->dataBlocks;
898             }
899              
900             # Get the specified block
901             $block =
902 70         312 Paranoid::IO::FileMultiplexer::Block->new(
903             $file, $blocks[ $addr[ADDR_BLK] ], $bsize );
904             }
905             }
906             }
907              
908             # Update stream position and EOS
909 555 50       1777 if ($rv) {
910 555         2746 $self->strmSeek( $sname, $pos, SEEK_SET );
911 555 100       2910 $stream->writeEOS($pos) if $stream->eos < $pos;
912             }
913              
914             }
915              
916             }
917 555         2736 pflock( $file, LOCK_UN );
918             }
919              
920 555         2095 subPostamble( PDLEVEL1, '$', $rv );
921              
922 555         7023 return $rv;
923             }
924              
925             sub strmWrite {
926              
927             # Purpose: Calls _strmWrite after making sure the file can be written to
928             # Returns: Integer/undef
929             # Usage: $bw = $obj->strmWrite($sname, $content);
930              
931 555     555 1 1579 my $self = shift;
932 555         1609 my @args = @_;
933 555   33     3860 my $bypass = $$self{readOnly} || $$self{corrupted};
934              
935 555 50       1771 pdebug( 'can\'t write to files that are corrupted or read-only',
936             PDLEVEL1 )
937             if $bypass;
938              
939 555 50       2159 return $bypass ? undef : $self->_strmWrite(@args);
940             }
941              
942             sub _strmRead {
943              
944             # Purpose: Reads from the specified stream
945             # Returns: Integer/undef (bytes read/error)
946             # Usage: $bytes = $obj->_strmRead($sname, $content, $bytes);
947              
948 68     68   119 my $self = shift;
949 68         129 my $sname = shift;
950 68         104 my $cref = shift;
951 68   50     160 my $btr = shift || 0;
952 68         156 my $file = $$self{file};
953 68         118 my $bsize = $$self{blockSize};
954 68         140 my $rv = 0;
955 68         217 my ( $stream, $pos, $eos, @addr, $content );
956 68         0 my ( $bat, @blocks, $block, $ctr, $br, $offset );
957              
958 68         274 subPreamble( PDLEVEL1, '$$$', $sname, $cref, $btr );
959              
960 68 50       320 if ( pflock( $file, LOCK_SH ) ) {
961              
962 68         286 $stream = $self->_getStream($sname);
963 68 50 33     561 if ( defined $stream and defined $cref and ref $cref eq 'SCALAR' ) {
      33        
964              
965             # Get the current position
966 68         257 $pos = $self->strmTell($sname);
967              
968             # Get the address
969 68         221 @addr = $self->_calcAddr($pos);
970              
971             # Get the End Of Stream position
972 68         263 $eos = $stream->eos;
973              
974             # Start reading
975 68         164 $$cref = '';
976 68   100     394 while ( $pos < $eos and $rv < $btr ) {
977              
978             # Get the specified BAT
979 83         293 $bat = $self->_getBAT( $sname, $addr[ADDR_BAT] );
980 83 50       243 if ( defined $bat ) {
981              
982             # Get the specified data block
983 83         346 @blocks = $bat->dataBlocks;
984 83         504 $block =
985             Paranoid::IO::FileMultiplexer::Block->new( $file,
986             $blocks[ $addr[ADDR_BLK] ], $bsize );
987 83 50       265 if ( defined $block ) {
988              
989             # Take and early out if pos equals eos
990 83 50       240 last unless $pos < $eos;
991              
992             # Figure out how much of the block we have left to
993             # read
994 83         188 $ctr = $bsize - $addr[ADDR_OFT];
995              
996             # Reduce it if the read finishes in this block
997 83 100       255 $ctr = $btr - $rv if $ctr > $btr - $rv;
998              
999             # Reduce it further if EOS is even closer
1000 83 100       259 $ctr = $eos - $pos if $ctr > $eos - $pos;
1001              
1002             # Read the chunk
1003 83         276 $br =
1004             $block->bread( \$content, $addr[ADDR_OFT], $ctr );
1005 83         203 $rv += $br;
1006 83         198 $pos += $br;
1007 83         322 @addr = $self->_calcAddr($pos);
1008 83         844 $$cref .= $content;
1009              
1010 83 50       664 unless ( $br == $ctr ) {
1011 0         0 pdebug(
1012             'failed to read entire chunk: %s/%s bytes',
1013             PDLEVEL1, $br, $ctr );
1014 0         0 last;
1015             }
1016              
1017             }
1018             }
1019             }
1020              
1021             # Update stream pointer
1022 68         319 $self->strmSeek( $sname, $pos, SEEK_SET );
1023              
1024             } else {
1025 0 0       0 if ( defined $stream ) {
1026 0         0 pdebug( 'invalid value passed for the content reference: %s',
1027             PDLEVEL1, $cref );
1028 0         0 $rv = undef;
1029             }
1030             }
1031              
1032 68         260 pflock( $file, LOCK_UN );
1033             }
1034              
1035 68         259 subPostamble( PDLEVEL1, '$', $rv );
1036              
1037 68         1179 return $rv;
1038             }
1039              
1040             sub strmRead {
1041              
1042             # Purpose: Calls _strmRead after making sure the file can be read from
1043             # Returns: Integer/undef
1044             # Usage: $br = $obj->strmRead($stream, \$content, $bytes);
1045              
1046 68     68 1 173 my $self = shift;
1047 68         218 my @args = @_;
1048 68         187 my $bypass = $$self{corrupted};
1049              
1050 68 50       237 pdebug( 'can\'t read from files that are corrupted', PDLEVEL1 )
1051             if $bypass;
1052              
1053 68 50       313 return $bypass ? undef : $self->_strmRead(@args);
1054             }
1055              
1056             sub strmAppend {
1057              
1058             # Purpose: Seeks to the end of the stream and writes new content there
1059             # Returns: Integer/undef (bytes written/error)
1060             # Usage: $bytes = $obj->_strmAppend($sname, $content);
1061              
1062 511     511 1 190044 my $self = shift;
1063 511         1499 my $sname = shift;
1064 511         1128 my $content = shift;
1065 511         1314 my $file = $$self{file};
1066 511         1247 my ( $rv, $stream, $pos );
1067              
1068 511         2372 subPreamble( PDLEVEL1, '$$', $sname, $content );
1069              
1070 511 50       2365 if ( pflock( $file, LOCK_EX ) ) {
1071 511         2191 $stream = $self->_getStream($sname);
1072 511 50       2058 if ( defined $stream ) {
1073 511         2231 $pos = $self->strmTell($sname);
1074 511 50       2278 if ( $self->strmSeek( $sname, 0, SEEK_END ) ) {
1075 511         2301 $rv = $self->strmWrite( $sname, $content );
1076 511         2545 $self->strmSeek( $sname, $pos, SEEK_SET );
1077             }
1078             }
1079             }
1080              
1081 511         2264 subPostamble( PDLEVEL1, '$', $rv );
1082              
1083 511         3059 return $rv;
1084             }
1085              
1086             sub _strmTruncate {
1087              
1088             # Purpose: Truncates the stream to the specified length. This will zero
1089             # out any data written past the new EOS.
1090             # Returns: Boolean
1091             # Usage: $rv = $obj->_strmTruncate($sname, $neos);
1092              
1093 11     11   33 my $self = shift;
1094 11         11 my $sname = shift;
1095 11         33 my $neos = shift;
1096 11         22 my $file = $$self{file};
1097 11         33 my ( $rv, $stream, $eos, $zeroes, $zl );
1098              
1099 11         66 subPreamble( PDLEVEL1, '$$', $sname, $neos );
1100              
1101 11 50       55 if ( pflock( $file, LOCK_EX ) ) {
1102 11         44 $stream = $self->_getStream($sname);
1103 11 50       66 if ( defined $stream ) {
1104 11         66 $eos = $stream->eos;
1105              
1106 11 50       66 if ( $neos < $eos ) {
1107              
1108             # Zero out old data beyond the new EOS
1109 11         55 $zl = $eos - $neos;
1110 11         528 $zeroes = pack "x$zl";
1111 11 50 33     66 $rv =
1112             $self->strmSeek( $sname, $neos, SEEK_SET )
1113             and $self->strmWrite( $sname, $zeroes )
1114             and $stream->writeEOS($neos);
1115             }
1116             }
1117             }
1118              
1119 11         66 subPostamble( PDLEVEL1, '$', $rv );
1120              
1121 11         110 return $rv;
1122             }
1123              
1124             sub strmTruncate {
1125              
1126             # Purpose: Calls _strmTruncate after making sure the file can be written to
1127             # Returns: Integer/undef
1128             # Usage: $bw = $obj->strmTruncate($sname, $neos);
1129              
1130 11     11 1 33 my $self = shift;
1131 11         44 my @args = @_;
1132 11   33     77 my $bypass = $$self{readOnly} || $$self{corrupted};
1133              
1134 11 50       55 pdebug( 'can\'t write to files that are corrupted or read-only',
1135             PDLEVEL1 )
1136             if $bypass;
1137              
1138 11 50       66 return $bypass ? undef : $self->_strmTruncate(@args);
1139             }
1140              
1141             sub DESTROY {
1142              
1143 154     154   4150 my $self = shift;
1144              
1145             pclose( $$self{file} )
1146 154 100 100     1203 if defined $$self{file} and length $$self{file};
1147              
1148 154         4354 return 1;
1149             }
1150              
1151             1;
1152              
1153             __END__