File Coverage

lib/MojoX/HTTP/Async.pm
Criterion Covered Total %
statement 375 413 90.8
branch 101 164 61.5
condition 58 111 52.2
subroutine 42 45 93.3
pod 7 7 100.0
total 583 740 78.7


line stmt bran cond sub pod time code
1             package MojoX::HTTP::Async;
2              
3             =encoding utf-8
4              
5             =head1 NAME
6              
7             MojoX::HTTP::Async - The simple package to execute multiple parallel requests to the same host
8              
9             =head1 SYNOPSIS
10              
11             use MojoX::HTTP::Async ();
12             use Mojo::Message::Request ();
13             use Mojo::URL ();
14              
15             # creates new instance for async requests
16             # restricts max amount of simultaneously executed requests
17             my $ua = MojoX::HTTP::Async->new('host' => 'my-site.com', 'slots' => 4);
18              
19             # let's fill slots
20             $ua->add( '/page1.html?lang=en');
21             $ua->add( 'http://my-site.com/page2.html');
22             $ua->add( Mojo::URL->new("/page/03.html") );
23             $ua->add( Mojo::Message::Request->new() );
24              
25             # non-blocking requests processing
26             while ( $ua->not_empty() ) {
27             if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class
28             print $tx->res->headers->to_string;
29             } else {
30             # do something else
31             }
32             }
33              
34             # blocking requests processing
35             while (my $tx = $ua->wait_for_next_response($timeout)) {
36             # do something here
37             }
38              
39             # how to process connect timeouts
40             if (my $error = $tx->req()->error()) {
41             say $error->{code};
42             say $error->{message};
43             }
44              
45             # how to process request timeouts and other errors sucn as broken pipes, etc
46             if (my $error = $tx->res()->error()) {
47             say $error->{code};
48             say $error->{message};
49             }
50              
51             # makes reconnection if either slot was timeouted or was inactive too long
52             $ua->refresh_connections();
53              
54             # closes everything
55             $ua->close_all();
56              
57             =head1 DESCRIPTION
58              
59             This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode.
60              
61             In comparison with C, this library doesn't make a new connection on each request.
62              
63             And in comparison with C, it's more intuitive how to use it, and there is no any Singleton restrictions.
64              
65             The instance of this class can work only with one domain and scheme: either HTTP or HTTPS.
66              
67             =head1 LICENSE
68              
69             This module is distributed under terms of Artistic Perl 5 license.
70              
71             =cut
72              
73 26     26   9222014 use 5.020;
  26         239  
74 26     26   134 use warnings;
  26         39  
  26         896  
75 26     26   166 use bytes ();
  26         105  
  26         949  
76 26     26   168 use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /;
  26         560  
  26         2513  
77             #use IO::Socket::IP ();
78 26     26   175 use IO::Socket::SSL ();
  26         46  
  26         666  
79 26     26   131 use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC O_NOINHERIT /;
  26         39  
  26         1839  
80 26     26   169 use experimental qw/ signatures /;
  26         52  
  26         227  
81 26     26   4031 use Carp qw/ croak /;
  26         33  
  26         1493  
82 26     26   497 use List::Util qw/ first /;
  26         40  
  26         1559  
83 26     26   158 use Time::HiRes qw/ time /;
  26         172  
  26         203  
84 26     26   11687 use Mojo::Message::Request ();
  26         4647612  
  26         725  
85 26     26   13144 use Mojo::Message::Response ();
  26         187349  
  26         686  
86 26     26   13087 use Mojo::Transaction::HTTP ();
  26         88258  
  26         607  
87 26     26   15597 use URI ();
  26         124667  
  26         729  
88 26     26   210 use Scalar::Util qw/ blessed /;
  26         145  
  26         1491  
89 26     26   156 use Errno qw / :POSIX /;
  26         62  
  26         9731  
90              
91             our $VERSION = 0.14;
92              
93             use constant {
94 26 50       132451 IS_WIN => ($^O eq 'MSWin32') ? 1 : 0,
    50          
95             IS_NOT_WIN => ($^O ne 'MSWin32') ? 1 : 0,
96 26     26   211 };
  26         45  
