File Coverage

blib/lib/WARC/Record/Logical.pm
Criterion Covered Total %
statement 139 139 100.0
branch 32 32 100.0
condition 21 21 100.0
subroutine 21 21 100.0
pod 8 8 100.0
total 221 221 100.0


line stmt bran cond sub pod time code
1             package WARC::Record::Logical; # -*- CPerl -*-
2              
3 2     2   56084 use strict;
  2         12  
  2         46  
4 2     2   8 use warnings;
  2         3  
  2         84  
5              
6             require WARC::Record::FromVolume;
7             our @ISA = qw(WARC::Record::FromVolume);
8              
9 2     2   339 use WARC; *WARC::Record::Logical::VERSION = \$WARC::VERSION;
  2         4  
  2         77  
10              
11 2     2   17 use Carp;
  2         4  
  2         94  
12 2     2   1929 use Math::BigInt;
  2         42133  
  2         8  
13 2     2   38125 use Scalar::Util qw();
  2         4  
  2         31  
14 2     2   8 use Symbol 'geniosym';
  2         3  
  2         142  
15              
16             require WARC::Fields;
17             require WARC::Record::Logical::Block;
18             require WARC::Record::Logical::Heuristics;
19              
20             # inherit _set
21              
22             # inherit compareTo
23              
24             # This implementation uses a hash as the underlying structure.
25              
26             # Keys inherited from WARC::Record base class (via WARC::Record::FromVolume):
27             #
28             # fields
29              
30             # Keys inherited from WARC::Record::FromVolume base class:
31             #
32             # collection (optional)
33             # Parent WARC::Collection object, if available
34              
35             # Keys defined by this class:
36             #
37             # segments
38             # Array, each element is array of:
39             # SEG_REC: WARC::Record::FromVolume
40             # SEG_BASE: integer, logical offset of first octet in segment block
41             # SEG_LENGTH: integer, number of octets in segment data block
42              
43 2     2   10 use constant {SEG_REC => 0, SEG_LENGTH => 1, SEG_BASE => 2};
  2         4  
  2         118  
44 2     2   10 use constant SEGMENT_INDEX => qw/SEG_REC SEG_LENGTH SEG_BASE/;
  2         4  
  2         1463  
45              
46 6     6   689 sub DESTROY { our $_total_destroyed; $_total_destroyed++ }
  6         25  
