File Coverage

blib/lib/YAHC.pm
Criterion Covered Total %
statement 380 629 60.4
branch 189 474 39.8
condition 72 202 35.6
subroutine 52 84 61.9
pod 26 26 100.0
total 719 1415 50.8


line stmt bran cond sub pod time code
1             package YAHC;
2              
3 16     16   242457 use strict;
  16         18  
  16         359  
4 16     16   49 use warnings;
  16         14  
  16         472  
5              
6             our $VERSION = '0.033';
7              
8 16     16   7199 use EV;
  16         25355  
  16         378  
9 16     16   6925 use Time::HiRes;
  16         16943  
  16         46  
10 16     16   1246 use Exporter 'import';
  16         19  
  16         437  
11 16     16   58 use Scalar::Util qw/weaken/;
  16         22  
  16         1041  
12 16     16   64 use Fcntl qw/F_GETFL F_SETFL O_NONBLOCK/;
  16         15  
  16         555  
13 16     16   7528 use POSIX qw/EINPROGRESS EINTR EAGAIN EWOULDBLOCK strftime/;
  16         76315  
  16         83  
14 16     16   21458 use Socket qw/PF_INET SOCK_STREAM $CRLF SOL_SOCKET SO_ERROR inet_aton inet_ntoa pack_sockaddr_in/;
  16         37435  
  16         3202  
15 16 50   16   157 use constant SSL => $ENV{YAHC_NO_SSL} ? 0 : eval 'use IO::Socket::SSL 1.94 (); 1';
  16     16   17  
  16         724  
  16         10212  
  16         785794  
  16         171  
16 16     16   93 use constant SSL_WANT_READ => SSL ? IO::Socket::SSL::SSL_WANT_READ() : 0;
  16         17  
  16         721  
17 16     16   56 use constant SSL_WANT_WRITE => SSL ? IO::Socket::SSL::SSL_WANT_WRITE() : 0;
  16         19  
  16         4986  
18              
19             sub YAHC::Error::NO_ERROR () { 0 }
20             sub YAHC::Error::REQUEST_TIMEOUT () { 1 << 0 }
21             sub YAHC::Error::CONNECT_TIMEOUT () { 1 << 1 }
22             sub YAHC::Error::DRAIN_TIMEOUT () { 1 << 2 }
23             sub YAHC::Error::LIFETIME_TIMEOUT () { 1 << 3 }
24             sub YAHC::Error::TIMEOUT () { 1 << 8 }
25             sub YAHC::Error::RETRY_LIMIT () { 1 << 9 }
26              
27             sub YAHC::Error::CONNECT_ERROR () { 1 << 10 }
28             sub YAHC::Error::READ_ERROR () { 1 << 11 }
29             sub YAHC::Error::WRITE_ERROR () { 1 << 12 }
30             sub YAHC::Error::REQUEST_ERROR () { 1 << 13 }
31             sub YAHC::Error::RESPONSE_ERROR () { 1 << 14 }
32             sub YAHC::Error::CALLBACK_ERROR () { 1 << 15 }
33             sub YAHC::Error::SSL_ERROR () { 1 << 16 }
34             sub YAHC::Error::TERMINAL_ERROR () { 1 << 30 }
35             sub YAHC::Error::INTERNAL_ERROR () { 1 << 31 }
36              
37             sub YAHC::State::INITIALIZED () { 0 }
38             sub YAHC::State::RESOLVE_DNS () { 5 }
39             sub YAHC::State::CONNECTING () { 10 }
40             sub YAHC::State::CONNECTED () { 15 }
41             sub YAHC::State::SSL_HANDSHAKE () { 20 }
42             sub YAHC::State::WRITING () { 25 }
43             sub YAHC::State::READING () { 30 }
44             sub YAHC::State::USER_ACTION () { 35 }
45             sub YAHC::State::COMPLETED () { 100 } # terminal state
46              
47             sub YAHC::SocketCache::GET () { 1 }
48             sub YAHC::SocketCache::STORE () { 2 }
49              
50             use constant {
51             # TCP_READ_CHUNK should *NOT* be lower than 16KB because of SSL things.
52             # https://metacpan.org/pod/distribution/IO-Socket-SSL/lib/IO/Socket/SSL.pod
53             # Another way might be if you try to sysread at least 16kByte all the time.
54             # 16kByte is the maximum size of an SSL frame and because sysread returns
55             # data from only a single SSL frame you can guarantee that there are no
56             # pending data.
57 16         100584 TCP_READ_CHUNK => 131072,
58             CALLBACKS => [ qw/init_callback connecting_callback connected_callback
59             writing_callback reading_callback callback/ ],
60 16     16   66 };
  16         17  
