File Coverage

blib/lib/YAHC.pm
Criterion Covered Total %
statement 398 637 62.4
branch 204 480 42.5
condition 76 204 37.2
subroutine 54 84 64.2
pod 26 26 100.0
total 758 1431 52.9


line stmt bran cond sub pod time code
1             package YAHC;
2              
3 20     20   824559 use strict;
  20         199  
  20         602  
4 20     20   123 use warnings;
  20         40  
  20         841  
5              
6             our $VERSION = '0.035';
7              
8 20     20   7778 use EV;
  20         35614  
  20         594  
9 20     20   7536 use Time::HiRes;
  20         23943  
  20         87  
10 20     20   2342 use Exporter 'import';
  20         46  
  20         840  
11 20     20   129 use Scalar::Util qw/weaken/;
  20         42  
  20         1516  
12 20     20   131 use Fcntl qw/F_GETFL F_SETFL O_NONBLOCK/;
  20         39  
  20         1012  
13 20     20   6690 use POSIX qw/EINPROGRESS EINTR EAGAIN EWOULDBLOCK strftime/;
  20         118354  
  20         144  
14 20     20   36161 use Socket qw/PF_INET SOCK_STREAM $CRLF SOL_SOCKET SO_ERROR inet_aton inet_ntoa pack_sockaddr_in/;
  20         63545  
  20         5617  
15 20 50   20   184 use constant SSL => $ENV{YAHC_NO_SSL} ? 0 : eval 'use IO::Socket::SSL 1.94 (); 1';
  20     20   45  
  20         1340  
  20         11629  
  20         1219388  
  20         285  
16 20     20   175 use constant SSL_WANT_READ => SSL ? IO::Socket::SSL::SSL_WANT_READ() : 0;
  20         46  
  20         1114  
17 20     20   120 use constant SSL_WANT_WRITE => SSL ? IO::Socket::SSL::SSL_WANT_WRITE() : 0;
  20         45  
  20         7601  
18              
19             sub YAHC::Error::NO_ERROR () { 0 }
20             sub YAHC::Error::REQUEST_TIMEOUT () { 1 << 0 }
21             sub YAHC::Error::CONNECT_TIMEOUT () { 1 << 1 }
22             sub YAHC::Error::DRAIN_TIMEOUT () { 1 << 2 }
23             sub YAHC::Error::LIFETIME_TIMEOUT () { 1 << 3 }
24             sub YAHC::Error::TIMEOUT () { 1 << 8 }
25             sub YAHC::Error::RETRY_LIMIT () { 1 << 9 }
26              
27             sub YAHC::Error::CONNECT_ERROR () { 1 << 10 }
28             sub YAHC::Error::READ_ERROR () { 1 << 11 }
29             sub YAHC::Error::WRITE_ERROR () { 1 << 12 }
30             sub YAHC::Error::REQUEST_ERROR () { 1 << 13 }
31             sub YAHC::Error::RESPONSE_ERROR () { 1 << 14 }
32             sub YAHC::Error::CALLBACK_ERROR () { 1 << 15 }
33             sub YAHC::Error::SSL_ERROR () { 1 << 16 }
34             sub YAHC::Error::TERMINAL_ERROR () { 1 << 30 }
35             sub YAHC::Error::INTERNAL_ERROR () { 1 << 31 }
36              
37             sub YAHC::State::INITIALIZED () { 0 }
38             sub YAHC::State::RESOLVE_DNS () { 5 }
39             sub YAHC::State::CONNECTING () { 10 }
40             sub YAHC::State::CONNECTED () { 15 }
41             sub YAHC::State::SSL_HANDSHAKE () { 20 }
42             sub YAHC::State::WRITING () { 25 }
43             sub YAHC::State::READING () { 30 }
44             sub YAHC::State::USER_ACTION () { 35 }
45             sub YAHC::State::COMPLETED () { 100 } # terminal state
46              
47             sub YAHC::SocketCache::GET () { 1 }
48             sub YAHC::SocketCache::STORE () { 2 }
49              
50             use constant {
51             # TCP_READ_CHUNK should *NOT* be lower than 16KB because of SSL things.
52             # https://metacpan.org/pod/distribution/IO-Socket-SSL/lib/IO/Socket/SSL.pod
53             # Another way might be if you try to sysread at least 16kByte all the time.
54             # 16kByte is the maximum size of an SSL frame and because sysread returns
55             # data from only a single SSL frame you can guarantee that there are no
56             # pending data.
57 20         156602 TCP_READ_CHUNK => 131072,
58             CALLBACKS => [ qw/init_callback connecting_callback connected_callback
59             writing_callback reading_callback callback/ ],
60 20     20   147 };
  20         43  
