File Coverage

blib/lib/IO/Stream/EV.pm
Criterion Covered Total %
statement 127 127 100.0
branch 38 40 95.0
condition 8 8 100.0
subroutine 25 25 100.0
pod 0 5 0.0
total 198 205 96.5


line stmt bran cond sub pod time code
1             package IO::Stream::EV;
2 27     27   384 use 5.010001;
  27         71  
3 27     27   103 use warnings;
  27         41  
  27         581  
4 27     27   111 use strict;
  27         41  
  27         410  
5 27     27   112 use utf8;
  27         40  
  27         127  
6 27     27   637 use Carp;
  27         39  
  27         1739  
7              
8             our $VERSION = 'v2.0.3';
9              
10 27     27   157 use IO::Stream::const;
  27         54  
  27         134  
11              
12 27     27   149 use Scalar::Util qw( weaken );
  27         51  
  27         1219  
13 27     27   128 use Socket qw( inet_aton sockaddr_in );
  27         78  
  27         4752  
14 27     27   3348 use EV;
  27         12904  
  27         655  
15 27     27   12480 use AnyEvent::DNS;
  27         586501  
  27         907  
16              
17             # States:
18 27     27   170 use constant RESOLVING => 1;
  27         48  
  27         1468  
19 27     27   130 use constant CONNECTING => 2;
  27         48  
  27         995  
20 27     27   123 use constant HANDLING => 3;
  27         41  
  27         24654  
