File Coverage

blib/lib/DR/Tarantool/AEConnection.pm
Criterion Covered Total %
statement 111 140 79.2
branch 23 48 47.9
condition 10 26 38.4
subroutine 28 36 77.7
pod 0 15 0.0
total 172 265 64.9


line stmt bran cond sub pod time code
1 6     6   26998 use utf8;
  6         8  
  6         26  
2 6     6   146 use strict;
  6         8  
  6         125  
3 6     6   19 use warnings;
  6         6  
  6         161  
4              
5             package DR::Tarantool::AEConnection;
6 6     6   6098 use AnyEvent;
  6         24983  
  6         164  
7 6     6   3234 use AnyEvent::Socket ();
  6         133013  
  6         246  
8 6     6   80 use Carp;
  6         5  
  6         365  
9 6     6   3215 use List::MoreUtils ();
  6         4883  
  6         112  
10 6     6   28 use Scalar::Util ();
  6         6  
  6         6343  
11              
12             sub _errno() {
13 12     12   84 while (my ($k, $v) = each(%!)) {
14 804 100       8943 return $k if $v;
15             }
16 6         51 return $!;
17             }
18              
19             sub new {
20 6     6 0 599975 my ($class, %opts) = @_;
21              
22 6         16 $opts{state} = 'init';
23 6   50     34 $opts{host} ||= '127.0.0.1';
24 6 50       48 croak 'port is undefined' unless $opts{port};
25              
26              
27 6   50 0   47 $opts{on}{connected} ||= sub { };
  0         0  
28 6   50 0   31 $opts{on}{connfail} ||= sub { };
  0         0  
29 6   50 0   72 $opts{on}{disconnect} ||= sub { };
  0         0  
30 6   50 2   35 $opts{on}{error} ||= sub { };
  2         4  
31 6   50 10   30 $opts{on}{reconnecting} ||= sub { };
  10         21  
32              
33 6         12 $opts{success_connects} = 0;
34 6         12 $opts{wbuf} = '';
35              
36 6         22 $opts{read} = { any => [] };
37              
38 6   33     37 bless \%opts => ref($class) || $class;
39             }
40              
41              
42             sub on {
43 8     8 0 16 my ($self, $name, $cb) = @_;
44 8 50       27 croak "wrong event name: $name" unless exists $self->{on}{$name};
45 8   50 0   33 $self->{on}{$name} = $cb || sub { };
  0         0  
46 8         23 $self;
47             }
48              
49 6     6 0 609 sub fh { $_[0]->{fh} }
50 85     85 0 7554 sub state { $_[0]->{state} }
51 16     16 0 72 sub host { $_[0]->{host} }
52 16     16 0 224 sub port { $_[0]->{port} }
53 0     0 0 0 sub error { $_[0]->{error} }
54 2     2 0 8 sub errno { $_[0]->{errno} }
55 13     13 0 40 sub reconnect_always { $_[0]->{reconnect_always} }
56 27     27 0 384 sub reconnect_period { $_[0]->{reconnect_period} }
57             sub timeout {
58 17     17 0 31 my ($self) = @_;
59 17 50       123 return $self->{timeout} if @_ == 1;
60 0         0 return $self->{timeout} = $_[1];
61             }
62              
63              
64             sub set_error {
65 2     2 0 849 my ($self, $error, $errno) = @_;
66 2   33     13 $errno ||= $error;
67 2         5 $self->{state} = 'error';
68 2         5 $self->{error} = $error;
69 2         37 $self->{errno} = $errno;
70 2         8 $self->{on}{error}($self);
71 2         5 $self->{guard} = {};
72 2         6 $self->{wbuf} = '';
73              
74 2         6 $self->_check_reconnect;
75            
76             }
77              
78             sub _check_reconnect {
79 15     15   58 Scalar::Util::weaken(my $self = shift);
80 15 50       36 return if $self->state eq 'connected';
81 15 50       35 return if $self->state eq 'connecting';
82 15 50       46 return if $self->{guard}{rc};
83              
84 15 100       36 return unless $self->reconnect_period;
85 13 100       44 unless ($self->reconnect_always) {
86 3 100       32 return unless $self->{success_connects};
87             }
88              
89             $self->{guard}{rc} = AE::timer $self->reconnect_period, 0, sub {
90 10 50   10   989241 return unless $self;
91 10         63 delete $self->{guard}{rc};
92 10         70 $self->{on}{reconnecting}($self);
93 10         59 $self->connect;
94 12         31 };
95             }
96              
97             sub connect {
98 16     16 0 86 Scalar::Util::weaken(my $self = shift);
99              
100 16 50 33     55 return if $self->state eq 'connected' or $self->state eq 'connecting';
101              
102 16         36 $self->{state} = 'connecting';
103 16         41 $self->{error} = undef;
104 16         31 $self->{errno} = undef;
105 16         45 $self->{guard} = {};
106              
107             $self->{guard}{c} = AnyEvent::Socket::tcp_connect
108             $self->host,
109             $self->port,
110             sub {
111 15     15   1445 $self->{guard} = {};
112 15         90 my ($fh) = @_;
113 15 100       123 if ($fh) {
114 3         8 $self->{fh} = $fh;
115 3         93 $self->{state} = 'connected';
116 3         8 $self->{success_connects}++;
117 3 50       14 $self->push_write('') if length $self->{wbuf};
118 3         15 $self->{on}{connected}($self);
119 3         43 return;
120             }
121            
122 12         56 $self->{error} = $!;
123 12         38 $self->{errno} = _errno;
124 12         29 $self->{state} = 'connfail';
125 12         20 $self->{guard} = {};
126 12         61 $self->{on}{connfail}($self);
127 12 50       105 return unless $self;
128 12         35 $self->_check_reconnect;
129             },
130 16     16   4567 sub {
131              
132             }
133 16         59 ;
134              
135 16 100       2418 if (defined $self->timeout) {
136             $self->{guard}{t} = AE::timer $self->timeout, 0, sub {
137 1     1   15 delete $self->{guard}{t};
138 1 50       2 return unless $self->state eq 'connecting';
139              
140 1         2 $self->{error} = 'Connection timeout';
141 1         2 $self->{errno} = 'ETIMEOUT';
142 1         2 $self->{state} = 'connfail';
143 1         2 $self->{guard} = {};
144 1         3 $self->{on}{connfail}($self);
145 1         14 $self->_check_reconnect;
146 1         2 };
147             }
148            
149 16         149 $self;
150             }
151              
152             sub disconnect {
153 0     0 0   Scalar::Util::weaken(my $self = shift);
154 0 0 0       return if $self->state eq 'disconnect' or $self->state eq 'init';
155              
156 0           $self->{guard} = {};
157 0           $self->{error} = 'Disconnected';
158 0           $self->{errno} = 'SUCCESS';
159 0           $self->{state} = 'disconnect';
160 0           $self->{wbuf} = '';
161 0           $self->{on}{disconnect}($self);
162             }
163              
164              
165             sub push_write {
166 0     0 0   Scalar::Util::weaken(my $self = shift);
167 0           my ($str) = @_;
168              
169 0           $self->{wbuf} .= $str;
170              
171 0 0         return unless $self->state eq 'connected';
172 0 0         return unless length $self->{wbuf};
173 0 0         return if $self->{guard}{write};
174              
175             $self->{guard}{write} = AE::io $self->fh, 1, sub {
176 0     0     my $l = syswrite $self->fh, $self->{wbuf};
177 0 0         unless(defined $l) {
178 0 0         return if $!{EINTR};
179 0           $self->set_error($!, _errno);
180 0           return;
181             }
182 0           substr $self->{wbuf}, 0, $l, '';
183 0 0         return if length $self->{wbuf};
184 0           delete $self->{guard}{write};
185 0           };
186             }
187              
188              
189              
190              
191             1;