File Coverage

lib/MojoX/HTTP/Async.pm
Criterion Covered Total %
statement 376 413 91.0
branch 102 164 62.2
condition 59 111 53.1
subroutine 42 45 93.3
pod 7 7 100.0
total 586 740 79.1


line stmt bran cond sub pod time code
1             package MojoX::HTTP::Async;
2              
3             =encoding utf-8
4              
5             =head1 NAME
6              
7             MojoX::HTTP::Async - The simple package to execute multiple parallel requests to the same host
8              
9             =head1 SYNOPSIS
10              
11             use MojoX::HTTP::Async ();
12             use Mojo::Message::Request ();
13             use Mojo::URL ();
14              
15             # creates new instance for async requests
16             # restricts max amount of simultaneously executed requests
17             my $ua = MojoX::HTTP::Async->new('host' => 'my-site.com', 'slots' => 4);
18              
19             # let's fill slots
20             $ua->add( '/page1.html?lang=en');
21             $ua->add( 'http://my-site.com/page2.html');
22             $ua->add( Mojo::URL->new("/page/03.html") );
23             $ua->add( Mojo::Message::Request->new() );
24              
25             # non-blocking requests processing
26             while ( $ua->not_empty() ) {
27             if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class
28             print $tx->res->headers->to_string;
29             } else {
30             # do something else
31             }
32             }
33              
34             # blocking requests processing
35             while (my $tx = $ua->wait_for_next_response($timeout)) {
36             # do something here
37             }
38              
39             # how to process connect timeouts
40             if (my $error = $tx->req()->error()) {
41             say $error->{code};
42             say $error->{message};
43             }
44              
45             # how to process request timeouts and other errors sucn as broken pipes, etc
46             if (my $error = $tx->res()->error()) {
47             say $error->{code};
48             say $error->{message};
49             }
50              
51             # makes reconnection if either slot was timeouted or was inactive too long
52             $ua->refresh_connections();
53              
54             # closes everything
55             $ua->close_all();
56              
57             =head1 DESCRIPTION
58              
59             This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode.
60              
61             In comparison with C, this library doesn't make a new connection on each request.
62              
63             And in comparison with C, it's it's more intuitive how to use it, and there is no any Singleton restrictions.
64              
65             The instance of this class can work only with one domain and scheme: either HTTP or HTTPS.
66              
67             =head1 LICENSE
68              
69             This module is distributed under terms of Artistic Perl 5 license.
70              
71             =cut
72              
73 27     27   9351025 use 5.020;
  27         276  
74 27     27   149 use warnings;
  27         54  
  27         751  
75 27     27   131 use bytes ();
  27         54  
  27         1001  
76 27     27   180 use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /;
  27         76  
  27         2676  
77             #use IO::Socket::IP ();
78 27     27   279 use IO::Socket::SSL ();
  27         61  
  27         876  
79 27     27   184 use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC O_NOINHERIT /;
  27         42  
  27         1978  
80 27     27   158 use experimental qw/ signatures /;
  27         51  
  27         212  
81 27     27   4477 use Carp qw/ croak /;
  27         93  
  27         1268  
82 27     27   140 use List::Util qw/ first /;
  27         60  
  27         1762  
83 27     27   152 use Time::HiRes qw/ time /;
  27         57  
  27         189  
84 27     27   12412 use Mojo::Message::Request ();
  27         4794560  
  27         625  
85 27     27   13653 use Mojo::Message::Response ();
  27         195020  
  27         707  
86 27     27   13195 use Mojo::Transaction::HTTP ();
  27         93706  
  27         642  
87 27     27   15216 use URI ();
  27         128187  
  27         744  
88 27     27   213 use Scalar::Util qw/ blessed /;
  27         63  
  27         1692  
89 27     27   176 use Errno qw / :POSIX /;
  27         58  
  27         10457  
90              
91             our $VERSION = 0.12;
92              
93             use constant {
94 27 50       134231 IS_WIN => ($^O eq 'MSWin32') ? 1 : 0,
    50          
95             IS_NOT_WIN => ($^O ne 'MSWin32') ? 1 : 0,
96 27     27   206 };
  27         43  