97              
98             =head2 new($class, %opts)
99              
100             The class constructor.
101              
102             =over
103              
104             =item host
105              
106             It's the obligatory option.
107             Sets the name/address of remote host to be requested.
108              
109             =item port
110              
111             By default it's equal to 80.
112             Sets the port number of remote point.
113              
114             =item slots
115              
116             By default it's equal to 5.
117             Sets the maximum amount of slots.
118             These slot will be filled one by one if required.
119              
120             =item ssl
121              
122             By default it's equal to 0 (means HTTP).
123             Sets the scheme of requests: HTTP or HTTPS.
124              
125             =item ssl_opts
126              
127             It's a HashRef with options to control SSL Layer.
128             See C constructor arguments for details.
129              
130             =item connect_timeout
131              
132             By default it's equal to 1.
133             Sets connection timeout in seconds.
134              
135             If it's equal to 0, then there will be no timeout restrictions.
136              
137             =item request_timeout
138              
139             By default it's equal to 1.
140             Sets the time in seconds with granular accuracy as micro seconds.
141             The awaiting time of response will be limited with this value.
142              
143             In case of 0 value there will be no time restrictions.
144              
145             =item sol_socket
146              
147             It's a HashRef with socket options.
148             The possible keys are:
149              
150             B - enables TCP KeepAlive on socket.
151             The default value is 1 (means that option is enabled).
152              
153             =item B
154              
155             WARNING: These options can be unsupported on some OS platforms.
156              
157             It's a HashRef with socket TCP-options.
158              
159             If some key is absent in HashRef then system settings will be used.
160              
161             The supported keys are shown below:
162              
163             B - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes
164              
165             B - the time (in seconds) between individual keepalive probes
166              
167             B - the maximum number of keepalive probes TCP should send before dropping the connection.
168              
169             =item inactivity_conn_ts
170              
171             If last response was received C seconds or more ago,
172             then such slots will be destroyed.
173              
174             By default the value is 0 (disabled).
175              
176             =item debug
177              
178             Enables debug mode. The dbug messages will be printed in STDERR.
179             By default the value is 0 (disabled).
180              
181             =back
182              
183             =cut
184              
185 5     5 1 63099 sub new ($class, %opts) {
  5         81  
  5         261  
  5         58  
186 5 50       138 croak("host is mandatory") if (! $opts{'host'});
187             my $self = bless({
188             'slots' => 5,
189             'ssl' => 0,
190             'ssl_opts' => undef,
191 5 100       357 'port' => $opts{'ssl'} ? 443 : 80,
192             'request_timeout' => 1, # 1 sec
193             'connect_timeout' => 1, # 1 sec
194             'sol_socket' => {
195             'so_keepalive' => 1,
196             },
197             'sol_tcp' => {},
198             'inactivity_conn_ts' => 0,
199             %opts,
200             '_conns' => [],
201             }, $class);
202 5         98 return $self;
203             }
204              
205 19     19   50 sub _connect ($self, $slot, $proto, $peer_addr) {
  19         59  
  19         39  
  19         38  
  19         49  
  19         76  
206              
207 19 50       117 warn("Connecting\n") if $self->{'debug'};
208              
209 19 50       1031 socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!");
210 19 50       1427 connect($socket, $peer_addr) || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS
211              
212             # When a constant is used in an expression, Perl replaces it with its value at compile time,
213             # and may then optimize the expression further. In particular, any code in an if (CONSTANT)
214             # block will be optimized away if the constant is false.
215 19 50       222 if (&IS_NOT_WIN) {
216 19 50       235 fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!");
217             }
218              
219 19 50       168 if (&IS_WIN) {
220 0         0 $socket = IO::Socket::IP->new_from_fd(fileno($socket), '+<');
221 0 0       0 defined($socket->blocking(0)) or croak("can't set non-blocking state on socket: $!");
222             #$socket->sockopt(O_NOINHERIT, 1) or croak("fcntl error has occurred: $!"); # the same as SOCK_CLOEXEC
223             }
224              
225 19   50     145 my $sol_socket_opts = $self->{'sol_socket'} // {};
226              
227 19 100       346 if (exists($sol_socket_opts->{'so_keepalive'})) {
228 17 50       211 setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!");
229              
230 17 50       105 if ($sol_socket_opts->{'so_keepalive'}) {
231 17   50     102 my $sol_tcp_opts = $self->{'sol_tcp'} // {};
232 17         47 state $SOL_TCP = &IPPROTO_TCP();
233              
234 17 100       64 if (exists($sol_tcp_opts->{'tcp_keepidle'})) {
235 8 50       74 setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!");
236             }
237              
238 17 100       72 if (exists($sol_tcp_opts->{'tcp_keepintvl'})) {
239 8 50       69 setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!");
240             }
241              
242 17 100       79 if (exists($sol_tcp_opts->{'tcp_keepcnt'})) {
243 8 50       59 setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!");
244             }
245             }
246             }
247              
248 19         121 $slot->{'connected_ts'} = time();
249 19         139 $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
250 19         66 $slot->{'sock_no'} = fileno($socket);
251 19 100       128 if ($self->{'ssl'}) {
252 2   50     83 my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
253 2 50       174639 croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
254 2         105 $ssl_socket->blocking(0); # just to be sure
255 2         87 $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
256             }
257             }
258              
259 19     19   52 sub _connect_slot ($self, $slot) {
  19         63  
  19         75  
  19         50  
260 19         80 my $timeout = $self->{'connect_timeout'};
261              
262 19 50       114 if ($timeout > 0) {
263 19         84 eval {
264 19     0   713 local $SIG{'ALRM'} = sub { die "alarm\n" };
  0         0  
265 19         257 alarm($timeout);
266 19         95 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  19         188  
267 19         454 alarm(0);
268             };
269              
270 19         70 my $error = $@;
271              
272 19         114 alarm(0);
273              
274 19 50       150 if ($error) {
275 0 0       0 croak($error) if ($error ne "alarm\n");
276 0         0 $self->_mark_request_as_timeouted($slot, 'Connect timeout');
277             }
278             } else {
279 0         0 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  0         0  
280             }
281             }
282              
283 14     14   61 sub _make_connections ($self, $amount) {
  14         76  
  14         72  
  14         41  
284              
285 14         2244 my $host_addr = inet_aton($self->{'host'});
286 14 50       233 croak("can't call inet_aton") if (! $host_addr);
287              
288 14   66     276 $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
289 14   66     455 $self->{'proto'} //= getprotobyname("tcp");
290              
291 14         83 for (1 .. $amount) {
292 14         126 my $slot = $self->_make_slot();
293 14         107 $self->_connect_slot($slot);
294 14         82 $self->_add_slot($slot);
295             }
296             }
297              
298 14     14   51 sub _add_slot ($self, $slot) {
  14         36  
  14         178  
  14         56  
299 14 50       162 push($self->{'_conns'}->@*, $slot) if ($slot);
300             }
301              
302 14     14   46 sub _make_slot ($self) {
  14         63  
  14         54  
303             return {
304 14         560 'reader' => undef,
305             'writer' => undef,
306             'socket' => undef,
307             'sock_no' => 0,
308             'is_busy' => 0,
309             'request' => undef,
310             'tx' => undef,
311             'exp_ts' => 0,
312             'tmp_response' => undef,
313             'reconnect_is_required' => 0,
314             'last_response_ts' => 0,
315             'connected_ts' => 0,
316             };
317             }
318              
319 98892     98892   109235 sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') {
  98892         117182  
  98892         118447  
  98892         117701  
  98892         227205  
  98892         104825  
