File Coverage

blib/lib/WARC/Record/Sponge.pm
Criterion Covered Total %
statement 159 159 100.0
branch 31 32 96.8
condition 4 4 100.0
subroutine 34 34 100.0
pod 7 7 100.0
total 235 236 99.5


line stmt bran cond sub pod time code
1             package WARC::Record::Sponge; # -*- CPerl -*-
2              
3 1     1   70360 use strict;
  1         11  
  1         28  
4 1     1   5 use warnings;
  1         2  
  1         22  
5              
6 1     1   5 use Carp;
  1         2  
  1         87  
7              
8             our @ISA = qw(WARC::Record);
9              
10 1     1   7 use File::Spec;
  1         2  
  1         33  
11 1     1   776 use File::Temp qw/:seekable/; # imports SEEK_* constants
  1         21036  
  1         143  
12 1     1   456 use MIME::Base32 qw/encode_rfc3548/;
  1         955  
  1         58  
13 1     1   7 use Symbol qw//;
  1         2  
  1         15  
14              
15 1     1   410 use WARC; *WARC::Record::Sponge::VERSION = \$WARC::VERSION;
  1         3  
  1         56  
16              
17             require WARC::Record;
18              
19 1     1   6 use overload '*{}' => \&_as_handle;
  1         1  
  1         7  
20 1     1   57 use overload fallback => 1;
  1         2  
  1         3  
21              
22             =head1 NAME
23              
24             WARC::Record::Sponge - data sponge for WARC records
25              
26             =head1 SYNOPSIS
27              
28             use Digest::SHA;
29             use WARC::Builder;
30             use WARC::Record::Sponge;
31              
32             $builder = new WARC::Builder ( ... );
33             $sponge = new WARC::Record::Sponge ( type => 'response' );
34             $sponge->begin_digest(block => sha1 => new Digest::SHA ('sha1'));
35              
36             while (<$socket>) {
37             print $sponge $_;
38             # ... other processing ...
39             $sponge->begin_digest(payload => sha1 => new Digest::SHA ('sha1'))
40             if $end_of_headers_reached;
41             }
42              
43             $builder->add($sponge); # add to growing WARC volume
44              
45             =cut
46              
47             # This implementation uses a filehandle tied to a hash.
48             # Keys defined in the inner hash:
49             #
50             # digests
51             # Hash mapping digest names to [, ] pairs
52             # file
53             # File::Temp object
54             # handle
55             # Tied file handle
56             # length
57             # Number of valid bytes in the temporary file
58             # writing
59             # Current mode; true if in "soak" phase
60              
61             =head1 DESCRIPTION
62              
63             C objects provide a streaming interface for
64             constructing WARC records as data is received using a temporary file to
65             store the record content. This allows recording records that exceed
66             available memory.
67              
68             This class provides objects with a tied filehandle interface using a data
69             sponge model. In the "soak" phase, data is written to the handle, along
70             with markers indicating the computation of digests for that data. In the
71             "squeeze" phase, data is read back from the handle and the digests are
72             collected. The object can then be reset to return to the "soak" phase to
73             collect new data and "squeezed" again. All digest markers are removed upon
74             returning to the "soak" phase. The handle is seekable in the "squeeze"
75             phase, but append-only in the "soak" phase.
76              
77             A C isa C and inherits the C
78             method. Header fields may be set on a C, but all
79             fields other than "WARC-Type" are erased when the C method is used.
80              
81             =head2 Methods
82              
83             =over
84              
85             =item $sponge = new WARC::Record::Sponge ( ... )
86              
87             =cut
88              
89             our $TmpDir = File::Spec->tmpdir;
90              
91             sub new {
92 3     3 1 1771 my $class = shift;
93 3         16 my $ob = $class->SUPER::new(@_);
94 3         58 %$ob = (%$ob, digests => {}, length => undef, writing => 1);
95              
96 3         12 my $xhandle = Symbol::geniosym;
97 3         98 tie *$xhandle, $class.'::TiedHandle', $ob;
98 3         9 $ob->{handle} = $xhandle;
99              
100 3         26 $ob->{file} = File::Temp->new
101             (UNLINK => 1, DIR => $TmpDir, TEMPLATE => 'warc-block-'.('X' x 12));
102 3         1340 binmode $ob->{file}, ':raw';
103              
104 3         6 { our $_total_constructed; $_total_constructed++ }
  3         4  
  3         6  
105              
106 3         8 return $ob
107             }
108              
109 3     3   1042 sub DESTROY { our $_total_destroyed; $_total_destroyed++ }
  3         20  
