File Coverage

blib/lib/MR/IProto/Connection/Sync.pm
Criterion Covered Total %
statement 15 145 10.3
branch 0 72 0.0
condition 0 12 0.0
subroutine 5 13 38.4
pod 3 5 60.0
total 23 247 9.3


line stmt bran cond sub pod time code
1             package MR::IProto::Connection::Sync;
2              
3             =head1 NAME
4              
5             MR::IProto::Connection::Sync - sync communication
6              
7             =head1 DESCRIPTION
8              
9             Used to perform synchronous communication.
10              
11             =cut
12              
13 1     1   5 use Mouse;
  1         3  
  1         9  
14             extends 'MR::IProto::Connection';
15              
16 1     1   437 use Errno;
  1         3  
  1         48  
17 1     1   4208 use IO::Socket::INET;
  1         39190  
  1         9  
18 1     1   729 use Socket qw( TCP_NODELAY SO_KEEPALIVE SO_SNDTIMEO SO_RCVTIMEO );
  1         3  
  1         1868  
19              
20             has _socket => (
21             is => 'ro',
22             isa => 'IO::Socket::INET',
23             predicate => '_has_socket',
24             clearer => '_clear_socket',
25             lazy_build => 1,
26             );
27              
28             has _sent => (
29             is => 'ro',
30             default => sub { {} },
31             );
32              
33             has last_sync => (
34             is => 'rw',
35             isa => 'Int',
36             );
37              
38             has last_error => (
39             is => 'rw',
40             isa => 'Str',
41             default => '',
42             );
43              
44             =head1 PUBLIC METHODS
45              
46             =over
47              
48             =item fh
49              
50             Returns socket.
51              
52             =item send
53              
54             See L for more information.
55              
56             =cut
57              
58 0   0 0 1   sub fh { return $_[0]->_has_socket && $_[0]->_socket }
59              
60             sub Close {
61 0     0 0   my ($self, $reason) = @_;
62 0           $self->_handle_error(undef, undef, $reason);
63             }
64              
65             sub send {
66 0     0 1   my ($self, $msg, $payload, $callback, $no_reply, $sync) = @_;
67 0           my $server = $self->server;
68 0           my $sent = $self->_sent;
69 0           my $ok = eval {
70 0 0         if(defined $sync) {
71 0 0         die "Sync $sync already sent" if exists $sent->{$sync};
72             } else {
73 0           1 while exists $sent->{$sync = $self->_choose_sync()};
74             }
75 0           $server->_send_started($sync, $msg, $payload);
76              
77 0           my $socket = $self->_socket;
78 0 0         unless (%$sent) {
79 0           vec((my $rin = ''), fileno($socket), 1) = 1;
80 0 0         if (select((my $rout = $rin), undef, undef, 0) > 0) {
81 0 0         if (sysread($socket, my $buf, 1)) {
82 0           die "More then 0 bytes was received when nothing was waited for";
83             } else {
84             # Connection was closed by other side, socket is in CLOSE_WAIT state, reconnecting
85 0           $self->_clear_socket();
86 0           $socket = $self->_socket;
87             }
88             }
89             }
90              
91 0           my $header = $self->_pack_header($msg, length $payload, $sync);
92 0 0         if( $server->debug >= 5 ) {
93 0           $server->_debug_dump('send header: ', $header);
94 0           $server->_debug_dump('send payload: ', $payload);
95             }
96              
97 0           my $write = $header . $payload;
98 0           while( length $write ) {
99 0           my $written = syswrite($socket, $write);
100 0 0         if (!defined $written) {
101 0 0         if ($! != Errno::EINTR) {
102 0 0         $! = Errno::ETIMEDOUT if $! == Errno::EAGAIN; # Hack over SO_SNDTIMEO behaviour
103 0           die "send: $!";
104             }
105             } else {
106 0           substr $write, 0, $written, '';
107             }
108             }
109 0           1;
110             };
111 0           my $err = $@;
112 0           $self->last_sync($sync);
113 0 0         if($ok) {
114 0 0         if ($no_reply) {
115 0           $callback->(undef, undef);
116 0           $server->_recv_finished($sync, undef, undef);
117             } else {
118 0           $sent->{$sync} = $callback;
119             }
120             }
121             else {
122 0           $self->_handle_error($sync, $callback, $err);
123             }
124 0           return $ok;
125             }
126              
127             sub recv_all {
128 0     0 0   my ($self, %opts) = @_;
129 0           my $server = $self->server;
130 0           my $sent = $self->_sent;
131 0           my $dump_resp = $server->debug >= 6;
132 0           my @sync = keys %$sent;
133 0   0       my $n = $opts{max} || @sync;
134 0   0       while ($n-- and %$sent) {
135 0           my ($resp_msg, $resp_payload);
136 0           my ($sync,$callback);
137 0           my $ok = eval {
138 0           my $socket = $self->_socket;
139 0           my $resp_header;
140 0           my $to_read = 12;
141 0           while( $to_read ) {
142 0           my $read = sysread($socket, my $buf, $to_read);
143 0 0         if (!defined $read) {
    0          
144 0 0         if ($! != Errno::EINTR) {
145 0 0         $! = Errno::ETIMEDOUT if $! == Errno::EAGAIN; # Hack over SO_RCVTIMEO behaviour
146 0           die "recv: $!";
147             }
148             } elsif ($read == 0) {
149 0           die "recv: Unexpected end-of-file";
150             } else {
151 0           $resp_header .= $buf;
152 0           $to_read -= $read;
153             }
154             }
155 0 0         $server->_debug_dump('recv header: ', $resp_header) if $dump_resp;
156 0           ($resp_msg, my $resp_length, $sync) = $self->_unpack_header($resp_header);
157 0 0         $callback = delete $sent->{$sync} or die "Reply sync $sync not found";
158             #die "Request and reply sync is different: $resp_sync != $sync" unless $resp_sync == $sync;
159              
160 0           $to_read = $resp_length;
161 0           while( $to_read ) {
162 0           my $read = sysread($socket, my $buf, $to_read);
163 0 0         if (!defined $read) {
    0          
164 0 0         if ($! != Errno::EINTR) {
165 0 0         $! = Errno::ETIMEDOUT if $! == Errno::EAGAIN; # Hack over SO_RCVTIMEO behaviour
166 0           die "recv: $!";
167             }
168             } elsif ($read == 0) {
169 0           die "recv: Unexpected end-of-file";
170             } else {
171 0           $resp_payload .= $buf;
172 0           $to_read -= $read;
173             }
174             }
175 0 0         $server->_debug_dump('recv payload: ', $resp_payload) if $dump_resp;
176 0           1;
177             };
178 0 0         if($ok) {
179 0           $server->_recv_finished($sync, $resp_msg, $resp_payload);
180 0 0         die "No Callback" unless $callback;
181 0           $callback->($resp_msg, $resp_payload);
182             }
183             else {
184 0           $self->_handle_error(undef, undef, $@);
185             }
186             }
187 0           return;
188             }
189              
190             =item set_timeout( $timeout )
191              
192             Set timeout value for existing connection.
193              
194             =cut
195              
196             sub set_timeout {
197 0     0 1   my ($self, $timeout) = @_;
198 0 0         $self->_set_timeout($self->_socket, $timeout) if $self->_has_socket();
199 0           return;
200             }
201              
202             =back
203              
204             =cut
205              
206             sub _build__socket {
207 0     0     my ($self) = @_;
208 0           my $server = $self->server;
209 0 0         $server->_debug("connecting") if $server->debug >= 4;
210             my $socket = IO::Socket::INET->new(
211             PeerHost => $self->host,
212             PeerPort => $self->port,
213             Proto => 'tcp',
214             Timeout => $self->connect_timeout,
215 0 0         ) or do {
216 0           $@ =~ s/^IO::Socket::INET: (?:connect: )?//;
217 0 0         if ($@ eq 'timeout') {
218             # Hack over IO::Socket behaviour
219 0           $! = Errno::ETIMEDOUT;
220 0           $@ = "$!";
221             }
222 0           die "connect: $@";
223             };
224 0 0         $socket->sockopt(SO_KEEPALIVE, 1) if $self->tcp_keepalive;
225 0 0         $socket->setsockopt((getprotobyname('tcp'))[2], TCP_NODELAY, 1) if $self->tcp_nodelay;
226 0 0         $self->_set_timeout($socket, $self->timeout) if $self->timeout;
227 0 0         $server->_debug("connected") if $server->debug >= 4;
228 0           return $socket;
229             }
230              
231             sub _set_timeout {
232 0     0     my ($self, $socket, $timeout) = @_;
233 0           my $sec = int $timeout; # seconds
234 0           my $usec = int( ($timeout - $sec) * 1_000_000 ); # micro-seconds
235 0           my $timeval = pack "L!L!", $sec, $usec; # struct timeval;
236 0           $socket->sockopt(SO_SNDTIMEO, $timeval);
237 0           $socket->sockopt(SO_RCVTIMEO, $timeval);
238 0           return;
239             }
240              
241             sub _handle_error {
242 0     0     my ($self, $sync, $callback, $error) = @_;
243 0           my $errno = $!;
244 0 0         if (!$error) {
    0          
245 0           $error = 'Unknown error';
246             } elsif ($error =~ /^(.+?) at \S+ line \d+/s) {
247 0           $error = $1;
248             }
249 0           $self->last_error($error);
250 0           my $server = $self->server;
251 0           $server->_debug("error: $error");
252 0 0         if($self->_has_socket()) {
253 0           close($self->_socket);
254 0           $self->_clear_socket();
255             }
256 0           $server->active(0);
257 0           my $sent = $self->_sent;
258 0 0 0       if($sync && $callback) {
259 0           $server->_recv_finished($sync, undef, undef, $error, $errno);
260 0           $callback->(undef, undef, $error, $errno);
261 0           delete $sent->{$sync};
262             }
263 0           foreach my $sync (keys %$sent) {
264 0           $server->_recv_finished($sync, undef, undef, $error, $errno);
265 0           $sent->{$sync}->(undef, undef, $error, $errno);
266             }
267 0           undef %$sent;
268 0           return;
269             }
270              
271             =head1 SEE ALSO
272              
273             L, L.
274              
275             =cut
276              
277 1     1   8 no Mouse;
  1         2  
  1         10  
278             __PACKAGE__->meta->make_immutable();
279              
280             1;