File Coverage

lib/DR/Tnt/LowLevel/Connector/Sync.pm
Criterion Covered Total %
statement 18 58 31.0
branch 0 18 0.0
condition 0 5 0.0
subroutine 6 11 54.5
pod 0 1 0.0
total 24 93 25.8


line stmt bran cond sub pod time code
1 3     3   2773 use utf8;
  3         4  
  3         13  
2 3     3   69 use strict;
  3         4  
  3         40  
3 3     3   10 use warnings;
  3         4  
  3         92  
4              
5             package DR::Tnt::LowLevel::Connector::Sync;
6 3     3   14 use Mouse;
  3         4  
  3         11  
7 3     3   1927 use IO::Socket::INET;
  3         46825  
  3         13  
8 3     3   1065 use IO::Socket::UNIX;
  3         5  
  3         15  
9              
10             extends 'DR::Tnt::LowLevel::Connector';
11              
12             sub _connect {
13 0     0     my ($self, $cb) = @_;
14              
15 0           my $fh;
16 0 0 0       if ($self->host eq 'unix' or $self->host eq 'unix/') {
17 0           $fh = IO::Socket::UNIX->new(
18             Type => SOCK_STREAM,
19             Peer => $self->port,
20             );
21             } else {
22 0           $fh = IO::Socket::INET->new(
23             PeerHost => $self->host,
24             PeerPort => $self->port,
25             Proto => 'tcp',
26             );
27             }
28              
29 0 0         unless ($fh) {
30 0           $cb->(ER_SOCKET => $!);
31 0           return;
32             }
33              
34 0           $self->_set_fh($fh);
35 0           $cb->(OK => 'Socket connected');
36              
37 0           return;
38             }
39            
40             sub _handshake {
41 0     0     my ($self, $cb) = @_;
42             $self->sread(128, sub {
43 0     0     my ($state, $message, $hs) = @_;
44 0 0         unless ($state eq 'OK') {
45 0           pop;
46 0           goto \&$cb;
47             }
48 0           $cb->(OK => 'handshake was read', $hs);
49 0           });
50             }
51              
52             sub send_pkt {
53 0     0 0   my ($self, $pkt, $cb) = @_;
54              
55 0           while (1) {
56 0           my $done = syswrite $self->fh, $pkt;
57 0 0         unless (defined $done) {
58 0           $cb->(ER_SOCKET => $!);
59 0           return;
60             }
61 0 0         if ($done == length $pkt) {
62 0           $cb->(OK => 'swrite done');
63 0           return;
64             }
65 0 0         substr $pkt, 0, $done, '' if $done;
66             }
67             }
68              
69             sub _wait_something {
70 0     0     my ($self) = @_;
71 0 0         return unless $self->fh;
72              
73 0           do {
74 0           my $blob = '';
75 0           my $done = sysread $self->fh, $blob, 4096;
76              
77 0 0         unless ($done) {
78 0 0         unless (defined $done) {
79 0   0       $self->socket_error($! // 'Connection lost');
80 0           return;
81             }
82 0           $self->socket_error('Remote host closed connection');
83 0           return;
84             }
85 0           $self->rbuf($self->rbuf . $blob);
86              
87             } until $self->check_rbuf;
88             }
89              
90             after handshake => sub {
91             my ($self) = @_;
92             $self->_wait_something;
93             };
94              
95             after wait_response => sub {
96             my ($self) = @_;
97             $self->_wait_something;
98             };
99              
100             __PACKAGE__->meta->make_immutable;