File Coverage

lib/MojoX/HTTP/Async.pm
Criterion Covered Total %
statement 375 413 90.8
branch 101 164 61.5
condition 58 111 52.2
subroutine 42 45 93.3
pod 7 7 100.0
total 583 740 78.7


line stmt bran cond sub pod time code
1             package MojoX::HTTP::Async;
2              
3             =encoding utf-8
4              
5             =head1 NAME
6              
7             MojoX::HTTP::Async - The simple package to execute multiple parallel requests to the same host
8              
9             =head1 SYNOPSIS
10              
11             use MojoX::HTTP::Async ();
12             use Mojo::Message::Request ();
13             use Mojo::URL ();
14              
15             # creates new instance for async requests
16             # restricts max amount of simultaneously executed requests
17             my $ua = MojoX::HTTP::Async->new('host' => 'my-site.com', 'slots' => 4);
18              
19             # let's fill slots
20             $ua->add( '/page1.html?lang=en');
21             $ua->add( 'http://my-site.com/page2.html');
22             $ua->add( Mojo::URL->new("/page/03.html") );
23             $ua->add( Mojo::Message::Request->new() );
24              
25             # non-blocking requests processing
26             while ( $ua->not_empty() ) {
27             if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class
28             print $tx->res->headers->to_string;
29             } else {
30             # do something else
31             }
32             }
33              
34             # blocking requests processing
35             while (my $tx = $ua->wait_for_next_response($timeout)) {
36             # do something here
37             }
38              
39             # how to process connect timeouts
40             if (my $error = $tx->req()->error()) {
41             say $error->{code};
42             say $error->{message};
43             }
44              
45             # how to process request timeouts and other errors sucn as broken pipes, etc
46             if (my $error = $tx->res()->error()) {
47             say $error->{code};
48             say $error->{message};
49             }
50              
51             # makes reconnection if either slot was timeouted or was inactive too long
52             $ua->refresh_connections();
53              
54             # closes everything
55             $ua->close_all();
56              
57             =head1 DESCRIPTION
58              
59             This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode.
60              
61             In comparison with C, this library doesn't make a new connection on each request.
62              
63             And in comparison with C, it's more intuitive how to use it, and there is no any Singleton restrictions.
64              
65             The instance of this class can work only with one domain and scheme: either HTTP or HTTPS.
66              
67             =head1 LICENSE
68              
69             This module is distributed under terms of Artistic Perl 5 license.
70              
71             =cut
72              
73 26     26   9174272 use 5.020;
  26         299  
74 26     26   137 use warnings;
  26         49  
  26         812  
75 26     26   139 use bytes ();
  26         133  
  26         1089  
76 26     26   177 use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /;
  26         54  
  26         2564  
77             #use IO::Socket::IP ();
78 26     26   256 use IO::Socket::SSL ();
  26         55  
  26         696  
79 26     26   131 use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC O_NOINHERIT /;
  26         55  
  26         1867  
80 26     26   192 use experimental qw/ signatures /;
  26         68  
  26         204  
81 26     26   3995 use Carp qw/ croak /;
  26         43  
  26         1518  
82 26     26   183 use List::Util qw/ first /;
  26         36  
  26         1802  
83 26     26   162 use Time::HiRes qw/ time /;
  26         52  
  26         171  
84 26     26   11961 use Mojo::Message::Request ();
  26         4597295  
  26         915  
85 26     26   13759 use Mojo::Message::Response ();
  26         185966  
  26         696  
86 26     26   13208 use Mojo::Transaction::HTTP ();
  26         87511  
  26         616  
87 26     26   14888 use URI ();
  26         121159  
  26         734  
88 26     26   218 use Scalar::Util qw/ blessed /;
  26         56  
  26         2488  
89 26     26   165 use Errno qw / :POSIX /;
  26         49  
  26         9296  
90              
91             our $VERSION = 0.13;
92              
93             use constant {
94 26 50       128535 IS_WIN => ($^O eq 'MSWin32') ? 1 : 0,
    50          
95             IS_NOT_WIN => ($^O ne 'MSWin32') ? 1 : 0,
96 26     26   196 };
  26         55  
