File Coverage

blib/lib/YAHC.pm
Criterion Covered Total %
statement 383 637 60.1
branch 192 480 40.0
condition 71 201 35.3
subroutine 52 84 61.9
pod 26 26 100.0
total 724 1428 50.7


line stmt bran cond sub pod time code
1             package YAHC;
2              
3 16     16   307861 use strict;
  16         42  
  16         464  
4 16     16   89 use warnings;
  16         39  
  16         690  
5              
6             our $VERSION = '0.034';
7              
8 16     16   8665 use EV;
  16         31017  
  16         1232  
9 16     16   8055 use Time::HiRes;
  16         18334  
  16         69  
10 16     16   2379 use Exporter 'import';
  16         37  
  16         569  
11 16     16   97 use Scalar::Util qw/weaken/;
  16         34  
  16         1354  
12 16     16   97 use Fcntl qw/F_GETFL F_SETFL O_NONBLOCK/;
  16         32  
  16         666  
13 16     16   7684 use POSIX qw/EINPROGRESS EINTR EAGAIN EWOULDBLOCK strftime/;
  16         90929  
  16         157  
14 16     16   28823 use Socket qw/PF_INET SOCK_STREAM $CRLF SOL_SOCKET SO_ERROR inet_aton inet_ntoa pack_sockaddr_in/;
  16         42837  
  16         3947  
15 16 50   16   253 use constant SSL => $ENV{YAHC_NO_SSL} ? 0 : eval 'use IO::Socket::SSL 1.94 (); 1';
  16     16   37  
  16         972  
  16         11264  
  16         952589  
  16         230  
16 16     16   142 use constant SSL_WANT_READ => SSL ? IO::Socket::SSL::SSL_WANT_READ() : 0;
  16         35  
  16         839  
17 16     16   103 use constant SSL_WANT_WRITE => SSL ? IO::Socket::SSL::SSL_WANT_WRITE() : 0;
  16         35  
  16         5749  
18              
19             sub YAHC::Error::NO_ERROR () { 0 }
20             sub YAHC::Error::REQUEST_TIMEOUT () { 1 << 0 }
21             sub YAHC::Error::CONNECT_TIMEOUT () { 1 << 1 }
22             sub YAHC::Error::DRAIN_TIMEOUT () { 1 << 2 }
23             sub YAHC::Error::LIFETIME_TIMEOUT () { 1 << 3 }
24             sub YAHC::Error::TIMEOUT () { 1 << 8 }
25             sub YAHC::Error::RETRY_LIMIT () { 1 << 9 }
26              
27             sub YAHC::Error::CONNECT_ERROR () { 1 << 10 }
28             sub YAHC::Error::READ_ERROR () { 1 << 11 }
29             sub YAHC::Error::WRITE_ERROR () { 1 << 12 }
30             sub YAHC::Error::REQUEST_ERROR () { 1 << 13 }
31             sub YAHC::Error::RESPONSE_ERROR () { 1 << 14 }
32             sub YAHC::Error::CALLBACK_ERROR () { 1 << 15 }
33             sub YAHC::Error::SSL_ERROR () { 1 << 16 }
34             sub YAHC::Error::TERMINAL_ERROR () { 1 << 30 }
35             sub YAHC::Error::INTERNAL_ERROR () { 1 << 31 }
36              
37             sub YAHC::State::INITIALIZED () { 0 }
38             sub YAHC::State::RESOLVE_DNS () { 5 }
39             sub YAHC::State::CONNECTING () { 10 }
40             sub YAHC::State::CONNECTED () { 15 }
41             sub YAHC::State::SSL_HANDSHAKE () { 20 }
42             sub YAHC::State::WRITING () { 25 }
43             sub YAHC::State::READING () { 30 }
44             sub YAHC::State::USER_ACTION () { 35 }
45             sub YAHC::State::COMPLETED () { 100 } # terminal state
46              
47             sub YAHC::SocketCache::GET () { 1 }
48             sub YAHC::SocketCache::STORE () { 2 }
49              
50             use constant {
51             # TCP_READ_CHUNK should *NOT* be lower than 16KB because of SSL things.
52             # https://metacpan.org/pod/distribution/IO-Socket-SSL/lib/IO/Socket/SSL.pod
53             # Another way might be if you try to sysread at least 16kByte all the time.
54             # 16kByte is the maximum size of an SSL frame and because sysread returns
55             # data from only a single SSL frame you can guarantee that there are no
56             # pending data.
57 16         124413 TCP_READ_CHUNK => 131072,
58             CALLBACKS => [ qw/init_callback connecting_callback connected_callback
59             writing_callback reading_callback callback/ ],
60 16     16   111 };
  16         35  