97              
98             =head2 new($class, %opts)
99              
100             The class constructor.
101              
102             =over
103              
104             =item host
105              
106             It's the obligatory option.
107             Sets the name/address of remote host to be requested.
108              
109             =item port
110              
111             By default it's equal to 80.
112             Sets the port number of remote point.
113              
114             =item slots
115              
116             By default it's equal to 5.
117             Sets the maximum amount of slots.
118             These slot will be filled one by one if required.
119              
120             =item ssl
121              
122             By default it's equal to 0 (means HTTP).
123             Sets the scheme of requests: HTTP or HTTPS.
124              
125             =item ssl_opts
126              
127             It's a HashRef with options to control SSL Layer.
128             See C constructor arguments for details.
129              
130             =item connect_timeout
131              
132             By default it's equal to 1.
133             Sets connection timeout in seconds.
134              
135             If it's equal to 0, then there will be no timeout restrictions.
136              
137             =item request_timeout
138              
139             By default it's equal to 1.
140             Sets the time in seconds with granular accuracy as micro seconds.
141             The awaiting time of response will be limited with this value.
142              
143             In case of 0 value there will be no time restrictions.
144              
145             =item sol_socket
146              
147             It's a HashRef with socket options.
148             The possible keys are:
149              
150             B - enables TCP KeepAlive on socket.
151             The default value is 1 (means that option is enabled).
152              
153             =item B
154              
155             WARNING: These options can be unsupported on some OS platforms.
156              
157             It's a HashRef with socket TCP-options.
158              
159             If some key is absent in HashRef then system settings will be used.
160              
161             The supported keys are shown below:
162              
163             B - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes
164              
165             B - the time (in seconds) between individual keepalive probes
166              
167             B - the maximum number of keepalive probes TCP should send before dropping the connection.
168              
169             =item inactivity_conn_ts
170              
171             If last response was received C seconds or more ago,
172             then such slots will be destroyed.
173              
174             By default the value is 0 (disabled).
175              
176             =item debug
177              
178             Enables debug mode. The dbug messages will be printed in STDERR.
179             By default the value is 0 (disabled).
180              
181             =back
182              
183             =cut
184              
185 5     5 1 54928 sub new ($class, %opts) {
  5         65  
  5         194  
  5         55  
186 5 50       155 croak("host is mandatory") if (! $opts{'host'});
187             my $self = bless({
188             'slots' => 5,
189             'ssl' => 0,
190             'ssl_opts' => undef,
191 5 100       365 'port' => $opts{'ssl'} ? 443 : 80,
192             'request_timeout' => 1, # 1 sec
193             'connect_timeout' => 1, # 1 sec
194             'sol_socket' => {
195             'so_keepalive' => 1,
196             },
197             'sol_tcp' => {},
198             'inactivity_conn_ts' => 0,
199             %opts,
200             '_conns' => [],
201             }, $class);
202 5         82 return $self;
203             }
204              
205 20     20   69 sub _connect ($self, $slot, $proto, $peer_addr) {
  20         64  
  20         47  
  20         44  
  20         58  
  20         50  
206              
207 20 50       130 warn("Connecting\n") if $self->{'debug'};
208              
209 20 50       1020 socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!");
210 20 50       1495 connect($socket, $peer_addr) || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS
211              
212             # When a constant is used in an expression, Perl replaces it with its value at compile time,
213             # and may then optimize the expression further. In particular, any code in an if (CONSTANT)
214             # block will be optimized away if the constant is false.
215 20 50       207 if (&IS_NOT_WIN) {
216 20 50       226 fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!");
217             }
218              
219 20 50       156 if (&IS_WIN) {
220 0         0 $socket = IO::Socket::IP->new_from_fd(fileno($socket), '+<');
221 0 0       0 defined($socket->blocking(0)) or croak("can't set non-blocking state on socket: $!");
222             #$socket->sockopt(O_NOINHERIT, 1) or croak("fcntl error has occurred: $!"); # the same as SOCK_CLOEXEC
223             }
224              
225 20   50     144 my $sol_socket_opts = $self->{'sol_socket'} // {};
226              
227 20 100       363 if (exists($sol_socket_opts->{'so_keepalive'})) {
228 17 50       242 setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!");
229              
230 17 50       106 if ($sol_socket_opts->{'so_keepalive'}) {
231 17   50     90 my $sol_tcp_opts = $self->{'sol_tcp'} // {};
232 17         53 state $SOL_TCP = &IPPROTO_TCP();
233              
234 17 100       93 if (exists($sol_tcp_opts->{'tcp_keepidle'})) {
235 8 50       73 setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!");
236             }
237              
238 17 100       82 if (exists($sol_tcp_opts->{'tcp_keepintvl'})) {
239 8 50       63 setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!");
240             }
241              
242 17 100       67 if (exists($sol_tcp_opts->{'tcp_keepcnt'})) {
243 8 50       62 setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!");
244             }
245             }
246             }
247              
248 20         110 $slot->{'connected_ts'} = time();
249 20         117 $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
250 20         77 $slot->{'sock_no'} = fileno($socket);
251 20 100       124 if ($self->{'ssl'}) {
252 3   50     78 my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
253 3 50       242746 croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
254 3         17 $ssl_socket->blocking(0); # just to be sure
255 3         51 $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
256             }
257             }
258              
259 20     20   60 sub _connect_slot ($self, $slot) {
  20         65  
  20         55  
  20         47  
260 20         68 my $timeout = $self->{'connect_timeout'};
261              
262 20 50       133 if ($timeout > 0) {
263 20         98 eval {
264 20     0   724 local $SIG{'ALRM'} = sub { die "alarm\n" };
  0         0  
265 20         188 alarm($timeout);
266 20         88 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  20         187  
267 20         450 alarm(0);
268             };
269              
270 20         139 my $error = $@;
271              
272 20         116 alarm(0);
273              
274 20 50       115 if ($error) {
275 0 0       0 croak($error) if ($error ne "alarm\n");
276 0         0 $self->_mark_request_as_timeouted($slot, 'Connect timeout');
277             }
278             } else {
279 0         0 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  0         0  
280             }
281             }
282              
283 15     15   77 sub _make_connections ($self, $amount) {
  15         67  
  15         47  
  15         58  
284              
285 15         2201 my $host_addr = inet_aton($self->{'host'});
286 15 50       168 croak("can't call inet_aton") if (! $host_addr);
287              
288 15   66     273 $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
289 15   66     437 $self->{'proto'} //= getprotobyname("tcp");
290              
291 15         82 for (1 .. $amount) {
292 15         110 my $slot = $self->_make_slot();
293 15         187 $self->_connect_slot($slot);
294 15         75 $self->_add_slot($slot);
295             }
296             }
297              
298 15     15   40 sub _add_slot ($self, $slot) {
  15         34  
  15         171  
  15         26  
299 15 50       94 push($self->{'_conns'}->@*, $slot) if ($slot);
300             }
301              
302 15     15   80 sub _make_slot ($self) {
  15         37  
  15         58  
303             return {
304 15         424 'reader' => undef,
305             'writer' => undef,
306             'socket' => undef,
307             'sock_no' => 0,
308             'is_busy' => 0,
309             'request' => undef,
310             'tx' => undef,
311             'exp_ts' => 0,
312             'tmp_response' => undef,
313             'reconnect_is_required' => 0,
314             'last_response_ts' => 0,
315             'connected_ts' => 0,
316             };
317             }
318              
319 70497     70497   80635 sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') {
  70497         79514  
  70497         80490  
  70497         77790  
  70497         155865  
  70497         69744  