61              
62             our @EXPORT_OK = qw/
63             yahc_retry_conn
64             yahc_reinit_conn
65             yahc_terminal_error
66             yahc_conn_last_error
67             yahc_conn_id
68             yahc_conn_url
69             yahc_conn_target
70             yahc_conn_state
71             yahc_conn_errors
72             yahc_conn_timeline
73             yahc_conn_request
74             yahc_conn_response
75             yahc_conn_attempt
76             yahc_conn_attempts_left
77             yahc_conn_socket_cache_id
78             yahc_conn_register_error
79             yahc_conn_user_data
80             /;
81              
82             our %EXPORT_TAGS = (all => \@EXPORT_OK);
83             my $LAST_CONNECTION_ID;
84              
85             ################################################################################
86             # User facing functons
87             ################################################################################
88              
89             sub new {
90 11     11 1 3375 my ($class, $args) = @_;
91 11 100       89 $LAST_CONNECTION_ID = $$ * 1000 unless defined $LAST_CONNECTION_ID;
92              
93 11 50 66     92 die 'YAHC: ->new() expect args to be a hashref' if defined $args and ref($args) ne 'HASH';
94 11 50       98 die 'YAHC: please do `my ($yahc, $yahc_storage) = YAHC::new()` and keep both these objects in the same scope' unless wantarray;
95              
96             # wrapping target selection here allows all client share same list
97             # and more importantly to share index within the list
98 11 100       57 $args->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
99 11 50       40 $args->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
100 11 50       63 $args->{_socket_cache} = _wrap_socket_cache(delete $args->{socket_cache}) if $args->{socket_cache};
101              
102 11         24 my %storage;
103             my $self = bless {
104             loop => delete($args->{loop}) || new EV::Loop,
105             pid => $$, # store pid to detect forks
106             storage => \%storage,
107             debug => delete $args->{debug} || $ENV{YAHC_DEBUG} || 0,
108 11   66     626 keep_timeline => delete $args->{keep_timeline} || $ENV{YAHC_TIMELINE} || 0,
      50        
      50        
109             pool_args => $args,
110             }, $class;
111              
112             # this's a radical way of avoiding circular references.
113             # let's see how it plays out in practise.
114 11         119 weaken($self->{storage});
115 11         118 weaken($self->{$_} = $storage{$_} = {}) for qw/watchers callbacks connections/;
116              
117 11 100       42 if (delete $args->{account_for_signals}) {
118 1 50       6 _log_message('YAHC: enable account_for_signals logic') if $self->{debug};
119 1     1   14 my $sigcheck = $self->{watchers}{_sigcheck} = $self->{loop}->check(sub {});
120 1         18 $sigcheck->keepalive(0);
121             }
122              
123 11         60 return $self, \%storage;
124             }
125              
126             sub request {
127 16     16 1 11576 my ($self, @args) = @_;
128 16 50       58 die 'YAHC: new_request() expects arguments' unless @args;
129 16 50       54 die 'YAHC: storage object is destroyed' unless $self->{storage};
130              
131 16 50       109 my ($conn_id, $request) = (@args == 1 ? ('connection_' . $LAST_CONNECTION_ID++, $args[0]) : @args);
132             die "YAHC: Connection with name '$conn_id' already exists\n"
133 16 50       58 if exists $self->{connections}{$conn_id};
134              
135 16         34 my $pool_args = $self->{pool_args};
136 16 50 0     50 do { $request->{$_} ||= $pool_args->{$_} if $pool_args->{$_} } foreach (qw/host port scheme head
  128         224  
137             request_timeout connect_timeout
138             drain_timeout lifetime_timeout/);
139 16 100       45 if ($request->{host}) {
    50          
140 14         65 $request->{_target} = _wrap_host($request->{host});
141             } elsif ($pool_args->{_target}) {
142 2         5 $request->{_target} = $pool_args->{_target};
143             } else {
144 0         0 die "YAHC: host must be defined in request() or in new()\n";
145             }
146              
147 16 50       72 if ($request->{backoff_delay}) {
    50          
148 0         0 $request->{_backoff} = _wrap_backoff($request->{backoff_delay});
149             } elsif ($pool_args->{_backoff}) {
150 0         0 $request->{_backoff} = $pool_args->{_backoff};
151             }
152              
153 16 50       79 if ($request->{socket_cache}) {
    50          
154 0         0 $request->{_socket_cache} = _wrap_socket_cache($request->{socket_cache});
155             } elsif ($pool_args->{_socket_cache}) {
156 0         0 $request->{_socket_cache} = $pool_args->{_socket_cache};
157             }
158              
159 16   50     91 my $scheme = $request->{scheme} ||= 'http';
160 16   33     118 my $debug = delete $request->{debug} || $self->{debug};
161 16   66     74 my $keep_timeline = delete $request->{keep_timeline} || $self->{keep_timeline};
162 16         30 my $user_data = delete $request->{user_data};
163              
164             my $conn = {
165             id => $conn_id,
166             request => $request,
167             response => { status => 0 },
168             attempt => 0,
169 16 50 100     314 retries => $request->{retries} || 0,
    100 66        
    100          
    50          
170             state => YAHC::State::INITIALIZED(),
171             selected_target => [],
172             ($debug ? (debug => $debug) : ()),
173             ($keep_timeline ? (keep_timeline => $keep_timeline) : ()),
174             ($debug || $keep_timeline ? (debug_or_timeline => 1) : ()),
175             (defined $user_data ? (user_data => $user_data) : ()),
176             pid => $$,
177             };
178              
179 16         37 my %callbacks;
180 16         26 foreach (@{ CALLBACKS() }) {
  16         46  
181 96 100       205 next unless exists $request->{$_};
182 3         9 my $cb = $callbacks{$_} = delete $request->{$_};
183 3         13 $conn->{"has_$_"} = !!$cb;
184             }
185              
186 16         44 $self->{watchers}{$conn_id} = {};
187 16         47 $self->{callbacks}{$conn_id} = \%callbacks;
188 16         31 $self->{connections}{$conn_id} = $conn;
189              
190 16 50       41 _set_lifetime_timer($self, $conn_id) if $request->{lifetime_timeout};
191              
192 16 100       66 return $conn if $request->{_test}; # for testing purposes
193 3         20 _set_init_state($self, $conn_id);
194              
195             # if user fire new request in a callback we need to update stop_condition
196 3         5 my $stop_condition = $self->{stop_condition};
197 3 0 33     9 if ($stop_condition && $stop_condition->{all}) {
198 0         0 $stop_condition->{connections}{$conn_id} = 1;
199             }
200              
201 3         28 return $conn;
202             }
203              
204             sub drop {
205 0     0 1 0 my ($self, $c, $force_socket_close) = @_;
206 0 0       0 my $conn_id = ref($c) eq 'HASH' ? $c->{id} : $c;
207 0 0       0 my $conn = $self->{connections}{$conn_id} or return;
208 0 0       0 _register_in_timeline($conn, "dropping connection from pool") if exists $conn->{debug_or_timeline};
209 0 0       0 _set_completed_state($self, $conn_id, $force_socket_close) unless $conn->{state} == YAHC::State::COMPLETED();
210 0         0 return $conn;
211             }
212              
213 11     11 1 145 sub run { shift->_run(0, @_) }
214 0     0 1 0 sub run_once { shift->_run(EV::RUN_ONCE) }
215 0     0 1 0 sub run_tick { shift->_run(EV::RUN_NOWAIT) }
216 3     3 1 25 sub is_running { !!shift->{loop}->depth }
217 10     10 1 167983 sub loop { shift->{loop} }
218              
219             sub break {
220 3     3 1 21 my ($self, $reason) = @_;
221 3 50       14 return unless $self->is_running;
222 3 50 0     13 _log_message('YAHC: pid %d breaking event loop because %s', $$, ($reason || 'no reason')) if $self->{debug};
223 3         19 $self->{loop}->break(EV::BREAK_ONE)
224             }
225              
226             ################################################################################
227             # Routines to manipulate connections (also user facing)
228             ################################################################################
229              
230             sub yahc_terminal_error {
231 6 100   6 1 1044 return (($_[0] & YAHC::Error::TERMINAL_ERROR()) == YAHC::Error::TERMINAL_ERROR()) ? 1 : 0;
232             }
233              
234             sub yahc_reinit_conn {
235 0     0 1 0 my ($conn, $args) = @_;
236             die "YAHC: cannot reinit completed connection\n"
237 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
238              
239 0         0 $conn->{attempt} = 0;
240 0         0 $conn->{state} = YAHC::State::INITIALIZED();
241 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
242              
243 0         0 my $request = $conn->{request};
244 0 0       0 $request->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
245 0 0       0 $request->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
246 0 0       0 do { $request->{$_} = $args->{$_} if $args->{$_} } foreach (keys %$args);
  0         0  
247             }
248              
249             sub yahc_retry_conn {
250 0     0 1 0 my ($conn, $args) = @_;
251             die "YAHC: cannot retry completed connection\n"
252 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
253 0 0       0 return unless yahc_conn_attempts_left($conn) > 0;
254              
255 0         0 $conn->{state} = YAHC::State::INITIALIZED();
256 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
257              
258             $conn->{request}{_backoff} = _wrap_backoff($args->{backoff_delay})
259 0 0       0 if $args->{backoff_delay};
260             }
261              
262             sub yahc_conn_last_error {
263 1     1 1 2955 my $conn = shift;
264 1 50 33     6 return unless $conn->{errors} && @{ $conn->{errors} };
  1         6  
265 1 50       4 return wantarray ? @{ $conn->{errors}[-1] } : $conn->{errors}[-1];
  1         4  
266             }
267              
268 0     0 1 0 sub yahc_conn_id { $_[0]->{id} }
269 0     0 1 0 sub yahc_conn_state { $_[0]->{state} }
270 0     0 1 0 sub yahc_conn_errors { $_[0]->{errors} }
271 0     0 1 0 sub yahc_conn_timeline { $_[0]->{timeline} }
272 0     0 1 0 sub yahc_conn_request { $_[0]->{request} }
273 0     0 1 0 sub yahc_conn_response { $_[0]->{response} }
274 4     4 1 18 sub yahc_conn_attempt { $_[0]->{attempt} }
275 4 100   4 1 25 sub yahc_conn_attempts_left { $_[0]->{attempt} > $_[0]->{retries} ? 0 : $_[0]->{retries} - $_[0]->{attempt} + 1 }
276              
277             sub yahc_conn_target {
278 4     4 1 12 my $target = $_[0]->{selected_target};
279 4 50 50     9 return unless $target && scalar @{ $target };
  4         10  
280 4         4 my ($host, $ip, $port) = @{ $target };
  4         8  
281 4 50 33     27 return ($host || $ip) . ($port ne '80' && $port ne '443' ? ":$port" : '');
      33        
282             }
283              
284             sub yahc_conn_url {
285 0     0 1 0 my $target = $_[0]->{selected_target};
286 0         0 my $request = $_[0]->{request};
287 0 0 0     0 return unless $target && @{ $target };
  0         0  
288              
289 0         0 my ($host, $ip, $port, $scheme) = @{ $target };
  0         0  
290             return "$scheme://"
291             . ($host || $ip)
292             . ($port ne '80' && $port ne '443' ? ":$port" : '')
293             . ($request->{path} || "/")
294 0 0 0     0 . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
    0 0        
      0        
295             }
296              
297             sub yahc_conn_user_data {
298 4     4 1 2090 my $conn = shift;
299 4 100       11 $conn->{user_data} = $_[0] if @_;
300 4         13 return $conn->{user_data};
301             }
302              
303             ################################################################################
304             # Internals
305             ################################################################################
306              
307             sub _run {
308 11     11   35 my ($self, $how, $until_state, @cs) = @_;
309 11 50       41 die "YAHC: storage object is destroyed\n" unless $self->{storage};
310 11 50       73 die "YAHC: reentering run\n" if $self->{loop}->depth;
311              
312 11 50       53 if ($self->{pid} != $$) {
313 0 0       0 _log_message('YAHC: reinitializing event loop after forking') if $self->{debug};
314 0         0 $self->{pid} = $$;
315 0         0 $self->{loop}->loop_fork;
316              
317 0         0 my $active_connections = grep { $$ != $_->{pid} } values %{ $self->{connections} };
  0         0  
  0         0  
318 0 0       0 warn "YAHC has $active_connections active connections after a fork, consider dropping them!"
319             if $active_connections;
320             }
321              
322 11 100       43 if (defined $until_state) {
323 2         5 my $until_state_str = _strstate($until_state);
324 2 50       18 die "YAHC: unknown until_state $until_state\n" if $until_state_str =~ m/unknown/;
325              
326 2         5 my $is_all = (@cs == 0);
327 0         0 my @connections = $is_all ? values %{ $self->{connections} }
328 2 50       10 : map { $self->{connections}{$_} || () }
329 2 50       10 map { ref($_) eq 'HASH' ? $_->{id} : $_ } @cs;
  2 50       9  
330              
331             $self->{stop_condition} = {
332             all => $is_all,
333             expected_state => $until_state,
334 2         5 connections => { map { $_->{id} => 1 } grep { $_->{state} < $until_state } @connections },
  2         22  
  2         7  
335             };
336             } else {
337 9         19 delete $self->{stop_condition};
338             }
339              
340 11         25 my $loop = $self->{loop};
341 11         50 $loop->now_update;
342              
343 11 50       33 if ($self->{debug}) {
344 0         0 my $iterations = $loop->iteration;
345 0 0       0 _log_message('YAHC: pid %d entering event loop%s', $$, ($until_state ? " with until state " . _strstate($until_state) : ''));
346 0   0     0 $loop->run($how || 0);
347 0         0 _log_message('YAHC: pid %d exited from event loop after %d iterations', $$, $loop->iteration - $iterations);
348             } else {
349 11   50     2000321 $loop->run($how || 0);
350             }
351             }
352              
353             sub _check_stop_condition {
354 2     2   6 my ($self, $conn) = @_;
355 2         5 my $stop_condition = $self->{stop_condition};
356 2 50 33     24 return if !$stop_condition || $conn->{state} < $stop_condition->{expected_state};
357              
358 2         7 delete $stop_condition->{connections}{$conn->{id}};
359 2         4 my $awaiting_connections = scalar keys %{ $stop_condition->{connections} };
  2         7  
360 2         4 my $expected_state = $stop_condition->{expected_state};
361              
362 2 50       6 if ($awaiting_connections == 0) {
363 2         6 $self->break(sprintf("until state '%s' is reached", _strstate($expected_state)));
364 2         7 return 1;
365             }
366              
367             _log_message("YAHC: still have %d connections awaiting state '%s'",
368 0 0       0 $awaiting_connections, _strstate($expected_state)) if $self->{debug};
369             }
370              
371             ################################################################################
372             # IO routines
373             ################################################################################
374              
375             sub _set_init_state {
376 16     16   56 my ($self, $conn_id) = @_;
377              
378 16 50       42 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
379              
380 16         48 $conn->{response} = { status => 0 };
381 16         24 $conn->{state} = YAHC::State::INITIALIZED();
382 16 100       51 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
383 16 50       29 _call_state_callback($self, $conn, 'init_callback') if exists $conn->{has_init_callback};
384              
385 16         48 _close_or_cache_socket($self, $conn, 1); # force connection close if any (likely not)
386 16         66 my $watchers = _delete_watchers_but_lifetime_timer($self, $conn_id); # implicit stop of all watchers
387              
388             return _set_user_action_state($self, $conn_id, YAHC::Error::RETRY_LIMIT(), "retries limit reached")
389 16 100       49 if $conn->{attempt} > $conn->{retries};
390              
391             # don't move attempt increment before boundary check !!!
392             # otherwise we can get off-by-one error in yahc_conn_attempts_left
393 14         20 my $attempt = ++$conn->{attempt};
394 14 50 66     47 if ($attempt > 1 && exists $conn->{request}{_backoff}) {
395 0         0 my $backoff_delay = eval { $conn->{request}{_backoff}->($conn) };
  0         0  
396 0 0       0 if (my $error = $@) {
397 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(),
398             "exception in backoff callback (close connection): $error");
399             };
400              
401 0 0       0 if ($backoff_delay) {
402 0         0 $self->{loop}->now_update;
403 0 0       0 _register_in_timeline($conn, "setting backoff_timer to %.3fs", $backoff_delay) if exists $conn->{debug_or_timeline};
404             $watchers->{backoff_timer} = $self->{loop}->timer($backoff_delay, 0, _get_safe_wrapper($self, $conn, sub {
405 0 0   0   0 _register_in_timeline($conn, "backoff timer of %.3fs expired, time for new attempt", $backoff_delay) if exists $conn->{debug_or_timeline};
406 0 0       0 _set_init_state($self, $conn_id) if _init_helper($self, $conn_id) == 1;
407 0         0 }));
408 0         0 return;
409             }
410             }
411              
412 14 50       39 if (_init_helper($self, $conn_id) == 1) {
413             _register_in_timeline($conn, "do attempt on next EV iteration, (iteration=%d)", $self->{loop}->iteration)
414 0 0       0 if exists $conn->{debug_or_timeline};
415              
416             # from EV docs:
417             # idle watcher call the callback when there are no other pending
418             # watchers of the same or higher priority. The idle watchers are
419             # being called once per event loop iteration - until stopped.
420             #
421             # so, what we do is we start idle watcher with priority 1 which is
422             # higher then 0 used by all IO watchers. As result, the callback
423             # will be called at the end of this iteration. And others if neccessary.
424              
425             my $retry_watcher = $watchers->{retry} ||= $self->{loop}->idle_ns(_get_safe_wrapper($self, $conn, sub {
426 0     0   0 shift->stop; # stop this watcher, _set_init_state will start if neccessary
427             _register_in_timeline($conn, "time for new attempt (iteration=%d)", $self->{loop}->iteration)
428 0 0       0 if exists $conn->{debug_or_timeline};
429 0         0 _set_init_state($self, $conn_id)
430 0   0     0 }));
431              
432 0         0 $retry_watcher->priority(1);
433 0         0 $retry_watcher->start;
434             };
435             }
436              
437             sub _init_helper {
438 14     14   28 my ($self, $conn_id) = @_;
439              
440 14 50       29 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
441 14 50       38 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
442              
443 14         17 my $request = $conn->{request};
444              
445 14         52 $self->{loop}->now_update; # update time for timers
446             _set_until_state_timer($self, $conn_id, 'request_timeout', YAHC::State::USER_ACTION(), YAHC::Error::TIMEOUT() | YAHC::Error::REQUEST_TIMEOUT())
447 14 50       26 if $request->{request_timeout};
448             _set_until_state_timer($self, $conn_id, 'connect_timeout', YAHC::State::CONNECTED(), YAHC::Error::TIMEOUT() | YAHC::Error::CONNECT_TIMEOUT())
449 14 50       24 if $request->{connect_timeout};
450             _set_until_state_timer($self, $conn_id, 'drain_timeout', YAHC::State::READING(), YAHC::Error::TIMEOUT() | YAHC::Error::DRAIN_TIMEOUT())
451 14 50       29 if $request->{drain_timeout};
452              
453             eval {
454 14         29 my ($host, $ip, $port, $scheme) = _get_next_target($conn);
455             _register_in_timeline($conn, "Target $scheme://$host:$port ($ip:$port) chosen for attempt #%d", $conn->{attempt})
456 14 100       57 if exists $conn->{debug_or_timeline};
457              
458 14         21 my $sock;
459 14 50       30 if (my $socket_cache = $request->{_socket_cache}) {
460 0         0 $sock = $socket_cache->(YAHC::SocketCache::GET(), $conn);
461             }
462              
463 14 50       33 if (defined $sock) {
464 0 0       0 _register_in_timeline($conn, "reuse socket") if $conn->{debug_or_timeline};
465 0         0 $watchers->{_fh} = $sock;
466 0     0   0 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, sub {});
467 0         0 _set_write_state($self, $conn_id);
468             } else {
469 14 100       33 _register_in_timeline($conn, "build new socket") if $conn->{debug_or_timeline};
470 14         34 $sock = _build_socket_and_connect($ip, $port);
471 14         54 _set_connecting_state($self, $conn_id, $sock);
472             }
473              
474 14         38 1;
475 14 50       22 } or do {
476 0   0     0 my $error = $@ || 'zombie error';
477 0         0 $error =~ s/\s+$//o;
478 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "connection attempt %d failed: %s", $conn->{attempt}, $error);
479 0         0 return 1;
480             };
481              
482 14         46 return 0;
483             }
484              
485             sub _set_connecting_state {
486 14     14   32 my ($self, $conn_id, $sock) = @_;
487              
488 14 50       42 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
489 14 50       34 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
490              
491 14         68 $conn->{state} = YAHC::State::CONNECTING();
492 14 100       44 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
493 14 50       35 _call_state_callback($self, $conn, 'connecting_callback') if exists $conn->{has_connecting_callback};
494              
495             my $connecting_cb = _get_safe_wrapper($self, $conn, sub {
496 13     13   61 my $sockopt = getsockopt($sock, SOL_SOCKET, SO_ERROR);
497 13 50       29 if (!$sockopt) {
498 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to do getsockopt(): '%s' errno=%d", "$!", $!+0);
499 0         0 _set_init_state($self, $conn_id);
500 0         0 return;
501             }
502              
503 13 100       40 if (my $err = unpack("L", $sockopt)) {
504 12   50     218 my $strerror = POSIX::strerror($err) || '';
505 12         2227 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to connect: $strerror");
506 12         30 _set_init_state($self, $conn_id);
507 12         19 return;
508             }
509              
510 1         8 _set_connected_state($self, $conn_id);
511 14         96 });
512              
513 14         45 $watchers->{_fh} = $sock;
514 14         99 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, $connecting_cb);
515 14 50       40 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
516             }
517              
518             sub _set_connected_state {
519 1     1   2 my ($self, $conn_id) = @_;
520              
521 1 50       3 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
522 1 50       4 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
523              
524 1         2 $conn->{state} = YAHC::State::CONNECTED();
525 1 50       3 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
526 1 50       2 _call_state_callback($self, $conn, 'connected_callback') if exists $conn->{has_connected_callback};
527              
528             my $connected_cb = _get_safe_wrapper($self, $conn, sub {
529 0 0   0   0 if ($conn->{is_ssl}) {
530 0         0 _set_ssl_handshake_state($self, $conn_id);
531             } else {
532 0         0 _set_write_state($self, $conn_id);
533             }
534 1         5235 });
535              
536             #$watcher->events(EV::WRITE);
537 1         21 $watchers->{io}->cb($connected_cb);
538 1 50       9 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
539             }
540              
541             sub _set_ssl_handshake_state {
542 0     0   0 my ($self, $conn_id) = @_;
543              
544 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
545 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
546              
547 0         0 $conn->{state} = YAHC::State::SSL_HANDSHAKE();
548 0 0       0 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
549             #_call_state_callback($self, $conn, 'writing_callback') if $conn->{has_writing_callback}; TODO
550              
551 0         0 my $fh = $watchers->{_fh};
552 0         0 my $hostname = $conn->{selected_target}[0];
553              
554             my %options = (
555             SSL_verifycn_name => $hostname,
556             IO::Socket::SSL->can_client_sni ? ( SSL_hostname => $hostname ) : (),
557 0 0       0 %{ $conn->{request}{ssl_options} || {} },
  0 0       0  
558             );
559              
560 0 0       0 if ($conn->{debug_or_timeline}) {
561 0   0     0 my $options_msg = join(', ', map { "$_=" . ($options{$_} || '') } keys %options);
  0         0  
562 0         0 _register_in_timeline($conn, "start SSL handshake with options: $options_msg");
563             }
564              
565 0 0       0 if (!IO::Socket::SSL->start_SSL($fh, %options, SSL_startHandshake => 0)) {
566 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::SSL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
567             sprintf("failed to start SSL session: %s", _format_ssl_error()));
568             }
569              
570             my $handshake_cb = _get_safe_wrapper($self, $conn, sub {
571 0     0   0 my $w = shift;
572 0 0       0 if ($fh->connect_SSL) {
573 0 0       0 _register_in_timeline($conn, "SSL handshake successfully completed") if exists $conn->{debug_or_timeline};
574 0         0 return _set_write_state($self, $conn_id);
575             }
576              
577 0 0       0 if ($! == EWOULDBLOCK) {
578 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
579 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
580             }
581              
582 0         0 yahc_conn_register_error($conn, YAHC::Error::SSL_ERROR(), "Failed to complete SSL handshake: %s", _format_ssl_error());
583 0         0 _set_init_state($self, $conn_id);
584 0         0 });
585              
586 0         0 my $watcher = $watchers->{io};
587 0         0 $watcher->cb($handshake_cb);
588 0         0 $watcher->events(EV::WRITE | EV::READ);
589 0 0       0 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
590             }
591              
592             sub _set_write_state {
593 1     1   13 my ($self, $conn_id) = @_;
594              
595 1 50       6 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
596 1 50       4 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
597              
598 1         2 $conn->{state} = YAHC::State::WRITING();
599 1 50       7 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
600 1 50       4 _call_state_callback($self, $conn, 'writing_callback') if exists $conn->{has_writing_callback};
601              
602 1         3 my $fh = $watchers->{_fh};
603 1         4 my $buf = _build_http_message($conn);
604 1         3 my $length = length($buf);
605              
606 1 50       4 warn "YAHC: HTTP message has UTF8 flag set! This will result in poor performance, see docs for details!"
607             if utf8::is_utf8($buf);
608              
609             _register_in_timeline($conn, "writing body of %d bytes\n%s", $length, ($length > 1024? substr($buf, 0, 1024) . '... (cut to 1024 bytes)' : $buf))
610 1 50       6 if exists $conn->{debug_or_timeline};
    50          
611              
612             my $write_cb = _get_safe_wrapper($self, $conn, sub {
613 1     1   2 my $w = shift;
614 1         24 my $wlen = syswrite($fh, $buf, $length);
615              
616 1 50       6 if (!defined $wlen) {
    50          
617 0 0       0 if ($conn->{is_ssl}) {
618 0 0       0 if ($! == EWOULDBLOCK) {
619 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
620 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
621             }
622              
623 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to send HTTPS data: %s", _format_ssl_error());
624 0         0 return _set_init_state($self, $conn_id);
625             }
626              
627 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
628 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "Failed to send HTTP data: '%s' errno=%d", "$!", $!+0);
629 0         0 _set_init_state($self, $conn_id);
630             } elsif ($wlen == 0) {
631 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "syswrite returned 0");
632 0         0 _set_init_state($self, $conn_id);
633             } else {
634 1         4 substr($buf, 0, $wlen, '');
635 1         20 $length -= $wlen;
636 1 50       7 _set_read_state($self, $conn_id) if $length == 0;
637             }
638 1         10 });
639              
640 1         2 my $watcher = $watchers->{io};
641 1         13 $watcher->cb($write_cb);
642 1         7 $watcher->events(EV::WRITE);
643 1 50       5 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
644             }
645              
646             sub _set_read_state {
647 8     8   66 my ($self, $conn_id) = @_;
648              
649 8 50       42 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
650 8 50       26 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
651              
652 8         16 $conn->{state} = YAHC::State::READING();
653 8 50       43 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
654 8 50       25 _call_state_callback($self, $conn, 'reading_callback') if exists $conn->{has_reading_callback};
655              
656 8         18 my $buf = '';
657 8         13 my $neck_pos = 0;
658 8         26 my $decapitated = 0;
659 8         13 my $content_length = 0;
660 8         22 my $no_content_length = 0;
661 8         13 my $is_chunked = 0;
662 8         13 my $fh = $watchers->{_fh};
663 8         15 my $chunk_size = 0;
664 8         13 my $body = ''; # used for chunked encoding
665              
666             my $read_cb = _get_safe_wrapper($self, $conn, sub {
667 9     9   15 my $w = shift;
668 9         161 my $rlen = sysread($fh, my $b = '', TCP_READ_CHUNK);
669 9 50       34 if (!defined $rlen) {
    100          
670 0 0       0 if ($conn->{is_ssl}) {
671 0 0       0 if ($! == EWOULDBLOCK) {
672 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
673 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
674             }
675              
676 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to receive HTTPS data: %s", _format_ssl_error());
677 0         0 return _set_init_state($self, $conn_id);
678             }
679              
680 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
681 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Failed to receive HTTP data: '%s' errno=%d", "$!", $!+0);
682 0         0 _set_init_state($self, $conn_id);
683             } elsif ($rlen == 0) {
684 2 50       7 if ($no_content_length) {
685 2         95 $conn->{response}{body} = $buf.$b;
686 2         7 _set_user_action_state($self, $conn_id);
687 2         3 return;
688             }
689              
690 0 0       0 if ($content_length > 0) {
691 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF, expect %d bytes more", $content_length - length($buf));
692             } else {
693 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF");
694             }
695 0         0 _set_init_state($self, $conn_id);
696             } else {
697 7         80 $buf .= $b;
698 7 50 33     112 if (!$decapitated && ($neck_pos = index($buf, "${CRLF}${CRLF}")) > 0) {
699 7         48 my $headers = _parse_http_headers($conn, substr($buf, 0, $neck_pos, '')); # $headers are always defined but might be empty, maybe fix later
700 7   100     44 $is_chunked = ($headers->{'transfer-encoding'} || '') eq 'chunked';
701              
702 7 50 66     23 if ($is_chunked && exists $headers->{'trailer'}) {
703 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "Chunked HTTP response with Trailer header");
704 0         0 return;
705             }
706              
707 7         13 $decapitated = 1;
708 7         18 substr($buf, 0, 4, ''); # 4 = length("$CRLF$CRLF")
709              
710             # Attempt to correctly determine content length, see RFC 2616 section 4.4
711 7 50 50     106 if (($conn->{request}->{method} || '') eq 'HEAD' || $conn->{response}->{status} =~ /^(1..|204|304)$/) { # 1.
    100 33        
    100          
712 0         0 $content_length = 0;
713             } elsif ($is_chunked) { # 2. (sort of, should actually also care for non-chunked transfer encodings)
714             # No content length, use chunked transfer encoding instead
715             } elsif (exists $headers->{'content-length'}) { # 3.
716 4         10 $content_length = $headers->{'content-length'};
717 4 100       26 if ($content_length !~ m#\A[0-9]+\z#) {
718 1         4 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "Not-numeric Content-Length received on the response");
719 1         2 return;
720             }
721             } else {
722             # byteranges (point .4 on the spec) not supported
723 2         3 $no_content_length = 1;
724             }
725             }
726              
727 6 100 66     89 if ($decapitated && $is_chunked) {
    100 66        
      66        
728             # in order to get the smallest chunk size we need
729             # at least 4 bytes (2xCLRF), and there *MUST* be
730             # last chunk which is at least 5 bytes (0\r\n\r\n)
731             # so we can safely ignore $bufs that have less than 5 bytes
732 1         4 while (length($buf) > ($chunk_size + 4)) {
733 4         13 my $neck_pos = index($buf, ${CRLF});
734 4 50       8 if ($neck_pos > 0) {
735             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
736             # All HTTP/1.1 applications MUST be able to receive and
737             # decode the "chunked" transfer-coding, and MUST ignore
738             # chunk-extension extensions they do not understand.
739 4         8 my ($s) = split(';', substr($buf, 0, $neck_pos), 1);
740 4         6 $chunk_size = hex($s);
741              
742 4 50       15 _register_in_timeline($conn, "parsing chunk of size $chunk_size bytes") if exists $conn->{debug_or_timeline};
743 4 100       8 if ($chunk_size == 0) { # end with, but as soon as we see 0\r\n\r\n we just mark it as done
744 1         2 $conn->{response}{body} = $body;
745 1         3 _set_user_action_state($self, $conn_id);
746 1         2 return;
747             } else {
748 3 50       7 if (length($buf) >= $chunk_size + $neck_pos + 2 + 2) {
749 3         4 $body .= substr($buf, $neck_pos + 2, $chunk_size);
750 3         6 substr($buf, 0, $neck_pos + 2 + $chunk_size + 2, '');
751 3         5 $chunk_size = 0;
752             } else {
753 0         0 last; # dont have enough data in this pass, wait for one more read
754             }
755             }
756             } else {
757 0 0 0     0 last if $neck_pos < 0 && $chunk_size == 0; # in case we couldnt get the chunk size in one go, we must concat until we have something
758 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "error processing chunked data, couldnt find CLRF[index:$neck_pos] in buf");
759 0         0 return;
760             }
761             }
762             } elsif ($decapitated && !$no_content_length && length($buf) >= $content_length) {
763 3 100       17 $conn->{response}{body} = (length($buf) > $content_length ? substr($buf, 0, $content_length) : $buf);
764 3         11 _set_user_action_state($self, $conn_id);
765             }
766             }
767 8         63 });
768              
769 8         21 my $watcher = $watchers->{io};
770 8         64 $watcher->cb($read_cb);
771 8         46 $watcher->events(EV::READ);
772 8 100       40 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
773             }
774              
775             sub _set_user_action_state {
776 9     9   30 my ($self, $conn_id, $error, $strerror) = @_;
777 9   100     39 $error ||= YAHC::Error::NO_ERROR();
778 9   100     33 $strerror ||= '';
779              
780             # this state may be used in critical places,
781             # so it should *NEVER* throw exception
782 9 50       31 my $conn = $self->{connections}{$conn_id}
783             or warn "YAHC: try to _set_user_action_state() for unknown connection $conn_id",
784             return;
785              
786 9         18 $conn->{state} = YAHC::State::USER_ACTION();
787 9 100       35 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
788 9 100       39 yahc_conn_register_error($conn, $error, $strerror) if $error != YAHC::Error::NO_ERROR;
789              
790 9         33 _close_or_cache_socket($self, $conn, $error != YAHC::Error::NO_ERROR);
791 9 100       39 return _set_completed_state($self, $conn_id) unless exists $conn->{has_callback};
792              
793             eval {
794 2 0       6 _register_in_timeline($conn, "call callback%s", $error ? " error=$error, strerror='$strerror'" : '') if exists $conn->{debug_or_timeline};
    50          
795 2         5 my $cb = $self->{callbacks}{$conn_id}{callback};
796 2         10 $cb->($conn, $error, $strerror);
797 2         503 1;
798 2 50       5 } or do {
799 0   0     0 my $error = $@ || 'zombie error';
800 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "Exception in user action callback (close connection): $error");
801 0         0 $self->{state} = YAHC::State::COMPLETED();
802             };
803              
804 2         10 $self->{loop}->now_update;
805              
806 2         2 my $state = $conn->{state};
807 2 50       7 if (yahc_terminal_error($error)) {
808 0 0 0     0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "ignoring changed state due to terminal error")
809             unless $state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED();
810 0         0 _set_completed_state($self, $conn_id, 1);
811             return
812 0         0 }
813              
814 2 50       7 _register_in_timeline($conn, "after invoking callback state is %s", _strstate($state)) if exists $conn->{debug_or_timeline};
815              
816 2 50 33     789 if ($state == YAHC::State::INITIALIZED()) {
    50          
817 0         0 _set_init_state($self, $conn_id);
818             } elsif ($state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED()) {
819 2         8 _set_completed_state($self, $conn_id);
820             } else {
821 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "callback set unsupported state");
822 0         0 _set_completed_state($self, $conn_id);
823             }
824             }
825              
826             sub _set_completed_state {
827 9     9   23 my ($self, $conn_id, $force_socket_close) = @_;
828              
829             # this's a terminal state,
830             # so setting this state should *NEVER* fail
831 9         28 delete $self->{callbacks}{$conn_id};
832 9         22 my $conn = delete $self->{connections}{$conn_id};
833              
834 9 50       29 if (!defined $conn) {
835 0         0 delete($self->{watchers}{$conn_id}), # implicit stop of all watchers
836             return;
837             }
838              
839 9         27 $conn->{state} = YAHC::State::COMPLETED();
840 9 100       37 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
841              
842 9         25 _close_or_cache_socket($self, $conn, $force_socket_close);
843 9         21 delete $self->{watchers}{$conn_id}; # implicit stop of all watchers
844              
845 9 50       30 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
846             }
847              
848             sub _build_socket_and_connect {
849 14     14   31 my ($ip, $port) = @_;
850              
851 14         26 my $sock;
852 14 50       319 socket($sock, PF_INET, SOCK_STREAM, 0)
853             or die sprintf("Failed to construct TCP socket: '%s' errno=%d\n", "$!", $!+0);
854              
855 14 50       61 my $flags = fcntl($sock, F_GETFL, 0) or die sprintf("Failed to get fcntl F_GETFL flag: '%s' errno=%d\n", "$!", $!+0);
856 14 50       48 fcntl($sock, F_SETFL, $flags | O_NONBLOCK) or die sprintf("Failed to set fcntl O_NONBLOCK flag: '%s' errno=%d\n", "$!", $!+0);
857              
858 14 50       74 my $ip_addr = inet_aton($ip) or die "Invalid IP address";
859 14         52 my $addr = pack_sockaddr_in($port, $ip_addr);
860 14 50 33     1208 if (!connect($sock, $addr) && $! != EINPROGRESS) {
861 0         0 die sprintf("Failed to connect: '%s' errno=%d\n", "$!", $!+0);
862             }
863              
864 14         58 return $sock;
865             }
866              
867             sub _get_next_target {
868 18     18   31 my $conn = shift;
869 18         37 my ($host, $ip, $port, $scheme) = $conn->{request}{_target}->($conn);
870              
871             # TODO STATE_RESOLVE_DNS
872 18 100 66     138 ($host, $port) = ($1, $2) if !$port && $host =~ m/^(.+):([0-9]+)$/o;
873 18 100 66     77 $ip = $host if !$ip && $host =~ m/^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$/o;
874 18   50     1000 $ip ||= inet_ntoa(gethostbyname($host) or die "Failed to resolve $host\n");
      66        
875 18   50     100 $scheme ||= $conn->{request}{scheme} || 'http';
      33        
876 18   66     69 $port ||= $conn->{request}{port} || ($scheme eq 'https' ? 443 : 80);
      66        
877              
878 18         40 $conn->{is_ssl} = $scheme eq 'https';
879 18         23 return @{ $conn->{selected_target} = [ $host, $ip, $port, $scheme ] };
  18         93  
880             }
881              
882             # this and following functions are used in terminal state
883             # so they should *NEVER* fail
884             sub _close_or_cache_socket {
885 34     34   63 my ($self, $conn, $force_close) = @_;
886 34 50       93 my $watchers = $self->{watchers}{$conn->{id}} or return;
887 34 100       103 my $fh = delete $watchers->{_fh} or return;
888 19         76 delete $watchers->{io}; # implicit stop
889              
890 19         30 my $socket_cache = $conn->{request}{_socket_cache};
891              
892             # Stolen from Hijk. Thanks guys!!!
893             # We always close connections for 1.0 because some servers LIE
894             # and say that they're 1.0 but don't close the connection on
895             # us! An example of this. Test::HTTP::Server (used by the
896             # ShardedKV::Storage::Rest tests) is an example of such a
897             # server. In either case we can't cache a connection for a 1.0
898             # server anyway, so BEGONE!
899              
900 19 0 66     69 if ( $force_close
      0        
      33        
      0        
      33        
      0        
      0        
901             || !defined $socket_cache
902             || (($conn->{request}{proto} || '') eq 'HTTP/1.0')
903             || (($conn->{response}{proto} || '') eq 'HTTP/1.0')
904             || (($conn->{response}{head}{connection} || '') eq 'close'))
905             {
906 19 100       47 _register_in_timeline($conn, "drop socket") if $conn->{debug_or_timeline};
907 19 100       166 close($fh) if ref($fh) eq 'GLOB'; # checking ref to avoid exception
908 19         39 return;
909             }
910              
911 0 0       0 _register_in_timeline($conn, "storing socket for later use") if $conn->{debug_or_timeline};
912 0 0       0 eval { $socket_cache->(YAHC::SocketCache::STORE(), $conn, $fh); 1; } or do {
  0         0  
  0         0  
913 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "Exception in socket_cache callback (ignore error): $@");
914             };
915             }
916              
917             sub yahc_conn_socket_cache_id {
918 0     0 1 0 my $conn = shift;
919 0 0       0 return unless defined $conn;
920 0 0       0 my ($host, undef, $port, $scheme) = @{ $conn->{selected_target} || [] };
  0         0  
921 0 0 0     0 return unless $host && $port && $scheme;
      0        
922             # Use $; so we can use the $socket_cache->{$$, $host, $port} idiom to access the cache.
923 0         0 return join($;, $$, $host, $port, $scheme);
924             }
925              
926             ################################################################################
927             # Timers
928             ################################################################################
929              
930             sub _set_until_state_timer {
931 0     0   0 my ($self, $conn_id, $timeout_name, $state, $error_to_report) = @_;
932              
933 0         0 my $timer_name = $timeout_name . '_timer';
934 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
935 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
936              
937 0         0 delete $watchers->{$timer_name}; # implicit stop
938 0         0 my $timeout = $conn->{request}{$timeout_name};
939 0 0       0 return unless $timeout;
940              
941             my $timer_cb = sub { # there is nothing what can throw exception
942 0 0   0   0 if ($conn->{state} < $state) {
943 0         0 yahc_conn_register_error($conn, $error_to_report, "$timeout_name of %.3fs expired", $timeout);
944 0         0 _set_init_state($self, $conn_id);
945             } else {
946 0 0       0 _register_in_timeline($conn, "delete $timer_name") if exists $conn->{debug_or_timeline};
947             }
948 0         0 };
949              
950 0 0       0 _register_in_timeline($conn, "setting $timeout_name to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
951              
952             # caller should call now_update
953 0         0 my $w = $watchers->{$timer_name} = $self->{loop}->timer_ns($timeout, 0, $timer_cb);
954 0         0 $w->priority(2); # set highest priority
955 0         0 $w->start;
956             }
957              
958             sub _set_lifetime_timer {
959 0     0   0 my ($self, $conn_id) = @_;
960              
961 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
962 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
963              
964 0         0 delete $watchers->{lifetime_timer}; # implicit stop
965 0         0 my $timeout = $conn->{request}{lifetime_timeout};
966 0 0       0 return unless $timeout;
967              
968 0 0       0 _register_in_timeline($conn, "setting lifetime timer to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
969              
970 0         0 $self->{loop}->now_update;
971             my $w = $watchers->{lifetime_timer} = $self->{loop}->timer_ns($timeout, 0, sub {
972             _set_user_action_state($self, $conn_id, YAHC::Error::TIMEOUT() | YAHC::Error::LIFETIME_TIMEOUT() | YAHC::Error::TERMINAL_ERROR(),
973 0 0   0   0 sprintf("lifetime_timeout of %.3fs expired", $timeout)) if $conn->{state} < YAHC::State::COMPLETED();
974 0         0 });
975              
976 0         0 $w->priority(2); # set highest priority
977 0         0 $w->start;
978             }
979              
980             ################################################################################
981             # HTTP functions
982             ################################################################################
983              
984             # copy-paste from Hijk
985             sub _build_http_message {
986 33     33   637 my $conn = shift;
987 33         47 my $request = $conn->{request};
988 33 100 100     143 my $path_and_qs = ($request->{path} || "/") . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
989 33         40 my $has_host = 0;
990              
991             return join(
992             $CRLF,
993             ($request->{method} || "GET") . " $path_and_qs " . ($request->{protocol} || "HTTP/1.1"),
994             defined($request->{body}) ? ("Content-Length: " . length($request->{body})) : (),
995             defined($request->{head}) && @{ $request->{head} } ? (
996             map {
997 9   33     44 $has_host ||= lc($request->{head}[2*$_]) eq 'host';
998 9         78 $request->{head}[2*$_] . ": " . $request->{head}[2*$_+1]
999 7         26 } 0..$#{$request->{head}}/2
1000             ) : (),
1001             !$has_host ? ("Host: " . $conn->{selected_target}[0]) : (),
1002             "",
1003 33 100 100     357 defined($request->{body}) ? $request->{body} : ""
    100 100        
    50 100        
    100          
1004             );
1005             }
1006              
1007             sub _parse_http_headers {
1008 8     8   123 my $conn = shift;
1009 8         21 my $proto = substr($_[0], 0, 8);
1010 8         17 my $status_code = substr($_[0], 9, 3);
1011 8         30 substr($_[0], 0, index($_[0], $CRLF) + 2, ''); # 2 = length($CRLF)
1012              
1013 8         13 my %headers;
1014 8         98 for (split /${CRLF}/o, $_[0]) {
1015 32         108 my ($key, $value) = split(/: /, $_, 2);
1016 32         104 $headers{lc $key} = $value;
1017             }
1018              
1019             $conn->{response} = {
1020 8         52 proto => $proto,
1021             status => $status_code,
1022             head => \%headers,
1023             };
1024              
1025 8 100       26 if ($conn->{debug_or_timeline}) {
1026 7         33 my $headers_str = join(' ', map { "$_='$headers{$_}'" } keys %headers);
  26         175  
1027 7         47 _register_in_timeline($conn, "headers parsed: $status_code $proto headers=$headers_str");
1028             }
1029              
1030 8         30 return \%headers;
1031             }
1032              
1033             ################################################################################
1034             # Helpers
1035             ################################################################################
1036              
1037             sub _delete_watchers_but_lifetime_timer {
1038 16     16   41 my ($self, $conn_id) = @_;
1039              
1040 16         33 my $watchers = $self->{watchers}{$conn_id};
1041 16 50 33     70 if (defined $watchers && (my $w = $watchers->{lifetime_timer})) {
1042 0         0 return $self->{watchers}{$conn_id} = { lifetime_timer => $w };
1043             }
1044              
1045 16         41 return $self->{watchers}{$conn_id} = {};
1046             }
1047              
1048             sub _wrap_host {
1049 16     16   39 my ($value) = @_;
1050 16         38 my $ref = ref($value);
1051              
1052 16 100   3   110 return sub { $value } if $ref eq '';
  3         9  
1053 4 100       11 return $value if $ref eq 'CODE';
1054              
1055 3         6 my $idx = 0;
1056 14     14   39 return sub { $value->[$idx++ % @$value]; }
1057 3 50 33     28 if $ref eq 'ARRAY' && @$value > 0;
1058              
1059 0         0 die "YAHC: unsupported host format\n";
1060             }
1061              
1062             sub _wrap_backoff {
1063 0     0   0 my ($value) = @_;
1064 0         0 my $ref = ref($value);
1065              
1066 0 0   0   0 return sub { $value } if $ref eq '';
  0         0  
1067 0 0       0 return $value if $ref eq 'CODE';
1068              
1069 0         0 die "YAHC: unsupported backoff format\n";
1070             }
1071              
1072             sub _wrap_socket_cache {
1073 0     0   0 my ($value) = @_;
1074 0         0 my $ref = ref($value);
1075              
1076 0 0       0 return $value if $ref eq 'CODE';
1077             return sub {
1078 0     0   0 my ($operation, $conn, $sock) = @_;
1079 0 0       0 if ($operation == YAHC::SocketCache::GET()) {
1080 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1081 0         0 return delete $value->{$socket_cache_id};
1082             }
1083              
1084 0 0       0 if ($operation == YAHC::SocketCache::STORE()) {
1085 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1086 0 0       0 close(delete $value->{$socket_cache_id}) if exists $value->{$socket_cache_id};
1087 0         0 $value->{$socket_cache_id} = $sock;
1088 0         0 return;
1089             }
1090 0 0       0 } if $ref eq 'HASH';
1091              
1092 0         0 die "YAHC: unsupported socket_cache format\n";
1093             }
1094              
1095             sub _call_state_callback {
1096 0     0   0 my ($self, $conn, $cb_name) = @_;
1097 0         0 my $cb = $self->{callbacks}{$conn->{id}}{$cb_name};
1098 0 0       0 return unless $cb;
1099              
1100 0 0       0 _register_in_timeline($conn, "calling $cb_name callback") if exists $conn->{debug_or_timeline};
1101              
1102             eval {
1103 0         0 $cb->($conn);
1104 0         0 1;
1105 0 0       0 } or do {
1106 0   0     0 my $error = $@ || 'zombie error';
1107 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "exception in state callback (ignore error): $error");
1108             };
1109              
1110 0         0 $self->{loop}->now_update;
1111             }
1112              
1113             sub _get_safe_wrapper {
1114 24     24   75 my ($self, $conn, $sub) = @_;
1115             return sub { eval {
1116 23         58 $sub->(@_);
1117 23         510 1;
1118 23 50   23   76 } or do {
1119 0   0     0 my $error = $@ || 'zombie error';
1120 0         0 _set_user_action_state($self, $conn->{id}, YAHC::Error::INTERNAL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
1121             "exception in internal callback: $error");
1122 24         110 }};
1123             }
1124              
1125             sub _register_in_timeline {
1126 48     48   128 my ($conn, $format, @arguments) = @_;
1127 48         205 my $event = sprintf("$format", @arguments);
1128 48 50       106 _log_message("YAHC connection '%s': %s", $conn->{id}, $event) if exists $conn->{debug};
1129 48 50 100     112 push @{ $conn->{timeline} ||= [] }, [ $event, $conn->{state}, Time::HiRes::time ] if exists $conn->{keep_timeline};
  48         260  
1130             }
1131              
1132             sub yahc_conn_register_error {
1133 15     15 1 30 my ($conn, $error, $format, @arguments) = @_;
1134 15         45 my $strerror = sprintf("$format", @arguments);
1135 15 100       38 _register_in_timeline($conn, "strerror='$strerror' error=$error") if exists $conn->{debug_or_timeline};
1136 15   100     17 push @{ $conn->{errors} ||= [] }, [ $error, $strerror, [ @{ $conn->{selected_target} } ], Time::HiRes::time, $conn->{attempt} ];
  15         42  
  15         69  
1137             }
1138              
1139             sub _strstate {
1140 30     30   56 my $state = shift;
1141 30 100       87 return 'STATE_INIT' if $state eq YAHC::State::INITIALIZED();
1142 29 50       70 return 'STATE_RESOLVE_DNS' if $state eq YAHC::State::RESOLVE_DNS();
1143 29 100       60 return 'STATE_CONNECTING' if $state eq YAHC::State::CONNECTING();
1144 28 100       70 return 'STATE_CONNECTED' if $state eq YAHC::State::CONNECTED();
1145 25 100       48 return 'STATE_WRITING' if $state eq YAHC::State::WRITING();
1146 24 100       72 return 'STATE_READING' if $state eq YAHC::State::READING();
1147 14 50       28 return 'STATE_SSL_HANDSHAKE'if $state eq YAHC::State::SSL_HANDSHAKE();
1148 14 100       44 return 'STATE_USER_ACTION' if $state eq YAHC::State::USER_ACTION();
1149 7 50       36 return 'STATE_COMPLETED' if $state eq YAHC::State::COMPLETED();
1150 0           return "";
1151             }
1152              
1153             sub _log_message {
1154 0     0     my $format = shift;
1155 0           my $now = Time::HiRes::time;
1156 0           my ($sec, $ms) = split(/[.]/, $now);
1157 0   0       printf STDERR "[%s.%-5d] [$$] $format\n", POSIX::strftime('%F %T', localtime($now)), $ms || 0, @_;
1158             }
1159              
1160 0     0     sub _format_ssl_error { return sprintf("'%s' errno=%d ssl_error='%s' ssl_errno=%d", "$!", 0+$!, "$IO::Socket::SSL::SSL_ERROR", 0+$IO::Socket::SSL::SSL_ERROR); }
1161              
1162             1;
1163              
1164             __END__