320              
321 98892         134358 my $message = $reason;
322              
323 98892 50       178887 if (!$message) {
324 98892 50 33     476171 $message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error';
325             }
326              
327 98892         3632142 for my $slot_no (keys %$socks2slots) {
328 126506 50       322284 if ( vec($error_handles, $slot_no, 1) != 0 ) {
329 0         0 my $slot = $socks2slots->{ $slot_no };
330 0         0 $self->_mark_response_as_broken($slot, 520, $message);
331             }
332             }
333             }
334              
335 35     35   75 sub _get_free_slot ($self) {
  35         100  
  35         90  
336              
337 35         82 my $slot;
338 22         217 my %socks2slots = map { $_->{'sock_no'} => $_ }
339 41 50 66     418 grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
340 35         184 $self->{'_conns'}->@*;
341              
342 35 100       187 if (%socks2slots) {
343              
344 19         210 local $!;
345 19         61 my $write_handles = '';
346              
347 19         244 vec($write_handles, $_, 1) = 1 for keys %socks2slots;
348              
349 19         66 my $error_handles = $write_handles;
350 19         287 my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0);
351              
352 19         166 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
353              
354 19 50       56 if ($nfound) {
355 19     19   239 my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots;
  19         80  
356 19         119 $slot = $socks2slots{ $slot_no };
357             }
358             }
359              
360 35         154 return $slot;
361             }
362              
363             =head2 add ($self, $request_or_uri, $timeout = undef)
364              
365             Adds HTTP request into empty slot.
366              
367             If the request was successfully added, then it will return 1.
368             Otherwise it will return 0.
369              
370             The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to
371             the limit of slot's amount had been reached (see C and C.
372              
373             It's recommendable always to check result code of this method.
374              
375             Example:
376              
377             my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1);
378              
379             # let's occupy the only slot
380             $ua->add('/page1.html');
381              
382             # let's wait until it's become free again
383             while ( ! $ua->add('/page2.html') ) {
384             while (my $tx = $ua->wait_for_next_response() ) {
385             # do something here
386             }
387             }
388              
389             =over
390              
391             =item $request_or_uri
392              
393             It can be either an instance of C class, or an instance of C.
394             It also can be a simple URI srtring.
395              
396             If the resourse contains the host, then it must be the same as in the constructor C.
397              
398             Using of string with URI or an instance of C class assumes that GET HTTP method will be used.
399              
400             =item $timeout
401              
402             Time in seconds. Can be fractional with microseconds tolerance.
403              
404             The C from conmtrucor will be used by default.
405              
406             =back
407              
408             =cut
409              
410 22     22 1 7158 sub add ($self, $request_or_uri, $timeout = undef) {
  22         83  
  22         197  
  22         81  
  22         39  
411 22         86 my $status = 0;
412 22         206 my $slot = $self->_get_free_slot();
413              
414 22 100 100     435 if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) {
415 13         132 $self->_make_connections(1);
416 13         75 $slot = $self->_get_free_slot();
417             }
418              
419 22 100       85 if ($slot) {
420 19         68 my $request = $request_or_uri;
421 19 100 66     136 if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) {
      100        
422 18         616 $request = Mojo::Message::Request->new();
423 18         447 $request->url()->parse($request_or_uri);
424             }
425 19 50       3871 if ($request) {
426 19         137 $self->_send_request($slot, $request, $timeout);
427 19         42 $status = 1;
428             }
429             }
430              
431 22         257 return $status;
432             }
433              
434 26     26   46 sub _clear_slot ($self, $slot, $force = 0) {
  26         51  
  26         46  
  26         51  
  26         37  
435 26         51 $slot->{'is_busy'} = 0;
436 26         50 $slot->{'exp_ts'} = 0;
437 26         55 $slot->{'tx'} = undef;
438 26         47 $slot->{'request'} = undef;
439 26         247 $slot->{'tmp_response'} = undef;
440 26 100       75 if ($force) {
441 14 50       885 close($slot->{'socket'}) if $slot->{'socket'};
442 14         66 $slot->{'socket'} = undef;
443 14         30 $slot->{'reader'} = undef;
444 14         67 $slot->{'writer'} = undef;
445 14         27 $slot->{'sock_no'} = 0;
446 14         24 $slot->{'reconnect_is_required'} = 0;
447 14         23 $slot->{'last_response_ts'} = 0;
448 14         53 $slot->{'connected_ts'} = 0;
449             }
450             }
451              
452 6     6   14 sub _mark_slot_as_broken($self, $slot) {
  6         15  
  6         13  
  6         8  
453 6         12 $slot->{'reconnect_is_required'} = 1;
454 6         23 $slot->{'is_busy'} = 1;
455 6   33     31 $slot->{'request'} //= Mojo::Message::Request->new();
456             $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
457 6   33     213 'req' => $slot->{'request'},
458             'res' => Mojo::Message::Response->new()
459             );
460             }
461              
462 0     0   0 sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
463 0         0 $self->_mark_slot_as_broken($slot);li
464 0         0 $slot->{'request'}->error({'message' => $msg, 'code' => $code});
465             }
466              
467 6     6   14 sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  6         8  
  6         11  
  6         12  
  6         10  
  6         9  
