File Coverage

blib/lib/LucyX/Remote/ClusterSearcher.pm
Criterion Covered Total %
statement 176 199 88.4
branch 27 48 56.2
condition 6 12 50.0
subroutine 21 23 91.3
pod 4 8 50.0
total 234 290 80.6


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one or more
2             # contributor license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright ownership.
4             # The ASF licenses this file to You under the Apache License, Version 2.0
5             # (the "License"); you may not use this file except in compliance with
6             # the License. You may obtain a copy of the License at
7             #
8             # http://www.apache.org/licenses/LICENSE-2.0
9             #
10             # Unless required by applicable law or agreed to in writing, software
11             # distributed under the License is distributed on an "AS IS" BASIS,
12             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13             # See the License for the specific language governing permissions and
14             # limitations under the License.
15              
16 2     2   1053 use strict;
  2         2  
  2         43  
17 2     2   6 use warnings;
  2         2  
  2         71  
18              
19             package LucyX::Remote::ClusterSearcher;
20 2     2   74 BEGIN { our @ISA = qw( Lucy::Search::Searcher ) }
21             our $VERSION = '0.006000_002';
22             $VERSION = eval $VERSION;
23 2     2   7 use Carp;
  2         20  
  2         84  
24 2     2   7 use Storable qw( nfreeze thaw );
  2         6  
  2         71  
25 2     2   6 use Scalar::Util qw( reftype );
  2         3  
  2         112  
26              
27             # Inside-out member vars.
28             our %shards;
29             our %num_shards;
30             our %starts;
31             our %doc_max;
32              
33 2     2   426 use IO::Socket::INET;
  2         8864  
  2         11  
