File Coverage

blib/lib/KinoSearch1/Search/SearchClient.pm
Criterion Covered Total %
statement 64 71 90.1
branch 10 20 50.0
condition 2 6 33.3
subroutine 16 18 88.8
pod 1 9 11.1
total 93 124 75.0


line stmt bran cond sub pod time code
1             package KinoSearch1::Search::SearchClient;
2 2     2   8338 use strict;
  2         8  
  2         82  
3 2     2   12 use warnings;
  2         5  
  2         57  
4 2     2   12 use KinoSearch1::Util::ToolSet;
  2         4  
  2         313  
5 2     2   12 use base qw( KinoSearch1::Searcher );
  2         4  
  2         677  
6              
7 2     2   14 use Storable qw( nfreeze thaw );
  2         6  
  2         144  
8              
9             BEGIN {
10 2     2   26 __PACKAGE__->init_instance_vars(
11             # params/members
12             analyzer => undef,
13             peer_address => undef,
14             password => undef,
15             # members
16             similarity => undef,
17             );
18             }
19              
20 2     2   1992 use IO::Socket::INET;
  2         18781  
  2         27  
21              
22             sub init_instance {
23 1     1 1 9 my $self = shift;
24              
25 1   33     88 $self->{similarity} ||= KinoSearch1::Search::Similarity->new;
26 1         7 $self->{field_sims} = {};
27              
28             # establish a connection
29 1         18 my $sock = IO::Socket::INET->new(
30             PeerAddr => $self->{peer_address},
31             Proto => 'tcp',
32             );
33 1 50       2440 confess("No socket: $!") unless $sock;
34 1         9 $sock->autoflush(1);
35 1         57 $self->{sock} = $sock;
36              
37             # verify password
38 1         63 print $sock "$self->{password}\n";
39 1         314 chomp( my $response = <$sock> );
40 1 50       34 confess("Failed to connect: '$response'") unless $response =~ /accept/i;
41             }
42              
43             =for comment
44              
45             Make a remote procedure call. For every call that does not close/terminate
46             the socket connection, expect a response back that's been serialized using
47             Storable.
48              
49             =cut
50              
51             sub _rpc {
52 16     16   64 my ( $self, $method, $args ) = @_;
53 16         31 my $sock = $self->{sock};
54              
55 16         68 my $serialized = nfreeze($args);
56 16         1304 my $packed_len = pack( 'N', bytes::length($serialized) );
57 16         24533 print $sock "$method\n$packed_len$serialized";
58              
59             # bail out if we're either closing or shutting down the server remotely
60 16 50       83 return if $method eq 'done';
61 16 100       61 return if $method eq 'terminate';
62              
63             # decode response
64 15         109 $sock->read( $packed_len, 4 );
65 15         15710 my $arg_len = unpack( 'N', $packed_len );
66 15         47 my $check_val = read( $sock, $serialized, $arg_len );
67 15 50 33     109 confess("Tried to read $arg_len bytes, got $check_val")
68             unless ( defined $arg_len and $check_val == $arg_len );
69 15         72 return thaw($serialized);
70             }
71              
72             sub get_field_names {
73 3     3 0 7 my $self = shift;
74 3         23 return $self->_rpc( 'get_field_names', {} );
75             }
76              
77             my %search_hit_collector_args = (
78             hit_collector => undef,
79             weight => undef,
80             filter => undef,
81             sort_spec => undef,
82             );
83              
84             sub search_hit_collector {
85 3     3 0 6 my $self = shift;
86 3 50       532 confess kerror() unless verify_args( \%search_hit_collector_args, @_ );
87 3         37 my %args = ( %search_hit_collector_args, @_ );
88 3 50       12 confess("remote filtered search not supported") if defined $args{filter};
89              
90             # replace the HitCollector with a size rather than serialize it
91 3         10 my $collector = delete $args{hit_collector};
92 3 100       14 if ( a_isa_b( $collector, "KinoSearch1::Search::OffsetCollector" ) ) {
93 1         12 $args{num_wanted} = $collector->get_storage->get_max_size;
94             }
95             else {
96 2         12 $args{num_wanted} = $collector->get_max_size;
97             }
98              
99             # Make the remote call, which returns a hashref of doc => score pairs.
100             # Accumulate hits into the HitCollector if the query is valid.
101 3         13 my $score_pairs = $self->_rpc( 'search_hit_collector', \%args );
102 3         98 while ( my ( $doc, $score ) = each %$score_pairs ) {
103 5         92 $collector->collect( $doc, $score );
104             }
105             }
106              
107             sub terminate {
108 1     1 0 9 my $self = shift;
109 1         4 return $self->_rpc( 'terminate', {} );
110             }
111              
112             sub fetch_doc {
113 1     1 0 3 my ( $self, $doc_num ) = @_;
114 1         13 return $self->_rpc( 'fetch_doc', { doc_num => $doc_num } );
115             }
116              
117             sub max_doc {
118 3     3 0 6 my $self = shift;
119 3         14 return $self->_rpc( 'max_doc', {} );
120             }
121              
122             sub doc_freq {
123 4     4 0 7 my ( $self, $term ) = @_;
124 4         18 return $self->_rpc( 'doc_freq', { term => $term } );
125             }
126              
127             sub doc_freqs {
128 1     1 0 3 my ( $self, $terms ) = @_;
129 1         5 return $self->_rpc( 'doc_freqs', { terms => $terms } );
130             }
131              
132             sub close {
133 0     0 0   my $self = shift;
134 0           $self->_rpc( 'done', {} );
135 0           my $sock = $self->{sock};
136 0 0         close $sock or confess("Error when closing socket: $!");
137 0           undef $self->{sock};
138             }
139              
140             sub DESTROY {
141 0     0     my $self = shift;
142 0 0         $self->close if defined $self->{sock};
143             }
144              
145             1;
146              
147             __END__