47              
48             sub _dbg_dump {
49 3     3   13 my $self = shift;
50              
51             my $out = 'WARC logical record ['
52 3         3 .(scalar @{$self->{segments}})." segments] containing:\n";
  3         9  
53             my @out =
54 3         6 map {s/^/ /gm; $_} map {$_->[SEG_REC]->_dbg_dump} @{$self->{segments}};
  20         99  
  20         30  
  20         45  
  3         5  
55 3         28 $out .= join("\n", @out);
56              
57 3         17 return $out;
58             }
59              
60             # inherit new
61              
62             sub _read {
63 17     17   121 my $class = shift;
64 17         23 my $member = shift;
65              
66 17         19 my %ob = ();
67 17 100       65 $ob{collection} = $member->{collection} if defined $member->{collection};
68              
69 17         40 my $member_segment_number = $member->field('WARC-Segment-Number');
70 17 100       218 croak "attempting to load logical record for non-segmented record"
71             unless $member_segment_number;
72              
73             # find the first segment
74 16         31 my $first_segment = undef; my @clues = ();
  16         21  
75             SEGMENT: {
76 16 100       18 if ($member_segment_number == 1) {
  16         30  
77 8         13 $first_segment = $member; # <-- that was easy...
78             } else { # ... less easy: go find the first segment...
79 8         17 my $segment_origin_id = $member->field('WARC-Segment-Origin-ID');
80 8 100       118 croak "record segment lacks required 'WARC-Segment-Origin-ID' field"
81             unless $segment_origin_id;
82 7 100 100     26 if (defined $member->{collection}
83             && $member->{collection}->searchable('record_id')) {
84             $first_segment = $member->{collection}->search
85 3         21 (record_id => $segment_origin_id);
86 3 100       21 next SEGMENT if defined $first_segment;
87 1         88 carp "index failed to locate first segment by Record-ID";
88             # ... and onwards to heuristics ...
89             }
90 5         55 ($first_segment, @clues) =
91             WARC::Record::Logical::Heuristics::find_first_segment ($member);
92             }
93             }
94 15 100       297 croak "failed to locate first segment of logical record"
95             unless defined $first_segment;
96              
97             # find the other segments
98 12         16 my @pool = ();
99             SEGMENT: {
100 12 100 100     13 if (defined $member->{collection}
  12         39  
101             && $member->{collection}->searchable('segment_origin_id')) {
102             @pool = $member->{collection}->search
103 7         80 (segment_origin_id => $first_segment->id);
104 20         31 @pool = map {$_->[0]} sort {$a->[1] <=> $b->[1]}
  41         90  
105 7         15 map {[$_, $_->field('WARC-Segment-Number')]} @pool;
  20         39  
106             last SEGMENT if # we have all of the segments
107             (@pool
108 7 100 100     27 && ($pool[-1]->field('WARC-Segment-Number') == (1+@pool))
      100        
109             && ($pool[-1]->field('WARC-Segment-Total-Length')));
110 4         377 carp "index failed to locate all segments by Origin-ID";
111             # ... and onwards to heuristics ...
112             }
113 9         232 push @pool, (WARC::Record::Logical::Heuristics::find_continuation
114             ($first_segment, @pool, @clues));
115             # sort again in case heuristics added more records
116 13         25 @pool = map {$_->[0]} sort {$a->[1] <=> $b->[1]}
  11         25  
117 9         39 map {[$_, $_->field('WARC-Segment-Number')]} @pool;
  13         29  
118             }
119 12 100       284 croak "failed to locate any continuation segments for logical record"
120             unless scalar @pool > 0;
121              
122             # assemble logical record segments
123 9         12 my @record = ($first_segment);
124             {
125 9         13 my $i = 0;
  9         10  
126 9         17 while ($i < @pool) {
127 24         50 my $segment_number = $pool[$i]->field('WARC-Segment-Number');
128 24         40 push @record, $pool[$i];
129 24   100     61 $i++ # skip duplicate segments heuristics may have found
130             while $i < @pool
131             && $segment_number == $pool[$i]->field('WARC-Segment-Number');
132             }
133             }
134              
135             # verify logical record
136 9         20 for (my $i = 0; $i < @record; $i++) {
137 33 100       69 croak "logical record segment missing or out-of-place"
138             unless $record[$i]->field('WARC-Segment-Number') == (1+$i);
139 32 100 100     91 croak "logical record segment not part of record (corrupted index?)"
140             unless $i == 0
141             || $record[$i]->field('WARC-Segment-Origin-ID') eq $record[0]->id;
142             }
143 7 100       17 croak "final segment lacks required 'WARC-Segment-Total-Length' header"
144             unless $record[-1]->field('WARC-Segment-Total-Length');
145              
146             # assemble logical record header
147 6         16 my $fields = $record[0]->fields->clone;
148             {
149             # Set "Content-Length" to the total length
150 6         7 $fields->field('Content-Length',
  6         23  
151             $record[-1]->field('WARC-Segment-Total-Length'));
152             # Transfer any other non-segment-related headers that appear on the
153             # last segment and are not present at the first segment.
154 6         9 foreach my $key (grep !m/^WARC-Segment-/, keys %{$record[-1]->fields}) {
  6         12  
155 22 100       36 $fields->field($key, $record[-1]->field($key))
156             unless defined $fields->field($key);
157             }
158             # Delete the block digest header, since it is from a segment.
159 6         22 $fields->field('WARC-Block-Digest' => []);
160             # Delete all segment-related headers
161 6         10 my %fields; tie %fields, ref $fields, $fields;
  6         18  
162 6         16 my @segment_headers = grep m/^WARC-Segment/, keys %fields;
163 6         23 $fields->field($_ => []) for @segment_headers;
164 6         16 untie %fields;
165             }
166 6         18 $fields->set_readonly;
167 6         11 $ob{fields} = $fields;
168              
169             # assemble logical record data
170 6         8 my @segments = ();
171             {
172 2     2   14 use integer;
  2         12  
  2         9  
  6         9  
173 6         8 my $running_base = 0;
174 6         12 for (my $i = 0; $i < @record; $i++) {
175 27         2064 my @row = ();
176              
177 27         46 $row[SEG_REC] = $record[$i];
178 27         56 $row[SEG_LENGTH] = 0+$record[$i]->field('Content-Length');
179              
180 27 100 100     91 $running_base = Math::BigInt->new($running_base)
181             if ((not ref $running_base)
182             && (($running_base + $row[SEG_LENGTH]) < $running_base));
183 27         87 $row[SEG_BASE] = $running_base;
184              
185 27         40 $segments[$i] = \@row;
186 27         56 $running_base += $row[SEG_LENGTH];
187             }
188             }
189 6         148 $ob{segments} = \@segments;
190              
191 6         7 { our $_total_read; $_total_read++ }
  6         7  
  6         7  
192              
193 6         11 my $self = bless \%ob, $class;
194              
195 6         14 $_->[SEG_REC]->{logical} = $self for @{$self->{segments}};
  6         65  
196 6         8 Scalar::Util::weaken $_->[SEG_REC]->{logical} for @{$self->{segments}};
  6         33  
197              
198 6         37 return $self;
199             }
200              
201 2     2 1 7 sub protocol { (shift)->{segments}[0][SEG_REC]->protocol }
202 20     20 1 977 sub volume { (shift)->{segments}[0][SEG_REC]->volume }
203 14     14 1 950 sub offset { (shift)->{segments}[0][SEG_REC]->offset }
204              
205 2     2 1 8 sub logical { shift }
206              
207             sub segments {
208 13 100   13 1 35 if (wantarray) {
209 10         12 return map {$_->[SEG_REC]} @{(shift)->{segments}}
  35         68  
  10         22  
210             } else {
211 3         4 return scalar @{(shift)->{segments}}
  3         14  
212             }
213             }
214              
215 2     2 1 861 sub next { (shift)->{segments}[-1][SEG_REC]->next }
216              
217             sub open_block {
218 4     4 1 848 my $self = shift;
219              
220 4         11 my $xhandle = Symbol::geniosym;
221 4         104 tie *$xhandle, 'WARC::Record::Logical::Block', $self;
222              
223 4         19 return $xhandle;
224             }
225              
226 2     2 1 7 sub open_continued { (shift)->open_block }
227              
228             # inherit replay
229              
230             # inherit open_payload
231              
232             1;
233             __END__