File Coverage

blib/lib/WARC/Record/Logical.pm
Criterion Covered Total %
statement 139 139 100.0
branch 32 32 100.0
condition 21 21 100.0
subroutine 21 21 100.0
pod 8 8 100.0
total 221 221 100.0


line stmt bran cond sub pod time code
1             package WARC::Record::Logical; # -*- CPerl -*-
2              
3 2     2   68332 use strict;
  2         14  
  2         53  
4 2     2   9 use warnings;
  2         4  
  2         105  
5              
6             require WARC::Record::FromVolume;
7             our @ISA = qw(WARC::Record::FromVolume);
8              
9 2     2   384 use WARC; *WARC::Record::Logical::VERSION = \$WARC::VERSION;
  2         3  
  2         73  
10              
11 2     2   10 use Carp;
  2         4  
  2         97  
12 2     2   2009 use Math::BigInt;
  2         44995  
  2         11  
13 2     2   40692 use Scalar::Util qw();
  2         4  
  2         36  
14 2     2   10 use Symbol 'geniosym';
  2         4  
  2         176  
15              
16             require WARC::Fields;
17             require WARC::Record::Logical::Block;
18             require WARC::Record::Logical::Heuristics;
19              
20             # inherit _set
21              
22             # inherit compareTo
23              
24             # This implementation uses a hash as the underlying structure.
25              
26             # Keys inherited from WARC::Record base class (via WARC::Record::FromVolume):
27             #
28             # fields
29              
30             # Keys inherited from WARC::Record::FromVolume base class:
31             #
32             # collection (optional)
33             # Parent WARC::Collection object, if available
34              
35             # Keys defined by this class:
36             #
37             # segments
38             # Array, each element is array of:
39             # SEG_REC: WARC::Record::FromVolume
40             # SEG_BASE: integer, logical offset of first octet in segment block
41             # SEG_LENGTH: integer, number of octets in segment data block
42              
43 2     2   12 use constant {SEG_REC => 0, SEG_LENGTH => 1, SEG_BASE => 2};
  2         4  
  2         132  
44 2     2   11 use constant SEGMENT_INDEX => qw/SEG_REC SEG_LENGTH SEG_BASE/;
  2         4  
  2         1465  
45              
46 6     6   833 sub DESTROY { our $_total_destroyed; $_total_destroyed++ }
  6         28  