320              
321 70497         85842 my $message = $reason;
322              
323 70497 50       122797 if (!$message) {
324 70497 50 33     320910 $message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error';
325             }
326              
327 70497         2610310 for my $slot_no (keys %$socks2slots) {
328 98110 50       238362 if ( vec($error_handles, $slot_no, 1) != 0 ) {
329 0         0 my $slot = $socks2slots->{ $slot_no };
330 0         0 $self->_mark_response_as_broken($slot, 520, $message);
331             }
332             }
333             }
334              
335 36     36   76 sub _get_free_slot ($self) {
  36         104  
  36         72  
336              
337 36         95 my $slot;
338 22         186 my %socks2slots = map { $_->{'sock_no'} => $_ }
339 41 50 66     420 grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
340 36         195 $self->{'_conns'}->@*;
341              
342 36 100       208 if (%socks2slots) {
343              
344 19         198 local $!;
345 19         73 my $write_handles = '';
346              
347 19         210 vec($write_handles, $_, 1) = 1 for keys %socks2slots;
348              
349 19         71 my $error_handles = $write_handles;
350 19         274 my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0);
351              
352 19         146 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
353              
354 19 50       61 if ($nfound) {
355 19     19   238 my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots;
  19         78  
356 19         108 $slot = $socks2slots{ $slot_no };
357             }
358             }
359              
360 36         115 return $slot;
361             }
362              
363             =head2 add ($self, $request_or_uri, $timeout = undef)
364              
365             Adds HTTP request into empty slot.
366              
367             If the request was successfully added, then it will return 1.
368             Otherwise it will return 0.
369              
370             The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to
371             the limit of slot's amount had been reached (see C and C.
372              
373             It's recommendable always to check result code of this method.
374              
375             Example:
376              
377             my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1);
378              
379             # let's occupy the only slot
380             $ua->add('/page1.html');
381              
382             # let's wait until it's become free again
383             while ( ! $ua->add('/page2.html') ) {
384             while (my $tx = $ua->wait_for_next_response() ) {
385             # do something here
386             }
387             }
388              
389             =over
390              
391             =item $request_or_uri
392              
393             It can be either an instance of C class, or an instance of C.
394             It also can be a simple URI srtring.
395              
396             If the resourse contains the host, then it must be the same as in the constructor C.
397              
398             Using of string with URI or an instance of C class assumes that GET HTTP method will be used.
399              
400             =item $timeout
401              
402             Time in seconds. Can be fractional with microseconds tolerance.
403              
404             The C from conmtrucor will be used by default.
405              
406             =back
407              
408             =cut
409              
410 22     22 1 6765 sub add ($self, $request_or_uri, $timeout = undef) {
  22         98  
  22         131  
  22         85  
  22         93  
411 22         90 my $status = 0;
412 22         203 my $slot = $self->_get_free_slot();
413              
414 22 100 100     309 if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) {
415 14         124 $self->_make_connections(1);
416 14         53 $slot = $self->_get_free_slot();
417             }
418              
419 22 100       74 if ($slot) {
420 19         52 my $request = $request_or_uri;
421 19 100 66     148 if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) {
      100        
422 18         551 $request = Mojo::Message::Request->new();
423 18         338 $request->url()->parse($request_or_uri);
424             }
425 19 50       3546 if ($request) {
426 19         151 $self->_send_request($slot, $request, $timeout);
427 19         41 $status = 1;
428             }
429             }
430              
431 22         240 return $status;
432             }
433              
434 27     27   44 sub _clear_slot ($self, $slot, $force = 0) {
  27         39  
  27         44  
  27         53  
  27         33  
435 27         69 $slot->{'is_busy'} = 0;
436 27         55 $slot->{'exp_ts'} = 0;
437 27         46 $slot->{'tx'} = undef;
438 27         54 $slot->{'request'} = undef;
439 27         264 $slot->{'tmp_response'} = undef;
440 27 100       76 if ($force) {
441 15 50       821 close($slot->{'socket'}) if $slot->{'socket'};
442 15         69 $slot->{'socket'} = undef;
443 15         33 $slot->{'reader'} = undef;
444 15         83 $slot->{'writer'} = undef;
445 15         395 $slot->{'sock_no'} = 0;
446 15         27 $slot->{'reconnect_is_required'} = 0;
447 15         28 $slot->{'last_response_ts'} = 0;
448 15         55 $slot->{'connected_ts'} = 0;
449             }
450             }
451              
452 6     6   12 sub _mark_slot_as_broken($self, $slot) {
  6         12  
  6         11  
  6         8  
453 6         29 $slot->{'reconnect_is_required'} = 1;
454 6         13 $slot->{'is_busy'} = 1;
455 6   33     25 $slot->{'request'} //= Mojo::Message::Request->new();
456             $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
457 6   33     200 'req' => $slot->{'request'},
458             'res' => Mojo::Message::Response->new()
459             );
460             }
461              
462 0     0   0 sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
463 0         0 $self->_mark_slot_as_broken($slot);li
464 0         0 $slot->{'request'}->error({'message' => $msg, 'code' => $code});
465             }
466              
467 6     6   12 sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  6         15  
  6         12  
  6         12  
  6         24  
  6         9  
