File Coverage

blib/lib/LucyX/Remote/ClusterSearcher.pm
Criterion Covered Total %
statement 176 199 88.4
branch 27 48 56.2
condition 6 12 50.0
subroutine 21 23 91.3
pod 4 8 50.0
total 234 290 80.6


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one or more
2             # contributor license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright ownership.
4             # The ASF licenses this file to You under the Apache License, Version 2.0
5             # (the "License"); you may not use this file except in compliance with
6             # the License. You may obtain a copy of the License at
7             #
8             # http://www.apache.org/licenses/LICENSE-2.0
9             #
10             # Unless required by applicable law or agreed to in writing, software
11             # distributed under the License is distributed on an "AS IS" BASIS,
12             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13             # See the License for the specific language governing permissions and
14             # limitations under the License.
15              
16 2     2   1013 use strict;
  2         2  
  2         46  
17 2     2   6 use warnings;
  2         1  
  2         76  
18              
19             package LucyX::Remote::ClusterSearcher;
20 2     2   76 BEGIN { our @ISA = qw( Lucy::Search::Searcher ) }
21             our $VERSION = '0.006000_001';
22             $VERSION = eval $VERSION;
23 2     2   7 use Carp;
  2         22  
  2         120  
24 2     2   9 use Storable qw( nfreeze thaw );
  2         2  
  2         108  
25 2     2   9 use Scalar::Util qw( reftype );
  2         2  
  2         165  
26              
27             # Inside-out member vars.
28             our %shards;
29             our %num_shards;
30             our %starts;
31             our %doc_max;
32              
33 2     2   488 use IO::Socket::INET;
  2         9478  
  2         12  
