File Coverage

blib/lib/KSx/Remote/SearchClient.pm
Criterion Covered Total %
statement 61 74 82.4
branch 7 16 43.7
condition 1 3 33.3
subroutine 16 19 84.2
pod 4 9 44.4
total 89 121 73.5


line stmt bran cond sub pod time code
1 2     2   54975 use strict;
  2         8  
  2         962  
2 2     2   18 use warnings;
  2         5  
  2         157  
3              
4             package KSx::Remote::SearchClient;
5 2     2   80 BEGIN { our @ISA = qw( KinoSearch::Search::Searcher ) }
6 2     2   247 use Carp;
  2         5  
  2         232  
7 2     2   13 use Storable qw( nfreeze thaw );
  2         11  
  2         119  
8 2     2   11 use bytes;
  2         4  
  2         16  
9 2     2   64 no bytes;
  2         4  
  2         12  
10              
11             # Inside-out member vars.
12             our %peer_address;
13             our %password;
14             our %sock;
15              
16 2     2   57855 use IO::Socket::INET;
  2         496421  
  2         21  
17              
18             sub new {
19 1     1 1 154 my ( $either, %args ) = @_;
20 1         5 my $peer_address = delete $args{peer_address};
21 1         8 my $password = delete $args{password};
22 1         47 my $self = $either->SUPER::new(%args);
23 1         19 $peer_address{$$self} = $peer_address;
24 1         7 $password{$$self} = $password;
25              
26             # Establish a connection.
27 1         23 my $sock = $sock{$$self} = IO::Socket::INET->new(
28             PeerAddr => $peer_address,
29             Proto => 'tcp',
30             );
31 1 50       2744 confess("No socket: $!") unless $sock;
32 1         15 $sock->autoflush(1);
33              
34             # Verify password.
35 1         90 print $sock "$password\n";
36 1         249 chomp( my $response = <$sock> );
37 1 50       11 confess("Failed to connect: '$response'") unless $response =~ /accept/i;
38              
39 1         10 return $self;
40             }
41              
42             sub DESTROY {
43 0     0   0 my $self = shift;
44 0         0 delete $peer_address{$$self};
45 0         0 delete $password{$$self};
46 0         0 delete $sock{$$self};
47 0         0 $self->SUPER::DESTROY;
48             }
49              
50             =for comment
51              
52             Make a remote procedure call. For every call that does not close/terminate
53             the socket connection, expect a response back that's been serialized using
54             Storable.
55              
56             =cut
57              
58             sub _rpc {
59 22     22   61 my ( $self, $method, $args ) = @_;
60 22         45 my $sock = $sock{$$self};
61              
62 22         446 my $serialized = nfreeze($args);
63 22         7544 my $packed_len = pack( 'N', bytes::length($serialized) );
64 22         14031 print $sock "$method\n$packed_len$serialized";
65              
66             # Bail out if we're either closing or shutting down the server remotely.
67 22 50       87 return if $method eq 'done';
68 22 100       73 return if $method eq 'terminate';
69              
70             # Decode response.
71 21         121 $sock->read( $packed_len, 4 );
72 21         4591 my $arg_len = unpack( 'N', $packed_len );
73 21         56 my $check_val = read( $sock, $serialized, $arg_len );
74 21 50 33     334 confess("Tried to read $arg_len bytes, got $check_val")
75             unless ( defined $arg_len and $check_val == $arg_len );
76 21         80 my $response = thaw($serialized);
77 21 50       812 if ( exists $response->{retval} ) {
78 21         786 return $response->{retval};
79             }
80 0         0 return;
81             }
82              
83             sub top_docs {
84 5     5 0 13 my $self = shift;
85 5         44 return $self->_rpc( 'top_docs', {@_} );
86             }
87              
88             sub terminate {
89 1     1 0 1846 my $self = shift;
90 1         7 return $self->_rpc( 'terminate', {} );
91             }
92              
93             sub fetch_doc {
94 4     4 1 5011 my ( $self, $doc_id ) = @_;
95 4         33 return $self->_rpc( 'fetch_doc', { doc_id => $doc_id } );
96             }
97              
98             sub fetch_doc_vec {
99 1     1 0 5 my ( $self, $doc_id ) = @_;
100 1         5 return $self->_rpc( 'fetch_doc_vec', { doc_id => $doc_id } );
101             }
102              
103             sub doc_max {
104 4     4 1 3296 my $self = shift;
105 4         21 return $self->_rpc( 'doc_max', {} );
106             }
107              
108             sub doc_freq {
109 7     7 1 289 my $self = shift;
110 7         66 return $self->_rpc( 'doc_freq', {@_} );
111             }
112              
113             sub close {
114 0     0 0   my $self = shift;
115 0           $self->_rpc( 'done', {} );
116 0           my $sock = $sock{$$self};
117 0 0         close $sock or confess("Error when closing socket: $!");
118 0           delete $sock{$$self};
119             }
120              
121             sub NUKE {
122 0     0 0   my $self = shift;
123 0 0         $self->close if defined $sock{$$self};
124             }
125              
126             1;
127              
128             __END__