97              
98             =head2 new($class, %opts)
99              
100             The class constructor.
101              
102             =over
103              
104             =item host
105              
106             It's the obligatory option.
107             Sets the name/address of remote host to be requested.
108              
109             =item port
110              
111             By default it's equal to 80.
112             Sets the port number of remote point.
113              
114             =item slots
115              
116             By default it's equal to 5.
117             Sets the maximum amount of slots.
118             These slot will be filled one by one if required.
119              
120             =item ssl
121              
122             By default it's equal to 0 (means HTTP).
123             Sets the scheme of requests: HTTP or HTTPS.
124              
125             =item ssl_opts
126              
127             It's a HashRef with options to control SSL Layer.
128             See C constructor arguments for details.
129              
130             =item connect_timeout
131              
132             By default it's equal to 1.
133             Sets connection timeout in seconds.
134              
135             If it's equal to 0, then there will be no timeout restrictions.
136              
137             =item request_timeout
138              
139             By default it's equal to 1.
140             Sets the time in seconds with granular accuracy as micro seconds.
141             The awaiting time of response will be limited with this value.
142              
143             In case of 0 value there will be no time restrictions.
144              
145             =item sol_socket
146              
147             It's a HashRef with socket options.
148             The possible keys are:
149              
150             B - enables TCP KeepAlive on socket.
151             The default value is 1 (means that option is enabled).
152              
153             =item B
154              
155             WARNING: These options can be unsupported on some OS platforms.
156              
157             It's a HashRef with socket TCP-options.
158              
159             If some key is absent in HashRef then system settings will be used.
160              
161             The supported keys are shown below:
162              
163             B - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes
164              
165             B - the time (in seconds) between individual keepalive probes
166              
167             B - the maximum number of keepalive probes TCP should send before dropping the connection.
168              
169             =item inactivity_conn_ts
170              
171             If last response was received C seconds or more ago,
172             then such slots will be destroyed.
173              
174             By default the value is 0 (disabled).
175              
176             =item debug
177              
178             Enables debug mode. The dbug messages will be printed in STDERR.
179             By default the value is 0 (disabled).
180              
181             =back
182              
183             =cut
184              
185 5     5 1 55777 sub new ($class, %opts) {
  5         70  
  5         216  
  5         44  
186 5 50       142 croak("host is mandatory") if (! $opts{'host'});
187             my $self = bless({
188             'slots' => 5,
189             'ssl' => 0,
190             'ssl_opts' => undef,
191 5 100       296 'port' => $opts{'ssl'} ? 443 : 80,
192             'request_timeout' => 1, # 1 sec
193             'connect_timeout' => 1, # 1 sec
194             'sol_socket' => {
195             'so_keepalive' => 1,
196             },
197             'sol_tcp' => {},
198             'inactivity_conn_ts' => 0,
199             %opts,
200             '_conns' => [],
201             }, $class);
202 5         84 return $self;
203             }
204              
205 19     19   50 sub _connect ($self, $slot, $proto, $peer_addr) {
  19         47  
  19         72  
  19         45  
  19         31  
  19         53  
206              
207 19 50       133 warn("Connecting\n") if $self->{'debug'};
208              
209 19 50       1039 socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!");
210 19 50       1422 connect($socket, $peer_addr) || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS
211              
212             # When a constant is used in an expression, Perl replaces it with its value at compile time,
213             # and may then optimize the expression further. In particular, any code in an if (CONSTANT)
214             # block will be optimized away if the constant is false.
215 19 50       174 if (&IS_NOT_WIN) {
216 19 50       219 fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!");
217             }
218              
219 19 50       140 if (&IS_WIN) {
220 0         0 $socket = IO::Socket::IP->new_from_fd(fileno($socket), '+<');
221 0 0       0 defined($socket->blocking(0)) or croak("can't set non-blocking state on socket: $!");
222             #$socket->sockopt(O_NOINHERIT, 1) or croak("fcntl error has occurred: $!"); # the same as SOCK_CLOEXEC
223             }
224              
225 19   50     129 my $sol_socket_opts = $self->{'sol_socket'} // {};
226              
227 19 100       283 if (exists($sol_socket_opts->{'so_keepalive'})) {
228 17 50       215 setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!");
229              
230 17 50       94 if ($sol_socket_opts->{'so_keepalive'}) {
231 17   50     81 my $sol_tcp_opts = $self->{'sol_tcp'} // {};
232 17         48 state $SOL_TCP = &IPPROTO_TCP();
233              
234 17 100       68 if (exists($sol_tcp_opts->{'tcp_keepidle'})) {
235 8 50       70 setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!");
236             }
237              
238 17 100       84 if (exists($sol_tcp_opts->{'tcp_keepintvl'})) {
239 8 50       58 setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!");
240             }
241              
242 17 100       73 if (exists($sol_tcp_opts->{'tcp_keepcnt'})) {
243 8 50       64 setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!");
244             }
245             }
246             }
247              
248 19         92 $slot->{'connected_ts'} = time();
249 19         127 $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
250 19         76 $slot->{'sock_no'} = fileno($socket);
251 19 100       110 if ($self->{'ssl'}) {
252 2   50     60 my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
253 2 50       171713 croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
254 2         12 $ssl_socket->blocking(0); # just to be sure
255 2         37 $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
256             }
257             }
258              
259 19     19   52 sub _connect_slot ($self, $slot) {
  19         38  
  19         35  
  19         56  
260 19         44 my $timeout = $self->{'connect_timeout'};
261              
262 19 50       126 if ($timeout > 0) {
263 19         88 eval {
264 19     0   733 local $SIG{'ALRM'} = sub { die "alarm\n" };
  0         0  
265 19         212 alarm($timeout);
266 19         73 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  19         159  
267 19         420 alarm(0);
268             };
269              
270 19         64 my $error = $@;
271              
272 19         103 alarm(0);
273              
274 19 50       111 if ($error) {
275 0 0       0 croak($error) if ($error ne "alarm\n");
276 0         0 $self->_mark_request_as_timeouted($slot, 'Connect timeout');
277             }
278             } else {
279 0         0 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  0         0  
280             }
281             }
282              
283 14     14   69 sub _make_connections ($self, $amount) {
  14         71  
  14         46  
  14         46  
284              
285 14         2207 my $host_addr = inet_aton($self->{'host'});
286 14 50       154 croak("can't call inet_aton") if (! $host_addr);
287              
288 14   66     264 $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
289 14   66     448 $self->{'proto'} //= getprotobyname("tcp");
290              
291 14         69 for (1 .. $amount) {
292 14         107 my $slot = $self->_make_slot();
293 14         116 $self->_connect_slot($slot);
294 14         78 $self->_add_slot($slot);
295             }
296             }
297              
298 14     14   39 sub _add_slot ($self, $slot) {
  14         47  
  14         110  
  14         23  
299 14 50       104 push($self->{'_conns'}->@*, $slot) if ($slot);
300             }
301              
302 14     14   57 sub _make_slot ($self) {
  14         42  
  14         54  
303             return {
304 14         594 'reader' => undef,
305             'writer' => undef,
306             'socket' => undef,
307             'sock_no' => 0,
308             'is_busy' => 0,
309             'request' => undef,
310             'tx' => undef,
311             'exp_ts' => 0,
312             'tmp_response' => undef,
313             'reconnect_is_required' => 0,
314             'last_response_ts' => 0,
315             'connected_ts' => 0,
316             };
317             }
318              
319 99242     99242   109720 sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') {
  99242         119588  
  99242         126928  
  99242         120123  
  99242         237620  
  99242         106637  