61              
62             our @EXPORT_OK = qw/
63             yahc_retry_conn
64             yahc_reinit_conn
65             yahc_terminal_error
66             yahc_conn_last_error
67             yahc_conn_id
68             yahc_conn_url
69             yahc_conn_target
70             yahc_conn_state
71             yahc_conn_errors
72             yahc_conn_timeline
73             yahc_conn_request
74             yahc_conn_response
75             yahc_conn_attempt
76             yahc_conn_attempts_left
77             yahc_conn_socket_cache_id
78             yahc_conn_register_error
79             yahc_conn_user_data
80             /;
81              
82             our %EXPORT_TAGS = (all => \@EXPORT_OK);
83             my $LAST_CONNECTION_ID;
84              
85             ################################################################################
86             # User facing functons
87             ################################################################################
88              
89             sub new {
90 9     9 1 2062 my ($class, $args) = @_;
91 9 100       57 $LAST_CONNECTION_ID = $$ * 1000 unless defined $LAST_CONNECTION_ID;
92              
93 9 50 66     60 die 'YAHC: ->new() expect args to be a hashref' if defined $args and ref($args) ne 'HASH';
94 9 50       30 die 'YAHC: please do `my ($yahc, $yahc_storage) = YAHC::new()` and keep both these objects in the same scope' unless wantarray;
95              
96             # wrapping target selection here allows all client share same list
97             # and more importantly to share index within the list
98 9 100       31 $args->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
99 9 50       44 $args->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
100 9 50       31 $args->{_socket_cache} = _wrap_socket_cache(delete $args->{socket_cache}) if $args->{socket_cache};
101              
102 9         14 my %storage;
103             my $self = bless {
104             loop => new EV::Loop,
105             pid => $$, # store pid to detect forks
106             storage => \%storage,
107             debug => delete $args->{debug} || $ENV{YAHC_DEBUG} || 0,
108 9   50     387 keep_timeline => delete $args->{keep_timeline} || $ENV{YAHC_TIMELINE} || 0,
      50        
109             pool_args => $args,
110             }, $class;
111              
112             # this's a radical way of avoiding circular references.
113             # let's see how it plays out in practise.
114 9         88 weaken($self->{storage});
115 9         85 weaken($self->{$_} = $storage{$_} = {}) for qw/watchers callbacks connections/;
116              
117 9 100       135 if (delete $args->{account_for_signals}) {
118 1 50       2 _log_message('YAHC: enable account_for_signals logic') if $self->{debug};
119 1     1   8 my $sigcheck = $self->{watchers}{_sigcheck} = $self->{loop}->check(sub {});
120 1         12 $sigcheck->keepalive(0);
121             }
122              
123 9         33 return $self, \%storage;
124             }
125              
126             sub request {
127 10     10 1 89 my ($self, @args) = @_;
128 10 50       25 die 'YAHC: new_request() expects arguments' unless @args;
129 10 50       22 die 'YAHC: storage object is destroyed' unless $self->{storage};
130              
131 10 50       48 my ($conn_id, $request) = (@args == 1 ? ('connection_' . $LAST_CONNECTION_ID++, $args[0]) : @args);
132             die "YAHC: Connection with name '$conn_id' already exists\n"
133 10 50       27 if exists $self->{connections}{$conn_id};
134              
135 10         17 my $pool_args = $self->{pool_args};
136 10 50 0     21 do { $request->{$_} ||= $pool_args->{$_} if $pool_args->{$_} } foreach (qw/host port scheme head
  80         111  
137             request_timeout connect_timeout
138             drain_timeout lifetime_timeout/);
139 10 100       24 if ($request->{host}) {
    50          
140 8         30 $request->{_target} = _wrap_host($request->{host});
141             } elsif ($pool_args->{_target}) {
142 2         3 $request->{_target} = $pool_args->{_target};
143             } else {
144 0         0 die "YAHC: host must be defined in request() or in new()\n";
145             }
146              
147 10 50       36 if ($request->{backoff_delay}) {
    50          
148 0         0 $request->{_backoff} = _wrap_backoff($request->{backoff_delay});
149             } elsif ($pool_args->{_backoff}) {
150 0         0 $request->{_backoff} = $pool_args->{_backoff};
151             }
152              
153 10 50       40 if ($request->{socket_cache}) {
    50          
154 0         0 $request->{_socket_cache} = _wrap_socket_cache($request->{socket_cache});
155             } elsif ($pool_args->{_socket_cache}) {
156 0         0 $request->{_socket_cache} = $pool_args->{_socket_cache};
157             }
158              
159 10   50     55 my $scheme = $request->{scheme} ||= 'http';
160 10   33     27 my $debug = delete $request->{debug} || $self->{debug};
161 10   66     25 my $keep_timeline = delete $request->{keep_timeline} || $self->{keep_timeline};
162 10         11 my $user_data = delete $request->{user_data};
163              
164             my $conn = {
165             id => $conn_id,
166             request => $request,
167             response => { status => 0 },
168             attempt => 0,
169 10 50 100     162 retries => $request->{retries} || 0,
    100 66        
    100          
    50          
170             state => YAHC::State::INITIALIZED(),
171             selected_target => [],
172             ($debug ? (debug => $debug) : ()),
173             ($keep_timeline ? (keep_timeline => $keep_timeline) : ()),
174             ($debug || $keep_timeline ? (debug_or_timeline => 1) : ()),
175             (defined $user_data ? (user_data => $user_data) : ()),
176             pid => $$,
177             };
178              
179 10         12 my %callbacks;
180 10         16 foreach (@{ CALLBACKS() }) {
  10         16  
181 60 100       95 next unless exists $request->{$_};
182 2         5 my $cb = $callbacks{$_} = delete $request->{$_};
183 2         8 $conn->{"has_$_"} = !!$cb;
184             }
185              
186 10         21 $self->{watchers}{$conn_id} = {};
187 10         21 $self->{callbacks}{$conn_id} = \%callbacks;
188 10         13 $self->{connections}{$conn_id} = $conn;
189              
190 10 50       23 _set_lifetime_timer($self, $conn_id) if $request->{lifetime_timeout};
191              
192 10 100       36 return $conn if $request->{_test}; # for testing purposes
193 2         8 _set_init_state($self, $conn_id);
194              
195             # if user fire new request in a callback we need to update stop_condition
196 2         3 my $stop_condition = $self->{stop_condition};
197 2 0 33     4 if ($stop_condition && $stop_condition->{all}) {
198 0         0 $stop_condition->{connections}{$conn_id} = 1;
199             }
200              
201 2         6 return $conn;
202             }
203              
204             sub drop {
205 0     0 1 0 my ($self, $c, $force_socket_close) = @_;
206 0 0       0 my $conn_id = ref($c) eq 'HASH' ? $c->{id} : $c;
207 0 0       0 my $conn = $self->{connections}{$conn_id} or return;
208 0 0       0 _register_in_timeline($conn, "dropping connection from pool") if exists $conn->{debug_or_timeline};
209 0 0       0 _set_completed_state($self, $conn_id, $force_socket_close) unless $conn->{state} == YAHC::State::COMPLETED();
210 0         0 return $conn;
211             }
212              
213 6     6 1 57 sub run { shift->_run(0, @_) }
214 0     0 1 0 sub run_once { shift->_run(EV::RUN_ONCE) }
215 0     0 1 0 sub run_tick { shift->_run(EV::RUN_NOWAIT) }
216 3     3 1 38 sub is_running { !!shift->{loop}->depth }
217 5     5 1 1729 sub loop { shift->{loop} }
218              
219             sub break {
220 3     3 1 18 my ($self, $reason) = @_;
221 3 50       10 return unless $self->is_running;
222 3 50 0     20 _log_message('YAHC: pid %d breaking event loop because %s', $$, ($reason || 'no reason')) if $self->{debug};
223 3         23 $self->{loop}->break(EV::BREAK_ONE)
224             }
225              
226             ################################################################################
227             # Routines to manipulate connections (also user facing)
228             ################################################################################
229              
230             sub yahc_terminal_error {
231 1 50   1 1 5 return (($_[0] & YAHC::Error::TERMINAL_ERROR()) == YAHC::Error::TERMINAL_ERROR()) ? 1 : 0;
232             }
233              
234             sub yahc_reinit_conn {
235 0     0 1 0 my ($conn, $args) = @_;
236             die "YAHC: cannot reinit completed connection\n"
237 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
238              
239 0         0 $conn->{attempt} = 0;
240 0         0 $conn->{state} = YAHC::State::INITIALIZED();
241 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
242              
243 0         0 my $request = $conn->{request};
244 0 0       0 $request->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
245 0 0       0 $request->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
246 0 0       0 do { $request->{$_} = $args->{$_} if $args->{$_} } foreach (keys %$args);
  0         0  
247             }
248              
249             sub yahc_retry_conn {
250 0     0 1 0 my ($conn, $args) = @_;
251             die "YAHC: cannot retry completed connection\n"
252 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
253 0 0       0 return unless yahc_conn_attempts_left($conn) > 0;
254              
255 0         0 $conn->{state} = YAHC::State::INITIALIZED();
256 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
257              
258             $conn->{request}{_backoff} = _wrap_backoff($args->{backoff_delay})
259 0 0       0 if $args->{backoff_delay};
260             }
261              
262             sub yahc_conn_last_error {
263 0     0 1 0 my $conn = shift;
264 0 0 0     0 return unless $conn->{errors} && @{ $conn->{errors} };
  0         0  
265 0 0       0 return wantarray ? @{ $conn->{errors}[-1] } : $conn->{errors}[-1];
  0         0  
266             }
267              
268 0     0 1 0 sub yahc_conn_id { $_[0]->{id} }
269 0     0 1 0 sub yahc_conn_state { $_[0]->{state} }
270 0     0 1 0 sub yahc_conn_errors { $_[0]->{errors} }
271 0     0 1 0 sub yahc_conn_timeline { $_[0]->{timeline} }
272 0     0 1 0 sub yahc_conn_request { $_[0]->{request} }
273 0     0 1 0 sub yahc_conn_response { $_[0]->{response} }
274 4     4 1 17 sub yahc_conn_attempt { $_[0]->{attempt} }
275 4 100   4 1 24 sub yahc_conn_attempts_left { $_[0]->{attempt} > $_[0]->{retries} ? 0 : $_[0]->{retries} - $_[0]->{attempt} + 1 }
276              
277             sub yahc_conn_target {
278 4     4 1 10 my $target = $_[0]->{selected_target};
279 4 50 50     8 return unless $target && scalar @{ $target };
  4         7  
280 4         30 my ($host, $ip, $port) = @{ $target };
  4         6  
281 4 50 33     29 return ($host || $ip) . ($port ne '80' && $port ne '443' ? ":$port" : '');
      33        
282             }
283              
284             sub yahc_conn_url {
285 0     0 1 0 my $target = $_[0]->{selected_target};
286 0         0 my $request = $_[0]->{request};
287 0 0 0     0 return unless $target && @{ $target };
  0         0  
288              
289 0         0 my ($host, $ip, $port, $scheme) = @{ $target };
  0         0  
290             return "$scheme://"
291             . ($host || $ip)
292             . ($port ne '80' && $port ne '443' ? ":$port" : '')
293             . ($request->{path} || "/")
294 0 0 0     0 . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
    0 0        
      0        
295             }
296              
297             sub yahc_conn_user_data {
298 0     0 1 0 my $conn = shift;
299 0 0       0 $conn->{user_data} = $_[0] if @_;
300 0         0 return $conn->{user_data};
301             }
302              
303             ################################################################################
304             # Internals
305             ################################################################################
306              
307             sub _run {
308 6     6   11 my ($self, $how, $until_state, @cs) = @_;
309 6 50       17 die "YAHC: storage object is destroyed\n" unless $self->{storage};
310 6 50       43 die "YAHC: reentering run\n" if $self->{loop}->depth;
311              
312 6 50       21 if ($self->{pid} != $$) {
313 0 0       0 _log_message('YAHC: reinitializing event loop after forking') if $self->{debug};
314 0         0 $self->{pid} = $$;
315 0         0 $self->{loop}->loop_fork;
316              
317 0         0 my $active_connections = grep { $$ != $_->{pid} } values %{ $self->{connections} };
  0         0  
  0         0  
318 0 0       0 warn "YAHC has $active_connections active connections after a fork, consider dropping them!"
319             if $active_connections;
320             }
321              
322 6 100       14 if (defined $until_state) {
323 2         4 my $until_state_str = _strstate($until_state);
324 2 50       14 die "YAHC: unknown until_state $until_state\n" if $until_state_str =~ m/unknown/;
325              
326 2         4 my $is_all = (@cs == 0);
327 0         0 my @connections = $is_all ? values %{ $self->{connections} }
328 2 50       7 : map { $self->{connections}{$_} || () }
329 2 50       9 map { ref($_) eq 'HASH' ? $_->{id} : $_ } @cs;
  2 50       8  
330              
331             $self->{stop_condition} = {
332             all => $is_all,
333             expected_state => $until_state,
334 2         6 connections => { map { $_->{id} => 1 } grep { $_->{state} < $until_state } @connections },
  2         15  
  2         4  
335             };
336             } else {
337 4         5 delete $self->{stop_condition};
338             }
339              
340 6         8 my $loop = $self->{loop};
341 6         24 $loop->now_update;
342              
343 6 50       39 if ($self->{debug}) {
344 0         0 my $iterations = $loop->iteration;
345 0 0       0 _log_message('YAHC: pid %d entering event loop%s', $$, ($until_state ? " with until state " . _strstate($until_state) : ''));
346 0   0     0 $loop->run($how || 0);
347 0         0 _log_message('YAHC: pid %d exited from event loop after %d iterations', $$, $loop->iteration - $iterations);
348             } else {
349 6   50     2000200 $loop->run($how || 0);
350             }
351             }
352              
353             sub _check_stop_condition {
354 2     2   3 my ($self, $conn) = @_;
355 2         3 my $stop_condition = $self->{stop_condition};
356 2 50 33     21 return if !$stop_condition || $conn->{state} < $stop_condition->{expected_state};
357              
358 2         6 delete $stop_condition->{connections}{$conn->{id}};
359 2         1 my $awaiting_connections = scalar keys %{ $stop_condition->{connections} };
  2         4  
360 2         3 my $expected_state = $stop_condition->{expected_state};
361              
362 2 50       5 if ($awaiting_connections == 0) {
363 2         12 $self->break(sprintf("until state '%s' is reached", _strstate($expected_state)));
364 2         4 return 1;
365             }
366              
367             _log_message("YAHC: still have %d connections awaiting state '%s'",
368 0 0       0 $awaiting_connections, _strstate($expected_state)) if $self->{debug};
369             }
370              
371             ################################################################################
372             # IO routines
373             ################################################################################
374              
375             sub _set_init_state {
376 14     14   24 my ($self, $conn_id) = @_;
377              
378 14 50       30 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
379              
380 14         25 $conn->{response} = { status => 0 };
381 14         20 $conn->{state} = YAHC::State::INITIALIZED();
382 14 100       28 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
383 14 50       23 _call_state_callback($self, $conn, 'init_callback') if exists $conn->{has_init_callback};
384              
385 14         24 _close_or_cache_socket($self, $conn, 1); # force connection close if any (likely not)
386 14         20 my $watchers = _delete_watchers_but_lifetime_timer($self, $conn_id); # implicit stop of all watchers
387              
388             return _set_user_action_state($self, $conn_id, YAHC::Error::RETRY_LIMIT(), "retries limit reached")
389 14 100       28 if $conn->{attempt} > $conn->{retries};
390              
391             # don't move attempt increment before boundary check !!!
392             # otherwise we can get off-by-one error in yahc_conn_attempts_left
393 13         13 my $attempt = ++$conn->{attempt};
394 13 50 66     31 if ($attempt > 1 && exists $conn->{request}{_backoff}) {
395 0         0 my $backoff_delay = eval { $conn->{request}{_backoff}->($conn) };
  0         0  
396 0 0       0 if (my $error = $@) {
397 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(),
398             "exception in backoff callback (close connection): $error");
399             };
400              
401 0 0       0 if ($backoff_delay) {
402 0         0 $self->{loop}->now_update;
403 0 0       0 _register_in_timeline($conn, "setting backoff_timer to %.3fs", $backoff_delay) if exists $conn->{debug_or_timeline};
404             $watchers->{backoff_timer} = $self->{loop}->timer($backoff_delay, 0, _get_safe_wrapper($self, $conn, sub {
405 0 0   0   0 _register_in_timeline($conn, "backoff timer of %.3fs expired, time for new attempt", $backoff_delay) if exists $conn->{debug_or_timeline};
406 0 0       0 _set_init_state($self, $conn_id) if _init_helper($self, $conn_id) == 1;
407 0         0 }));
408 0         0 return;
409             }
410             }
411              
412 13 50       31 if (_init_helper($self, $conn_id) == 1) {
413             _register_in_timeline($conn, "do attempt on next EV iteration, (iteration=%d)", $self->{loop}->iteration)
414 0 0       0 if exists $conn->{debug_or_timeline};
415              
416             # from EV docs:
417             # idle watcher call the callback when there are no other pending
418             # watchers of the same or higher priority. The idle watchers are
419             # being called once per event loop iteration - until stopped.
420             #
421             # so, what we do is we start idle watcher with priority 1 which is
422             # higher then 0 used by all IO watchers. As result, the callback
423             # will be called at the end of this iteration. And others if neccessary.
424              
425             my $retry_watcher = $watchers->{retry} ||= $self->{loop}->idle_ns(_get_safe_wrapper($self, $conn, sub {
426 0     0   0 shift->stop; # stop this watcher, _set_init_state will start if neccessary
427             _register_in_timeline($conn, "time for new attempt (iteration=%d)", $self->{loop}->iteration)
428 0 0       0 if exists $conn->{debug_or_timeline};
429 0         0 _set_init_state($self, $conn_id)
430 0   0     0 }));
431              
432 0         0 $retry_watcher->priority(1);
433 0         0 $retry_watcher->start;
434             };
435             }
436              
437             sub _init_helper {
438 13     13   54 my ($self, $conn_id) = @_;
439              
440 13 50       27 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
441 13 50       21 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
442              
443 13         13 my $request = $conn->{request};
444              
445 13         42 $self->{loop}->now_update; # update time for timers
446             _set_until_state_timer($self, $conn_id, 'request_timeout', YAHC::State::USER_ACTION(), YAHC::Error::TIMEOUT() | YAHC::Error::REQUEST_TIMEOUT())
447 13 50       24 if $request->{request_timeout};
448             _set_until_state_timer($self, $conn_id, 'connect_timeout', YAHC::State::CONNECTED(), YAHC::Error::TIMEOUT() | YAHC::Error::CONNECT_TIMEOUT())
449 13 50       18 if $request->{connect_timeout};
450             _set_until_state_timer($self, $conn_id, 'drain_timeout', YAHC::State::READING(), YAHC::Error::TIMEOUT() | YAHC::Error::DRAIN_TIMEOUT())
451 13 50       17 if $request->{drain_timeout};
452              
453             eval {
454 13         23 my ($host, $ip, $port, $scheme) = _get_next_target($conn);
455             _register_in_timeline($conn, "Target $scheme://$host:$port ($ip:$port) chosen for attempt #%d", $conn->{attempt})
456 13 100       30 if exists $conn->{debug_or_timeline};
457              
458 13         7 my $sock;
459 13 50       23 if (my $socket_cache = $request->{_socket_cache}) {
460 0         0 $sock = $socket_cache->(YAHC::SocketCache::GET(), $conn);
461             }
462              
463 13 50       17 if (defined $sock) {
464 0 0       0 _register_in_timeline($conn, "reuse socket") if $conn->{debug_or_timeline};
465 0         0 $watchers->{_fh} = $sock;
466 0     0   0 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, sub {});
467 0         0 _set_write_state($self, $conn_id);
468             } else {
469 13 100       26 _register_in_timeline($conn, "build new socket") if $conn->{debug_or_timeline};
470 13         26 $sock = _build_socket_and_connect($ip, $port);
471 13         26 _set_connecting_state($self, $conn_id, $sock);
472             }
473              
474 13         25 1;
475 13 50       18 } or do {
476 0   0     0 my $error = $@ || 'zombie error';
477 0         0 $error =~ s/\s+$//o;
478 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "connection attempt %d failed: %s", $conn->{attempt}, $error);
479 0         0 return 1;
480             };
481              
482 13         33 return 0;
483             }
484              
485             sub _set_connecting_state {
486 13     13   39 my ($self, $conn_id, $sock) = @_;
487              
488 13 50       31 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
489 13 50       25 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
490              
491 13         12 $conn->{state} = YAHC::State::CONNECTING();
492 13 100       23 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
493 13 50       17 _call_state_callback($self, $conn, 'connecting_callback') if exists $conn->{has_connecting_callback};
494              
495             my $connecting_cb = _get_safe_wrapper($self, $conn, sub {
496 12     12   44 my $sockopt = getsockopt($sock, SOL_SOCKET, SO_ERROR);
497 12 50       18 if (!$sockopt) {
498 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to do getsockopt(): '%s' errno=%d", "$!", $!+0);
499 0         0 _set_init_state($self, $conn_id);
500 0         0 return;
501             }
502              
503 12 100       41 if (my $err = unpack("L", $sockopt)) {
504 11   50     200 my $strerror = POSIX::strerror($err) || '';
505 11         1185 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to connect: $strerror");
506 11         20 _set_init_state($self, $conn_id);
507 11         11 return;
508             }
509              
510 1         2 _set_connected_state($self, $conn_id);
511 13         75 });
512              
513 13         22 $watchers->{_fh} = $sock;
514 13         70 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, $connecting_cb);
515 13 50       28 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
516             }
517              
518             sub _set_connected_state {
519 1     1   2 my ($self, $conn_id) = @_;
520              
521 1 50       3 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
522 1 50       3 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
523              
524 1         1 $conn->{state} = YAHC::State::CONNECTED();
525 1 50       5 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
526 1 50       6 _call_state_callback($self, $conn, 'connected_callback') if exists $conn->{has_connected_callback};
527              
528             my $connected_cb = _get_safe_wrapper($self, $conn, sub {
529 0 0   0   0 if ($conn->{is_ssl}) {
530 0         0 _set_ssl_handshake_state($self, $conn_id);
531             } else {
532 0         0 _set_write_state($self, $conn_id);
533             }
534 1         11 });
535              
536             #$watcher->events(EV::WRITE);
537 1         20 $watchers->{io}->cb($connected_cb);
538 1 50       4 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
539             }
540              
541             sub _set_ssl_handshake_state {
542 0     0   0 my ($self, $conn_id) = @_;
543              
544 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
545 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
546              
547 0         0 $conn->{state} = YAHC::State::SSL_HANDSHAKE();
548 0 0       0 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
549             #_call_state_callback($self, $conn, 'writing_callback') if $conn->{has_writing_callback}; TODO
550              
551 0         0 my $fh = $watchers->{_fh};
552 0         0 my $hostname = $conn->{selected_target}[0];
553              
554             my %options = (
555             SSL_verifycn_name => $hostname,
556             IO::Socket::SSL->can_client_sni ? ( SSL_hostname => $hostname ) : (),
557 0 0       0 %{ $conn->{request}{ssl_options} || {} },
  0 0       0  
558             );
559              
560 0 0       0 if ($conn->{debug_or_timeline}) {
561 0   0     0 my $options_msg = join(', ', map { "$_=" . ($options{$_} || '') } keys %options);
  0         0  
562 0         0 _register_in_timeline($conn, "start SSL handshake with options: $options_msg");
563             }
564              
565 0 0       0 if (!IO::Socket::SSL->start_SSL($fh, %options, SSL_startHandshake => 0)) {
566 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::SSL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
567             sprintf("failed to start SSL session: %s", _format_ssl_error()));
568             }
569              
570             my $handshake_cb = _get_safe_wrapper($self, $conn, sub {
571 0     0   0 my $w = shift;
572 0 0       0 if ($fh->connect_SSL) {
573 0 0       0 _register_in_timeline($conn, "SSL handshake successfully completed") if exists $conn->{debug_or_timeline};
574 0         0 return _set_write_state($self, $conn_id);
575             }
576              
577 0 0       0 if ($! == EWOULDBLOCK) {
578 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
579 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
580             }
581              
582 0         0 yahc_conn_register_error($conn, YAHC::Error::SSL_ERROR(), "Failed to complete SSL handshake: %s", _format_ssl_error());
583 0         0 _set_init_state($self, $conn_id);
584 0         0 });
585              
586 0         0 my $watcher = $watchers->{io};
587 0         0 $watcher->cb($handshake_cb);
588 0         0 $watcher->events(EV::WRITE | EV::READ);
589 0 0       0 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
590             }
591              
592             sub _set_write_state {
593 1     1   8 my ($self, $conn_id) = @_;
594              
595 1 50       3 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
596 1 50       3 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
597              
598 1         2 $conn->{state} = YAHC::State::WRITING();
599 1 50       3 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
600 1 50       3 _call_state_callback($self, $conn, 'writing_callback') if exists $conn->{has_writing_callback};
601              
602 1         1 my $fh = $watchers->{_fh};
603 1         2 my $buf = _build_http_message($conn);
604 1         2 my $length = length($buf);
605              
606 1 50       3 warn "YAHC: HTTP message has UTF8 flag set! This will result in poor performance, see docs for details!"
607             if utf8::is_utf8($buf);
608              
609             _register_in_timeline($conn, "writing body of %d bytes\n%s", $length, ($length > 1024? substr($buf, 0, 1024) . '... (cut to 1024 bytes)' : $buf))
610 1 50       6 if exists $conn->{debug_or_timeline};
    50          
611              
612             my $write_cb = _get_safe_wrapper($self, $conn, sub {
613 1     1   1 my $w = shift;
614 1         45 my $wlen = syswrite($fh, $buf, $length);
615              
616 1 50       4 if (!defined $wlen) {
    50          
617 0 0       0 if ($conn->{is_ssl}) {
618 0 0       0 if ($! == EWOULDBLOCK) {
619 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
620 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
621             }
622              
623 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to send HTTPS data: %s", _format_ssl_error());
624 0         0 return _set_init_state($self, $conn_id);
625             }
626              
627 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
628 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "Failed to send HTTP data: '%s' errno=%d", "$!", $!+0);
629 0         0 _set_init_state($self, $conn_id);
630             } elsif ($wlen == 0) {
631 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "syswrite returned 0");
632 0         0 _set_init_state($self, $conn_id);
633             } else {
634 1         3 substr($buf, 0, $wlen, '');
635 1         1 $length -= $wlen;
636 1 50       4 _set_read_state($self, $conn_id) if $length == 0;
637             }
638 1         6 });
639              
640 1         1 my $watcher = $watchers->{io};
641 1         9 $watcher->cb($write_cb);
642 1         8 $watcher->events(EV::WRITE);
643 1 50       3 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
644             }
645              
646             sub _set_read_state {
647 3     3   14 my ($self, $conn_id) = @_;
648              
649 3 50       11 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
650 3 50       8 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
651              
652 3         3 $conn->{state} = YAHC::State::READING();
653 3 50       12 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
654 3 50       7 _call_state_callback($self, $conn, 'reading_callback') if exists $conn->{has_reading_callback};
655              
656 3         4 my $buf = '';
657 3         4 my $neck_pos = 0;
658 3         4 my $decapitated = 0;
659 3         3 my $content_length = 0;
660 3         3 my $is_chunked = 0;
661 3         3 my $fh = $watchers->{_fh};
662 3         3 my $chunk_size = 0;
663 3         3 my $body = ''; # used for chunked encoding
664              
665             my $read_cb = _get_safe_wrapper($self, $conn, sub {
666 2     2   4 my $w = shift;
667 2         38 my $rlen = sysread($fh, my $b = '', TCP_READ_CHUNK);
668 2 50       8 if (!defined $rlen) {
    50          
669 0 0       0 if ($conn->{is_ssl}) {
670 0 0       0 if ($! == EWOULDBLOCK) {
671 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
672 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
673             }
674              
675 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to receive HTTPS data: %s", _format_ssl_error());
676 0         0 return _set_init_state($self, $conn_id);
677             }
678              
679 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
680 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Failed to receive HTTP data: '%s' errno=%d", "$!", $!+0);
681 0         0 _set_init_state($self, $conn_id);
682             } elsif ($rlen == 0) {
683 0 0       0 if ($content_length > 0) {
684 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF, expect %d bytes more", $content_length - length($buf));
685             } else {
686 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF");
687             }
688 0         0 _set_init_state($self, $conn_id);
689             } else {
690 2         7 $buf .= $b;
691 2 50 33     38 if (!$decapitated && ($neck_pos = index($buf, "${CRLF}${CRLF}")) > 0) {
692 2         10 my $headers = _parse_http_headers($conn, substr($buf, 0, $neck_pos, '')); # $headers are always defined but might be empty, maybe fix later
693 2   100     11 $is_chunked = ($headers->{'Transfer-Encoding'} || '') eq 'chunked';
694              
695 2 50 66     19 if ($is_chunked && exists $headers->{'Trailer'}) {
    50 66        
696 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "Chunked HTTP response with Trailer header");
697 0         0 return;
698             } elsif (!$is_chunked && !exists $headers->{'Content-Length'}) {
699 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "HTTP reponse without Content-Length");
700 0         0 return;
701             }
702              
703 2         2 $decapitated = 1;
704 2         3 $content_length = $headers->{'Content-Length'};
705 2         3 substr($buf, 0, 4, ''); # 4 = length("$CRLF$CRLF")
706             }
707              
708 2 100 66     21 if ($decapitated && $is_chunked) {
    50 33        
    50 33        
      33        
709             # in order to get the smallest chunk size we need
710             # at least 4 bytes (2xCLRF), and there *MUST* be
711             # last chunk which is at least 5 bytes (0\r\n\r\n)
712             # so we can safely ignore $bufs that have less than 5 bytes
713 1         3 while (length($buf) > ($chunk_size + 4)) {
714 4         4 my $neck_pos = index($buf, ${CRLF});
715 4 50       5 if ($neck_pos > 0) {
716             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
717             # All HTTP/1.1 applications MUST be able to receive and
718             # decode the "chunked" transfer-coding, and MUST ignore
719             # chunk-extension extensions they do not understand.
720 4         6 my ($s) = split(';', substr($buf, 0, $neck_pos), 1);
721 4         5 $chunk_size = hex($s);
722              
723 4 50       10 _register_in_timeline($conn, "parsing chunk of size $chunk_size bytes") if exists $conn->{debug_or_timeline};
724 4 100       5 if ($chunk_size == 0) { # end with, but as soon as we see 0\r\n\r\n we just mark it as done
725 1         2 $conn->{response}{body} = $body;
726 1         2 _set_user_action_state($self, $conn_id);
727 1         1 return;
728             } else {
729 3 50       5 if (length($buf) >= $chunk_size + $neck_pos + 2 + 2) {
730 3         3 $body .= substr($buf, $neck_pos + 2, $chunk_size);
731 3         5 substr($buf, 0, $neck_pos + 2 + $chunk_size + 2, '');
732 3         6 $chunk_size = 0;
733             } else {
734 0         0 last; # dont have enough data in this pass, wait for one more read
735             }
736             }
737             } else {
738 0 0 0     0 last if $neck_pos < 0 && $chunk_size == 0; # in case we couldnt get the chunk size in one go, we must concat until we have something
739 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "error processing chunked data, couldnt find CLRF[index:$neck_pos] in buf");
740 0         0 return;
741             }
742             }
743             } elsif ($decapitated && ($conn->{request}{method} && $conn->{request}{method} eq 'HEAD')) {
744 0         0 _set_user_action_state($self, $conn_id); # We are done reading headers for a HEAD request
745             } elsif ($decapitated && length($buf) >= $content_length) {
746 1 50       4 $conn->{response}{body} = (length($buf) > $content_length ? substr($buf, 0, $content_length) : $buf);
747 1         2 _set_user_action_state($self, $conn_id);
748             }
749             }
750 3         17 });
751              
752 3         3 my $watcher = $watchers->{io};
753 3         20 $watcher->cb($read_cb);
754 3         18 $watcher->events(EV::READ);
755 3 100       12 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
756             }
757              
758             sub _set_user_action_state {
759 3     3   6 my ($self, $conn_id, $error, $strerror) = @_;
760 3   100     9 $error ||= YAHC::Error::NO_ERROR();
761 3   100     9 $strerror ||= '';
762              
763             # this state may be used in critical places,
764             # so it should *NEVER* throw exception
765 3 50       10 my $conn = $self->{connections}{$conn_id}
766             or warn "YAHC: try to _set_user_action_state() for unknown connection $conn_id",
767             return;
768              
769 3         5 $conn->{state} = YAHC::State::USER_ACTION();
770 3 100       10 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
771 3 100       11 yahc_conn_register_error($conn, $error, $strerror) if $error != YAHC::Error::NO_ERROR;
772              
773 3         9 _close_or_cache_socket($self, $conn, $error != YAHC::Error::NO_ERROR);
774 3 100       9 return _set_completed_state($self, $conn_id) unless exists $conn->{has_callback};
775              
776             eval {
777 1 0       2 _register_in_timeline($conn, "call callback%s", $error ? " error=$error, strerror='$strerror'" : '') if exists $conn->{debug_or_timeline};
    50          
778 1         2 my $cb = $self->{callbacks}{$conn_id}{callback};
779 1         3 $cb->($conn, $error, $strerror);
780 1         345 1;
781 1 50       2 } or do {
782 0   0     0 my $error = $@ || 'zombie error';
783 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "Exception in user action callback (close connection): $error");
784 0         0 $self->{state} = YAHC::State::COMPLETED();
785             };
786              
787 1         4 $self->{loop}->now_update;
788              
789 1         1 my $state = $conn->{state};
790 1 50       3 if (yahc_terminal_error($error)) {
791 0 0 0     0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "ignoring changed state due to terminal error")
792             unless $state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED();
793 0         0 _set_completed_state($self, $conn_id, 1);
794             return
795 0         0 }
796              
797 1 50       2 _register_in_timeline($conn, "after invoking callback state is %s", _strstate($state)) if exists $conn->{debug_or_timeline};
798              
799 1 50 33     6 if ($state == YAHC::State::INITIALIZED()) {
    50          
800 0         0 _set_init_state($self, $conn_id);
801             } elsif ($state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED()) {
802 1         3 _set_completed_state($self, $conn_id);
803             } else {
804 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "callback set unsupported state");
805 0         0 _set_completed_state($self, $conn_id);
806             }
807             }
808              
809             sub _set_completed_state {
810 3     3   5 my ($self, $conn_id, $force_socket_close) = @_;
811              
812             # this's a terminal state,
813             # so setting this state should *NEVER* fail
814 3         9 delete $self->{callbacks}{$conn_id};
815 3         19 my $conn = delete $self->{connections}{$conn_id};
816              
817 3 50       8 if (!defined $conn) {
818 0         0 delete($self->{watchers}{$conn_id}), # implicit stop of all watchers
819             return;
820             }
821              
822 3         5 $conn->{state} = YAHC::State::COMPLETED();
823 3 100       8 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
824              
825 3         6 _close_or_cache_socket($self, $conn, $force_socket_close);
826 3         6 delete $self->{watchers}{$conn_id}; # implicit stop of all watchers
827              
828 3 50       9 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
829             }
830              
831             sub _build_socket_and_connect {
832 13     13   14 my ($ip, $port) = @_;
833              
834 13         10 my $sock;
835 13 50       191 socket($sock, PF_INET, SOCK_STREAM, 0)
836             or die sprintf("Failed to construct TCP socket: '%s' errno=%d\n", "$!", $!+0);
837              
838 13 50       33 my $flags = fcntl($sock, F_GETFL, 0) or die sprintf("Failed to get fcntl F_GETFL flag: '%s' errno=%d\n", "$!", $!+0);
839 13 50       31 fcntl($sock, F_SETFL, $flags | O_NONBLOCK) or die sprintf("Failed to set fcntl O_NONBLOCK flag: '%s' errno=%d\n", "$!", $!+0);
840              
841 13 50       48 my $ip_addr = inet_aton($ip) or die "Invalid IP address";
842 13         33 my $addr = pack_sockaddr_in($port, $ip_addr);
843 13 50 33     737 if (!connect($sock, $addr) && $! != EINPROGRESS) {
844 0         0 die sprintf("Failed to connect: '%s' errno=%d\n", "$!", $!+0);
845             }
846              
847 13         22 return $sock;
848             }
849              
850             sub _get_next_target {
851 17     17   24 my $conn = shift;
852 17         27 my ($host, $ip, $port, $scheme) = $conn->{request}{_target}->($conn);
853              
854             # TODO STATE_RESOLVE_DNS
855 17 100 66     125 ($host, $port) = ($1, $2) if !$port && $host =~ m/^(.+):([0-9]+)$/o;
856 17 100 66     70 $ip = $host if !$ip && $host =~ m/^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$/o;
857 17   50     956 $ip ||= inet_ntoa(gethostbyname($host) or die "Failed to resolve $host\n");
      66        
858 17   50     69 $scheme ||= $conn->{request}{scheme} || 'http';
      33        
859 17   66     52 $port ||= $conn->{request}{port} || ($scheme eq 'https' ? 443 : 80);
      66        
860              
861 17         23 $conn->{is_ssl} = $scheme eq 'https';
862 17         17 return @{ $conn->{selected_target} = [ $host, $ip, $port, $scheme ] };
  17         68  
863             }
864              
865             # this and following functions are used in terminal state
866             # so they should *NEVER* fail
867             sub _close_or_cache_socket {
868 20     20   20 my ($self, $conn, $force_close) = @_;
869 20 50       57 my $watchers = $self->{watchers}{$conn->{id}} or return;
870 20 100       38 my $fh = delete $watchers->{_fh} or return;
871 13         24 delete $watchers->{io}; # implicit stop
872              
873 13         13 my $socket_cache = $conn->{request}{_socket_cache};
874              
875             # Stolen from Hijk. Thanks guys!!!
876             # We always close connections for 1.0 because some servers LIE
877             # and say that they're 1.0 but don't close the connection on
878             # us! An example of this. Test::HTTP::Server (used by the
879             # ShardedKV::Storage::Rest tests) is an example of such a
880             # server. In either case we can't cache a connection for a 1.0
881             # server anyway, so BEGONE!
882              
883 13 0 66     30 if ( $force_close
      0        
      33        
      0        
      33        
      0        
      0        
884             || !defined $socket_cache
885             || (($conn->{request}{proto} || '') eq 'HTTP/1.0')
886             || (($conn->{response}{proto} || '') eq 'HTTP/1.0')
887             || (($conn->{response}{head}{Connection} || '') eq 'close'))
888             {
889 13 100       21 _register_in_timeline($conn, "drop socket") if $conn->{debug_or_timeline};
890 13 100       126 close($fh) if ref($fh) eq 'GLOB'; # checking ref to avoid exception
891 13         17 return;
892             }
893              
894 0 0       0 _register_in_timeline($conn, "storing socket for later use") if $conn->{debug_or_timeline};
895 0 0       0 eval { $socket_cache->(YAHC::SocketCache::STORE(), $conn, $fh); 1; } or do {
  0         0  
  0         0  
896 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "Exception in socket_cache callback (ignore error): $@");
897             };
898             }
899              
900             sub yahc_conn_socket_cache_id {
901 0     0 1 0 my $conn = shift;
902 0 0       0 return unless defined $conn;
903 0 0       0 my ($host, undef, $port, $scheme) = @{ $conn->{selected_target} || [] };
  0         0  
904 0 0 0     0 return unless $host && $port && $scheme;
      0        
905             # Use $; so we can use the $socket_cache->{$$, $host, $port} idiom to access the cache.
906 0         0 return join($;, $$, $host, $port, $scheme);
907             }
908              
909             ################################################################################
910             # Timers
911             ################################################################################
912              
913             sub _set_until_state_timer {
914 0     0   0 my ($self, $conn_id, $timeout_name, $state, $error_to_report) = @_;
915              
916 0         0 my $timer_name = $timeout_name . '_timer';
917 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
918 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
919              
920 0         0 delete $watchers->{$timer_name}; # implicit stop
921 0         0 my $timeout = $conn->{request}{$timeout_name};
922 0 0       0 return unless $timeout;
923              
924             my $timer_cb = sub { # there is nothing what can throw exception
925 0 0   0   0 if ($conn->{state} < $state) {
926 0         0 yahc_conn_register_error($conn, $error_to_report, "$timeout_name of %.3fs expired", $timeout);
927 0         0 _set_init_state($self, $conn_id);
928             } else {
929 0 0       0 _register_in_timeline($conn, "delete $timer_name") if exists $conn->{debug_or_timeline};
930             }
931 0         0 };
932              
933 0 0       0 _register_in_timeline($conn, "setting $timeout_name to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
934              
935             # caller should call now_update
936 0         0 my $w = $watchers->{$timer_name} = $self->{loop}->timer_ns($timeout, 0, $timer_cb);
937 0         0 $w->priority(2); # set highest priority
938 0         0 $w->start;
939             }
940              
941             sub _set_lifetime_timer {
942 0     0   0 my ($self, $conn_id) = @_;
943              
944 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
945 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
946              
947 0         0 delete $watchers->{lifetime_timer}; # implicit stop
948 0         0 my $timeout = $conn->{request}{lifetime_timeout};
949 0 0       0 return unless $timeout;
950              
951 0 0       0 _register_in_timeline($conn, "setting lifetime timer to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
952              
953 0         0 $self->{loop}->now_update;
954             my $w = $watchers->{lifetime_timer} = $self->{loop}->timer_ns($timeout, 0, sub {
955             _set_user_action_state($self, $conn_id, YAHC::Error::TIMEOUT() | YAHC::Error::LIFETIME_TIMEOUT() | YAHC::Error::TERMINAL_ERROR(),
956 0 0   0   0 sprintf("lifetime_timeout of %.3fs expired", $timeout)) if $conn->{state} < YAHC::State::COMPLETED();
957 0         0 });
958              
959 0         0 $w->priority(2); # set highest priority
960 0         0 $w->start;
961             }
962              
963             ################################################################################
964             # HTTP functions
965             ################################################################################
966              
967             # copy-paste from Hijk
968             sub _build_http_message {
969 33     33   282 my $conn = shift;
970 33         27 my $request = $conn->{request};
971 33 100 100     129 my $path_and_qs = ($request->{path} || "/") . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
972 33         32 my $has_host = 0;
973              
974             return join(
975             $CRLF,
976             ($request->{method} || "GET") . " $path_and_qs " . ($request->{protocol} || "HTTP/1.1"),
977             defined($request->{body}) ? ("Content-Length: " . length($request->{body})) : (),
978             defined($request->{head}) && @{ $request->{head} } ? (
979             map {
980 9   33     37 $has_host ||= lc($request->{head}[2*$_]) eq 'host';
981 9         67 $request->{head}[2*$_] . ": " . $request->{head}[2*$_+1]
982 7         17 } 0..$#{$request->{head}}/2
983             ) : (),
984             !$has_host ? ("Host: " . $conn->{selected_target}[0]) : (),
985             "",
986 33 100 100     323 defined($request->{body}) ? $request->{body} : ""
    100 100        
    50 100        
    100          
987             );
988             }
989              
990             sub _parse_http_headers {
991 3     3   20 my $conn = shift;
992 3         7 my $proto = substr($_[0], 0, 8);
993 3         5 my $status_code = substr($_[0], 9, 3);
994 3         9 substr($_[0], 0, index($_[0], $CRLF) + 2, ''); # 2 = length($CRLF)
995              
996 3         3 my %headers;
997 3         42 for (split /${CRLF}/o, $_[0]) {
998 19         39 my ($key, $value) = split(/: /, $_, 2);
999 19         29 $headers{$key} = $value;
1000             }
1001              
1002             $conn->{response} = {
1003 3         14 proto => $proto,
1004             status => $status_code,
1005             head => \%headers,
1006             };
1007              
1008 3 100       10 if ($conn->{debug_or_timeline}) {
1009 2         6 my $headers_str = join(' ', map { "$_='$headers{$_}'" } keys %headers);
  13         42  
1010 2         23 _register_in_timeline($conn, "headers parsed: $status_code $proto headers=$headers_str");
1011             }
1012              
1013 3         8 return \%headers;
1014             }
1015              
1016             ################################################################################
1017             # Helpers
1018             ################################################################################
1019              
1020             sub _delete_watchers_but_lifetime_timer {
1021 14     14   16 my ($self, $conn_id) = @_;
1022              
1023 14         17 my $watchers = $self->{watchers}{$conn_id};
1024 14 50 33     53 if (defined $watchers && (my $w = $watchers->{lifetime_timer})) {
1025 0         0 return $self->{watchers}{$conn_id} = { lifetime_timer => $w };
1026             }
1027              
1028 14         37 return $self->{watchers}{$conn_id} = {};
1029             }
1030              
1031             sub _wrap_host {
1032 10     10   12 my ($value) = @_;
1033 10         15 my $ref = ref($value);
1034              
1035 10 100   3   51 return sub { $value } if $ref eq '';
  3         6  
1036 3 100       7 return $value if $ref eq 'CODE';
1037              
1038 2         5 my $idx = 0;
1039 13     13   26 return sub { $value->[$idx++ % @$value]; }
1040 2 50 33     17 if $ref eq 'ARRAY' && @$value > 0;
1041              
1042 0         0 die "YAHC: unsupported host format\n";
1043             }
1044              
1045             sub _wrap_backoff {
1046 0     0   0 my ($value) = @_;
1047 0         0 my $ref = ref($value);
1048              
1049 0 0   0   0 return sub { $value } if $ref eq '';
  0         0  
1050 0 0       0 return $value if $ref eq 'CODE';
1051              
1052 0         0 die "YAHC: unsupported backoff format\n";
1053             }
1054              
1055             sub _wrap_socket_cache {
1056 0     0   0 my ($value) = @_;
1057 0         0 my $ref = ref($value);
1058              
1059 0 0       0 return $value if $ref eq 'CODE';
1060             return sub {
1061 0     0   0 my ($operation, $conn, $sock) = @_;
1062 0 0       0 if ($operation == YAHC::SocketCache::GET()) {
1063 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1064 0         0 return delete $value->{$socket_cache_id};
1065             }
1066              
1067 0 0       0 if ($operation == YAHC::SocketCache::STORE()) {
1068 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1069 0 0       0 close(delete $value->{$socket_cache_id}) if exists $value->{$socket_cache_id};
1070 0         0 $value->{$socket_cache_id} = $sock;
1071 0         0 return;
1072             }
1073 0 0       0 } if $ref eq 'HASH';
1074              
1075 0         0 die "YAHC: unsupported socket_cache format\n";
1076             }
1077              
1078             sub _call_state_callback {
1079 0     0   0 my ($self, $conn, $cb_name) = @_;
1080 0         0 my $cb = $self->{callbacks}{$conn->{id}}{$cb_name};
1081 0 0       0 return unless $cb;
1082              
1083 0 0       0 _register_in_timeline($conn, "calling $cb_name callback") if exists $conn->{debug_or_timeline};
1084              
1085             eval {
1086 0         0 $cb->($conn);
1087 0         0 1;
1088 0 0       0 } or do {
1089 0   0     0 my $error = $@ || 'zombie error';
1090 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "exception in state callback (ignore error): $error");
1091             };
1092              
1093 0         0 $self->{loop}->now_update;
1094             }
1095              
1096             sub _get_safe_wrapper {
1097 18     18   22 my ($self, $conn, $sub) = @_;
1098             return sub { eval {
1099 15         38 $sub->(@_);
1100 15         237 1;
1101 15 50   15   13 } or do {
1102 0   0     0 my $error = $@ || 'zombie error';
1103 0         0 _set_user_action_state($self, $conn->{id}, YAHC::Error::INTERNAL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
1104             "exception in internal callback: $error");
1105 18         53 }};
1106             }
1107              
1108             sub _register_in_timeline {
1109 22     22   40 my ($conn, $format, @arguments) = @_;
1110 22         108 my $event = sprintf("$format", @arguments);
1111 22 50       39 _log_message("YAHC connection '%s': %s", $conn->{id}, $event) if exists $conn->{debug};
1112 22 50 100     64 push @{ $conn->{timeline} ||= [] }, [ $event, $conn->{state}, Time::HiRes::time ] if exists $conn->{keep_timeline};
  22         101  
1113             }
1114              
1115             sub yahc_conn_register_error {
1116 12     12 1 14 my ($conn, $error, $format, @arguments) = @_;
1117 12         43 my $strerror = sprintf("$format", @arguments);
1118 12 50       21 _register_in_timeline($conn, "strerror='$strerror' error=$error") if exists $conn->{debug_or_timeline};
1119 12   100     7 push @{ $conn->{errors} ||= [] }, [ $error, $strerror, [ @{ $conn->{selected_target} } ], Time::HiRes::time, $conn->{attempt} ];
  12         29  
  12         45  
1120             }
1121              
1122             sub _strstate {
1123 15     15   16 my $state = shift;
1124 15 100       35 return 'STATE_INIT' if $state eq YAHC::State::INITIALIZED();
1125 14 50       23 return 'STATE_RESOLVE_DNS' if $state eq YAHC::State::RESOLVE_DNS();
1126 14 100       23 return 'STATE_CONNECTING' if $state eq YAHC::State::CONNECTING();
1127 13 100       26 return 'STATE_CONNECTED' if $state eq YAHC::State::CONNECTED();
1128 10 100       29 return 'STATE_WRITING' if $state eq YAHC::State::WRITING();
1129 9 100       29 return 'STATE_READING' if $state eq YAHC::State::READING();
1130 4 50       8 return 'STATE_SSL_HANDSHAKE'if $state eq YAHC::State::SSL_HANDSHAKE();
1131 4 100       10 return 'STATE_USER_ACTION' if $state eq YAHC::State::USER_ACTION();
1132 2 50       7 return 'STATE_COMPLETED' if $state eq YAHC::State::COMPLETED();
1133 0           return "";
1134             }
1135              
1136             sub _log_message {
1137 0     0     my $format = shift;
1138 0           my $now = Time::HiRes::time;
1139 0           my ($sec, $ms) = split(/[.]/, $now);
1140 0   0       printf STDERR "[%s.%-5d] [$$] $format\n", POSIX::strftime('%F %T', localtime($now)), $ms || 0, @_;
1141             }
1142              
1143 0     0     sub _format_ssl_error { return sprintf("'%s' errno=%d ssl_error='%s' ssl_errno=%d", "$!", 0+$!, "$IO::Socket::SSL::SSL_ERROR", 0+$IO::Socket::SSL::SSL_ERROR); }
1144              
1145             1;
1146              
1147             __END__