34              
35             sub new {
36 2     2 1 505968 my ( $either, %args ) = @_;
37 2         7 my $addrs = delete $args{shards};
38 2         39 my $self = $either->SUPER::new(%args);
39 2 50       1357 confess("'shards' must be an arrayref")
40             unless reftype($addrs) eq 'ARRAY';
41 2         9 $num_shards{$$self} = scalar @$addrs;
42              
43             # Establish connections.
44 2         4 my @shards;
45 2         6 for my $addr (@$addrs) {
46 7         35 my $sock = IO::Socket::INET->new(
47             PeerAddr => $addr,
48             Proto => 'tcp',
49             Blocking => 0,
50             );
51 7 50       2239 confess("No socket: $!") unless $sock;
52 7         24 push @shards,
53             {
54             addr => $addr,
55             sock => $sock,
56             };
57             }
58 2         7 $shards{$$self} = \@shards;
59              
60             # Handshake with servers.
61 2         7 my %handshake_args = ( _action => 'handshake' );
62 2         5 my $responses = $self->_multi_rpc( \%handshake_args );
63 2         4 for my $response (@$responses) {
64 7 50       9 confess unless $response;
65             }
66              
67             # Derive doc_max and relative start offsets.
68 2         9 my $doc_max_responses = $self->_multi_rpc( { _action => 'doc_max' } );
69 2         4 my $doc_max = 0;
70 2         1 my @starts;
71 2         4 for my $shard_doc_max (@$doc_max_responses) {
72 7         5 push @starts, $doc_max;
73 7         6 $doc_max += $shard_doc_max;
74             }
75 2         44 $starts{$$self} = Lucy::Object::I32Array->new( ints => \@starts );
76 2         5 $doc_max{$$self} = $doc_max;
77              
78 2         11 return $self;
79             }
80              
81             sub DESTROY {
82 0     0   0 my $self = shift;
83 0 0       0 $self->close if defined $shards{$$self};
84 0         0 delete $shards{$$self};
85 0         0 delete $num_shards{$$self};
86 0         0 delete $starts{$$self};
87 0         0 delete $doc_max{$$self};
88 0         0 $self->SUPER::DESTROY;
89             }
90              
91             # Send a remote procedure call to all shards.
92             sub _multi_rpc {
93 30     30   33 my ( $self, $args ) = @_;
94 30         58 return $self->_rpc( $args, $shards{$$self} );
95             }
96              
97             # Send a remote procedure call to one shard.
98             sub _single_rpc {
99 26     26   34 my ( $self, $args, $shard_num ) = @_;
100 26         50 my $shard = $shards{$$self}[$shard_num];
101 26         74 my $responses = $self->_rpc( $args, [$shard] );
102 26         420 return $responses->[0];
103             }
104              
105             sub _rpc {
106 56     56   52 my ( $self, $args, $shards ) = @_;
107              
108 56         92 my $request = $self->_serialize_request($args);
109 56         59 my $timeout = 5;
110             my $shutdown = $args->{_action} eq 'done'
111 56   66     238 || $args->{_action} eq 'terminate';
112              
113 56         78 my ( $rin, $win, $ein ) = ( '', '', '' );
114              
115             # Initialize shards to send the request
116 56         93 for my $shard (@$shards) {
117 146         298 my $fileno = $shard->{sock}->fileno;
118 146         540 vec( $win, $fileno, 1 ) = 1;
119 146         173 $shard->{response} = undef;
120 146         122 $shard->{error} = undef;
121 146         188 $shard->{buf} = $request;
122 146         104 $shard->{sent} = 0;
123 146         135 $shard->{callback} = \&_cb_send;
124 146         169 $shard->{shutdown} = $shutdown;
125             }
126              
127 56         63 my $remaining = @$shards;
128              
129             # Event loop
130 56         99 while ( $remaining > 0 ) {
131 198         148 my ( $rout, $wout, $eout );
132              
133 198         352779 my $n = select( $rout = $rin, $wout = $win, $eout = $ein, $timeout );
134              
135 198 50       403 confess("select: $!") if $n == -1;
136 198 50       288 confess("I/O timeout") if $n == 0;
137              
138 198         308 for my $shard (@$shards) {
139 528 100       755 next if !$shard->{callback};
140 490         939 my $fileno = $shard->{sock}->fileno;
141 490 100 66     1924 next if !vec( $rout, $fileno, 1 ) && !vec( $wout, $fileno, 1 );
142             # Dispatch event
143 444         673 $shard->{callback}->( $shard, \$rin, \$win, \$ein );
144 444 100       1161 --$remaining if !$shard->{callback};
145             }
146             }
147              
148             # Collect responses and cleanup
149 56         49 my @responses;
150             my @errors;
151 56         102 for my $shard (@$shards) {
152 146 50       201 if ( defined $shard->{error} ) {
153 0         0 push( @errors, $shard->{error} . ' @ ' . $shard->{addr} );
154             }
155             else {
156 146         180 push( @responses, $shard->{response}{retval} );
157             }
158 146         124 $shard->{response} = undef;
159 146         135 $shard->{error} = undef;
160 146         139 $shard->{buf} = undef;
161             }
162 56 50       99 confess( 'RPC error: ' . join( ', ', @errors ) ) if @errors;
163 56         128 return \@responses;
164             }
165              
166             # Serialize a method name and hash-style parameters using the conventions
167             # understood by SearchServer.
168             sub _serialize_request {
169 56     56   57 my ( $self, $args ) = @_;
170 56         133 my $serialized = nfreeze($args);
171 56         1662 my $packed_len = pack( 'N', length($serialized) );
172 56         97 my $request = "$packed_len$serialized";
173 56         87 return \$request;
174             }
175              
176             # Send a (partial) request to a shard
177             sub _cb_send {
178 146     146   156 my ( $shard, $rin, $win, $ein ) = @_;
179              
180 146         107 my $msg = substr( ${ $shard->{buf} }, $shard->{sent} );
  146         290  
181 146         315 my $sent = $shard->{sock}->send($msg);
182              
183 146 50       31676 if ( !defined($sent) ) {
184 0         0 $shard->{error} = $!;
185 0         0 $shard->{callback} = undef;
186 0         0 vec( $$win, $shard->{sock}->fileno, 1 ) = 0;
187 0         0 return;
188             }
189              
190 146         206 $shard->{sent} += $sent;
191              
192 146 50       305 if ( $sent >= length($msg) ) {
193             # Complete
194 146         377 my $fileno = $shard->{sock}->fileno;
195 146         695 vec( $$win, $fileno, 1 ) = 0;
196 146 100       245 if ( $shard->{shutdown} ) {
197             # Bail out if we're either closing or shutting down the server
198             # remotely.
199 7         12 $shard->{callback} = undef;
200             }
201             else {
202             # Setup shard to read response length
203 139         136 $shard->{buf} = '';
204 139         167 $shard->{callback} = \&_cb_recv_length;
205 139         287 vec( $$rin, $fileno, 1 ) = 1;
206             }
207             }
208             }
209              
210             # Receive a (partial) response length from a shard
211             sub _cb_recv_length {
212 139     139   136 my ( $shard, $rin, $win, $ein ) = @_;
213              
214 139         91 my $data;
215 139         303 my $r = $shard->{sock}->recv( $data, 4 - length( $shard->{buf} ) );
216              
217 139 50 33     1611 if ( !defined($r) || length($data) == 0 ) {
218 0 0       0 $shard->{error} = !defined($r) ? $! : 'Remote shutdown';
219 0         0 $shard->{callback} = undef;
220 0         0 vec( $$rin, $shard->{sock}->fileno, 1 ) = 0;
221 0         0 return;
222             }
223              
224 139         167 $shard->{buf} .= $data;
225              
226 139 50       228 if ( length( $shard->{buf} ) >= 4 ) {
227             # Complete, setup shard to receive response
228 139         212 $shard->{response_size} = unpack( 'N', $shard->{buf} );
229 139         112 $shard->{buf} = '';
230 139         171 $shard->{callback} = \&_cb_recv_response;
231             }
232             }
233              
234             # Receive a (partial) response from a shard
235             sub _cb_recv_response {
236 159     159   165 my ( $shard, $rin, $win, $ein ) = @_;
237              
238 159         119 my $data;
239 159         193 my $remaining = $shard->{response_size} - length( $shard->{buf} );
240 159         315 my $r = $shard->{sock}->recv( $data, $remaining );
241              
242 159 50 33     2323 if ( !defined($r) || length($data) == 0 ) {
243 0 0       0 $shard->{error} = !defined($r) ? $! : 'Remote shutdown';
244 0         0 $shard->{callback} = undef;
245 0         0 vec( $$rin, $shard->{sock}->fileno, 1 ) = 0;
246 0         0 return;
247             }
248              
249 159         382 $shard->{buf} .= $data;
250              
251 159 100       289 if ( length( $shard->{buf} ) >= $shard->{response_size} ) {
252             # Finished
253 139         289 $shard->{response} = thaw( $shard->{buf} );
254 139         3398 $shard->{callback} = undef;
255             }
256             }
257              
258             sub top_docs {
259 5     5 0 20 my ( $self, %args ) = @_;
260 5         9 my $starts = $starts{$$self};
261 5         7 my $num_shards = $num_shards{$$self};
262 5         7 my $query = $args{query};
263 5         5 my $num_wanted = $args{num_wanted};
264 5         5 my $sort_spec = $args{sort_spec};
265              
266             # Weight query if necessary.
267 5 50       114 my $compiler
268             = $query->isa("Lucy::Search::Compiler")
269             ? $query
270             : $query->make_compiler(
271             searcher => $self,
272             boost => $query->get_boost,
273             );
274              
275             # Create HitQueue.
276 5         7 my $hit_q;
277 5 100       26 if ($sort_spec) {
278 2         45 $hit_q = Lucy::Search::HitQueue->new(
279             schema => $self->get_schema,
280             sort_spec => $sort_spec,
281             wanted => $num_wanted,
282             );
283             }
284             else {
285 3         45 $hit_q = Lucy::Search::HitQueue->new( wanted => $num_wanted, );
286             }
287              
288             # Gather remote responses and aggregate.
289 5         10 $args{_action} = 'top_docs';
290 5         12 my $responses = $self->_multi_rpc( \%args );
291 5         15 my $total_hits = 0;
292 5         12 for ( my $i = 0; $i < $num_shards; $i++ ) {
293 20         52 my $base = $starts->get($i);
294 20         17 my $sub_top_docs = $responses->[$i];
295 3         14 my @sub_match_docs = sort { $a->get_doc_id <=> $b->get_doc_id }
296 20         16 @{ $sub_top_docs->get_match_docs };
  20         84  
297 20         33 for my $match_doc (@sub_match_docs) {
298 22         59 $match_doc->set_doc_id( $match_doc->get_doc_id + $base );
299 22         92 $hit_q->insert($match_doc);
300             }
301 20         58 $total_hits += $sub_top_docs->get_total_hits;
302             }
303              
304             # Return a TopDocs object with the best of the best.
305 5         32 my $best_match_docs = $hit_q->pop_all;
306 5         171 return Lucy::Search::TopDocs->new(
307             total_hits => $total_hits,
308             match_docs => $best_match_docs,
309             );
310             }
311              
312             sub terminate {
313 0     0 0 0 my $self = shift;
314 0         0 $self->_multi_rpc( { _action => 'terminate' } );
315 0         0 return;
316             }
317              
318             sub fetch_doc {
319 19     19 1 397 my ( $self, $doc_id ) = @_;
320 19         48 my $starts = $starts{$$self};
321 19         85 my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id );
322 19         57 my $start = $starts->get($tick);
323 19         77 my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc' );
324 19         104 my $hit_doc = $self->_single_rpc( \%args, $tick );
325 19         75 $hit_doc->set_doc_id($doc_id);
326 19         107 return $hit_doc;
327             }
328              
329             sub fetch_doc_vec {
330 7     7 0 107 my ( $self, $doc_id ) = @_;
331 7         16 my $starts = $starts{$$self};
332 7         23 my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id );
333 7         16 my $start = $starts->get($tick);
334 7         26 my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc_vec' );
335 7         25 return $self->_single_rpc( \%args, $tick );
336             }
337              
338             sub doc_max {
339 24     24 1 1058 my $self = shift;
340 24         119 return $doc_max{$$self};
341             }
342              
343             sub doc_freq {
344 19     19 1 36 my $self = shift;
345 19         72 my %args = ( @_, _action => 'doc_freq' );
346 19         39 my $responses = $self->_multi_rpc( \%args );
347 19         19 my $doc_freq = 0;
348 19         41 $doc_freq += $_ for @$responses;
349 19         221 return $doc_freq;
350             }
351              
352             sub close {
353 2     2 0 653 my $self = shift;
354 2 50       7 return unless $shards{$$self};
355 2         7 $self->_multi_rpc( { _action => 'done' } );
356 2         3 for my $shard ( @{ $shards{$$self} } ) {
  2         5  
357 7 50       183 close $shard->{sock} or confess("Error when closing socket: $!");
358             }
359 2         25 delete $shards{$$self};
360             }
361              
362             1;
363              
364             __END__