468 6         25 $self->_mark_slot_as_broken($slot);
469              
470 6         364 my $res = $slot->{'tx'}->res();
471 6         180 $res->error({'message' => $msg, 'code' => $code});
472 6         250 $res->headers()->content_length(0);
473 6         509 $res->code($code);
474 6         69 $res->message($msg);
475             }
476              
477 0     0   0 sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
  0         0  
  0         0  
  0         0  
  0         0  
478 0         0 $self->_mark_request_as_broken($slot, 524, $message);
479             }
480              
481 5     5   12 sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
  5         8  
  5         14  
  5         24  
  5         9  
482 5         25 $self->_mark_response_as_broken($slot, 524, $message);
483             }
484              
485 19     19   32 sub _send_request ($self, $slot, $request, $timeout = undef) {
  19         45  
  19         41  
  19         30  
  19         29  
  19         31  
486              
487 19 50       86 croak("slot is busy") if ($slot->{'is_busy'});
488 19 50       63 croak("request object is obligatory") if (!$request);
489 19 50       131 croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
490              
491 19 100       123 my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
492 19         72 my $url = $request->url();
493 19         372 my $uri = URI->new( $url );
494 19         39505 my $scheme = $url->scheme();
495              
496 19 50 33     184 if ($scheme && $required_scheme ne $scheme) {
497 0         0 croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
498             }
499              
500 19 50       160 if (! $uri->scheme()) {
501             # URI::_generic doesn't have C method, that's why we set the scheme by ourseleves to change C<$uri> type
502 19         741 $uri->scheme($required_scheme);
503             }
504              
505 19 50       19106 if (my $host = $uri->host()) {
506 0 0       0 if ($host ne $self->{'host'}) {
507 0         0 croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'}));
508             }
509             }
510              
511 19   33     806 $timeout //= $self->{'request_timeout'};
512              
513 19         47 my $response = '';
514 19         38 state $default_ua_hdr = 'perl/' . __PACKAGE__;
515              
516 19         251 my $h = $request->headers();
517 19 100       2376 $h->host($self->{'host'}) if (! $h->host() );
518 19 100       432 $h->user_agent($default_ua_hdr) if (! $h->user_agent() );
519              
520 19         296 $slot->{'request'} = $request;
521 19         39 $slot->{'is_busy'} = 1;
522 19 50       125 $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
523              
524 19         196 my $plain_request = $request->to_string();
525              
526 19 100       13641 if ($self->{'ssl'}) {
527 2         28 $slot->{'writer'}->print($plain_request);
528             } else {
529 17         35 my $socket = $slot->{'socket'};
530 17         212 my $msg_len = bytes::length($plain_request);
531 17         4580 my $sent_bytes = 0;
532 17         31 my $attempts = 10;
533              
534 17         100 local $!;
535              
536 17   66     160 while ($sent_bytes < $msg_len && $attempts--) {
537 17         844 my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes);
538              
539 17 50 33     175 if ($! || ! defined($bytes)) {
540 0   0     0 my $error = $! // 'Unknown error';
541 0         0 $self->_mark_request_as_broken($slot, 520, $error);
542 0         0 return;
543             }
544              
545 17         31 $sent_bytes += $bytes;
546 17 50       71 $plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len;
547             }
548              
549 17 50       80 if ($sent_bytes < $msg_len) {
550 0   0     0 my $error = $! // 'sent message is shorter than original';
551 0         0 $self->_mark_request_as_broken($slot, 520, $error);
552 0         0 return;
553             }
554             }
555              
556 19 50 33     572 if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
557 0         0 $self->_mark_request_as_timeouted($slot);
558             }
559              
560 19         71 return;
561             }
562              
563 7     7   11 sub _try_to_read ($self, $slot) {
  7         10  
  7         11  
  7         11  
564              
565 7 50 33     64 return if $slot->{'tx'} || ! $slot->{'is_busy'};
566              
567 7         17 my $reader = $slot->{'reader'};
568 7   33     161 my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
569              
570 7         291 $response->parse($_) while (<$reader>);
571              
572 7 100 66     9298 if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
    50 66        
      33        
573 1         46 $self->_mark_response_as_broken($slot, 520, $!);
574             } elsif ($response && $response->code()) {
575              
576 6         182 my $content = $response->content();
577              
578 6 50       39 if ($content->is_finished()) { # this is required to support "Transfer-Encoding: chunked"
579             $slot->{'tx'} = Mojo::Transaction::HTTP->new(
580 6         125 'req' => $slot->{'request'},
581             'res' => $response
582             );
583 6         114 $slot->{'tmp_response'} = undef;
584 6         27 $slot->{'last_response_ts'} = time();
585             } else {
586 0         0 $slot->{'tmp_response'} = $response;
587             }
588              
589 6 50       28 $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
590             }
591              
592 7 0 0     82 if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      33        
593 0         0 $self->_mark_response_as_timeouted($slot);
594             }
595             }
596              
597             =head2 not_empty($self)
598              
599             Returns 1 if there even one slot is busy or slot contains a not processed response.
600             Otherwise the method returns 0.
601              
602             =cut
603              
604 70474     70474 1 109404 sub not_empty ($self) {
  70474         73016  
  70474         64634  
605              
606 70474         86761 my $not_empty = scalar $self->{'_conns'}->@*;
607              
608 70474         99681 for my $slot ($self->{'_conns'}->@*) {
609 127390 50 66     247436 $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
610             }
611              
612 70474 100       185344 return $not_empty ? 1 : 0;
613             }
614              
615              
616             =head2 wait_for_next_response($self, $timeout = 0)
617              
618             Waits for first received response or time-outed request in any slot.
619             Returns the C instance with result.
620              
621             =over
622              
623             =item $timeout
624              
625             Period of time in seconds. Can be fractial with microsecond tollerance.
626             The response will be marked as time-outed after this time is out.
627              
628             The default value is 0, which means that request will have been blocked until the response is received.
629              
630             If all slots are empty, then C will be returned.
631              
632             =back
633              
634             =cut
635              
636 12     12 1 14466 sub wait_for_next_response ($self, $timeout = 0) {
  12         19  
  12         23  
  12         16  
637              
638 12         17 my $response;
639 12 50       59 my $exp_ts = $timeout ? (time() + $timeout) : 0;
640              
641 12         19 while (1) {
642 62555 50 33     453028 last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed
643 62555 100 100     156505 last if (($response = $self->next_response()) || !$self->not_empty());
644 62543 50       5532273 select(undef, undef, undef, 1E-6) if (!$response); # sleep 1 microsecond
645             }
646              
647 12         38 return $response;
648             }
649              
650             =head2 next_response ($self)
651              
652             Returns an instance of C class.
653             If there is no response, it will return C.
654              
655             =cut
656              
657 70478     70478 1 77865 sub next_response ($self) {
  70478         79332  
  70478         76361  
658 70478   66     119283 return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot();
659             }
660              
661 70466     70466   82893 sub _get_response_from_slot ($self) {
  70466         80370  
  70466         64316  
662              
663 70466         69766 my $tx;
664 70466     127376   337872 my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*;
  127376         164169  
665              
666 70466 50       191637 if ($slot) {
667 0         0 $tx = $slot->{'tx'};
668 0         0 $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
669             }
670              
671 70466         248446 return $tx;
672             }
673              
674 70478     70478   69745 sub _get_response_from_ready_slot ($self) {
  70478         70826  
  70478         69036  
675              
676 70478         71730 my $tx;
677 98079         314812 my %socks2slots = map { $_->{'sock_no'} => $_ }
678 127398 100 66     539870 grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
679 70478         146782 $self->{'_conns'}->@*;
680              
681 70478 100       157935 if (%socks2slots) {
682              
683 70473         328266 local $!;
684 70473         102781 my $read_handles = '';
685              
686 70473         358231 vec($read_handles, $_, 1) = 1 for keys %socks2slots;
687              
688 70473         110485 my $error_handles = $read_handles;
689 70473         576423 my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
690              
691 70473         300535 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
692              
693 70473         120059 for my $sock_no (keys %socks2slots) {
694 98076         128339 my $slot = $socks2slots{ $sock_no };
695 98076 100 66     158817 if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
696 7         27 $self->_try_to_read($slot);
697 7 50       24 next if ! $slot->{'tx'};
698 7 50       21 next if ! $slot->{'is_busy'};
699 7         14 $tx = $slot->{'tx'};
700             } else {
701 98069 100 66     415742 if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      66        
702 5         49 $self->_mark_response_as_timeouted($slot);
703 5         35 $tx = $slot->{'tx'};
704             }
705             }
706              
707 98076 100       269295 if ($tx) {
708 12         49 $self->_clear_slot($slot, 0);
709 12         55 last;
710             }
711             }
712             }
713              
714 70478         235644 return $tx;
715             }
716              
717             =head2 refresh_connections ($self)
718              
719             Closes connections in slots in the following cases:
720              
721             1. The slot was marked as timeouted
722             2. The "inactivity_conn_ts" was set and the connection was expired
723             3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc)
724              
725             Returns the amount of made reconnections.
726              
727             =cut
728              
729 5     5 1 17 sub refresh_connections ($self) {
  5         15  
  5         10  
730              
731 5         12 my $n = 0;
732 5         24 my $now = time();
733 5   50     53 my $keep_ts = $self->{'inactivity_conn_ts'} // 0;
734              
735 5 50       27 if (scalar $self->{'_conns'}->@*) {
736              
737 5         60 local $!;
738 5         32 my $error_handles = '';
739 5         25 my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*;
  9         58  
740              
741 5         76 vec($error_handles, $_, 1) = 1 for keys %socks2slots;
742 5         88 select(undef, undef, $error_handles, 0);
743              
744 5         53 $self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect
745             }
746              
747 5         26 for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) {
  5         21  
748 9         93 my $slot = $self->{'_conns'}->[$i];
749 9   33     79 my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts;
750 9   100     37 my $is_outdated = $keep_ts && $slot_exp_ts <= $now;
751              
752 9 50 33     32 warn("Outdated\n") if $self->{'debug'} && $is_outdated;
753              
754 9 100 100     49 if ($slot->{'reconnect_is_required'} || $is_outdated) {
755 5 50       21 warn("Going to reconnect\n") if $self->{'debug'};
756 5         24 $self->_clear_slot($slot, 1);
757 5         22 $self->_connect_slot($slot);
758 5         13 $n++;
759             }
760             }
761              
762 5         19 return $n;
763             }
764              
765             =head2 close_all ($self)
766              
767             Closes all opened connections and resets all slots with requests.
768              
769             =cut
770              
771 6     6 1 8288 sub close_all ($self) {
  6         28  
  6         11  
772 6         45 $self->_clear_slot($_, 1) for $self->{'_conns'}->@*;
773 6         38 $self->{'_conns'} = [];
774 6         21 return;
775             }
776              
777             =head2 DESTROY($self)
778              
779             The class destructor.
780              
781             Closes all opened sockets.
782              
783             =cut
784              
785 5     5   14793 sub DESTROY ($self) {
  5         18  
  5         11  
786 5         13 my $in_use = 0;
787 5         44 while ( my $slot = shift($self->{'_conns'}->@*) ) {
788 5 50       466 $in_use++ if ($slot->{'is_busy'});
789 5 50       124 $slot->{'socket'}->close() if ($slot->{'socket'});
790             }
791 5 50       494 warn ref($self) ." object destroyed but still in use" if $in_use;
792             }
793              
794             1;
795             __END__