File Coverage

blib/lib/LucyX/Remote/SearchClient.pm
Criterion Covered Total %
statement 61 72 84.7
branch 9 20 45.0
condition n/a
subroutine 14 16 87.5
pod 4 8 50.0
total 88 116 75.8


line stmt bran cond sub pod time code
1             # Licensed to the Apache Software Foundation (ASF) under one or more
2             # contributor license agreements. See the NOTICE file distributed with
3             # this work for additional information regarding copyright ownership.
4             # The ASF licenses this file to You under the Apache License, Version 2.0
5             # (the "License"); you may not use this file except in compliance with
6             # the License. You may obtain a copy of the License at
7             #
8             # http://www.apache.org/licenses/LICENSE-2.0
9             #
10             # Unless required by applicable law or agreed to in writing, software
11             # distributed under the License is distributed on an "AS IS" BASIS,
12             # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13             # See the License for the specific language governing permissions and
14             # limitations under the License.
15              
16 2     2   1215 use strict;
  2         4  
  2         46  
17 2     2   6 use warnings;
  2         2  
  2         92  
18              
19             package LucyX::Remote::SearchClient;
20 2     2   69 BEGIN { our @ISA = qw( Lucy::Search::Searcher ) }
21             our $VERSION = '0.006000_001';
22             $VERSION = eval $VERSION;
23 2     2   7 use Carp;
  2         2  
  2         100  
24 2     2   36 use Storable qw( nfreeze thaw );
  2         4  
  2         106  
25              
26             # Inside-out member vars.
27             our %peer_address;
28             our %sock;
29              
30 2     2   8 use IO::Socket::INET;
  2         2  
  2         10  
31              
32             sub new {
33 1     1 1 253307 my ( $either, %args ) = @_;
34 1         6 my $peer_address = delete $args{peer_address};
35 1         30 my $self = $either->SUPER::new(%args);
36 1         1257 $peer_address{$$self} = $peer_address;
37              
38             # Establish a connection.
39 1         12 my $sock = $sock{$$self} = IO::Socket::INET->new(
40             PeerAddr => $peer_address,
41             Proto => 'tcp',
42             );
43 1 50       595 confess("No socket: $!") unless $sock;
44 1         8 $sock->autoflush(1);
45 1         41 my %handshake_args = ( _action => 'handshake' );
46 1         10 my $response = $self->_rpc( \%handshake_args );
47 1 50       3 confess("Failed to connect") unless $response;
48              
49 1         4 return $self;
50             }
51              
52             sub DESTROY {
53 0     0   0 my $self = shift;
54 0 0       0 $self->close if defined $sock{$$self};
55 0         0 delete $peer_address{$$self};
56 0         0 delete $sock{$$self};
57 0         0 $self->SUPER::DESTROY;
58             }
59              
60             =for comment
61              
62             Make a remote procedure call. For every call that does not close/terminate
63             the socket connection, expect a response back that's been serialized using
64             Storable.
65              
66             =cut
67              
68             sub _rpc {
69 23     23   23 my ( $self, $args ) = @_;
70 23         37 my $sock = $sock{$$self};
71              
72 23         49 my $serialized = nfreeze($args);
73 23         792 my $packed_len = pack( 'N', length($serialized) );
74 23 50       4432 print $sock "$packed_len$serialized" or confess $!;
75              
76             # disabled
77             #my $check_val = $sock->syswrite("$packed_len$serialized");
78             #confess $! if $check_val != length($serialized) + 4;
79              
80 23         36 my $check_val;
81              
82             # Bail out if we're either closing or shutting down the server remotely.
83 23 50       45 return if $args->{_action} eq 'done';
84 23 100       42 return if $args->{_action} eq 'terminate';
85              
86             # Decode response.
87 22         71 $check_val = $sock->read( $packed_len, 4 );
88 22 50       2510 confess("Failed to read 4 bytes: $!")
89             unless $check_val == 4;
90 22         40 my $arg_len = unpack( 'N', $packed_len );
91 22         52 $check_val = $sock->read( $serialized, $arg_len );
92 22 50       93 confess("Failed to read $arg_len bytes")
93             unless $check_val == $arg_len;
94 22         41 my $response = thaw($serialized);
95 22 50       574 if ( exists $response->{retval} ) {
96 22         398 return $response->{retval};
97             }
98 0         0 return;
99             }
100              
101             sub top_docs {
102 5     5 0 7 my $self = shift;
103 5         18 my %args = ( @_, _action => 'top_docs' );
104 5         13 return $self->_rpc( \%args );
105             }
106              
107             sub terminate {
108 1     1 0 734 my $self = shift;
109 1         4 my %args = ( _action => 'terminate' );
110 1         3 return $self->_rpc( \%args );
111             }
112              
113             sub fetch_doc {
114 4     4 1 433 my ( $self, $doc_id ) = @_;
115 4         14 my %args = ( doc_id => $doc_id, _action => 'fetch_doc' );
116 4         8 return $self->_rpc( \%args );
117             }
118              
119             sub fetch_doc_vec {
120 1     1 0 1 my ( $self, $doc_id ) = @_;
121 1         3 my %args = ( doc_id => $doc_id, _action => 'fetch_doc_vec' );
122 1         3 return $self->_rpc( \%args );
123             }
124              
125             sub doc_max {
126 4     4 1 1735 my $self = shift;
127 4         17 my %args = ( _action => 'doc_max' );
128 4         9 return $self->_rpc( { _action => 'doc_max' } );
129             }
130              
131             sub doc_freq {
132 7     7 1 110 my $self = shift;
133 7         21 my %args = ( @_, _action => 'doc_freq' );
134 7         14 return $self->_rpc( \%args );
135             }
136              
137             sub close {
138 0     0 0   my $self = shift;
139 0           $self->_rpc( { _action => 'done' } );
140 0           my $sock = $sock{$$self};
141 0 0         close $sock or confess("Error when closing socket: $!");
142 0           delete $sock{$$self};
143             }
144              
145             1;
146              
147             __END__