320              
321 99242         109873 my $message = $reason;
322              
323 99242 50       167579 if (!$message) {
324 99242 50 33     463577 $message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error';
325             }
326              
327 99242         3700439 for my $slot_no (keys %$socks2slots) {
328 126974 50       314808 if ( vec($error_handles, $slot_no, 1) != 0 ) {
329 0         0 my $slot = $socks2slots->{ $slot_no };
330 0         0 $self->_mark_response_as_broken($slot, 520, $message);
331             }
332             }
333             }
334              
335 35     35   86 sub _get_free_slot ($self) {
  35         83  
  35         71  
336              
337 35         87 my $slot;
338 22         203 my %socks2slots = map { $_->{'sock_no'} => $_ }
339 41 50 66     360 grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
340 35         189 $self->{'_conns'}->@*;
341              
342 35 100       176 if (%socks2slots) {
343              
344 19         179 local $!;
345 19         51 my $write_handles = '';
346              
347 19         199 vec($write_handles, $_, 1) = 1 for keys %socks2slots;
348              
349 19         70 my $error_handles = $write_handles;
350 19         254 my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0);
351              
352 19         135 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
353              
354 19 50       51 if ($nfound) {
355 19     19   229 my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots;
  19         101  
356 19         113 $slot = $socks2slots{ $slot_no };
357             }
358             }
359              
360 35         111 return $slot;
361             }
362              
363             =head2 add ($self, $request_or_uri, $timeout = undef)
364              
365             Adds HTTP request into empty slot.
366              
367             If the request was successfully added, then it will return 1.
368             Otherwise it will return 0.
369              
370             The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to
371             the limit of slot's amount had been reached (see C and C.
372              
373             It's recommendable always to check result code of this method.
374              
375             Example:
376              
377             my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1);
378              
379             # let's occupy the only slot
380             $ua->add('/page1.html');
381              
382             # let's wait until it's become free again
383             while ( ! $ua->add('/page2.html') ) {
384             while (my $tx = $ua->wait_for_next_response() ) {
385             # do something here
386             }
387             }
388              
389             =over
390              
391             =item $request_or_uri
392              
393             It can be either an instance of C class, or an instance of C.
394             It also can be a simple URI srtring.
395              
396             If the resourse contains the host, then it must be the same as in the constructor C.
397              
398             Using of string with URI or an instance of C class assumes that GET HTTP method will be used.
399              
400             =item $timeout
401              
402             Time in seconds. Can be fractional with microseconds tolerance.
403              
404             The C from conmtrucor will be used by default.
405              
406             =back
407              
408             =cut
409              
410 22     22 1 6805 sub add ($self, $request_or_uri, $timeout = undef) {
  22         95  
  22         139  
  22         75  
  22         69  
411 22         94 my $status = 0;
412 22         204 my $slot = $self->_get_free_slot();
413              
414 22 100 100     311 if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) {
415 13         119 $self->_make_connections(1);
416 13         82 $slot = $self->_get_free_slot();
417             }
418              
419 22 100       72 if ($slot) {
420 19         61 my $request = $request_or_uri;
421 19 100 66     108 if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) {
      100        
422 18         561 $request = Mojo::Message::Request->new();
423 18         345 $request->url()->parse($request_or_uri);
424             }
425 19 50       3339 if ($request) {
426 19         106 $self->_send_request($slot, $request, $timeout);
427 19         40 $status = 1;
428             }
429             }
430              
431 22         228 return $status;
432             }
433              
434 26     26   38 sub _clear_slot ($self, $slot, $force = 0) {
  26         35  
  26         43  
  26         32  
  26         33  
435 26         398 $slot->{'is_busy'} = 0;
436 26         50 $slot->{'exp_ts'} = 0;
437 26         47 $slot->{'tx'} = undef;
438 26         47 $slot->{'request'} = undef;
439 26         250 $slot->{'tmp_response'} = undef;
440 26 100       77 if ($force) {
441 14 50       949 close($slot->{'socket'}) if $slot->{'socket'};
442 14         62 $slot->{'socket'} = undef;
443 14         28 $slot->{'reader'} = undef;
444 14         64 $slot->{'writer'} = undef;
445 14         32 $slot->{'sock_no'} = 0;
446 14         19 $slot->{'reconnect_is_required'} = 0;
447 14         24 $slot->{'last_response_ts'} = 0;
448 14         56 $slot->{'connected_ts'} = 0;
449             }
450             }
451              
452 6     6   12 sub _mark_slot_as_broken($self, $slot) {
  6         9  
  6         13  
  6         8  
453 6         14 $slot->{'reconnect_is_required'} = 1;
454 6         59 $slot->{'is_busy'} = 1;
455 6   33     37 $slot->{'request'} //= Mojo::Message::Request->new();
456             $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
457 6   33     223 'req' => $slot->{'request'},
458             'res' => Mojo::Message::Response->new()
459             );
460             }
461              
462 0     0   0 sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
463 0         0 $self->_mark_slot_as_broken($slot);li
464 0         0 $slot->{'request'}->error({'message' => $msg, 'code' => $code});
465             }
466              
467 6     6   12 sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  6         11  
  6         12  
  6         9  
  6         9  
  6         10  
