File Coverage

blib/lib/IO/Stream/EV.pm
Criterion Covered Total %
statement 24 26 92.3
branch 1 2 50.0
condition n/a
subroutine 9 9 100.0
pod n/a
total 34 37 91.8


line stmt bran cond sub pod time code
1             package IO::Stream::EV;
2              
3 27     27   168 use warnings;
  27         39  
  27         1294  
4 27     27   173 use strict;
  27         60  
  27         1150  
5              
6 27     27   167 use version; our $VERSION = qv('1.0.9');
  27         36  
  27         199  
7              
8 27     27   2954 use IO::Stream::const;
  27         140  
  27         295  
9              
10             # update DEPENDENCIES in POD & Makefile.PL & README
11 27     27   274 use Scalar::Util qw( weaken );
  27         71  
  27         2549  
12 27     27   562 use Socket qw( inet_aton sockaddr_in );
  27         140  
  27         11225  
13 27     27   4586 use EV;
  27         19059  
  27         2273  
14 27 50   27   86 BEGIN { if (!WIN32) { eval 'use EV::ADNS; 1' or die $@ }} ## no critic
  27     27   2690  
  27         498552  
  0            
  0            
15              
16              
17              
18             # States:
19             use constant RESOLVING => 1;
20             use constant CONNECTING => 2;
21             use constant HANDLING => 3;
22              
23              
24             sub new {
25             my $self = bless {
26             fh => undef,
27             _state => 0, # RESOLVING -> CONNECTING -> HANDLING
28             _r => undef, # read watcher
29             _w => undef, # write watcher
30             _t => undef, # timer watcher
31             _cb_r => undef, # read callback
32             _cb_w => undef, # write callback
33             _cb_t => undef, # timer callback
34             }, __PACKAGE__;
35              
36             my $this = $self;
37             weaken($this);
38             $self->{_cb_t} = sub { $this->T() };
39             $self->{_cb_r} = sub { $this->R() };
40             $self->{_cb_w} = sub { $this->W() };
41              
42             return $self;
43             }
44              
45             sub PREPARE {
46             my ($self, $fh, $host, $port) = @_;
47             $self->{fh} = $fh;
48             if (!defined $host) {
49             $self->{_state} = HANDLING;
50             $self->{_r} = EV::io($fh, EV::READ, $self->{_cb_r});
51             }
52             else {
53             $self->{_state} = RESOLVING;
54             resolve($host, $self, sub {
55             my ($self, $ip) = @_;
56             $self->{_state} = CONNECTING;
57             # TODO try other ip on failed connect?
58             connect $self->{fh}, sockaddr_in($port, inet_aton($ip));
59             $self->{_r} = EV::io($fh, EV::READ, $self->{_cb_r});
60             $self->{_w} = EV::io($fh, EV::WRITE, $self->{_cb_w});
61             $self->{_t} = EV::timer(TOCONNECT, 0, $self->{_cb_t});
62             $self->{_master}{ip} = $ip;
63             $self->{_master}->EVENT(RESOLVED);
64             });
65             }
66             return;
67             }
68              
69             sub WRITE {
70             my ($self) = @_;
71             if ($self->{_state} == HANDLING) {
72             $self->{_cb_w}->();
73             }
74             return;
75             }
76              
77             sub resolve {
78             my ($host, $plugin, $cb) = @_;
79             if ($host =~ /\A\d{1,3}[.]\d{1,3}[.]\d{1,3}[.]\d{1,3}\z/xms) {
80             $cb->($plugin, $host);
81             }
82             elsif (WIN32) {
83             my $iaddr = inet_aton($host);
84             if ($iaddr) {
85             $cb->($plugin, join q{.}, unpack 'C4', $iaddr);
86             }
87             else {
88             $plugin->{_master}->EVENT(0, EDNSNXDOMAIN);
89             }
90             }
91             else {
92             weaken($plugin);
93             # WARNING ADNS has own timeouts, so we don't setup own here.
94             EV::ADNS::submit $host, EV::ADNS::r_addr(), 0, sub {
95             my ($status, undef, @a) = @_;
96             return if !$plugin;
97             if ($status == EV::ADNS::s_ok()) {
98             $cb->($plugin, @a);
99             }
100             else {
101             $plugin->{_master}->EVENT(0, adns2err($status));
102             }
103             return;
104             };
105             }
106             return;
107             }
108              
109             sub adns2err {
110             my ($status) = @_;
111             return
112             $status == EV::ADNS::s_timeout() ? ETORESOLVE
113             : $status == EV::ADNS::s_nxdomain() ? EDNSNXDOMAIN
114             : $status == EV::ADNS::s_nodata() ? EDNSNODATA
115             : EDNS
116             }
117              
118             sub T {
119             my ($self) = @_;
120             my $m = $self->{_master};
121             $m->EVENT(0, $self->{_state} == CONNECTING ? ETOCONNECT : ETOWRITE);
122             return;
123             }
124              
125             sub R {
126             my ($self) = @_;
127             my $m = $self->{_master};
128             my $n = sysread $self->{fh}, $m->{in_buf}, BUFSIZE, length $m->{in_buf};
129             if (defined $n) {
130             if ($n) {
131             $m->{in_bytes} += $n;
132             $m->EVENT(IN);
133             }
134             elsif (!$m->{is_eof}) { # EOF delivered only once
135             $m->{is_eof} = 1;
136             $m->EVENT(EOF);
137             }
138             }
139             elsif ($! != EAGAIN) { # may need to handle EINTR too
140             $m->EVENT(0, $!);
141             }
142             return;
143             }
144              
145             sub W {
146             my ($self) = @_;
147             my $m = $self->{_master};
148             my $e = 0;
149              
150             if ($self->{_state} == CONNECTING) {
151             $self->{_state} = HANDLING;
152             undef $self->{_t};
153             undef $self->{_w};
154             $e |= CONNECTED;
155             }
156              
157             my $len = length $m->{out_buf};
158             my $has_out = defined $m->{out_pos} ? ($len > $m->{out_pos}) : ($len>0);
159             if ($has_out) {
160             my $n = syswrite $self->{fh}, $m->{out_buf}, BUFSIZE, $m->{out_pos}||0;
161             if (!defined $n) {
162             if ($! != EAGAIN) {
163             $m->EVENT($e, $!);
164             return; # WARNING leave {_w} unchanged
165             }
166             }
167             else {
168             $m->{out_bytes} += $n;
169             if (defined $m->{out_pos}) {
170             $m->{out_pos} += $n;
171             $has_out = $len > $m->{out_pos};
172             }
173             else {
174             substr $m->{out_buf}, 0, $n, q{};
175             $has_out = $len > $n;
176             }
177             if ($self->{_t}) {
178             $self->{_t} = EV::timer(TOWRITE, 0, $self->{_cb_t});
179             }
180             $e |= $has_out ? OUT : (OUT|SENT);
181             }
182             }
183              
184             if ($self->{_w} && !$has_out) {
185             undef $self->{_w};
186             undef $self->{_t};
187             }
188             elsif (!$self->{_w} && $has_out) {
189             $self->{_w} = EV::io($self->{fh}, EV::WRITE, $self->{_cb_w});
190             $self->{_t} = EV::timer(TOWRITE, 0, $self->{_cb_t});
191             }
192              
193             $m->EVENT($e);
194             return;
195             }
196              
197              
198             1;