File Coverage

blib/lib/Plucene/Index/Writer.pm
Criterion Covered Total %
statement 135 144 93.7
branch 22 38 57.8
condition 15 20 75.0
subroutine 31 32 96.8
pod 7 7 100.0
total 210 241 87.1


line stmt bran cond sub pod time code
1             package Plucene::Index::Writer;
2              
3             =head1 NAME
4              
5             Plucene::Index::Writer - write an index.
6              
7             =head1 SYNOPSIS
8              
9             my $writer = Plucene::Index::Writer->new($path, $analyser, $create);
10              
11             $writer->add_document($doc);
12             $writer->add_indexes(@dirs);
13              
14             $writer->optimize; # called before close
15            
16             my $doc_count = $writer->doc_count;
17              
18             my $mergefactor = $writer->mergefactor;
19              
20             $writer->set_mergefactor($value);
21              
22             =head1 DESCRIPTION
23              
24             This is the writer class.
25              
26             If an index will not have more documents added for a while and optimal search
27             performance is desired, then the C method should be called before the
28             index is closed.
29              
30             =head1 METHODS
31              
32             =cut
33              
34 17     17   4055 use strict;
  17         35  
  17         606  
35 17     17   93 use warnings;
  17         34  
  17         718  
36              
37 17     17   173 use Carp qw/cluck croak/;
  17         37  
  17         1313  
38 17     17   112 use Fcntl qw(O_EXCL O_CREAT O_WRONLY);
  17         33  
  17         951  
39 17     17   99 use File::Path qw(mkpath);
  17         55  
  17         1091  
40 17     17   105 use List::Util qw(sum);
  17         31  
  17         2344  
41 17     17   36677 use File::Temp qw(tempdir);
  17         277606  
  17         1390  
42              
43 17     17   11911 use Plucene::Index::DocumentWriter;
  17         82  
  17         575  
44 17     17   691 use Plucene::Index::SegmentInfos;
  17         42  
  17         425  
45 17     17   109 use Plucene::Index::SegmentInfo;
  17         36  
  17         144  
46 17     17   1374 use Plucene::Index::SegmentReader;
  17         41  
  17         210  
47 17     17   12815 use Plucene::Index::SegmentMerger;
  17         51  
  17         222  
48 17     17   586 use Plucene::Utils;
  17         37  
  17         1481  
49              
50 17     17   111 use constant MAX_FIELD_LENGTH => 10_000;
  17         35  
  17         36106  