468 6         24 $self->_mark_slot_as_broken($slot);
469              
470 6         380 my $res = $slot->{'tx'}->res();
471 6         178 $res->error({'message' => $msg, 'code' => $code});
472 6         276 $res->headers()->content_length(0);
473 6         596 $res->code($code);
474 6         66 $res->message($msg);
475             }
476              
477 0     0   0 sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
  0         0  
  0         0  
  0         0  
  0         0  
478 0         0 $self->_mark_request_as_broken($slot, 524, $message);
479             }
480              
481 6     6   13 sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
  6         11  
  6         10  
  6         32  
  6         14  
482 6         28 $self->_mark_response_as_broken($slot, 524, $message);
483             }
484              
485 19     19   44 sub _send_request ($self, $slot, $request, $timeout = undef) {
  19         43  
  19         68  
  19         54  
  19         34  
  19         30  
486              
487 19 50       101 croak("slot is busy") if ($slot->{'is_busy'});
488 19 50       66 croak("request object is obligatory") if (!$request);
489 19 50       167 croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
490              
491 19 100       113 my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
492 19         78 my $url = $request->url();
493 19         399 my $uri = URI->new( $url );
494 19         46539 my $scheme = $url->scheme();
495              
496 19 50 33     177 if ($scheme && $required_scheme ne $scheme) {
497 0         0 croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
498             }
499              
500 19 50       117 if (! $uri->scheme()) {
501             # URI::_generic doesn't have C method, that's why we set the scheme by ourseleves to change C<$uri> type
502 19         843 $uri->scheme($required_scheme);
503             }
504              
505 19 50       21664 if (my $host = $uri->host()) {
506 0 0       0 if ($host ne $self->{'host'}) {
507 0         0 croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'}));
508             }
509             }
510              
511 19   33     817 $timeout //= $self->{'request_timeout'};
512              
513 19         54 my $response = '';
514 19         39 state $default_ua_hdr = 'perl/' . __PACKAGE__;
515              
516 19         246 my $h = $request->headers();
517 19 100       2606 $h->host($self->{'host'}) if (! $h->host() );
518 19 100       457 $h->user_agent($default_ua_hdr) if (! $h->user_agent() );
519              
520 19         307 $slot->{'request'} = $request;
521 19         40 $slot->{'is_busy'} = 1;
522 19 50       166 $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
523              
524 19         190 my $plain_request = $request->to_string();
525              
526 19 100       15168 if ($self->{'ssl'}) {
527 2         42 $slot->{'writer'}->print($plain_request);
528             } else {
529 17         52 my $socket = $slot->{'socket'};
530 17         220 my $msg_len = bytes::length($plain_request);
531 17         4865 my $sent_bytes = 0;
532 17         32 my $attempts = 10;
533              
534 17         98 local $!;
535              
536 17   66     151 while ($sent_bytes < $msg_len && $attempts--) {
537 17         911 my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes);
538              
539 17 50 33     178 if ($! || ! defined($bytes)) {
540 0   0     0 my $error = $! // 'Unknown error';
541 0         0 $self->_mark_request_as_broken($slot, 520, $error);
542 0         0 return;
543             }
544              
545 17         30 $sent_bytes += $bytes;
546 17 50       85 $plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len;
547             }
548              
549 17 50       82 if ($sent_bytes < $msg_len) {
550 0   0     0 my $error = $! // 'sent message is shorter than original';
551 0         0 $self->_mark_request_as_broken($slot, 520, $error);
552 0         0 return;
553             }
554             }
555              
556 19 50 33     631 if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
557 0         0 $self->_mark_request_as_timeouted($slot);
558             }
559              
560 19         76 return;
561             }
562              
563 6     6   14 sub _try_to_read ($self, $slot) {
  6         12  
  6         8  
  6         10  
564              
565 6 50 33     98 return if $slot->{'tx'} || ! $slot->{'is_busy'};
566              
567 6         16 my $reader = $slot->{'reader'};
568 6   33     161 my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
569              
570 6         289 $response->parse($_) while (<$reader>);
571              
572 6 50 33     9245 if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
    50 33        
      33        
573 0         0 $self->_mark_response_as_broken($slot, 520, $!);
574             } elsif ($response && $response->code()) {
575              
576 6         176 my $content = $response->content();
577              
578 6 50       39 if ($content->is_finished()) { # this is required to support "Transfer-Encoding: chunked"
579             $slot->{'tx'} = Mojo::Transaction::HTTP->new(
580 6         149 'req' => $slot->{'request'},
581             'res' => $response
582             );
583 6         97 $slot->{'tmp_response'} = undef;
584 6         22 $slot->{'last_response_ts'} = time();
585             } else {
586 0         0 $slot->{'tmp_response'} = $response;
587             }
588              
589 6 50       18 $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
590             }
591              
592 6 0 0     57 if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      33        
593 0         0 $self->_mark_response_as_timeouted($slot);
594             }
595             }
596              
597             =head2 not_empty($self)
598              
599             Returns 1 if there even one slot is busy or slot contains a not processed response.
600             Otherwise the method returns 0.
601              
602             =cut
603              
604 98869     98869 1 142819 sub not_empty ($self) {
  98869         103501  
  98869         88510  
605              
606 98869         119317 my $not_empty = scalar $self->{'_conns'}->@*;
607              
608 98869         155147 for my $slot ($self->{'_conns'}->@*) {
609 184056 50 66     382158 $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
610             }
611              
612 98869 100       249046 return $not_empty ? 1 : 0;
613             }
614              
615              
616             =head2 wait_for_next_response($self, $timeout = 0)
617              
618             Waits for first received response or time-outed request in any slot.
619             Returns the C instance with result.
620              
621             =over
622              
623             =item $timeout
624              
625             Period of time in seconds. Can be fractial with microsecond tollerance.
626             The response will be marked as time-outed after this time is out.
627              
628             The default value is 0, which means that request will have been blocked until the response is received.
629              
630             If all slots are empty, then C will be returned.
631              
632             =back
633              
634             =cut
635              
636 12     12 1 14729 sub wait_for_next_response ($self, $timeout = 0) {
  12         25  
  12         20  
  12         13  
637              
638 12         17 my $response;
639 12 50       55 my $exp_ts = $timeout ? (time() + $timeout) : 0;
640              
641 12         82 while (1) {
642 90871 50 33     643939 last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed
643 90871 100 100     248285 last if (($response = $self->next_response()) || !$self->not_empty());
644 90859 50       8180305 select(undef, undef, undef, 1E-6) if (!$response); # sleep 1 microsecond
645             }
646              
647 12         47 return $response;
648             }
649              
650             =head2 next_response ($self)
651              
652             Returns an instance of C class.
653             If there is no response, it will return C.
654              
655             =cut
656              
657 98873     98873 1 108740 sub next_response ($self) {
  98873         113466  
  98873         99801  
658 98873   66     186185 return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot();
659             }
660              
661 98861     98861   108714 sub _get_response_from_slot ($self) {
  98861         98033  
  98861         90341  
662              
663 98861         95235 my $tx;
664 98861     184042   451335 my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*;
  184042         234928  
665              
666 98861 50       253878 if ($slot) {
667 0         0 $tx = $slot->{'tx'};
668 0         0 $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
669             }
670              
671 98861         341132 return $tx;
672             }
673              
674 98873     98873   99284 sub _get_response_from_ready_slot ($self) {
  98873         103607  
  98873         104049  
675              
676 98873         96834 my $tx;
677 126475         411153 my %socks2slots = map { $_->{'sock_no'} => $_ }
678 184064 100 66     804485 grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
679 98873         216910 $self->{'_conns'}->@*;
680              
681 98873 100       208043 if (%socks2slots) {
682              
683 98868         456117 local $!;
684 98868         137618 my $read_handles = '';
685              
686 98868         476064 vec($read_handles, $_, 1) = 1 for keys %socks2slots;
687              
688 98868         148643 my $error_handles = $read_handles;
689 98868         781221 my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
690              
691 98868         407115 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
692              
693 98868         165287 for my $sock_no (keys %socks2slots) {
694 126474         151946 my $slot = $socks2slots{ $sock_no };
695 126474 100 100     213737 if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
696 6         28 $self->_try_to_read($slot);
697 6 50       20 next if ! $slot->{'tx'};
698 6 50       25 next if ! $slot->{'is_busy'};
699 6         14 $tx = $slot->{'tx'};
700             } else {
701 126468 100 66     562307 if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      66        
702 6         39 $self->_mark_response_as_timeouted($slot);
703 6         41 $tx = $slot->{'tx'};
704             }
705             }
706              
707 126474 100       334746 if ($tx) {
708 12         50 $self->_clear_slot($slot, 0);
709 12         54 last;
710             }
711             }
712             }
713              
714 98873         317764 return $tx;
715             }
716              
717             =head2 refresh_connections ($self)
718              
719             Closes connections in slots in the following cases:
720              
721             1. The slot was marked as timeouted
722             2. The "inactivity_conn_ts" was set and the connection was expired
723             3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc)
724              
725             Returns the amount of made reconnections.
726              
727             =cut
728              
729 5     5 1 17 sub refresh_connections ($self) {
  5         19  
  5         11  
730              
731 5         18 my $n = 0;
732 5         31 my $now = time();
733 5   50     79 my $keep_ts = $self->{'inactivity_conn_ts'} // 0;
734              
735 5 50       32 if (scalar $self->{'_conns'}->@*) {
736              
737 5         77 local $!;
738 5         18 my $error_handles = '';
739 5         33 my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*;
  9         88  
740              
741 5         97 vec($error_handles, $_, 1) = 1 for keys %socks2slots;
742 5         86 select(undef, undef, $error_handles, 0);
743              
744 5         81 $self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect
745             }
746              
747 5         32 for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) {
  5         23  
748 9         25 my $slot = $self->{'_conns'}->[$i];
749 9   33     135 my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts;
750 9   100     51 my $is_outdated = $keep_ts && $slot_exp_ts <= $now;
751              
752 9 50 33     52 warn("Outdated\n") if $self->{'debug'} && $is_outdated;
753              
754 9 100 100     55 if ($slot->{'reconnect_is_required'} || $is_outdated) {
755 5 50       18 warn("Going to reconnect\n") if $self->{'debug'};
756 5         23 $self->_clear_slot($slot, 1);
757 5         25 $self->_connect_slot($slot);
758 5         14 $n++;
759             }
760             }
761              
762 5         30 return $n;
763             }
764              
765             =head2 close_all ($self)
766              
767             Closes all opened connections and resets all slots with requests.
768              
769             =cut
770              
771 5     5 1 9797 sub close_all ($self) {
  5         15  
  5         9  
772 5         33 $self->_clear_slot($_, 1) for $self->{'_conns'}->@*;
773 5         42 $self->{'_conns'} = [];
774 5         19 return;
775             }
776              
777             =head2 DESTROY($self)
778              
779             The class destructor.
780              
781             Closes all opened sockets.
782              
783             =cut
784              
785 5     5   14377 sub DESTROY ($self) {
  5         32  
  5         18  
786 5         16 my $in_use = 0;
787 5         42 while ( my $slot = shift($self->{'_conns'}->@*) ) {
788 5 50       727 $in_use++ if ($slot->{'is_busy'});
789 5 50       136 $slot->{'socket'}->close() if ($slot->{'socket'});
790             }
791 5 50       539 warn ref($self) ." object destroyed but still in use" if $in_use;
792             }
793              
794             1;
795             __END__