110              
111             sub block {
112 3     3 1 75 my $self = shift;
113              
114 3 100       10 unless (@_) {
115             # slurp the current contents; may run out of memory but that is the
116             # caller's problem if this method is called on a record data sponge
117 1         5 my $pos = $self->{file}->sysseek(0, SEEK_CUR);
118 1         14 $self->{file}->sysseek(0, SEEK_SET);
119 1         10 my $buf = do {local $/ = undef; readline $self->{file}};
  1         5  
  1         33  
120 1         7 $self->{file}->sysseek($pos, SEEK_SET);
121 1         13 return $buf
122             }
123              
124             # otherwise, replace the file contents with the provided data
125             croak "attempt to replace block during squeeze phase"
126 2 100       246 unless $self->{writing};
127             # setting digest markers is not possible if a block is supplied
128 1         4 $self->{digests} = {};
129 1         9 $self->SUPER::block(@_);
130 1         6 $self->{file}->sysseek(0, SEEK_SET);
131 1         16 $self->{file}->syswrite($self->{block});
132 1         21 delete $self->{block};
133             }
134              
135             =item $sponge-Ebegin_digest ( $key , $tag , $digest )
136              
137             Insert a digest marker using a digest object that must support the C,
138             C, and C methods from the C API. All data written
139             to the record is included in all digests active when the data is written.
140              
141             =cut
142              
143             sub begin_digest {
144 2     2 1 653 my $self = shift;
145              
146 2         9 $self->{digests}{$_[0]} = [$_[1], $_[2]];
147             return
148 2         5 }
149              
150             =item $value = $sponge-Eget_digest ( $key )
151              
152             Return a digest value for the data from the digest marker inserted with the
153             given key to the current end of the data. The result is a Base32 value
154             labelled with the tag given when the digest was started.
155              
156             =cut
157              
158             sub get_digest {
159 4     4 1 1206 my $self = shift;
160 4         12 my $cell = $self->{digests}{$_[0]};
161              
162 4 100       14 return undef unless $cell;
163 3         38 return $cell->[0].':'.encode_rfc3548($cell->[1]->clone->digest)
164             }
165              
166             =item $sponge-Ereadback
167              
168             End the "soak" phase and switch to the "squeeze" phase.
169              
170             =cut
171              
172             sub readback {
173 5     5 1 68 my $self = shift;
174              
175 5         24 $self->{length} = $self->{file}->sysseek(0, SEEK_CUR);
176 5         71 $self->{fields}->field('Content-Length', $self->{length});
177 5         10 $self->{writing} = 0;
178 5         17 $self->{file}->sysseek(0, SEEK_SET);
179              
180             return
181 5         60 }
182              
183             =item $sponge-Econtent_length
184              
185             Return the length of the data stored in the sponge if called in the
186             "squeeze" phase. Returns undefined if called during the "soak" phase.
187              
188             =cut
189              
190             # The inherited method will work, but this is slightly more efficient.
191 2     2 1 15 sub content_length { (shift)->{length} }
192              
193             =item $sponge-Ereset
194              
195             End the "squeeze" phase and return to a new "soak" phase.
196              
197             =cut
198              
199             sub reset {
200 3     3 1 389 my $self = shift;
201              
202 3         6 $self->{length} = undef;
203 3         9 $self->{digests} = {};
204 3         6 $self->{writing} = 1;
205 3         15 $self->{file}->sysseek(0, SEEK_SET);
206              
207 3         35 foreach my $key (keys %{$self->{fields}})
  3         13  
208 6 100       17 { delete $self->{fields}{$key} unless $key eq 'WARC-Type' }
209              
210             return
211 3         10 }
212              
213             =back
214              
215             =cut
216              
217 42     42   1862 sub _as_handle { (shift)->{handle} }
218              
219             sub _add_syswrite_buffer {
220 11   100 11   18 my $self = shift; my $buf = substr $_[0], $_[2] || 0, $_[1];
  11         45  
221 11         16 foreach (values %{$self->{digests}}) { $_->[1]->add($buf) }
  11         37  
  3         13  
222             }
223              
224             {
225             package WARC::Record::Sponge::TiedHandle;
226              
227 1     1   843 use Errno qw/EBADF/; # POSIX defines this, so we should have it.
  1         2  
  1         127  
228 1     1   6 use Fcntl qw/SEEK_CUR/;
  1         2  
  1         60  
229 1     1   6 use Scalar::Util qw/weaken/;
  1         2  
  1         696  
230              
231             # The underlying object is a weak reference to the parent record sponge.
232              
233             sub TIEHANDLE {
234 3     3   7 my $class = shift;
235 3         4 my $parent = shift;
236              
237 3         6 my $ob = bless \ $parent, $class;
238 3         13 weaken $$ob;
239 3         8 return $ob
240             }
241              
242             # The Perl debugger uses the fileno operator when printing a handle.
243 1     1   2 sub FILENO { my $self = ${(shift)}; fileno $self->{file} }
  1         2  
  1         7  
244              
245 1     1   2 sub BINMODE { my $self = ${(shift)}; binmode $self->{file}, shift }
  1         3  
  1         6  
246 2     2   3 sub CLOSE { my $self = ${(shift)};
  2         6  
247 2 100       8 if ($self->{writing}) { $self->readback }
  1         3  
248 1         3 else { $self->reset }
249 2         7 return 1 }
250              
251 12     12   22 sub WRITE { my $self = ${(shift)};
  12         20  
252 12 100       35 unless ($self->{writing}) { $! = EBADF; return }
  1         3  
  1         4  
253 11         27 $self->_add_syswrite_buffer(@_);
254 11         43 $self->{file}->syswrite(@_) }
255 7     7   13 sub PRINT { my $self = ${(shift)};
  7         11  
256 7 100       28 my $buf = join(defined($,) ? $, : '', @_);
257 7 100       24 $buf .= $\ if defined($\);
258 7         13 syswrite $self, $buf }
259 1     1   2 sub PRINTF { my $self = ${(shift)}; my $fmt = shift;
  1         3  
  1         2  
260 1         5 local $\ = ''; print $self sprintf $fmt, @_ }
  1         2  
261              
262 3     3   7 sub SEEK { my $self = ${(shift)};
  3         19  
263 3 100       16 return undef if $self->{writing};
264 2         9 $self->{file}->sysseek(@_) }
265 2     2   3 sub TELL { my $self = ${(shift)};
  2         5  
266 2         8 $self->{file}->sysseek(0, SEEK_CUR) }
267              
268 3 100   3   7 sub EOF { my $self = ${(shift)}; return 1 if $self->{writing};
  3         6  
  3         13  
269 2         31 return (tell $self >= $self->{length}) }
270              
271             # This sub must rely on the aliasing effect of @_.
272             sub READ {
273 10     10   20 my $self = ${(shift)};
  10         18  
274             # args now: 0: buffer 1: length 2: offset into buffer or undef
275 10         17 my $length = $_[1];
276 10   100     40 my $offset = $_[2] || 0;
277              
278 10 100       25 if ($self->{writing}) { $! = EBADF; return undef }
  1         2  
  1         5  
279              
280             my $excess = (($length + $self->{file}->sysseek(0, SEEK_CUR))
281 9         34 - $self->{length});
282 9 100       131 $length -= $excess if $excess > 0;
283 9 100       21 return 0 unless $length;
284              
285 8         11 my $buf; my $count = sysread $self->{file}, $buf, $length;
  8         69  
286 8 50       25 return undef unless defined $count;
287              
288 8 100       24 $_[0] = '' unless defined $_[0];
289 8 100       20 $_[0] .= "\0" x ($offset - length($_[0])) if $offset > length $_[0];
290 8         21 substr $_[0], $offset, (length($_[0]) - $offset), $buf;
291 8         44 return $count;
292             }
293             }
294              
295             1;
296             __END__