468 6         29 $self->_mark_slot_as_broken($slot);
469              
470 6         337 my $res = $slot->{'tx'}->res();
471 6         207 $res->error({'message' => $msg, 'code' => $code});
472 6         258 $res->headers()->content_length(0);
473 6         551 $res->code($code);
474 6         60 $res->message($msg);
475             }
476              
477 0     0   0 sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
  0         0  
  0         0  
  0         0  
  0         0  
478 0         0 $self->_mark_request_as_broken($slot, 524, $message);
479             }
480              
481 6     6   14 sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
  6         10  
  6         9  
  6         26  
  6         14  
482 6         36 $self->_mark_response_as_broken($slot, 524, $message);
483             }
484              
485 19     19   34 sub _send_request ($self, $slot, $request, $timeout = undef) {
  19         42  
  19         50  
  19         33  
  19         28  
  19         35  
486              
487 19 50       121 croak("slot is busy") if ($slot->{'is_busy'});
488 19 50       71 croak("request object is obligatory") if (!$request);
489 19 50       123 croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
490              
491 19 100       119 my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
492 19         86 my $url = $request->url();
493 19         363 my $uri = URI->new( $url );
494 19         39605 my $scheme = $url->scheme();
495              
496 19 50 33     150 if ($scheme && $required_scheme ne $scheme) {
497 0         0 croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
498             }
499              
500 19 50       124 if (! $uri->scheme()) {
501             # URI::_generic doesn't have C method, that's why we set the scheme by ourseleves to change C<$uri> type
502 19         748 $uri->scheme($required_scheme);
503             }
504              
505 19 50       19621 if (my $host = $uri->host()) {
506 0 0       0 if ($host ne $self->{'host'}) {
507 0         0 croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'}));
508             }
509             }
510              
511 19   33     759 $timeout //= $self->{'request_timeout'};
512              
513 19         36 my $response = '';
514 19         42 state $default_ua_hdr = 'perl/' . __PACKAGE__;
515              
516 19         244 my $h = $request->headers();
517 19 100       2418 $h->host($self->{'host'}) if (! $h->host() );
518 19 100       445 $h->user_agent($default_ua_hdr) if (! $h->user_agent() );
519              
520 19         308 $slot->{'request'} = $request;
521 19         46 $slot->{'is_busy'} = 1;
522 19 50       135 $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
523              
524 19         188 my $plain_request = $request->to_string();
525              
526 19 100       13586 if ($self->{'ssl'}) {
527 2         30 $slot->{'writer'}->print($plain_request);
528             } else {
529 17         38 my $socket = $slot->{'socket'};
530 17         255 my $msg_len = bytes::length($plain_request);
531 17         4961 my $sent_bytes = 0;
532 17         30 my $attempts = 10;
533              
534 17         93 local $!;
535              
536 17   66     150 while ($sent_bytes < $msg_len && $attempts--) {
537 17         900 my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes);
538              
539 17 50 33     170 if ($! || ! defined($bytes)) {
540 0   0     0 my $error = $! // 'Unknown error';
541 0         0 $self->_mark_request_as_broken($slot, 520, $error);
542 0         0 return;
543             }
544              
545 17         28 $sent_bytes += $bytes;
546 17 50       69 $plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len;
547             }
548              
549 17 50       78 if ($sent_bytes < $msg_len) {
550 0   0     0 my $error = $! // 'sent message is shorter than original';
551 0         0 $self->_mark_request_as_broken($slot, 520, $error);
552 0         0 return;
553             }
554             }
555              
556 19 50 33     524 if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
557 0         0 $self->_mark_request_as_timeouted($slot);
558             }
559              
560 19         68 return;
561             }
562              
563 6     6   9 sub _try_to_read ($self, $slot) {
  6         10  
  6         9  
  6         7  
564              
565 6 50 33     66 return if $slot->{'tx'} || ! $slot->{'is_busy'};
566              
567 6         13 my $reader = $slot->{'reader'};
568 6   33     115 my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
569              
570 6         292 $response->parse($_) while (<$reader>);
571              
572 6 50 33     8680 if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
    50 33        
      33        
573 0         0 $self->_mark_response_as_broken($slot, 520, $!);
574             } elsif ($response && $response->code()) {
575              
576 6         196 my $content = $response->content();
577              
578 6 50       39 if ($content->is_finished()) { # this is required to support "Transfer-Encoding: chunked"
579             $slot->{'tx'} = Mojo::Transaction::HTTP->new(
580 6         114 'req' => $slot->{'request'},
581             'res' => $response
582             );
583 6         91 $slot->{'tmp_response'} = undef;
584 6         24 $slot->{'last_response_ts'} = time();
585             } else {
586 0         0 $slot->{'tmp_response'} = $response;
587             }
588              
589 6 50       18 $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
590             }
591              
592 6 0 0     54 if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      33        
593 0         0 $self->_mark_response_as_timeouted($slot);
594             }
595             }
596              
597             =head2 not_empty($self)
598              
599             Returns 1 if there even one slot is busy or slot contains a not processed response.
600             Otherwise the method returns 0.
601              
602             =cut
603              
604 99219     99219 1 133757 sub not_empty ($self) {
  99219         101007  
  99219         99413  
605              
606 99219         125164 my $not_empty = scalar $self->{'_conns'}->@*;
607              
608 99219         144873 for my $slot ($self->{'_conns'}->@*) {
609 184680 50 66     375103 $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
610             }
611              
612 99219 100       248389 return $not_empty ? 1 : 0;
613             }
614              
615              
616             =head2 wait_for_next_response($self, $timeout = 0)
617              
618             Waits for first received response or time-outed request in any slot.
619             Returns the C instance with result.
620              
621             =over
622              
623             =item $timeout
624              
625             Period of time in seconds. Can be fractial with microsecond tollerance.
626             The response will be marked as time-outed after this time is out.
627              
628             The default value is 0, which means that request will have been blocked until the response is received.
629              
630             If all slots are empty, then C will be returned.
631              
632             =back
633              
634             =cut
635              
636 12     12 1 14718 sub wait_for_next_response ($self, $timeout = 0) {
  12         22  
  12         20  
  12         18  
637              
638 12         17 my $response;
639 12 50       65 my $exp_ts = $timeout ? (time() + $timeout) : 0;
640              
641 12         18 while (1) {
642 91136 50 33     658557 last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed
643 91136 100 100     235910 last if (($response = $self->next_response()) || !$self->not_empty());
644 91124 50       8044041 select(undef, undef, undef, 1E-6) if (!$response); # sleep 1 microsecond
645             }
646              
647 12         42 return $response;
648             }
649              
650             =head2 next_response ($self)
651              
652             Returns an instance of C class.
653             If there is no response, it will return C.
654              
655             =cut
656              
657 99223     99223 1 106266 sub next_response ($self) {
  99223         123267  
  99223         97520  
658 99223   66     167904 return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot();
659             }
660              
661 99211     99211   111634 sub _get_response_from_slot ($self) {
  99211         101279  
  99211         95589  
662              
663 99211         100933 my $tx;
664 99211     184666   460402 my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*;
  184666         245446  
665              
666 99211 50       257120 if ($slot) {
667 0         0 $tx = $slot->{'tx'};
668 0         0 $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
669             }
670              
671 99211         338040 return $tx;
672             }
673              
674 99223     99223   96730 sub _get_response_from_ready_slot ($self) {
  99223         100976  
  99223         99843  
675              
676 99223         102741 my $tx;
677 126943         439557 my %socks2slots = map { $_->{'sock_no'} => $_ }
678 184688 100 66     761716 grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
679 99223         216036 $self->{'_conns'}->@*;
680              
681 99223 100       204959 if (%socks2slots) {
682              
683 99218         455532 local $!;
684 99218         159770 my $read_handles = '';
685              
686 99218         500041 vec($read_handles, $_, 1) = 1 for keys %socks2slots;
687              
688 99218         141917 my $error_handles = $read_handles;
689 99218         805131 my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
690              
691 99218         421018 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
692              
693 99218         176906 for my $sock_no (keys %socks2slots) {
694 126941         154742 my $slot = $socks2slots{ $sock_no };
695 126941 100 100     221139 if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
696 6         26 $self->_try_to_read($slot);
697 6 50       19 next if ! $slot->{'tx'};
698 6 50       24 next if ! $slot->{'is_busy'};
699 6         54 $tx = $slot->{'tx'};
700             } else {
701 126935 100 66     549816 if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      66        
702 6         78 $self->_mark_response_as_timeouted($slot);
703 6         43 $tx = $slot->{'tx'};
704             }
705             }
706              
707 126941 100       355905 if ($tx) {
708 12         46 $self->_clear_slot($slot, 0);
709 12         53 last;
710             }
711             }
712             }
713              
714 99223         319589 return $tx;
715             }
716              
717             =head2 refresh_connections ($self)
718              
719             Closes connections in slots in the following cases:
720              
721             1. The slot was marked as timeouted
722             2. The "inactivity_conn_ts" was set and the connection was expired
723             3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc)
724              
725             Returns the amount of made reconnections.
726              
727             =cut
728              
729 5     5 1 15 sub refresh_connections ($self) {
  5         14  
  5         8  
730              
731 5         11 my $n = 0;
732 5         30 my $now = time();
733 5   50     59 my $keep_ts = $self->{'inactivity_conn_ts'} // 0;
734              
735 5 50       24 if (scalar $self->{'_conns'}->@*) {
736              
737 5         60 local $!;
738 5         16 my $error_handles = '';
739 5         25 my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*;
  9         58  
740              
741 5         83 vec($error_handles, $_, 1) = 1 for keys %socks2slots;
742 5         87 select(undef, undef, $error_handles, 0);
743              
744 5         38 $self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect
745             }
746              
747 5         22 for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) {
  5         20  
748 9         20 my $slot = $self->{'_conns'}->[$i];
749 9   33     146 my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts;
750 9   100     34 my $is_outdated = $keep_ts && $slot_exp_ts <= $now;
751              
752 9 50 33     31 warn("Outdated\n") if $self->{'debug'} && $is_outdated;
753              
754 9 100 100     45 if ($slot->{'reconnect_is_required'} || $is_outdated) {
755 5 50       15 warn("Going to reconnect\n") if $self->{'debug'};
756 5         23 $self->_clear_slot($slot, 1);
757 5         24 $self->_connect_slot($slot);
758 5         14 $n++;
759             }
760             }
761              
762 5         28 return $n;
763             }
764              
765             =head2 close_all ($self)
766              
767             Closes all opened connections and resets all slots with requests.
768              
769             =cut
770              
771 5     5 1 7524 sub close_all ($self) {
  5         12  
  5         9  
772 5         31 $self->_clear_slot($_, 1) for $self->{'_conns'}->@*;
773 5         32 $self->{'_conns'} = [];
774 5         18 return;
775             }
776              
777             =head2 DESTROY($self)
778              
779             The class destructor.
780              
781             Closes all opened sockets.
782              
783             =cut
784              
785 5     5   13220 sub DESTROY ($self) {
  5         16  
  5         14  
786 5         12 my $in_use = 0;
787 5         44 while ( my $slot = shift($self->{'_conns'}->@*) ) {
788 5 50       526 $in_use++ if ($slot->{'is_busy'});
789 5 50       133 $slot->{'socket'}->close() if ($slot->{'socket'});
790             }
791 5 50       458 warn ref($self) ." object destroyed but still in use" if $in_use;
792             }
793              
794             1;
795             __END__