File Coverage

blib/lib/MR/IProto/Cluster/Server.pm
Criterion Covered Total %
statement 21 49 42.8
branch n/a
condition n/a
subroutine 7 17 41.1
pod 3 3 100.0
total 31 69 44.9


line stmt bran cond sub pod time code
1             package MR::IProto::Cluster::Server;
2              
3             =head1 NAME
4              
5             MR::IProto::Cluster::Server - server
6              
7             =head1 DESCRIPTION
8              
9             This class is used to implement all communication with one server.
10              
11             =cut
12              
13 1     1   7 use Mouse;
  1         3  
  1         7  
14 1     1   297 use Mouse::Util::TypeConstraints;
  1         2  
  1         6  
15 1     1   67 use Scalar::Util;
  1         3  
  1         52  
16 1     1   627 use MR::IProto::Connection::Async;
  1         4  
  1         47  
17 1     1   802 use MR::IProto::Connection::Sync;
  1         4  
  1         40  
18 1     1   793 use MR::IProto::Message;
  1         2  
  1         886  
19              
20             with 'MR::IProto::Role::Debuggable';
21              
22             coerce 'MR::IProto::Cluster::Server'
23             => from 'Str'
24             => via {
25             my ($host, $port, $weight) = split /:/, $_;
26             __PACKAGE__->new(
27             host => $host,
28             port => $port,
29             defined $weight ? ( weight => $weight ) : (),
30             );
31             };
32              
33             has prefix => (
34             is => 'ro',
35             isa => 'Str',
36             default => 'MR::IProto',
37             );
38              
39             =head1 ATTRIBUTES
40              
41             =over
42              
43             =item host
44              
45             Host name or IP address.
46              
47             =cut
48              
49             has host => (
50             is => 'ro',
51             isa => 'Str',
52             required => 1,
53             );
54              
55             =item port
56              
57             Port number.
58              
59             =cut
60              
61             has port => (
62             is => 'ro',
63             isa => 'Int',
64             required => 1,
65             );
66              
67             =item weight
68              
69             Server weight.
70              
71             =cut
72              
73             has weight => (
74             is => 'ro',
75             isa => 'Int',
76             default => 1,
77             );
78              
79             =item connect_timeout
80              
81             Timeout of connect operation.
82              
83             =cut
84              
85             has connect_timeout => (
86             is => 'rw',
87             isa => 'Num',
88             default => 2,
89             );
90              
91             =item timeout
92              
93             Timeout of read and write operations.
94              
95             =cut
96              
97             has timeout => (
98             is => 'rw',
99             isa => 'Num',
100             default => 2,
101             trigger => sub {
102             my ($self, $new) = @_;
103             $self->async->set_timeout($new) if $self->has_async();
104             $self->sync->set_timeout($new) if $self->has_sync();
105             return;
106             },
107             );
108              
109             =item tcp_nodelay
110              
111             Enable TCP_NODELAY.
112              
113             =cut
114              
115             has tcp_nodelay => (
116             is => 'ro',
117             isa => 'Int',
118             default => 1,
119             );
120              
121             =item tcp_keepalive
122              
123             Enable SO_KEEPALIVE.
124              
125             =cut
126              
127             has tcp_keepalive => (
128             is => 'ro',
129             isa => 'Int',
130             default => 0,
131             );
132              
133             =item max_parallel
134              
135             Max amount of simultaneous request.
136              
137             =cut
138              
139             has max_parallel => (
140             is => 'ro',
141             isa => 'Int',
142             default => 10,
143             );
144              
145             =item active
146              
147             Is server used in balancing.
148              
149             =cut
150              
151             has active => (
152             is => 'rw',
153             isa => 'Bool',
154             default => 1,
155             );
156              
157             has on_close => (
158             is => 'rw',
159             isa => 'CodeRef',
160             );
161              
162             has async => (
163             is => 'ro',
164             isa => 'MR::IProto::Connection::Async',
165             lazy_build => 1,
166             );
167              
168             has sync => (
169             is => 'ro',
170             isa => 'MR::IProto::Connection::Sync',
171             lazy_build => 1,
172             );
173              
174             my %servers;
175              
176             =back
177              
178             =head1 PUBLIC METHODS
179              
180             =over
181              
182             =item disconnect_all
183              
184             Class method used to disconnect all iproto-connections. Very useful in case of fork().
185              
186             =cut
187              
188             sub disconnect_all {
189 0     0 1   my ($class) = @_;
190 0           foreach my $server (values %servers) {
191 0           $server->clear_async();
192 0           $server->clear_sync();
193             }
194 0           return;
195             }
196              
197             =head1 PROTECTED METHODS
198              
199             =over
200              
201             =cut
202              
203             sub BUILD {
204 0     0 1   my ($self) = @_;
205 0           $servers{Scalar::Util::refaddr($self)} = $self;
206 0           Scalar::Util::weaken($servers{Scalar::Util::refaddr($self)});
207 0           return;
208             }
209              
210             sub DEMOLISH {
211 0     0 1   my ($self) = @_;
212 0           delete $servers{Scalar::Util::refaddr($self)};
213 0           return;
214             }
215              
216             sub _build_async {
217 0     0     my ($self) = @_;
218 0           return MR::IProto::Connection::Async->new( server => $self );
219             }
220              
221             sub _build_sync {
222 0     0     my ($self) = @_;
223 0           return MR::IProto::Connection::Sync->new( server => $self );
224             }
225              
226             sub _build_debug_cb {
227 0     0     my ($self) = @_;
228 0           my $prefix = $self->prefix;
229             return sub {
230 0     0     my ($msg) = @_;
231 0           chomp $msg;
232 0           warn "$prefix: $msg\n";
233 0           return;
234 0           };
235             }
236              
237             =item _send_started( $sync, $message, $data )
238              
239             This method is called when message is started to send.
240              
241             =cut
242              
243             sub _send_started {
244 0     0     return;
245             }
246              
247             =item _recv_finished( $sync, $message, $data, $error )
248              
249             This method is called when message is received.
250              
251             =cut
252              
253             sub _recv_finished {
254 0     0     return;
255             }
256              
257             sub _debug {
258 0     0     my ($self, $msg) = @_;
259 0           $self->debug_cb->( sprintf "%s:%d: %s", $self->host, $self->port, $msg );
260 0           return;
261             }
262              
263             =back
264              
265             =head1 SEE ALSO
266              
267             L, L.
268              
269             =cut
270              
271 1     1   8 no Mouse;
  1         2  
  1         6  
272             __PACKAGE__->meta->make_immutable();
273              
274             1;