47              
48             sub _dbg_dump {
49 3     3   17 my $self = shift;
50              
51             my $out = 'WARC logical record ['
52 3         7 .(scalar @{$self->{segments}})." segments] containing:\n";
  3         11  
53             my @out =
54 3         8 map {s/^/ /gm; $_} map {$_->[SEG_REC]->_dbg_dump} @{$self->{segments}};
  20         138  
  20         39  
  20         56  
  3         9  
55 3         37 $out .= join("\n", @out);
56              
57 3         24 return $out;
58             }
59              
60             # inherit new
61              
62             sub _read {
63 17     17   142 my $class = shift;
64 17         25 my $member = shift;
65              
66 17         27 my %ob = ();
67 17 100       82 $ob{collection} = $member->{collection} if defined $member->{collection};
68              
69 17         51 my $member_segment_number = $member->field('WARC-Segment-Number');
70 17 100       265 croak "attempting to load logical record for non-segmented record"
71             unless $member_segment_number;
72              
73             # find the first segment
74 16         35 my $first_segment = undef; my @clues = ();
  16         25  
75             SEGMENT: {
76 16 100       24 if ($member_segment_number == 1) {
  16         37  
77 8         13 $first_segment = $member; # <-- that was easy...
78             } else { # ... less easy: go find the first segment...
79 8         30 my $segment_origin_id = $member->field('WARC-Segment-Origin-ID');
80 8 100       131 croak "record segment lacks required 'WARC-Segment-Origin-ID' field"
81             unless $segment_origin_id;
82 7 100 100     32 if (defined $member->{collection}
83             && $member->{collection}->searchable('record_id')) {
84             $first_segment = $member->{collection}->search
85 3         29 (record_id => $segment_origin_id);
86 3 100       16 next SEGMENT if defined $first_segment;
87 1         109 carp "index failed to locate first segment by Record-ID";
88             # ... and onwards to heuristics ...
89             }
90 5         72 ($first_segment, @clues) =
91             WARC::Record::Logical::Heuristics::find_first_segment ($member);
92             }
93             }
94 15 100       404 croak "failed to locate first segment of logical record"
95             unless defined $first_segment;
96              
97             # find the other segments
98 12         20 my @pool = ();
99             SEGMENT: {
100 12 100 100     17 if (defined $member->{collection}
  12         46  
101             && $member->{collection}->searchable('segment_origin_id')) {
102             @pool = $member->{collection}->search
103 7         79 (segment_origin_id => $first_segment->id);
104 20         37 @pool = map {$_->[0]} sort {$a->[1] <=> $b->[1]}
  41         140  
105 7         18 map {[$_, $_->field('WARC-Segment-Number')]} @pool;
  20         55  
106             last SEGMENT if # we have all of the segments
107             (@pool
108 7 100 100     37 && ($pool[-1]->field('WARC-Segment-Number') == (1+@pool))
      100        
109             && ($pool[-1]->field('WARC-Segment-Total-Length')));
110 4         538 carp "index failed to locate all segments by Origin-ID";
111             # ... and onwards to heuristics ...
112             }
113 9         293 push @pool, (WARC::Record::Logical::Heuristics::find_continuation
114             ($first_segment, @pool, @clues));
115             # sort again in case heuristics added more records
116 13         29 @pool = map {$_->[0]} sort {$a->[1] <=> $b->[1]}
  11         31  
117 9         50 map {[$_, $_->field('WARC-Segment-Number')]} @pool;
  13         36  
118             }
119 12 100       439 croak "failed to locate any continuation segments for logical record"
120             unless scalar @pool > 0;
121              
122             # assemble logical record segments
123 9         20 my @record = ($first_segment);
124             {
125 9         11 my $i = 0;
  9         12  
126 9         21 while ($i < @pool) {
127 24         64 my $segment_number = $pool[$i]->field('WARC-Segment-Number');
128 24         49 push @record, $pool[$i];
129 24   100     73 $i++ # skip duplicate segments heuristics may have found
130             while $i < @pool
131             && $segment_number == $pool[$i]->field('WARC-Segment-Number');
132             }
133             }
134              
135             # verify logical record
136 9         27 for (my $i = 0; $i < @record; $i++) {
137 33 100       77 croak "logical record segment missing or out-of-place"
138             unless $record[$i]->field('WARC-Segment-Number') == (1+$i);
139 32 100 100     113 croak "logical record segment not part of record (corrupted index?)"
140             unless $i == 0
141             || $record[$i]->field('WARC-Segment-Origin-ID') eq $record[0]->id;
142             }
143 7 100       17 croak "final segment lacks required 'WARC-Segment-Total-Length' header"
144             unless $record[-1]->field('WARC-Segment-Total-Length');
145              
146             # assemble logical record header
147 6         18 my $fields = $record[0]->fields->clone;
148             {
149             # Set "Content-Length" to the total length
150 6         12 $fields->field('Content-Length',
  6         16  
151             $record[-1]->field('WARC-Segment-Total-Length'));
152             # Transfer any other non-segment-related headers that appear on the
153             # last segment and are not present at the first segment.
154 6         12 foreach my $key (grep !m/^WARC-Segment-/, keys %{$record[-1]->fields}) {
  6         17  
155 22 100       53 $fields->field($key, $record[-1]->field($key))
156             unless defined $fields->field($key);
157             }
158             # Delete the block digest header, since it is from a segment.
159 6         29 $fields->field('WARC-Block-Digest' => []);
160             # Delete all segment-related headers
161 6         11 my %fields; tie %fields, ref $fields, $fields;
  6         23  
162 6         25 my @segment_headers = grep m/^WARC-Segment/, keys %fields;
163 6         32 $fields->field($_ => []) for @segment_headers;
164 6         22 untie %fields;
165             }
166 6         20 $fields->set_readonly;
167 6         11 $ob{fields} = $fields;
168              
169             # assemble logical record data
170 6         11 my @segments = ();
171             {
172 2     2   16 use integer;
  2         4  
  2         7  
  6         10  
173 6         9 my $running_base = 0;
174 6         17 for (my $i = 0; $i < @record; $i++) {
175 27         2544 my @row = ();
176              
177 27         60 $row[SEG_REC] = $record[$i];
178 27         73 $row[SEG_LENGTH] = 0+$record[$i]->field('Content-Length');
179              
180 27 100 100     117 $running_base = Math::BigInt->new($running_base)
181             if ((not ref $running_base)
182             && (($running_base + $row[SEG_LENGTH]) < $running_base));
183 27         135 $row[SEG_BASE] = $running_base;
184              
185 27         51 $segments[$i] = \@row;
186 27         70 $running_base += $row[SEG_LENGTH];
187             }
188             }
189 6         183 $ob{segments} = \@segments;
190              
191 6         12 { our $_total_read; $_total_read++ }
  6         8  
  6         9  
192              
193 6         16 my $self = bless \%ob, $class;
194              
195 6         10 $_->[SEG_REC]->{logical} = $self for @{$self->{segments}};
  6         87  
196 6         10 Scalar::Util::weaken $_->[SEG_REC]->{logical} for @{$self->{segments}};
  6         40  
197              
198 6         32 return $self;
199             }
200              
201 2     2 1 10 sub protocol { (shift)->{segments}[0][SEG_REC]->protocol }
202 20     20 1 1157 sub volume { (shift)->{segments}[0][SEG_REC]->volume }
203 14     14 1 1200 sub offset { (shift)->{segments}[0][SEG_REC]->offset }
204              
205 2     2 1 9 sub logical { shift }
206              
207             sub segments {
208 13 100   13 1 46 if (wantarray) {
209 10         17 return map {$_->[SEG_REC]} @{(shift)->{segments}}
  35         71  
  10         28  
210             } else {
211 3         6 return scalar @{(shift)->{segments}}
  3         19  
212             }
213             }
214              
215 2     2 1 1024 sub next { (shift)->{segments}[-1][SEG_REC]->next }
216              
217             sub open_block {
218 4     4 1 1065 my $self = shift;
219              
220 4         16 my $xhandle = Symbol::geniosym;
221 4         131 tie *$xhandle, 'WARC::Record::Logical::Block', $self;
222              
223 4         25 return $xhandle;
224             }
225              
226 2     2 1 7 sub open_continued { (shift)->open_block }
227              
228             # inherit replay
229              
230             # inherit open_payload
231              
232             1;
233             __END__