21              
22              
23             sub new {
24 50     50 0 316 my $self = bless {
25             fh => undef,
26             _state => 0, # RESOLVING -> CONNECTING -> HANDLING
27             _r => undef, # read watcher
28             _w => undef, # write watcher
29             _t => undef, # timer watcher
30             _cb_r => undef, # read callback
31             _cb_w => undef, # write callback
32             _cb_t => undef, # timer callback
33             }, __PACKAGE__;
34              
35 50         78 my $this = $self;
36 50         169 weaken($this);
37 50     3   234 $self->{_cb_t} = sub { $this->T() };
  3         939197  
38 50     416   138 $self->{_cb_r} = sub { $this->R() };
  416         3263  
39 50     1292   157 $self->{_cb_w} = sub { $this->W() };
  1292         8003  
40              
41 50         115 return $self;
42             }
43              
44             sub PREPARE {
45 50     50 0 110 my ($self, $fh, $host, $port) = @_;
46 50         99 $self->{fh} = $fh;
47 50 100       148 if (!defined $host) {
48 31         53 $self->{_state} = HANDLING;
49 31         229 $self->{_r} = EV::io($fh, EV::READ, $self->{_cb_r});
50             }
51             else {
52 19         43 $self->{_state} = RESOLVING;
53             _resolve($host, $self, sub {
54 17     17   33 my ($self, $ip) = @_;
55 17         57 $self->{_state} = CONNECTING;
56             # TODO try other ip on failed connect?
57 17         111 connect $self->{fh}, sockaddr_in($port, inet_aton($ip));
58 17         1744 $self->{_r} = EV::io($fh, EV::READ, $self->{_cb_r});
59 17         67 $self->{_w} = EV::io($fh, EV::WRITE, $self->{_cb_w});
60 17         122 $self->{_t} = EV::timer(TOCONNECT, 0, $self->{_cb_t});
61 17         43 $self->{_master}{ip} = $ip;
62 17         66 $self->{_master}->EVENT(RESOLVED);
63 19         114 });
64             }
65 50         166 return;
66             }
67              
68             sub WRITE {
69 234     234   342 my ($self) = @_;
70 234 100       451 if ($self->{_state} == HANDLING) {
71 222         365 $self->{_cb_w}->();
72             }
73 234         343 return;
74             }
75              
76             sub _resolve {
77 19     19   56 my ($host, $plugin, $cb) = @_;
78 19 100       122 if ($host =~ /\A\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}\z/xms) {
79 15         45 $cb->($plugin, $host);
80             }
81             else {
82 4         12 weaken($plugin);
83             # AnyEvent::DNS has own timeouts, so we don't setup own here.
84             AnyEvent::DNS::a $host, sub {
85 3     3   10767 my (@a) = @_;
86 3 50       11 return if !$plugin;
87 3 100       8 if (@a) {
88 2         5 $cb->($plugin, @a);
89             }
90             else {
91 1         6 $plugin->{_master}->EVENT(0, EDNS);
92             }
93 3         107036 return;
94 4         24 };
95             }
96 19         18247 return;
97             }
98              
99             sub T {
100 3     3 0 12 my ($self) = @_;
101 3         12 my $m = $self->{_master};
102 3 100       39 $m->EVENT(0, $self->{_state} == CONNECTING ? ETOCONNECT : ETOWRITE);
103 3         509 return;
104             }
105              
106             sub R {
107 416     416 0 643 my ($self) = @_;
108 416         588 my $m = $self->{_master};
109 416         6177 my $n = sysread $self->{fh}, $m->{in_buf}, BUFSIZE, length $m->{in_buf};
110 416 100       2037 if (defined $n) {
    100          
111 413 100       613 if ($n) {
    100          
112 388         524 $m->{in_bytes} += $n;
113 388         1027 $m->EVENT(IN);
114             }
115             elsif (!$m->{is_eof}) { # EOF delivered only once
116 24         45 $m->{is_eof} = 1;
117 24         65 $m->EVENT(EOF);
118             }
119             }
120             elsif ($! != EAGAIN) { # may need to handle EINTR too
121 2         8 $m->EVENT(0, $!);
122             }
123 414         4017586 return;
124             }
125              
126             sub W {
127 1292     1292 0 1754 my ($self) = @_;
128 1292         1651 my $m = $self->{_master};
129 1292         1354 my $e = 0;
130              
131 1292 100       2179 if ($self->{_state} == CONNECTING) {
132 16         33 $self->{_state} = HANDLING;
133 16         77 undef $self->{_t};
134 16         42 undef $self->{_w};
135 16         41 $e |= CONNECTED;
136             }
137              
138 1292         1598 my $len = length $m->{out_buf};
139 1292 100       2160 my $has_out = defined $m->{out_pos} ? ($len > $m->{out_pos}) : ($len>0);
140 1292 100       1823 if ($has_out) {
141 1288   100     21199 my $n = syswrite $self->{fh}, $m->{out_buf}, BUFSIZE, $m->{out_pos}||0;
142 1288 100       3294 if (!defined $n) {
143 1 50       11 if ($! != EAGAIN) {
144 1         4 $m->EVENT($e, $!);
145 1         3 return; # WARNING leave {_w} unchanged
146             }
147             }
148             else {
149 1287         1773 $m->{out_bytes} += $n;
150 1287 100       1626 if (defined $m->{out_pos}) {
151 50         51 $m->{out_pos} += $n;
152 50         74 $has_out = $len > $m->{out_pos};
153             }
154             else {
155 1237         1956 substr $m->{out_buf}, 0, $n, q{};
156 1237         1580 $has_out = $len > $n;
157             }
158 1287 100       2274 if ($self->{_t}) {
159 1054         4779 $self->{_t} = EV::timer(TOWRITE, 0, $self->{_cb_t});
160             }
161 1287 100       2347 $e |= $has_out ? OUT : (OUT|SENT);
162             }
163             }
164              
165 1291 100 100     4584 if ($self->{_w} && !$has_out) {
    100 100        
166 6         18 undef $self->{_w};
167 6         19 undef $self->{_t};
168             }
169             elsif (!$self->{_w} && $has_out) {
170 8         52 $self->{_w} = EV::io($self->{fh}, EV::WRITE, $self->{_cb_w});
171 8         41 $self->{_t} = EV::timer(TOWRITE, 0, $self->{_cb_t});
172             }
173              
174 1291         4029 $m->EVENT($e);
175 1291         2447996 return;
176             }
177              
178              
179             1;