File Coverage

blib/lib/RMI/Server/Tcp.pm
Criterion Covered Total %
statement 63 78 80.7
branch 9 20 45.0
condition n/a
subroutine 11 12 91.6
pod 2 2 100.0
total 85 112 75.8


line stmt bran cond sub pod time code
1             package RMI::Server::Tcp;
2 4     4   18790 use base 'RMI::Server';
  4         128  
  4         482  
3              
4 4     4   24 use strict;
  4         12  
  4         164  
5 4     4   24 use warnings;
  4         6  
  4         160  
6 4     4   244 use version;
  4         8  
  4         138  
7             our $VERSION = $RMI::VERSION;
8              
9 4     4   478 use RMI;
  4         24  
  4         98  
10 4     4   28 use IO::Socket;
  4         32  
  4         40  
11 4     4   8774 use IO::Select;
  4         8466  
  4         228  
12 4     4   32 use Fcntl;
  4         8  
  4         5238  
13              
14             RMI::Node::_mk_ro_accessors(__PACKAGE__, qw/host port listen_socket all_select sockets_select listen_queue_size/);
15              
16             our $DEFAULT_PORT = 4409;
17              
18             sub new {
19 2     2 1 4069 my $class = shift;
20              
21 2         191 my $self = bless { port => $DEFAULT_PORT, @_ }, $class;
22 2 50       235 return unless $self;
23              
24 2 50       100 unless ($self->listen_socket) {
25 2         35 my $listen = IO::Socket::INET->new(
26             LocalHost => $self->host,
27             LocalPort => $self->port,
28             ReuseAddr => 1,
29             Listen => $self->listen_queue_size,
30             );
31 2 50       2946 unless ($listen) {
32 0         0 die "Couldn't create socket: $!";
33             }
34 2         25 $self->{listen_socket} = $listen;
35 2         39 $self->{all_select} = IO::Select->new($listen);
36 2         246 $self->{sockets_select} = IO::Select->new();
37 2         35 $self->{data_ready} = [];
38             }
39              
40 2         10 return $self;
41             }
42              
43             # Override in the base class to delegate to whichever socket returns a value next.
44             # Note, that this only receives queries, since the delegate will receive all responses
45             # to our own counter queries.
46              
47             sub receive_request_and_send_response {
48 59     59 1 134 my ($self,$timeout) = @_;
49            
50             # the list of all sockets w/ data ready
51 59         104 my $data_ready = $self->{data_ready};
52            
53             # ck for new connections and also new sockets with data
54 59         223 my $select = $self->all_select;
55 59         172 until (@$data_ready) {
56 61         245 my @new_readable = $select->can_read($timeout);
57 61 50       2082992 unless (@new_readable) {
58 0         0 return;
59             }
60 61         92 my @new_data;
61 61         212 for (my $i = 0; $i < @new_readable; $i++) {
62 61 100       294 if ($new_readable[$i] eq $self->listen_socket) {
63 2         12 $self->_accept_connection();;
64             }
65             else {
66 59         255 push @new_data, $new_readable[$i]
67             }
68             }
69 61         232 push @$data_ready, @new_data;
70             }
71            
72             # process the first socket with data
73             # delegate to the right "server" object, which manages just this particular client
74 59         112 my $ready = shift @$data_ready;
75 59         193 my $delegate_server = $self->{_server_for_socket}{$ready};
76 59         499 my $retval = $delegate_server->receive_request_and_send_response;
77 58         219 return $retval;
78             }
79              
80             # Add the given socket to the list of connected clients.
81             # if socket is undef, it blocks waiting on an incoming connection
82             sub _accept_connection {
83 2     2   9 my $self = shift;
84 2         5 my $socket = shift;
85              
86 2 50       28 unless ($socket) {
87 2         17 my $listen = $self->listen_socket;
88 2         132 $socket = $listen->accept();
89 2 50       474 unless ($socket) {
90 0         0 die "accept() failed: $!";
91             }
92             }
93              
94 2         115 my $server = RMI::Server->new(
95             reader => $socket,
96             writer => $socket,
97             peer_pid => "$socket",
98             );
99 2 50       8 unless ($server) {
100 0         0 die "failed to create RMI::Server for accepted socket";
101             }
102              
103 2         30 $self->{_server_for_socket}{$socket} = $server;
104            
105 2         12 $self->sockets_select->add($socket);
106 2         148 $self->all_select->add($socket);
107 2         60 return $socket;
108             }
109              
110             sub _close_connection {
111             # This is no longer called, and somehow the select sockets get things removed?
112 0     0     my $self = shift;
113 0           my $socket = shift;
114              
115 0 0         unless ($self->sockets_select->exists($socket)) {
116 0           warn ("Passed-in socket $socket is not on the list of connected clients");
117             }
118 0 0         unless ($self->all_select->exists($socket)) {
119 0           warn ("Passed-in socket $socket is not on the list of all clients");
120             }
121 0           print "removed $socket\n";
122              
123 0           $self->sockets_select->remove($socket);
124 0           $self->all_select->remove($socket);
125 0           $socket->close();
126 0           return 1;
127             }
128              
129             1;
130              
131              
132             =pod
133              
134             =head1 NAME
135              
136             RMI::Server::Tcp - service RMI::Client::Tcp requests
137              
138             =head1 VERSION
139              
140             This document describes RMI::Server::Tcp v0.10.
141              
142             =head1 SYNOPSIS
143              
144             $s = RMI::Server::Tcp->new(
145             port => 1234 # defaults to 4409
146             );
147             $s->run;
148            
149             =head1 DESCRIPTION
150              
151             This subclass of RMI::Server makes a TCP/IP listening socket, and accepts
152             multiple non-blocking IO connections.
153              
154             =head1 METHODS
155              
156             This class overrides the constructor for a default RMI::Server to make a
157             listening socket. Individual accepted connections get their own private
158             subordinate RMI::Server of this class.
159              
160             =head1 BUGS AND CAVEATS
161              
162             See general bugs in B for general system limitations of proxied objects.
163              
164             =head1 SEE ALSO
165              
166             B, B, B, B, B, B
167              
168             =cut
169