51              
52             our $max_merge_docs = ~0;
53              
54             =head2 new
55              
56             my $writer = Plucene::Index::Writer->new($path, $analyser, $create);
57              
58             This will create a new Plucene::Index::Writer object.
59            
60             The third argument to the constructor determines whether a new index is
61             created, or whether an existing index is opened for the addition of new
62             documents.
63              
64             =cut
65              
66             sub new {
67 18     18 1 78 my ($class, $path, $analyzer, $create) = @_;
68 18 50       120 $create = 0 unless defined $create;
69 18 50       840 if (!-d $path) {
70 0 0       0 croak "Couldn't write into $path - it doesn't exist" unless $create;
71 0 0       0 mkpath($path) or croak "Couldn't create $path - $!";
72             }
73              
74 18         84 my $lock = "$path/write.lock";
75              
76 18         216 my $self = bless {
77             directory => $path,
78             analyzer => $analyzer,
79             lock => $lock, # There are many like it, but this one is mine
80             segmentinfos => new Plucene::Index::SegmentInfos(),
81             tmp_directory => tempdir(CLEANUP => 1),
82             mergefactor => 10,
83             }, $class;
84              
85 18         11031 local *FH;
86 18 50       1964 sysopen FH, $lock, O_EXCL | O_CREAT | O_WRONLY
87             or croak "Couldn't get lock";
88 18         247 close *FH;
89              
90             do_locked {
91 18 100   18   291 $create
92             ? $self->{segmentinfos}->write($path)
93             : $self->{segmentinfos}->read($path);
94             }
95 18         276 "$path/commit.lock";
96              
97 18         155 return $self;
98             }
99              
100             =head2 mergefactor / set_mergefactor
101              
102             my $mergefactor = $writer->mergefactor;
103              
104             $writer->set_mergefactor($value);
105              
106             Get / set the mergefactor. It defaults to 5.
107              
108             =cut
109              
110 310     310 1 1559 sub mergefactor { $_[0]->{mergefactor} }
111              
112             sub set_mergefactor {
113 3   100 3 1 19 $_[0]->{mergefactor} = $_[1] || $_[0]->mergefactor || 10;
114             }
115              
116             sub DESTROY {
117 18     18   804 my $self = shift;
118 18 50       1565 unlink $self->{lock} if $self->{lock};
119 18         107 $self->_flush;
120             }
121              
122             =head2 doc_count
123              
124             my $doc_count = $writer->doc_count;
125              
126             =cut
127              
128 2     2 1 25 sub doc_count { sum map $_->doc_count(), $_[0]->{segmentinfos}->segments }
129              
130             =head2 add_document
131              
132             $writer->add_document($doc);
133              
134             Adds a document to the index. After the document has been added, a merge takes
135             place if there are more than C<$Plucene::Index::Writer::mergefactor> segments
136             in the index. This defaults to 10, but can be set to whatever value is optimal
137             for your application.
138            
139             =cut
140              
141             sub add_document {
142 257     257 1 1062 my ($self, $doc) = @_;
143              
144 257         2460 my $dw = Plucene::Index::DocumentWriter->new($self->{tmp_directory},
145             $self->{analyzer}, MAX_FIELD_LENGTH);
146 257         951 my $segname = $self->_new_segname;
147 257         1141 $dw->add_document($segname, $doc);
148              
149             #lock $self;
150 257         5717 $self->{segmentinfos}->add_element(
151             Plucene::Index::SegmentInfo->new({
152             name => $segname,
153             doc_count => 1,
154             dir => $self->{tmp_directory} }));
155 257         1895 $self->_maybe_merge_segments;
156             }
157              
158             sub _new_segname {
159 309     309   1604 "_" . $_[0]->{segmentinfos}->{counter}++ # Urgh
160             }
161              
162             sub _flush {
163 18     18   46 my $self = shift;
164 18         114 my @segs = $self->{segmentinfos}->segments;
165 18         60 my $min_segment = $#segs;
166 18         50 my $doc_count = 0;
167 18   100     179 while ($min_segment >= 0
168             and $segs[$min_segment]->dir eq $self->{tmp_directory}) {
169 19         178 $doc_count += $segs[$min_segment]->doc_count;
170 19         117 $min_segment--;
171             }
172 18 50 100     259 if ( $min_segment < 0
      66        
173             or ($doc_count + $segs[$min_segment]->doc_count > $self->mergefactor)
174             or !($segs[-1]->dir eq $self->{tmp_directory})) {
175 18         101 $min_segment++;
176             }
177 18 50       82 return if $min_segment > @segs;
178 18         92 $self->_merge_segments($min_segment);
179             }
180              
181             =head2 optimize
182              
183             $writer->optimize;
184              
185             Merges all segments together into a single segment, optimizing an index
186             for search. This should be the last method called on an indexer, as it
187             invalidates the writer object.
188              
189             =cut
190              
191             sub optimize {
192 19     19 1 120 my $self = shift;
193 19         38 my $segments;
194 19   33     87 while (
      66        
195             ($segments = scalar $self->{segmentinfos}->segments) > 1
196             or
197              
198             # If it's fragmented
199             (
200             $segments == 1 and # or it's not fragmented
201             (
202             Plucene::Index::SegmentReader->has_deletions( # but has deletions
203             $self->{segmentinfos}->info(0))))
204             ) {
205 15         58 my $minseg = $segments - $self->mergefactor;
206 15 50       101 $self->_merge_segments($minseg < 0 ? 0 : $minseg);
207             }
208             }
209              
210             =head2 add_indexes
211              
212             $writer->add_indexes(@dirs);
213              
214             Merges all segments from an array of indexes into this index.
215              
216             This may be used to parallelize batch indexing. A large document
217             collection can be broken into sub-collections. Each sub-collection can be
218             indexed in parallel, on a different thread, process or machine. The
219             complete index can then be created by merging sub-collection indexes
220             with this method.
221              
222             After this completes, the index is optimized.
223              
224             =cut
225              
226             sub add_indexes {
227 0     0 1 0 my ($self, @dirs) = @_;
228 0         0 $self->optimize;
229 0         0 for my $dir (@dirs) {
230 0         0 my $sis = new Plucene::Index::SegmentInfos;
231 0         0 $sis->read($dir);
232 0         0 $self->{segmentinfos}->add_element($_) for $sis->segments;
233             }
234 0         0 $self->optimize;
235             }
236              
237             # Incremental segment merger.
238             # Or even this code - SC
239             sub _maybe_merge_segments {
240 257     257   473 my $self = shift;
241 257         1249 my $target_merge_docs = $self->mergefactor;
242 257         1346 while ($target_merge_docs <= $max_merge_docs) {
243 276 50       959 cluck("No segments defined!") unless $self->{segmentinfos};
244 276         1407 my $min_seg = scalar $self->{segmentinfos}->segments;
245 276         509 my $merge_docs = 0;
246 276         844 while (--$min_seg >= 0) {
247 1515         9297 my $si = $self->{segmentinfos}->info($min_seg);
248 1515 100       4463 last if $si->doc_count >= $target_merge_docs;
249 1368         8501 $merge_docs += $si->doc_count;
250             }
251 276 100       130992 last unless $merge_docs >= $target_merge_docs;
252 19         103 $self->_merge_segments($min_seg + 1);
253 19         897 $target_merge_docs *= $self->mergefactor;
254             }
255             }
256              
257             # Pops segments off of segmentInfos stack down to minSegment, merges
258             # them, and pushes the merged index onto the top of the segmentInfos stack.
259             sub _merge_segments {
260 52     52   118 my $self = shift;
261 52         112 my $min_segment = shift;
262 52         201 my $mergedname = $self->_new_segname;
263 52         116 my $mergedcount = 0;
264 52         703 my $merger = Plucene::Index::SegmentMerger->new({
265             dir => $self->{directory},
266             segment => $mergedname
267             });
268 52         680 my @to_delete;
269 52         238 my @segments = $self->{segmentinfos}->segments;
270 52 100       944 return if $#segments < $min_segment;
271              
272 39         216 for my $si (@segments[ $min_segment .. $#segments ]) {
273 274         2503 my $reader = Plucene::Index::SegmentReader->new($si);
274 274         1147 $merger->add($reader);
275 274 50 66     868 push @to_delete, $reader
276             if $reader->directory eq $self->{directory}
277             or $reader->directory eq $self->{tmp_directory};
278 274         4668 $mergedcount += $si->doc_count;
279             }
280 39         426 $merger->merge;
281              
282 39         525 $self->{segmentinfos}->{segments} = # This is a bit naughty
283             [
284             ($self->{segmentinfos}->segments)[ 0 .. $min_segment - 1 ],
285             Plucene::Index::SegmentInfo->new({
286             name => $mergedname,
287             dir => $self->{directory},
288             doc_count => $mergedcount
289             }) ];
290             do_locked {
291 39     39   314 $self->{segmentinfos}->write($self->{directory});
292 39         389 $self->_delete_segments(@to_delete);
293             }
294 39         1332 "$self->{directory}/commit.lock";
295              
296             }
297              
298             sub _delete_segments {
299 39     39   150 my ($self, @to_delete) = @_;
300 39         213 my @try_later = $self->_delete($self->_read_deletable_files);
301 39         109 for my $reader (@to_delete) {
302 274         1145 for my $file ($reader->files) {
303 2316         9595 push @try_later, $self->_delete("$reader->{directory}/$file");
304             }
305             }
306 39         221 $self->_write_deletable_files(@try_later);
307             }
308              
309             sub _delete {
310 2355     2355   4575 my ($self, @files) = @_;
311 2355         2711 my @failed;
312 2355 50       4240 for (@files) { unlink $_ or push @failed, $_ }
  2316         161141  
313 2355         7617 return @failed;
314             }
315              
316             sub _read_deletable_files {
317 39     39   131 my $self = shift;
318 39 100       1227 return unless -e (my $dfile = "$self->{directory}/deletable");
319 23 50       1100 open my $fh, $dfile or die $!;
320 23         832 chomp(my @files = <$fh>);
321 23         660 return @files;
322             }
323              
324             sub _write_deletable_files {
325 39     39   119 my ($self, @files) = @_;
326 39         155 my $dfile = "$self->{directory}/deletable";
327 39 50       4326 open my $fh, ">" . $dfile . ".new" or die $!;
328 39         149 print $fh "$_\n" for @files;
329 39         524 close($fh);
330 39         3418 rename $dfile . ".new", $dfile;
331             }
332              
333             1;