34              
35             sub new {
36 2     2 1 507022 my ( $either, %args ) = @_;
37 2         7 my $addrs = delete $args{shards};
38 2         41 my $self = $either->SUPER::new(%args);
39 2 50       1319 confess("'shards' must be an arrayref")
40             unless reftype($addrs) eq 'ARRAY';
41 2         10 $num_shards{$$self} = scalar @$addrs;
42              
43             # Establish connections.
44 2         3 my @shards;
45 2         7 for my $addr (@$addrs) {
46 7         36 my $sock = IO::Socket::INET->new(
47             PeerAddr => $addr,
48             Proto => 'tcp',
49             Blocking => 0,
50             );
51 7 50       2672 confess("No socket: $!") unless $sock;
52 7         25 push @shards,
53             {
54             addr => $addr,
55             sock => $sock,
56             };
57             }
58 2         9 $shards{$$self} = \@shards;
59              
60             # Handshake with servers.
61 2         5 my %handshake_args = ( _action => 'handshake' );
62 2         8 my $responses = $self->_multi_rpc( \%handshake_args );
63 2         3 for my $response (@$responses) {
64 7 50       9 confess unless $response;
65             }
66              
67             # Derive doc_max and relative start offsets.
68 2         12 my $doc_max_responses = $self->_multi_rpc( { _action => 'doc_max' } );
69 2         3 my $doc_max = 0;
70 2         2 my @starts;
71 2         5 for my $shard_doc_max (@$doc_max_responses) {
72 7         5 push @starts, $doc_max;
73 7         7 $doc_max += $shard_doc_max;
74             }
75 2         35 $starts{$$self} = Lucy::Object::I32Array->new( ints => \@starts );
76 2         4 $doc_max{$$self} = $doc_max;
77              
78 2         8 return $self;
79             }
80              
81             sub DESTROY {
82 0     0   0 my $self = shift;
83 0 0       0 $self->close if defined $shards{$$self};
84 0         0 delete $shards{$$self};
85 0         0 delete $num_shards{$$self};
86 0         0 delete $starts{$$self};
87 0         0 delete $doc_max{$$self};
88 0         0 $self->SUPER::DESTROY;
89             }
90              
91             # Send a remote procedure call to all shards.
92             sub _multi_rpc {
93 30     30   33 my ( $self, $args ) = @_;
94 30         62 return $self->_rpc( $args, $shards{$$self} );
95             }
96              
97             # Send a remote procedure call to one shard.
98             sub _single_rpc {
99 26     26   33 my ( $self, $args, $shard_num ) = @_;
100 26         58 my $shard = $shards{$$self}[$shard_num];
101 26         58 my $responses = $self->_rpc( $args, [$shard] );
102 26         410 return $responses->[0];
103             }
104              
105             sub _rpc {
106 56     56   52 my ( $self, $args, $shards ) = @_;
107              
108 56         78 my $request = $self->_serialize_request($args);
109 56         50 my $timeout = 5;
110             my $shutdown = $args->{_action} eq 'done'
111 56   66     244 || $args->{_action} eq 'terminate';
112              
113 56         77 my ( $rin, $win, $ein ) = ( '', '', '' );
114              
115             # Initialize shards to send the request
116 56         88 for my $shard (@$shards) {
117 146         256 my $fileno = $shard->{sock}->fileno;
118 146         532 vec( $win, $fileno, 1 ) = 1;
119 146         161 $shard->{response} = undef;
120 146         115 $shard->{error} = undef;
121 146         133 $shard->{buf} = $request;
122 146         116 $shard->{sent} = 0;
123 146         124 $shard->{callback} = \&_cb_send;
124 146         194 $shard->{shutdown} = $shutdown;
125             }
126              
127 56         62 my $remaining = @$shards;
128              
129             # Event loop
130 56         88 while ( $remaining > 0 ) {
131 220         187 my ( $rout, $wout, $eout );
132              
133 220         351551 my $n = select( $rout = $rin, $wout = $win, $eout = $ein, $timeout );
134              
135 220 50       388 confess("select: $!") if $n == -1;
136 220 50       305 confess("I/O timeout") if $n == 0;
137              
138 220         317 for my $shard (@$shards) {
139 665 100       1037 next if !$shard->{callback};
140 548         1137 my $fileno = $shard->{sock}->fileno;
141 548 100 66     2413 next if !vec( $rout, $fileno, 1 ) && !vec( $wout, $fileno, 1 );
142             # Dispatch event
143 443         766 $shard->{callback}->( $shard, \$rin, \$win, \$ein );
144 443 100       1191 --$remaining if !$shard->{callback};
145             }
146             }
147              
148             # Collect responses and cleanup
149 56         56 my @responses;
150             my @errors;
151 56         81 for my $shard (@$shards) {
152 146 50       186 if ( defined $shard->{error} ) {
153 0         0 push( @errors, $shard->{error} . ' @ ' . $shard->{addr} );
154             }
155             else {
156 146         152 push( @responses, $shard->{response}{retval} );
157             }
158 146         109 $shard->{response} = undef;
159 146         146 $shard->{error} = undef;
160 146         138 $shard->{buf} = undef;
161             }
162 56 50       91 confess( 'RPC error: ' . join( ', ', @errors ) ) if @errors;
163 56         120 return \@responses;
164             }
165              
166             # Serialize a method name and hash-style parameters using the conventions
167             # understood by SearchServer.
168             sub _serialize_request {
169 56     56   52 my ( $self, $args ) = @_;
170 56         133 my $serialized = nfreeze($args);
171 56         1638 my $packed_len = pack( 'N', length($serialized) );
172 56         109 my $request = "$packed_len$serialized";
173 56         82 return \$request;
174             }
175              
176             # Send a (partial) request to a shard
177             sub _cb_send {
178 146     146   166 my ( $shard, $rin, $win, $ein ) = @_;
179              
180 146         112 my $msg = substr( ${ $shard->{buf} }, $shard->{sent} );
  146         291  
181 146         319 my $sent = $shard->{sock}->send($msg);
182              
183 146 50       18249 if ( !defined($sent) ) {
184 0         0 $shard->{error} = $!;
185 0         0 $shard->{callback} = undef;
186 0         0 vec( $$win, $shard->{sock}->fileno, 1 ) = 0;
187 0         0 return;
188             }
189              
190 146         226 $shard->{sent} += $sent;
191              
192 146 50       291 if ( $sent >= length($msg) ) {
193             # Complete
194 146         393 my $fileno = $shard->{sock}->fileno;
195 146         724 vec( $$win, $fileno, 1 ) = 0;
196 146 100       293 if ( $shard->{shutdown} ) {
197             # Bail out if we're either closing or shutting down the server
198             # remotely.
199 7         13 $shard->{callback} = undef;
200             }
201             else {
202             # Setup shard to read response length
203 139         158 $shard->{buf} = '';
204 139         163 $shard->{callback} = \&_cb_recv_length;
205 139         283 vec( $$rin, $fileno, 1 ) = 1;
206             }
207             }
208             }
209              
210             # Receive a (partial) response length from a shard
211             sub _cb_recv_length {
212 139     139   163 my ( $shard, $rin, $win, $ein ) = @_;
213              
214 139         113 my $data;
215 139         332 my $r = $shard->{sock}->recv( $data, 4 - length( $shard->{buf} ) );
216              
217 139 50 33     1772 if ( !defined($r) || length($data) == 0 ) {
218 0 0       0 $shard->{error} = !defined($r) ? $! : 'Remote shutdown';
219 0         0 $shard->{callback} = undef;
220 0         0 vec( $$rin, $shard->{sock}->fileno, 1 ) = 0;
221 0         0 return;
222             }
223              
224 139         194 $shard->{buf} .= $data;
225              
226 139 50       228 if ( length( $shard->{buf} ) >= 4 ) {
227             # Complete, setup shard to receive response
228 139         276 $shard->{response_size} = unpack( 'N', $shard->{buf} );
229 139         129 $shard->{buf} = '';
230 139         193 $shard->{callback} = \&_cb_recv_response;
231             }
232             }
233              
234             # Receive a (partial) response from a shard
235             sub _cb_recv_response {
236 158     158   202 my ( $shard, $rin, $win, $ein ) = @_;
237              
238 158         103 my $data;
239 158         196 my $remaining = $shard->{response_size} - length( $shard->{buf} );
240 158         316 my $r = $shard->{sock}->recv( $data, $remaining );
241              
242 158 50 33     2306 if ( !defined($r) || length($data) == 0 ) {
243 0 0       0 $shard->{error} = !defined($r) ? $! : 'Remote shutdown';
244 0         0 $shard->{callback} = undef;
245 0         0 vec( $$rin, $shard->{sock}->fileno, 1 ) = 0;
246 0         0 return;
247             }
248              
249 158         362 $shard->{buf} .= $data;
250              
251 158 100       282 if ( length( $shard->{buf} ) >= $shard->{response_size} ) {
252             # Finished
253 139         321 $shard->{response} = thaw( $shard->{buf} );
254 139         3429 $shard->{callback} = undef;
255             }
256             }
257              
258             sub top_docs {
259 5     5 0 20 my ( $self, %args ) = @_;
260 5         10 my $starts = $starts{$$self};
261 5         8 my $num_shards = $num_shards{$$self};
262 5         8 my $query = $args{query};
263 5         5 my $num_wanted = $args{num_wanted};
264 5         7 my $sort_spec = $args{sort_spec};
265              
266             # Weight query if necessary.
267 5 50       108 my $compiler
268             = $query->isa("Lucy::Search::Compiler")
269             ? $query
270             : $query->make_compiler(
271             searcher => $self,
272             boost => $query->get_boost,
273             );
274              
275             # Create HitQueue.
276 5         7 my $hit_q;
277 5 100       16 if ($sort_spec) {
278 2         43 $hit_q = Lucy::Search::HitQueue->new(
279             schema => $self->get_schema,
280             sort_spec => $sort_spec,
281             wanted => $num_wanted,
282             );
283             }
284             else {
285 3         47 $hit_q = Lucy::Search::HitQueue->new( wanted => $num_wanted, );
286             }
287              
288             # Gather remote responses and aggregate.
289 5         10 $args{_action} = 'top_docs';
290 5         15 my $responses = $self->_multi_rpc( \%args );
291 5         8 my $total_hits = 0;
292 5         15 for ( my $i = 0; $i < $num_shards; $i++ ) {
293 20         38 my $base = $starts->get($i);
294 20         17 my $sub_top_docs = $responses->[$i];
295 3         13 my @sub_match_docs = sort { $a->get_doc_id <=> $b->get_doc_id }
296 20         18 @{ $sub_top_docs->get_match_docs };
  20         81  
297 20         24 for my $match_doc (@sub_match_docs) {
298 22         51 $match_doc->set_doc_id( $match_doc->get_doc_id + $base );
299 22         77 $hit_q->insert($match_doc);
300             }
301 20         50 $total_hits += $sub_top_docs->get_total_hits;
302             }
303              
304             # Return a TopDocs object with the best of the best.
305 5         31 my $best_match_docs = $hit_q->pop_all;
306 5         127 return Lucy::Search::TopDocs->new(
307             total_hits => $total_hits,
308             match_docs => $best_match_docs,
309             );
310             }
311              
312             sub terminate {
313 0     0 0 0 my $self = shift;
314 0         0 $self->_multi_rpc( { _action => 'terminate' } );
315 0         0 return;
316             }
317              
318             sub fetch_doc {
319 19     19 1 322 my ( $self, $doc_id ) = @_;
320 19         40 my $starts = $starts{$$self};
321 19         83 my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id );
322 19         66 my $start = $starts->get($tick);
323 19         64 my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc' );
324 19         46 my $hit_doc = $self->_single_rpc( \%args, $tick );
325 19         91 $hit_doc->set_doc_id($doc_id);
326 19         110 return $hit_doc;
327             }
328              
329             sub fetch_doc_vec {
330 7     7 0 122 my ( $self, $doc_id ) = @_;
331 7         14 my $starts = $starts{$$self};
332 7         25 my $tick = Lucy::Index::PolyReader::sub_tick( $starts, $doc_id );
333 7         20 my $start = $starts->get($tick);
334 7         29 my %args = ( doc_id => $doc_id - $start, _action => 'fetch_doc_vec' );
335 7         18 return $self->_single_rpc( \%args, $tick );
336             }
337              
338             sub doc_max {
339 24     24 1 1070 my $self = shift;
340 24         125 return $doc_max{$$self};
341             }
342              
343             sub doc_freq {
344 19     19 1 26 my $self = shift;
345 19         59 my %args = ( @_, _action => 'doc_freq' );
346 19         39 my $responses = $self->_multi_rpc( \%args );
347 19         19 my $doc_freq = 0;
348 19         43 $doc_freq += $_ for @$responses;
349 19         201 return $doc_freq;
350             }
351              
352             sub close {
353 2     2 0 654 my $self = shift;
354 2 50       7 return unless $shards{$$self};
355 2         7 $self->_multi_rpc( { _action => 'done' } );
356 2         4 for my $shard ( @{ $shards{$$self} } ) {
  2         5  
357 7 50       162 close $shard->{sock} or confess("Error when closing socket: $!");
358             }
359 2         23 delete $shards{$$self};
360             }
361              
362             1;
363              
364             __END__