61              
62             our @EXPORT_OK = qw/
63             yahc_retry_conn
64             yahc_reinit_conn
65             yahc_terminal_error
66             yahc_conn_last_error
67             yahc_conn_id
68             yahc_conn_url
69             yahc_conn_target
70             yahc_conn_state
71             yahc_conn_errors
72             yahc_conn_timeline
73             yahc_conn_request
74             yahc_conn_response
75             yahc_conn_attempt
76             yahc_conn_attempts_left
77             yahc_conn_socket_cache_id
78             yahc_conn_register_error
79             yahc_conn_user_data
80             /;
81              
82             our %EXPORT_TAGS = (all => \@EXPORT_OK);
83             my $LAST_CONNECTION_ID;
84              
85             ################################################################################
86             # User facing functons
87             ################################################################################
88              
89             sub new {
90 9     9 1 2867 my ($class, $args) = @_;
91 9 100       75 $LAST_CONNECTION_ID = $$ * 1000 unless defined $LAST_CONNECTION_ID;
92              
93 9 50 66     58 die 'YAHC: ->new() expect args to be a hashref' if defined $args and ref($args) ne 'HASH';
94 9 50       41 die 'YAHC: please do `my ($yahc, $yahc_storage) = YAHC::new()` and keep both these objects in the same scope' unless wantarray;
95              
96             # wrapping target selection here allows all client share same list
97             # and more importantly to share index within the list
98 9 100       67 $args->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
99 9 50       186 $args->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
100 9 50       35 $args->{_socket_cache} = _wrap_socket_cache(delete $args->{socket_cache}) if $args->{socket_cache};
101              
102 9         21 my %storage;
103             my $self = bless {
104             loop => new EV::Loop,
105             pid => $$, # store pid to detect forks
106             storage => \%storage,
107             debug => delete $args->{debug} || $ENV{YAHC_DEBUG} || 0,
108 9   50     426 keep_timeline => delete $args->{keep_timeline} || $ENV{YAHC_TIMELINE} || 0,
      50        
109             pool_args => $args,
110             }, $class;
111              
112             # this's a radical way of avoiding circular references.
113             # let's see how it plays out in practise.
114 9         122 weaken($self->{storage});
115 9         106 weaken($self->{$_} = $storage{$_} = {}) for qw/watchers callbacks connections/;
116              
117 9 100       35 if (delete $args->{account_for_signals}) {
118 1 50       5 _log_message('YAHC: enable account_for_signals logic') if $self->{debug};
119 1     1   14 my $sigcheck = $self->{watchers}{_sigcheck} = $self->{loop}->check(sub {});
120 1         26 $sigcheck->keepalive(0);
121             }
122              
123 9         42 return $self, \%storage;
124             }
125              
126             sub request {
127 10     10 1 120 my ($self, @args) = @_;
128 10 50       81 die 'YAHC: new_request() expects arguments' unless @args;
129 10 50       38 die 'YAHC: storage object is destroyed' unless $self->{storage};
130              
131 10 50       57 my ($conn_id, $request) = (@args == 1 ? ('connection_' . $LAST_CONNECTION_ID++, $args[0]) : @args);
132             die "YAHC: Connection with name '$conn_id' already exists\n"
133 10 50       42 if exists $self->{connections}{$conn_id};
134              
135 10         21 my $pool_args = $self->{pool_args};
136 10 50 0     31 do { $request->{$_} ||= $pool_args->{$_} if $pool_args->{$_} } foreach (qw/host port scheme head
  80         181  
137             request_timeout connect_timeout
138             drain_timeout lifetime_timeout/);
139 10 100       33 if ($request->{host}) {
    50          
140 8         48 $request->{_target} = _wrap_host($request->{host});
141             } elsif ($pool_args->{_target}) {
142 2         3 $request->{_target} = $pool_args->{_target};
143             } else {
144 0         0 die "YAHC: host must be defined in request() or in new()\n";
145             }
146              
147 10 50       52 if ($request->{backoff_delay}) {
    50          
148 0         0 $request->{_backoff} = _wrap_backoff($request->{backoff_delay});
149             } elsif ($pool_args->{_backoff}) {
150 0         0 $request->{_backoff} = $pool_args->{_backoff};
151             }
152              
153 10 50       51 if ($request->{socket_cache}) {
    50          
154 0         0 $request->{_socket_cache} = _wrap_socket_cache($request->{socket_cache});
155             } elsif ($pool_args->{_socket_cache}) {
156 0         0 $request->{_socket_cache} = $pool_args->{_socket_cache};
157             }
158              
159 10   50     77 my $scheme = $request->{scheme} ||= 'http';
160 10   33     80 my $debug = delete $request->{debug} || $self->{debug};
161 10   66     45 my $keep_timeline = delete $request->{keep_timeline} || $self->{keep_timeline};
162 10         25 my $user_data = delete $request->{user_data};
163              
164             my $conn = {
165             id => $conn_id,
166             request => $request,
167             response => { status => 0 },
168             attempt => 0,
169 10 50 100     239 retries => $request->{retries} || 0,
    100 66        
    100          
    50          
170             state => YAHC::State::INITIALIZED(),
171             selected_target => [],
172             ($debug ? (debug => $debug) : ()),
173             ($keep_timeline ? (keep_timeline => $keep_timeline) : ()),
174             ($debug || $keep_timeline ? (debug_or_timeline => 1) : ()),
175             (defined $user_data ? (user_data => $user_data) : ()),
176             pid => $$,
177             };
178              
179 10         26 my %callbacks;
180 10         22 foreach (@{ CALLBACKS() }) {
  10         33  
181 60 100       151 next unless exists $request->{$_};
182 2         7 my $cb = $callbacks{$_} = delete $request->{$_};
183 2         7 $conn->{"has_$_"} = !!$cb;
184             }
185              
186 10         34 $self->{watchers}{$conn_id} = {};
187 10         38 $self->{callbacks}{$conn_id} = \%callbacks;
188 10         21 $self->{connections}{$conn_id} = $conn;
189              
190 10 50       35 _set_lifetime_timer($self, $conn_id) if $request->{lifetime_timeout};
191              
192 10 100       49 return $conn if $request->{_test}; # for testing purposes
193 2         11 _set_init_state($self, $conn_id);
194              
195             # if user fire new request in a callback we need to update stop_condition
196 2         4 my $stop_condition = $self->{stop_condition};
197 2 0 33     11 if ($stop_condition && $stop_condition->{all}) {
198 0         0 $stop_condition->{connections}{$conn_id} = 1;
199             }
200              
201 2         7 return $conn;
202             }
203              
204             sub drop {
205 0     0 1 0 my ($self, $c, $force_socket_close) = @_;
206 0 0       0 my $conn_id = ref($c) eq 'HASH' ? $c->{id} : $c;
207 0 0       0 my $conn = $self->{connections}{$conn_id} or return;
208 0 0       0 _register_in_timeline($conn, "dropping connection from pool") if exists $conn->{debug_or_timeline};
209 0 0       0 _set_completed_state($self, $conn_id, $force_socket_close) unless $conn->{state} == YAHC::State::COMPLETED();
210 0         0 return $conn;
211             }
212              
213 6     6 1 93 sub run { shift->_run(0, @_) }
214 0     0 1 0 sub run_once { shift->_run(EV::RUN_ONCE) }
215 0     0 1 0 sub run_tick { shift->_run(EV::RUN_NOWAIT) }
216 3     3 1 33 sub is_running { !!shift->{loop}->depth }
217 5     5 1 2314 sub loop { shift->{loop} }
218              
219             sub break {
220 3     3 1 32 my ($self, $reason) = @_;
221 3 50       17 return unless $self->is_running;
222 3 50 0     22 _log_message('YAHC: pid %d breaking event loop because %s', $$, ($reason || 'no reason')) if $self->{debug};
223 3         43 $self->{loop}->break(EV::BREAK_ONE)
224             }
225              
226             ################################################################################
227             # Routines to manipulate connections (also user facing)
228             ################################################################################
229              
230             sub yahc_terminal_error {
231 1 50   1 1 6 return (($_[0] & YAHC::Error::TERMINAL_ERROR()) == YAHC::Error::TERMINAL_ERROR()) ? 1 : 0;
232             }
233              
234             sub yahc_reinit_conn {
235 0     0 1 0 my ($conn, $args) = @_;
236             die "YAHC: cannot reinit completed connection\n"
237 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
238              
239 0         0 $conn->{attempt} = 0;
240 0         0 $conn->{state} = YAHC::State::INITIALIZED();
241 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
242              
243 0         0 my $request = $conn->{request};
244 0 0       0 $request->{_target} = _wrap_host(delete $args->{host}) if $args->{host};
245 0 0       0 $request->{_backoff} = _wrap_backoff(delete $args->{backoff_delay}) if $args->{backoff_delay};
246 0 0       0 do { $request->{$_} = $args->{$_} if $args->{$_} } foreach (keys %$args);
  0         0  
247             }
248              
249             sub yahc_retry_conn {
250 0     0 1 0 my ($conn, $args) = @_;
251             die "YAHC: cannot retry completed connection\n"
252 0 0       0 if $conn->{state} >= YAHC::State::COMPLETED();
253 0 0       0 return unless yahc_conn_attempts_left($conn) > 0;
254              
255 0         0 $conn->{state} = YAHC::State::INITIALIZED();
256 0 0 0     0 return unless defined $args && ref($args) eq 'HASH';
257              
258             $conn->{request}{_backoff} = _wrap_backoff($args->{backoff_delay})
259 0 0       0 if $args->{backoff_delay};
260             }
261              
262             sub yahc_conn_last_error {
263 0     0 1 0 my $conn = shift;
264 0 0 0     0 return unless $conn->{errors} && @{ $conn->{errors} };
  0         0  
265 0 0       0 return wantarray ? @{ $conn->{errors}[-1] } : $conn->{errors}[-1];
  0         0  
266             }
267              
268 0     0 1 0 sub yahc_conn_id { $_[0]->{id} }
269 0     0 1 0 sub yahc_conn_state { $_[0]->{state} }
270 0     0 1 0 sub yahc_conn_errors { $_[0]->{errors} }
271 0     0 1 0 sub yahc_conn_timeline { $_[0]->{timeline} }
272 0     0 1 0 sub yahc_conn_request { $_[0]->{request} }
273 0     0 1 0 sub yahc_conn_response { $_[0]->{response} }
274 4     4 1 22 sub yahc_conn_attempt { $_[0]->{attempt} }
275 4 100   4 1 34 sub yahc_conn_attempts_left { $_[0]->{attempt} > $_[0]->{retries} ? 0 : $_[0]->{retries} - $_[0]->{attempt} + 1 }
276              
277             sub yahc_conn_target {
278 4     4 1 15 my $target = $_[0]->{selected_target};
279 4 50 50     12 return unless $target && scalar @{ $target };
  4         14  
280 4         8 my ($host, $ip, $port) = @{ $target };
  4         8  
281 4 50 33     33 return ($host || $ip) . ($port ne '80' && $port ne '443' ? ":$port" : '');
      33        
282             }
283              
284             sub yahc_conn_url {
285 0     0 1 0 my $target = $_[0]->{selected_target};
286 0         0 my $request = $_[0]->{request};
287 0 0 0     0 return unless $target && @{ $target };
  0         0  
288              
289 0         0 my ($host, $ip, $port, $scheme) = @{ $target };
  0         0  
290             return "$scheme://"
291             . ($host || $ip)
292             . ($port ne '80' && $port ne '443' ? ":$port" : '')
293             . ($request->{path} || "/")
294 0 0 0     0 . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
    0 0        
      0        
295             }
296              
297             sub yahc_conn_user_data {
298 0     0 1 0 my $conn = shift;
299 0 0       0 $conn->{user_data} = $_[0] if @_;
300 0         0 return $conn->{user_data};
301             }
302              
303             ################################################################################
304             # Internals
305             ################################################################################
306              
307             sub _run {
308 6     6   22 my ($self, $how, $until_state, @cs) = @_;
309 6 50       28 die "YAHC: storage object is destroyed\n" unless $self->{storage};
310 6 50       55 die "YAHC: reentering run\n" if $self->{loop}->depth;
311              
312 6 50       68 if ($self->{pid} != $$) {
313 0 0       0 _log_message('YAHC: reinitializing event loop after forking') if $self->{debug};
314 0         0 $self->{pid} = $$;
315 0         0 $self->{loop}->loop_fork;
316              
317 0         0 my $active_connections = grep { $$ != $_->{pid} } values %{ $self->{connections} };
  0         0  
  0         0  
318 0 0       0 warn "YAHC has $active_connections active connections after a fork, consider dropping them!"
319             if $active_connections;
320             }
321              
322 6 100       41 if (defined $until_state) {
323 2         8 my $until_state_str = _strstate($until_state);
324 2 50       17 die "YAHC: unknown until_state $until_state\n" if $until_state_str =~ m/unknown/;
325              
326 2         6 my $is_all = (@cs == 0);
327 0         0 my @connections = $is_all ? values %{ $self->{connections} }
328 2 50       24 : map { $self->{connections}{$_} || () }
329 2 50       8 map { ref($_) eq 'HASH' ? $_->{id} : $_ } @cs;
  2 50       13  
330              
331             $self->{stop_condition} = {
332             all => $is_all,
333             expected_state => $until_state,
334 2         8 connections => { map { $_->{id} => 1 } grep { $_->{state} < $until_state } @connections },
  2         19  
  2         7  
335             };
336             } else {
337 4         11 delete $self->{stop_condition};
338             }
339              
340 6         19 my $loop = $self->{loop};
341 6         45 $loop->now_update;
342              
343 6 50       22 if ($self->{debug}) {
344 0         0 my $iterations = $loop->iteration;
345 0 0       0 _log_message('YAHC: pid %d entering event loop%s', $$, ($until_state ? " with until state " . _strstate($until_state) : ''));
346 0   0     0 $loop->run($how || 0);
347 0         0 _log_message('YAHC: pid %d exited from event loop after %d iterations', $$, $loop->iteration - $iterations);
348             } else {
349 6   50     2000195 $loop->run($how || 0);
350             }
351             }
352              
353             sub _check_stop_condition {
354 2     2   5 my ($self, $conn) = @_;
355 2         5 my $stop_condition = $self->{stop_condition};
356 2 50 33     18 return if !$stop_condition || $conn->{state} < $stop_condition->{expected_state};
357              
358 2         17 delete $stop_condition->{connections}{$conn->{id}};
359 2         5 my $awaiting_connections = scalar keys %{ $stop_condition->{connections} };
  2         9  
360 2         5 my $expected_state = $stop_condition->{expected_state};
361              
362 2 50       13 if ($awaiting_connections == 0) {
363 2         12 $self->break(sprintf("until state '%s' is reached", _strstate($expected_state)));
364 2         7 return 1;
365             }
366              
367             _log_message("YAHC: still have %d connections awaiting state '%s'",
368 0 0       0 $awaiting_connections, _strstate($expected_state)) if $self->{debug};
369             }
370              
371             ################################################################################
372             # IO routines
373             ################################################################################
374              
375             sub _set_init_state {
376 14     14   36 my ($self, $conn_id) = @_;
377              
378 14 50       40 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
379              
380 14         47 $conn->{response} = { status => 0 };
381 14         33 $conn->{state} = YAHC::State::INITIALIZED();
382 14 100       60 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
383 14 50       29 _call_state_callback($self, $conn, 'init_callback') if exists $conn->{has_init_callback};
384              
385 14         43 _close_or_cache_socket($self, $conn, 1); # force connection close if any (likely not)
386 14         34 my $watchers = _delete_watchers_but_lifetime_timer($self, $conn_id); # implicit stop of all watchers
387              
388             return _set_user_action_state($self, $conn_id, YAHC::Error::RETRY_LIMIT(), "retries limit reached")
389 14 100       42 if $conn->{attempt} > $conn->{retries};
390              
391             # don't move attempt increment before boundary check !!!
392             # otherwise we can get off-by-one error in yahc_conn_attempts_left
393 13         23 my $attempt = ++$conn->{attempt};
394 13 50 66     50 if ($attempt > 1 && exists $conn->{request}{_backoff}) {
395 0         0 my $backoff_delay = eval { $conn->{request}{_backoff}->($conn) };
  0         0  
396 0 0       0 if (my $error = $@) {
397 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(),
398             "exception in backoff callback (close connection): $error");
399             };
400              
401 0 0       0 if ($backoff_delay) {
402 0         0 $self->{loop}->now_update;
403 0 0       0 _register_in_timeline($conn, "setting backoff_timer to %.3fs", $backoff_delay) if exists $conn->{debug_or_timeline};
404             $watchers->{backoff_timer} = $self->{loop}->timer($backoff_delay, 0, _get_safe_wrapper($self, $conn, sub {
405 0 0   0   0 _register_in_timeline($conn, "backoff timer of %.3fs expired, time for new attempt", $backoff_delay) if exists $conn->{debug_or_timeline};
406 0 0       0 _set_init_state($self, $conn_id) if _init_helper($self, $conn_id) == 1;
407 0         0 }));
408 0         0 return;
409             }
410             }
411              
412 13 50       33 if (_init_helper($self, $conn_id) == 1) {
413             _register_in_timeline($conn, "do attempt on next EV iteration, (iteration=%d)", $self->{loop}->iteration)
414 0 0       0 if exists $conn->{debug_or_timeline};
415              
416             # from EV docs:
417             # idle watcher call the callback when there are no other pending
418             # watchers of the same or higher priority. The idle watchers are
419             # being called once per event loop iteration - until stopped.
420             #
421             # so, what we do is we start idle watcher with priority 1 which is
422             # higher then 0 used by all IO watchers. As result, the callback
423             # will be called at the end of this iteration. And others if neccessary.
424              
425             my $retry_watcher = $watchers->{retry} ||= $self->{loop}->idle_ns(_get_safe_wrapper($self, $conn, sub {
426 0     0   0 shift->stop; # stop this watcher, _set_init_state will start if neccessary
427             _register_in_timeline($conn, "time for new attempt (iteration=%d)", $self->{loop}->iteration)
428 0 0       0 if exists $conn->{debug_or_timeline};
429 0         0 _set_init_state($self, $conn_id)
430 0   0     0 }));
431              
432 0         0 $retry_watcher->priority(1);
433 0         0 $retry_watcher->start;
434             };
435             }
436              
437             sub _init_helper {
438 13     13   25 my ($self, $conn_id) = @_;
439              
440 13 50       41 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
441 13 50       32 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
442              
443 13         21 my $request = $conn->{request};
444              
445 13         53 $self->{loop}->now_update; # update time for timers
446             _set_until_state_timer($self, $conn_id, 'request_timeout', YAHC::State::USER_ACTION(), YAHC::Error::TIMEOUT() | YAHC::Error::REQUEST_TIMEOUT())
447 13 50       28 if $request->{request_timeout};
448             _set_until_state_timer($self, $conn_id, 'connect_timeout', YAHC::State::CONNECTED(), YAHC::Error::TIMEOUT() | YAHC::Error::CONNECT_TIMEOUT())
449 13 50       29 if $request->{connect_timeout};
450             _set_until_state_timer($self, $conn_id, 'drain_timeout', YAHC::State::READING(), YAHC::Error::TIMEOUT() | YAHC::Error::DRAIN_TIMEOUT())
451 13 50       29 if $request->{drain_timeout};
452              
453             eval {
454 13         29 my ($host, $ip, $port, $scheme) = _get_next_target($conn);
455             _register_in_timeline($conn, "Target $scheme://$host:$port ($ip:$port) chosen for attempt #%d", $conn->{attempt})
456 13 100       42 if exists $conn->{debug_or_timeline};
457              
458 13         18 my $sock;
459 13 50       33 if (my $socket_cache = $request->{_socket_cache}) {
460 0         0 $sock = $socket_cache->(YAHC::SocketCache::GET(), $conn);
461             }
462              
463 13 50       33 if (defined $sock) {
464 0 0       0 _register_in_timeline($conn, "reuse socket") if $conn->{debug_or_timeline};
465 0         0 $watchers->{_fh} = $sock;
466 0     0   0 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, sub {});
467 0         0 _set_write_state($self, $conn_id);
468             } else {
469 13 100       32 _register_in_timeline($conn, "build new socket") if $conn->{debug_or_timeline};
470 13         30 $sock = _build_socket_and_connect($ip, $port);
471 13         35 _set_connecting_state($self, $conn_id, $sock);
472             }
473              
474 13         46 1;
475 13 50       28 } or do {
476 0   0     0 my $error = $@ || 'zombie error';
477 0         0 $error =~ s/\s+$//o;
478 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "connection attempt %d failed: %s", $conn->{attempt}, $error);
479 0         0 return 1;
480             };
481              
482 13         43 return 0;
483             }
484              
485             sub _set_connecting_state {
486 13     13   48 my ($self, $conn_id, $sock) = @_;
487              
488 13 50       58 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
489 13 50       33 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
490              
491 13         22 $conn->{state} = YAHC::State::CONNECTING();
492 13 100       30 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
493 13 50       33 _call_state_callback($self, $conn, 'connecting_callback') if exists $conn->{has_connecting_callback};
494              
495             my $connecting_cb = _get_safe_wrapper($self, $conn, sub {
496 12     12   58 my $sockopt = getsockopt($sock, SOL_SOCKET, SO_ERROR);
497 12 50       30 if (!$sockopt) {
498 0         0 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to do getsockopt(): '%s' errno=%d", "$!", $!+0);
499 0         0 _set_init_state($self, $conn_id);
500 0         0 return;
501             }
502              
503 12 100       59 if (my $err = unpack("L", $sockopt)) {
504 11   50     211 my $strerror = POSIX::strerror($err) || '';
505 11         1412 yahc_conn_register_error($conn, YAHC::Error::CONNECT_ERROR(), "Failed to connect: $strerror");
506 11         32 _set_init_state($self, $conn_id);
507 11         22 return;
508             }
509              
510 1         9 _set_connected_state($self, $conn_id);
511 13         81 });
512              
513 13         35 $watchers->{_fh} = $sock;
514 13         82 $watchers->{io} = $self->{loop}->io($sock, EV::WRITE, $connecting_cb);
515 13 50       50 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
516             }
517              
518             sub _set_connected_state {
519 1     1   4 my ($self, $conn_id) = @_;
520              
521 1 50       4 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
522 1 50       3 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
523              
524 1         3 $conn->{state} = YAHC::State::CONNECTED();
525 1 50       6 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
526 1 50       3 _call_state_callback($self, $conn, 'connected_callback') if exists $conn->{has_connected_callback};
527              
528             my $connected_cb = _get_safe_wrapper($self, $conn, sub {
529 0 0   0   0 if ($conn->{is_ssl}) {
530 0         0 _set_ssl_handshake_state($self, $conn_id);
531             } else {
532 0         0 _set_write_state($self, $conn_id);
533             }
534 1         11 });
535              
536             #$watcher->events(EV::WRITE);
537 1         20 $watchers->{io}->cb($connected_cb);
538 1 50       5 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
539             }
540              
541             sub _set_ssl_handshake_state {
542 0     0   0 my ($self, $conn_id) = @_;
543              
544 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
545 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
546              
547 0         0 $conn->{state} = YAHC::State::SSL_HANDSHAKE();
548 0 0       0 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
549             #_call_state_callback($self, $conn, 'writing_callback') if $conn->{has_writing_callback}; TODO
550              
551 0         0 my $fh = $watchers->{_fh};
552 0         0 my $hostname = $conn->{selected_target}[0];
553              
554             my %options = (
555             SSL_verifycn_name => $hostname,
556             IO::Socket::SSL->can_client_sni ? ( SSL_hostname => $hostname ) : (),
557 0 0       0 %{ $conn->{request}{ssl_options} || {} },
  0 0       0  
558             );
559              
560 0 0       0 if ($conn->{debug_or_timeline}) {
561 0   0     0 my $options_msg = join(', ', map { "$_=" . ($options{$_} || '') } keys %options);
  0         0  
562 0         0 _register_in_timeline($conn, "start SSL handshake with options: $options_msg");
563             }
564              
565 0 0       0 if (!IO::Socket::SSL->start_SSL($fh, %options, SSL_startHandshake => 0)) {
566 0         0 return _set_user_action_state($self, $conn_id, YAHC::Error::SSL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
567             sprintf("failed to start SSL session: %s", _format_ssl_error()));
568             }
569              
570             my $handshake_cb = _get_safe_wrapper($self, $conn, sub {
571 0     0   0 my $w = shift;
572 0 0       0 if ($fh->connect_SSL) {
573 0 0       0 _register_in_timeline($conn, "SSL handshake successfully completed") if exists $conn->{debug_or_timeline};
574 0         0 return _set_write_state($self, $conn_id);
575             }
576              
577 0 0       0 if ($! == EWOULDBLOCK) {
578 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
579 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
580             }
581              
582 0         0 yahc_conn_register_error($conn, YAHC::Error::SSL_ERROR(), "Failed to complete SSL handshake: %s", _format_ssl_error());
583 0         0 _set_init_state($self, $conn_id);
584 0         0 });
585              
586 0         0 my $watcher = $watchers->{io};
587 0         0 $watcher->cb($handshake_cb);
588 0         0 $watcher->events(EV::WRITE | EV::READ);
589 0 0       0 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
590             }
591              
592             sub _set_write_state {
593 1     1   14 my ($self, $conn_id) = @_;
594              
595 1 50       5 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
596 1 50       8 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
597              
598 1         3 $conn->{state} = YAHC::State::WRITING();
599 1 50       8 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
600 1 50       4 _call_state_callback($self, $conn, 'writing_callback') if exists $conn->{has_writing_callback};
601              
602 1         2 my $fh = $watchers->{_fh};
603 1         4 my $buf = _build_http_message($conn);
604 1         3 my $length = length($buf);
605              
606 1 50       5 warn "YAHC: HTTP message has UTF8 flag set! This will result in poor performance, see docs for details!"
607             if utf8::is_utf8($buf);
608              
609             _register_in_timeline($conn, "writing body of %d bytes\n%s", $length, ($length > 1024? substr($buf, 0, 1024) . '... (cut to 1024 bytes)' : $buf))
610 1 50       8 if exists $conn->{debug_or_timeline};
    50          
611              
612             my $write_cb = _get_safe_wrapper($self, $conn, sub {
613 1     1   2 my $w = shift;
614 1         42 my $wlen = syswrite($fh, $buf, $length);
615              
616 1 50       6 if (!defined $wlen) {
    50          
617 0 0       0 if ($conn->{is_ssl}) {
618 0 0       0 if ($! == EWOULDBLOCK) {
619 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
620 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
621             }
622              
623 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to send HTTPS data: %s", _format_ssl_error());
624 0         0 return _set_init_state($self, $conn_id);
625             }
626              
627 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
628 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "Failed to send HTTP data: '%s' errno=%d", "$!", $!+0);
629 0         0 _set_init_state($self, $conn_id);
630             } elsif ($wlen == 0) {
631 0         0 yahc_conn_register_error($conn, YAHC::Error::WRITE_ERROR(), "syswrite returned 0");
632 0         0 _set_init_state($self, $conn_id);
633             } else {
634 1         4 substr($buf, 0, $wlen, '');
635 1         38 $length -= $wlen;
636 1 50       8 _set_read_state($self, $conn_id) if $length == 0;
637             }
638 1         12 });
639              
640 1         2 my $watcher = $watchers->{io};
641 1         13 $watcher->cb($write_cb);
642 1         10 $watcher->events(EV::WRITE);
643 1 50       8 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
644             }
645              
646             sub _set_read_state {
647 3     3   25 my ($self, $conn_id) = @_;
648              
649 3 50       15 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
650 3 50       17 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
651              
652 3         8 $conn->{state} = YAHC::State::READING();
653 3 50       19 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
654 3 50       13 _call_state_callback($self, $conn, 'reading_callback') if exists $conn->{has_reading_callback};
655              
656 3         7 my $buf = '';
657 3         8 my $neck_pos = 0;
658 3         7 my $decapitated = 0;
659 3         6 my $content_length = 0;
660 3         6 my $no_content_length = 0;
661 3         7 my $is_chunked = 0;
662 3         6 my $fh = $watchers->{_fh};
663 3         6 my $chunk_size = 0;
664 3         9 my $body = ''; # used for chunked encoding
665              
666             my $read_cb = _get_safe_wrapper($self, $conn, sub {
667 2     2   5 my $w = shift;
668 2         40 my $rlen = sysread($fh, my $b = '', TCP_READ_CHUNK);
669 2 50       15 if (!defined $rlen) {
    50          
670 0 0       0 if ($conn->{is_ssl}) {
671 0 0       0 if ($! == EWOULDBLOCK) {
672 0 0       0 return $w->events(EV::READ) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_READ;
673 0 0       0 return $w->events(EV::WRITE) if $IO::Socket::SSL::SSL_ERROR == SSL_WANT_WRITE;
674             }
675              
676 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR() | YAHC::Error::SSL_ERROR(), "Failed to receive HTTPS data: %s", _format_ssl_error());
677 0         0 return _set_init_state($self, $conn_id);
678             }
679              
680 0 0 0     0 return if $! == EWOULDBLOCK || $! == EINTR || $! == EAGAIN;
      0        
681 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Failed to receive HTTP data: '%s' errno=%d", "$!", $!+0);
682 0         0 _set_init_state($self, $conn_id);
683             } elsif ($rlen == 0) {
684 0 0       0 if ($no_content_length) {
685 0         0 $conn->{response}{body} = $buf.$b;
686 0         0 _set_user_action_state($self, $conn_id);
687 0         0 return;
688             }
689              
690 0 0       0 if ($content_length > 0) {
691 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF, expect %d bytes more", $content_length - length($buf));
692             } else {
693 0         0 yahc_conn_register_error($conn, YAHC::Error::READ_ERROR(), "Premature EOF");
694             }
695 0         0 _set_init_state($self, $conn_id);
696             } else {
697 2         19 $buf .= $b;
698 2 50 33     42 if (!$decapitated && ($neck_pos = index($buf, "${CRLF}${CRLF}")) > 0) {
699 2         28 my $headers = _parse_http_headers($conn, substr($buf, 0, $neck_pos, '')); # $headers are always defined but might be empty, maybe fix later
700 2   100     15 $is_chunked = ($headers->{'Transfer-Encoding'} || '') eq 'chunked';
701              
702 2 50 66     12 if ($is_chunked && exists $headers->{'Trailer'}) {
703 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "Chunked HTTP response with Trailer header");
704 0         0 return;
705             }
706              
707 2         5 $decapitated = 1;
708 2         5 substr($buf, 0, 4, ''); # 4 = length("$CRLF$CRLF")
709              
710             # Attempt to correctly determine content length, see RFC 2616 section 4.4
711 2 50 50     42 if (($conn->{request}->{method} || '') eq 'HEAD' || $conn->{response}->{status} =~ /^(1..|204|304)$/) { # 1.
    100 33        
    50          
712 0         0 $content_length = 0;
713             } elsif ($is_chunked) { # 2. (sort of, should actually also care for non-chunked transfer encodings)
714             # No content length, use chunked transfer encoding instead
715             } elsif (exists $headers->{'Content-Length'}) { # 3.
716 1         3 $content_length = $headers->{'Content-Length'};
717 1 50       15 if ($content_length !~ m#\A[0-9]+\z#) {
718 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "Not-numeric Content-Length received on the response");
719 0         0 return;
720             }
721             } else {
722             # byteranges (point .4 on the spec) not supported
723 0         0 $no_content_length = 1;
724             }
725             }
726              
727 2 100 66     25 if ($decapitated && $is_chunked) {
    50 33        
      33        
728             # in order to get the smallest chunk size we need
729             # at least 4 bytes (2xCLRF), and there *MUST* be
730             # last chunk which is at least 5 bytes (0\r\n\r\n)
731             # so we can safely ignore $bufs that have less than 5 bytes
732 1         5 while (length($buf) > ($chunk_size + 4)) {
733 4         6 my $neck_pos = index($buf, ${CRLF});
734 4 50       8 if ($neck_pos > 0) {
735             # http://www.w3.org/Protocols/rfc2616/rfc2616-sec3.html
736             # All HTTP/1.1 applications MUST be able to receive and
737             # decode the "chunked" transfer-coding, and MUST ignore
738             # chunk-extension extensions they do not understand.
739 4         10 my ($s) = split(';', substr($buf, 0, $neck_pos), 1);
740 4         31 $chunk_size = hex($s);
741              
742 4 50       23 _register_in_timeline($conn, "parsing chunk of size $chunk_size bytes") if exists $conn->{debug_or_timeline};
743 4 100       10 if ($chunk_size == 0) { # end with, but as soon as we see 0\r\n\r\n we just mark it as done
744 1         3 $conn->{response}{body} = $body;
745 1         4 _set_user_action_state($self, $conn_id);
746 1         2 return;
747             } else {
748 3 50       8 if (length($buf) >= $chunk_size + $neck_pos + 2 + 2) {
749 3         7 $body .= substr($buf, $neck_pos + 2, $chunk_size);
750 3         4 substr($buf, 0, $neck_pos + 2 + $chunk_size + 2, '');
751 3         9 $chunk_size = 0;
752             } else {
753 0         0 last; # dont have enough data in this pass, wait for one more read
754             }
755             }
756             } else {
757 0 0 0     0 last if $neck_pos < 0 && $chunk_size == 0; # in case we couldnt get the chunk size in one go, we must concat until we have something
758 0         0 _set_user_action_state($self, $conn_id, YAHC::Error::RESPONSE_ERROR(), "error processing chunked data, couldnt find CLRF[index:$neck_pos] in buf");
759 0         0 return;
760             }
761             }
762             } elsif ($decapitated && !$no_content_length && length($buf) >= $content_length) {
763 1 50       7 $conn->{response}{body} = (length($buf) > $content_length ? substr($buf, 0, $content_length) : $buf);
764 1         5 _set_user_action_state($self, $conn_id);
765             }
766             }
767 3         38 });
768              
769 3         8 my $watcher = $watchers->{io};
770 3         35 $watcher->cb($read_cb);
771 3         24 $watcher->events(EV::READ);
772 3 100       18 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
773             }
774              
775             sub _set_user_action_state {
776 3     3   10 my ($self, $conn_id, $error, $strerror) = @_;
777 3   100     18 $error ||= YAHC::Error::NO_ERROR();
778 3   100     15 $strerror ||= '';
779              
780             # this state may be used in critical places,
781             # so it should *NEVER* throw exception
782 3 50       23 my $conn = $self->{connections}{$conn_id}
783             or warn "YAHC: try to _set_user_action_state() for unknown connection $conn_id",
784             return;
785              
786 3         20 $conn->{state} = YAHC::State::USER_ACTION();
787 3 100       21 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
788 3 100       17 yahc_conn_register_error($conn, $error, $strerror) if $error != YAHC::Error::NO_ERROR;
789              
790 3         13 _close_or_cache_socket($self, $conn, $error != YAHC::Error::NO_ERROR);
791 3 100       17 return _set_completed_state($self, $conn_id) unless exists $conn->{has_callback};
792              
793             eval {
794 1 0       16 _register_in_timeline($conn, "call callback%s", $error ? " error=$error, strerror='$strerror'" : '') if exists $conn->{debug_or_timeline};
    50          
795 1         4 my $cb = $self->{callbacks}{$conn_id}{callback};
796 1         5 $cb->($conn, $error, $strerror);
797 1         519 1;
798 1 50       3 } or do {
799 0   0     0 my $error = $@ || 'zombie error';
800 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "Exception in user action callback (close connection): $error");
801 0         0 $self->{state} = YAHC::State::COMPLETED();
802             };
803              
804 1         6 $self->{loop}->now_update;
805              
806 1         2 my $state = $conn->{state};
807 1 50       5 if (yahc_terminal_error($error)) {
808 0 0 0     0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "ignoring changed state due to terminal error")
809             unless $state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED();
810 0         0 _set_completed_state($self, $conn_id, 1);
811             return
812 0         0 }
813              
814 1 50       4 _register_in_timeline($conn, "after invoking callback state is %s", _strstate($state)) if exists $conn->{debug_or_timeline};
815              
816 1 50 33     8 if ($state == YAHC::State::INITIALIZED()) {
    50          
817 0         0 _set_init_state($self, $conn_id);
818             } elsif ($state == YAHC::State::USER_ACTION() || $state == YAHC::State::COMPLETED()) {
819 1         5 _set_completed_state($self, $conn_id);
820             } else {
821 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR() | YAHC::Error::TERMINAL_ERROR(), "callback set unsupported state");
822 0         0 _set_completed_state($self, $conn_id);
823             }
824             }
825              
826             sub _set_completed_state {
827 3     3   10 my ($self, $conn_id, $force_socket_close) = @_;
828              
829             # this's a terminal state,
830             # so setting this state should *NEVER* fail
831 3         15 delete $self->{callbacks}{$conn_id};
832 3         23 my $conn = delete $self->{connections}{$conn_id};
833              
834 3 50       11 if (!defined $conn) {
835 0         0 delete($self->{watchers}{$conn_id}), # implicit stop of all watchers
836             return;
837             }
838              
839 3         8 $conn->{state} = YAHC::State::COMPLETED();
840 3 100       14 _register_in_timeline($conn, "new state %s", _strstate($conn->{state})) if exists $conn->{debug_or_timeline};
841              
842 3         11 _close_or_cache_socket($self, $conn, $force_socket_close);
843 3         9 delete $self->{watchers}{$conn_id}; # implicit stop of all watchers
844              
845 3 50       14 _check_stop_condition($self, $conn) if exists $self->{stop_condition};
846             }
847              
848             sub _build_socket_and_connect {
849 13     13   24 my ($ip, $port) = @_;
850              
851 13         19 my $sock;
852 13 50       204 socket($sock, PF_INET, SOCK_STREAM, 0)
853             or die sprintf("Failed to construct TCP socket: '%s' errno=%d\n", "$!", $!+0);
854              
855 13 50       45 my $flags = fcntl($sock, F_GETFL, 0) or die sprintf("Failed to get fcntl F_GETFL flag: '%s' errno=%d\n", "$!", $!+0);
856 13 50       44 fcntl($sock, F_SETFL, $flags | O_NONBLOCK) or die sprintf("Failed to set fcntl O_NONBLOCK flag: '%s' errno=%d\n", "$!", $!+0);
857              
858 13 50       57 my $ip_addr = inet_aton($ip) or die "Invalid IP address";
859 13         44 my $addr = pack_sockaddr_in($port, $ip_addr);
860 13 50 33     747 if (!connect($sock, $addr) && $! != EINPROGRESS) {
861 0         0 die sprintf("Failed to connect: '%s' errno=%d\n", "$!", $!+0);
862             }
863              
864 13         45 return $sock;
865             }
866              
867             sub _get_next_target {
868 17     17   42 my $conn = shift;
869 17         43 my ($host, $ip, $port, $scheme) = $conn->{request}{_target}->($conn);
870              
871             # TODO STATE_RESOLVE_DNS
872 17 100 66     145 ($host, $port) = ($1, $2) if !$port && $host =~ m/^(.+):([0-9]+)$/o;
873 17 100 66     96 $ip = $host if !$ip && $host =~ m/^[0-9]+\.[0-9]+\.[0-9]+\.[0-9]+$/o;
874 17   50     947 $ip ||= inet_ntoa(gethostbyname($host) or die "Failed to resolve $host\n");
      66        
875 17   50     97 $scheme ||= $conn->{request}{scheme} || 'http';
      33        
876 17   66     78 $port ||= $conn->{request}{port} || ($scheme eq 'https' ? 443 : 80);
      66        
877              
878 17         39 $conn->{is_ssl} = $scheme eq 'https';
879 17         24 return @{ $conn->{selected_target} = [ $host, $ip, $port, $scheme ] };
  17         90  
880             }
881              
882             # this and following functions are used in terminal state
883             # so they should *NEVER* fail
884             sub _close_or_cache_socket {
885 20     20   41 my ($self, $conn, $force_close) = @_;
886 20 50       63 my $watchers = $self->{watchers}{$conn->{id}} or return;
887 20 100       79 my $fh = delete $watchers->{_fh} or return;
888 13         49 delete $watchers->{io}; # implicit stop
889              
890 13         25 my $socket_cache = $conn->{request}{_socket_cache};
891              
892             # Stolen from Hijk. Thanks guys!!!
893             # We always close connections for 1.0 because some servers LIE
894             # and say that they're 1.0 but don't close the connection on
895             # us! An example of this. Test::HTTP::Server (used by the
896             # ShardedKV::Storage::Rest tests) is an example of such a
897             # server. In either case we can't cache a connection for a 1.0
898             # server anyway, so BEGONE!
899              
900 13 0 66     40 if ( $force_close
      0        
      33        
      0        
      33        
      0        
      0        
901             || !defined $socket_cache
902             || (($conn->{request}{proto} || '') eq 'HTTP/1.0')
903             || (($conn->{response}{proto} || '') eq 'HTTP/1.0')
904             || (($conn->{response}{head}{Connection} || '') eq 'close'))
905             {
906 13 100       36 _register_in_timeline($conn, "drop socket") if $conn->{debug_or_timeline};
907 13 100       144 close($fh) if ref($fh) eq 'GLOB'; # checking ref to avoid exception
908 13         32 return;
909             }
910              
911 0 0       0 _register_in_timeline($conn, "storing socket for later use") if $conn->{debug_or_timeline};
912 0 0       0 eval { $socket_cache->(YAHC::SocketCache::STORE(), $conn, $fh); 1; } or do {
  0         0  
  0         0  
913 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "Exception in socket_cache callback (ignore error): $@");
914             };
915             }
916              
917             sub yahc_conn_socket_cache_id {
918 0     0 1 0 my $conn = shift;
919 0 0       0 return unless defined $conn;
920 0 0       0 my ($host, undef, $port, $scheme) = @{ $conn->{selected_target} || [] };
  0         0  
921 0 0 0     0 return unless $host && $port && $scheme;
      0        
922             # Use $; so we can use the $socket_cache->{$$, $host, $port} idiom to access the cache.
923 0         0 return join($;, $$, $host, $port, $scheme);
924             }
925              
926             ################################################################################
927             # Timers
928             ################################################################################
929              
930             sub _set_until_state_timer {
931 0     0   0 my ($self, $conn_id, $timeout_name, $state, $error_to_report) = @_;
932              
933 0         0 my $timer_name = $timeout_name . '_timer';
934 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
935 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
936              
937 0         0 delete $watchers->{$timer_name}; # implicit stop
938 0         0 my $timeout = $conn->{request}{$timeout_name};
939 0 0       0 return unless $timeout;
940              
941             my $timer_cb = sub { # there is nothing what can throw exception
942 0 0   0   0 if ($conn->{state} < $state) {
943 0         0 yahc_conn_register_error($conn, $error_to_report, "$timeout_name of %.3fs expired", $timeout);
944 0         0 _set_init_state($self, $conn_id);
945             } else {
946 0 0       0 _register_in_timeline($conn, "delete $timer_name") if exists $conn->{debug_or_timeline};
947             }
948 0         0 };
949              
950 0 0       0 _register_in_timeline($conn, "setting $timeout_name to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
951              
952             # caller should call now_update
953 0         0 my $w = $watchers->{$timer_name} = $self->{loop}->timer_ns($timeout, 0, $timer_cb);
954 0         0 $w->priority(2); # set highest priority
955 0         0 $w->start;
956             }
957              
958             sub _set_lifetime_timer {
959 0     0   0 my ($self, $conn_id) = @_;
960              
961 0 0       0 my $conn = $self->{connections}{$conn_id} or die "YAHC: unknown connection id $conn_id\n";
962 0 0       0 my $watchers = $self->{watchers}{$conn_id} or die "YAHC: no watchers for connection id $conn_id\n";
963              
964 0         0 delete $watchers->{lifetime_timer}; # implicit stop
965 0         0 my $timeout = $conn->{request}{lifetime_timeout};
966 0 0       0 return unless $timeout;
967              
968 0 0       0 _register_in_timeline($conn, "setting lifetime timer to %.3fs", $timeout) if exists $conn->{debug_or_timeline};
969              
970 0         0 $self->{loop}->now_update;
971             my $w = $watchers->{lifetime_timer} = $self->{loop}->timer_ns($timeout, 0, sub {
972             _set_user_action_state($self, $conn_id, YAHC::Error::TIMEOUT() | YAHC::Error::LIFETIME_TIMEOUT() | YAHC::Error::TERMINAL_ERROR(),
973 0 0   0   0 sprintf("lifetime_timeout of %.3fs expired", $timeout)) if $conn->{state} < YAHC::State::COMPLETED();
974 0         0 });
975              
976 0         0 $w->priority(2); # set highest priority
977 0         0 $w->start;
978             }
979              
980             ################################################################################
981             # HTTP functions
982             ################################################################################
983              
984             # copy-paste from Hijk
985             sub _build_http_message {
986 33     33   830 my $conn = shift;
987 33         64 my $request = $conn->{request};
988 33 100 100     204 my $path_and_qs = ($request->{path} || "/") . (defined $request->{query_string} ? ("?" . $request->{query_string}) : "");
989 33         64 my $has_host = 0;
990              
991             return join(
992             $CRLF,
993             ($request->{method} || "GET") . " $path_and_qs " . ($request->{protocol} || "HTTP/1.1"),
994             defined($request->{body}) ? ("Content-Length: " . length($request->{body})) : (),
995             defined($request->{head}) && @{ $request->{head} } ? (
996             map {
997 9   33     72 $has_host ||= lc($request->{head}[2*$_]) eq 'host';
998 9         121 $request->{head}[2*$_] . ": " . $request->{head}[2*$_+1]
999 7         33 } 0..$#{$request->{head}}/2
1000             ) : (),
1001             !$has_host ? ("Host: " . $conn->{selected_target}[0]) : (),
1002             "",
1003 33 100 100     472 defined($request->{body}) ? $request->{body} : ""
    100 100        
    50 100        
    100          
1004             );
1005             }
1006              
1007             sub _parse_http_headers {
1008 3     3   46 my $conn = shift;
1009 3         15 my $proto = substr($_[0], 0, 8);
1010 3         14 my $status_code = substr($_[0], 9, 3);
1011 3         15 substr($_[0], 0, index($_[0], $CRLF) + 2, ''); # 2 = length($CRLF)
1012              
1013 3         9 my %headers;
1014 3         77 for (split /${CRLF}/o, $_[0]) {
1015 19         84 my ($key, $value) = split(/: /, $_, 2);
1016 19         68 $headers{$key} = $value;
1017             }
1018              
1019             $conn->{response} = {
1020 3         22 proto => $proto,
1021             status => $status_code,
1022             head => \%headers,
1023             };
1024              
1025 3 100       15 if ($conn->{debug_or_timeline}) {
1026 2         10 my $headers_str = join(' ', map { "$_='$headers{$_}'" } keys %headers);
  13         66  
1027 2         31 _register_in_timeline($conn, "headers parsed: $status_code $proto headers=$headers_str");
1028             }
1029              
1030 3         16 return \%headers;
1031             }
1032              
1033             ################################################################################
1034             # Helpers
1035             ################################################################################
1036              
1037             sub _delete_watchers_but_lifetime_timer {
1038 14     14   32 my ($self, $conn_id) = @_;
1039              
1040 14         25 my $watchers = $self->{watchers}{$conn_id};
1041 14 50 33     108 if (defined $watchers && (my $w = $watchers->{lifetime_timer})) {
1042 0         0 return $self->{watchers}{$conn_id} = { lifetime_timer => $w };
1043             }
1044              
1045 14         44 return $self->{watchers}{$conn_id} = {};
1046             }
1047              
1048             sub _wrap_host {
1049 10     10   34 my ($value) = @_;
1050 10         24 my $ref = ref($value);
1051              
1052 10 100   3   80 return sub { $value } if $ref eq '';
  3         10  
1053 3 100       10 return $value if $ref eq 'CODE';
1054              
1055 2         3 my $idx = 0;
1056 13     13   38 return sub { $value->[$idx++ % @$value]; }
1057 2 50 33     20 if $ref eq 'ARRAY' && @$value > 0;
1058              
1059 0         0 die "YAHC: unsupported host format\n";
1060             }
1061              
1062             sub _wrap_backoff {
1063 0     0   0 my ($value) = @_;
1064 0         0 my $ref = ref($value);
1065              
1066 0 0   0   0 return sub { $value } if $ref eq '';
  0         0  
1067 0 0       0 return $value if $ref eq 'CODE';
1068              
1069 0         0 die "YAHC: unsupported backoff format\n";
1070             }
1071              
1072             sub _wrap_socket_cache {
1073 0     0   0 my ($value) = @_;
1074 0         0 my $ref = ref($value);
1075              
1076 0 0       0 return $value if $ref eq 'CODE';
1077             return sub {
1078 0     0   0 my ($operation, $conn, $sock) = @_;
1079 0 0       0 if ($operation == YAHC::SocketCache::GET()) {
1080 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1081 0         0 return delete $value->{$socket_cache_id};
1082             }
1083              
1084 0 0       0 if ($operation == YAHC::SocketCache::STORE()) {
1085 0 0       0 my $socket_cache_id = yahc_conn_socket_cache_id($conn) or return;
1086 0 0       0 close(delete $value->{$socket_cache_id}) if exists $value->{$socket_cache_id};
1087 0         0 $value->{$socket_cache_id} = $sock;
1088 0         0 return;
1089             }
1090 0 0       0 } if $ref eq 'HASH';
1091              
1092 0         0 die "YAHC: unsupported socket_cache format\n";
1093             }
1094              
1095             sub _call_state_callback {
1096 0     0   0 my ($self, $conn, $cb_name) = @_;
1097 0         0 my $cb = $self->{callbacks}{$conn->{id}}{$cb_name};
1098 0 0       0 return unless $cb;
1099              
1100 0 0       0 _register_in_timeline($conn, "calling $cb_name callback") if exists $conn->{debug_or_timeline};
1101              
1102             eval {
1103 0         0 $cb->($conn);
1104 0         0 1;
1105 0 0       0 } or do {
1106 0   0     0 my $error = $@ || 'zombie error';
1107 0         0 yahc_conn_register_error($conn, YAHC::Error::CALLBACK_ERROR(), "exception in state callback (ignore error): $error");
1108             };
1109              
1110 0         0 $self->{loop}->now_update;
1111             }
1112              
1113             sub _get_safe_wrapper {
1114 18     18   41 my ($self, $conn, $sub) = @_;
1115             return sub { eval {
1116 15         61 $sub->(@_);
1117 15         269 1;
1118 15 50   15   29 } or do {
1119 0   0     0 my $error = $@ || 'zombie error';
1120 0         0 _set_user_action_state($self, $conn->{id}, YAHC::Error::INTERNAL_ERROR() | YAHC::Error::TERMINAL_ERROR(),
1121             "exception in internal callback: $error");
1122 18         73 }};
1123             }
1124              
1125             sub _register_in_timeline {
1126 22     22   72 my ($conn, $format, @arguments) = @_;
1127 22         136 my $event = sprintf("$format", @arguments);
1128 22 50       62 _log_message("YAHC connection '%s': %s", $conn->{id}, $event) if exists $conn->{debug};
1129 22 50 100     60 push @{ $conn->{timeline} ||= [] }, [ $event, $conn->{state}, Time::HiRes::time ] if exists $conn->{keep_timeline};
  22         140  
1130             }
1131              
1132             sub yahc_conn_register_error {
1133 12     12 1 29 my ($conn, $error, $format, @arguments) = @_;
1134 12         37 my $strerror = sprintf("$format", @arguments);
1135 12 50       29 _register_in_timeline($conn, "strerror='$strerror' error=$error") if exists $conn->{debug_or_timeline};
1136 12   100     18 push @{ $conn->{errors} ||= [] }, [ $error, $strerror, [ @{ $conn->{selected_target} } ], Time::HiRes::time, $conn->{attempt} ];
  12         36  
  12         57  
1137             }
1138              
1139             sub _strstate {
1140 15     15   36 my $state = shift;
1141 15 100       59 return 'STATE_INIT' if $state eq YAHC::State::INITIALIZED();
1142 14 50       40 return 'STATE_RESOLVE_DNS' if $state eq YAHC::State::RESOLVE_DNS();
1143 14 100       39 return 'STATE_CONNECTING' if $state eq YAHC::State::CONNECTING();
1144 13 100       41 return 'STATE_CONNECTED' if $state eq YAHC::State::CONNECTED();
1145 10 100       29 return 'STATE_WRITING' if $state eq YAHC::State::WRITING();
1146 9 100       45 return 'STATE_READING' if $state eq YAHC::State::READING();
1147 4 50       17 return 'STATE_SSL_HANDSHAKE'if $state eq YAHC::State::SSL_HANDSHAKE();
1148 4 100       17 return 'STATE_USER_ACTION' if $state eq YAHC::State::USER_ACTION();
1149 2 50       11 return 'STATE_COMPLETED' if $state eq YAHC::State::COMPLETED();
1150 0           return "";
1151             }
1152              
1153             sub _log_message {
1154 0     0     my $format = shift;
1155 0           my $now = Time::HiRes::time;
1156 0           my ($sec, $ms) = split(/[.]/, $now);
1157 0   0       printf STDERR "[%s.%-5d] [$$] $format\n", POSIX::strftime('%F %T', localtime($now)), $ms || 0, @_;
1158             }
1159              
1160 0     0     sub _format_ssl_error { return sprintf("'%s' errno=%d ssl_error='%s' ssl_errno=%d", "$!", 0+$!, "$IO::Socket::SSL::SSL_ERROR", 0+$IO::Socket::SSL::SSL_ERROR); }
1161              
1162             1;
1163              
1164             __END__