File Coverage

lib/MojoX/HTTP/Async.pm
Criterion Covered Total %
statement 355 406 87.4
branch 76 154 49.3
condition 48 111 43.2
subroutine 41 44 93.1
pod 7 7 100.0
total 527 722 72.9


line stmt bran cond sub pod time code
1             package MojoX::HTTP::Async;
2              
3             =encoding utf-8
4              
5             =head1 NAME
6              
7             MojoX::HTTP::Async - simple package to execute multiple parallel requests to the same host
8              
9             =head1 SYNOPSIS
10              
11             use MojoX::HTTP::Async ();
12              
13             # creates new instance for async requests
14             # restricts max amount of simultaneously executed requests
15             my $ua = MojoX::HTTP::Async->new('host' => 'my-site.com', 'slots' => 2);
16              
17             # let's fill slots
18             $ua->add( '/page1.html?lang=en');
19             $ua->add( 'http://my-site.com/page2.html');
20              
21             # non-blocking requests processing
22             while ( $ua->not_empty() ) {
23             if (my $tx = $ua->next_response) { # returns an instance of Mojo::Transaction::HTTP class
24             print $tx->res->headers->to_string;
25             } else {
26             # do something else
27             }
28             }
29              
30             # blocking requests processing
31             while (my $tx = $ua->wait_for_next_response($timeout)) {
32             # do something here
33             }
34              
35             # how to process connect timeouts
36             if (my $error = $tx->req()->error()) {
37             say $error->{code};
38             say $error->{message};
39             }
40              
41             # how to process request timeouts and other errors sucn as broken pipes, etc
42             if (my $error = $tx->res()->error()) {
43             say $error->{code};
44             say $error->{message};
45             }
46              
47             # makes reconnection if either slot was timeouted or was inactive too long
48             $ua->refresh_connections();
49              
50             # close everything
51             $ua->close_all();
52              
53             =head1 DESCRIPTION
54              
55             This library allows to make multiple HTTP/HTTPS request to the particular host in non-blocking mode.
56              
57             In comparison with C, this library doesn't make a new connection on each request.
58              
59             And in comparison with C, it's it's more intuitive how to use it, and there is no any Singleton restrictions.
60              
61             The instance of this class can work only with one domain and scheme: either HTTP or HTTPS.
62              
63             =cut
64              
65 9     9   1521369 use 5.026;
  9         36  
66 9     9   63 use warnings;
  9         9  
  9         234  
67 9     9   45 use bytes ();
  9         18  
  9         189  
68 9     9   36 use Socket qw/ inet_aton pack_sockaddr_in AF_INET SOCK_STREAM SOL_SOCKET SO_KEEPALIVE SO_OOBINLINE IPPROTO_TCP TCP_KEEPIDLE TCP_KEEPINTVL TCP_KEEPCNT /;
  9         18  
  9         945  
69 9     9   7380 use IO::Socket::SSL ();
  9         727389  
  9         468  
70 9     9   81 use Fcntl qw/ F_SETFL O_NONBLOCK FD_CLOEXEC /;
  9         27  
  9         801  
71 9     9   81 use experimental qw/ signatures /;
  9         18  
  9         99  
72 9     9   1737 use Carp qw/ croak /;
  9         27  
  9         531  
73 9     9   54 use List::Util qw/ first /;
  9         18  
  9         1071  
74 9     9   108 use Time::HiRes qw/ alarm time sleep /;
  9         18  
  9         108  
75 9     9   8550 use Mojo::Message::Request ();
  9         2993499  
  9         333  
76 9     9   6867 use Mojo::Message::Response ();
  9         85509  
  9         306  
77 9     9   6318 use Mojo::Transaction::HTTP ();
  9         31896  
  9         828  
78 9     9   6246 use URI ();
  9         52002  
  9         333  
79 9     9   108 use Scalar::Util qw/ blessed /;
  9         45  
  9         675  
80 9     9   81 use Errno qw / :POSIX /;
  9         27  
  9         48834  
81              
82             our $VERSION = 0.02;
83              
84             =head2 new($class, %opts)
85              
86             The class constructor.
87              
88             =over
89              
90             =item host
91              
92             It's the obligatory option.
93             Sets the name/adress of remote host to be requested.
94              
95             =item port
96              
97             By default it's equal to 80.
98             Sets the port number of remote point.
99              
100             =item slots
101              
102             By default it's equal to 5.
103             Sets the max amount slots.
104             These slot will be filled one by one if required.
105              
106             =item ssl
107              
108             By default it's equal to 0 (means HTTP).
109             Sets the scheme of requests: HTTP or HTTPS.
110              
111             =item ssl_opts
112              
113             It's a HashRef with options to control SSL Layer.
114             See the constructor arguments of C for details.
115              
116             =item connect_timeout
117              
118             By default it's equal to 1.
119             Sets connection timeout in seconds (can be float with micro seconds accuracy).
120              
121             If it's equal to 0, then there will be no timeout restrictions.
122              
123             =item request_timeout
124              
125             By default it's equal to 1
126             Sets the time in seconds with granular accuracy as micro seconds.
127             The awaiting time of response will be limited with this value.
128              
129             In case of 0 value there will be no time restrictions.
130              
131             =item sol_socket
132              
133             It's a HashRef with socket options.
134             THe possible keys are:
135              
136             =item B
137              
138             Enables TCP KeepAlive on socket.
139             The default value is 1 (means that option is enabled).
140              
141             =item B
142              
143             WARNING: These options can be unsupported on some OS platforms.
144              
145             It's a HashRef with socket TCP-options.
146              
147             If some key is absent in HashRef then system settings will be used.
148              
149             The supported key are shown below:
150              
151             B - the time (in seconds) the connection needs to remain idle before TCP starts sending keepalive probes
152              
153             B - the time (in seconds) between individual keepalive probes
154              
155             B - the maximum number of keepalive probes TCP should send before dropping the connection.
156              
157             =item inactivity_conn_ts
158              
159             If last response was received C seconds or more ago,
160             then such slots will be destroyed in C method.
161              
162             By default the value is 0 (disabled).
163              
164             =item debug
165              
166             Enables debug mode. The dbug messages will be printed in STDERR.
167             By default the value is 0 (disabled).
168              
169             =back
170              
171             =cut
172              
173 1     1 1 8815 sub new ($class, %opts) {
  1         11  
  1         51  
  1         11  
174 1 50       28 croak("host is mandatory") if (! $opts{'host'});
175             my $self = bless({
176             'slots' => 5,
177             'ssl' => 0,
178             'ssl_opts' => undef,
179 1 50       39 'port' => $opts{'ssl'} ? 443 : 80,
180             'request_timeout' => 1, # 1 sec
181             'connect_timeout' => 1, # 1 sec
182             'sol_socket' => {
183             'so_keepalive' => 1,
184             },
185             'sol_tcp' => {},
186             'inactivity_conn_ts' => 0,
187             %opts,
188             '_conns' => [],
189             }, $class);
190 1         19 return $self;
191             }
192              
193 8     8   18 sub _connect ($self, $slot, $proto, $peer_addr) {
  8         21  
  8         11  
  8         29  
  8         19  
  8         12  
194              
195 8 50       49 warn("Connecting\n") if $self->{'debug'};
196              
197 8 50       415 socket(my $socket, AF_INET, SOCK_STREAM, $proto) || croak("socket error: $!");
198 8 50       8912 connect($socket, $peer_addr) || croak("connect error: $!"); # in case of O_NONBLOCK it will return with EINPROGRESS
199 8 50       98 fcntl($socket, F_SETFL, O_NONBLOCK | FD_CLOEXEC) || croak("fcntl error has occurred: $!");
200              
201 8   50     58 my $sol_socket_opts = $self->{'sol_socket'} // {};
202              
203 8 50       33 if (exists($sol_socket_opts->{'so_keepalive'})) {
204 0 0       0 setsockopt($socket, SOL_SOCKET, SO_KEEPALIVE, 1) || croak("setsockopt error has occurred while setting SO_KEEPALIVE: $!");
205              
206 0 0       0 if ($sol_socket_opts->{'so_keepalive'}) {
207 0   0     0 my $sol_tcp_opts = $self->{'sol_tcp'} // {};
208 0         0 state $SOL_TCP = &IPPROTO_TCP();
209              
210 0 0       0 if (exists($sol_tcp_opts->{'tcp_keepidle'})) {
211 0 0       0 setsockopt($socket, $SOL_TCP, TCP_KEEPIDLE, $sol_tcp_opts->{'tcp_keepidle'}) || croak("setsockopt error has occurred while setting TCP_KEEPIDLE: $!");
212             }
213              
214 0 0       0 if (exists($sol_tcp_opts->{'tcp_keepintvl'})) {
215 0 0       0 setsockopt($socket, $SOL_TCP, TCP_KEEPINTVL, $sol_tcp_opts->{'tcp_keepintvl'}) || croak("setsockopt error has occurred while setting TCP_KEEPINTVL: $!");
216             }
217              
218 0 0       0 if (exists($sol_tcp_opts->{'tcp_keepcnt'})) {
219 0 0       0 setsockopt($socket, $SOL_TCP, TCP_KEEPCNT, $sol_tcp_opts->{'tcp_keepcnt'}) || croak("setsockopt error has occurred while setting TCP_KEEPCNT: $!");
220             }
221             }
222             }
223              
224 8         45 $slot->{'connected_ts'} = time();
225 8         21 $slot->{'reader'} = $slot->{'writer'} = $slot->{'socket'} = $socket;
226 8         26 $slot->{'sock_no'} = fileno($socket);
227 8 50       41 if ($self->{'ssl'}) {
228 0   0     0 my $ssl_socket = IO::Socket::SSL->new_from_fd($socket, ($self->{'ssl_opts'} // {})->%*);
229 0 0       0 croak("error=$!, ssl_error=" . $IO::Socket::SSL::SSL_ERROR) if (!$ssl_socket);
230 0         0 $ssl_socket->blocking(0); # just to be sure
231 0         0 $slot->{'reader'} = $slot->{'writer'} = $ssl_socket;
232             }
233             }
234              
235 8     8   12 sub _connect_slot ($self, $slot) {
  8         24  
  8         15  
  8         8  
236 8         25 my $timeout = $self->{'connect_timeout'};
237              
238 8 50       34 if ($timeout > 0) {
239 8         29 eval {
240 8     0   250 local $SIG{'ALRM'} = sub { die "alarm\n" };
  0         0  
241 8         99 alarm($timeout);
242 8         22 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  8         44  
243 8         185 alarm(0);
244             };
245              
246 8         33 my $error = $@;
247              
248 8         47 alarm(0);
249              
250 8 50       33 if ($error) {
251 0 0       0 croak($error) if ($error ne "alarm\n");
252 0         0 $self->_mark_request_as_timeouted($slot, 'Connect timeout');
253             }
254             } else {
255 0         0 $self->_connect($slot, @{$self}{qw/ proto peer_addr /});
  0         0  
256             }
257             }
258              
259 6     6   15 sub _make_connections ($self, $amount) {
  6         18  
  6         17  
  6         9  
260              
261 6         465 my $host_addr = inet_aton($self->{'host'});
262 6 50       61 croak("can't call inet_aton") if (! $host_addr);
263              
264 6   66     75 $self->{'peer_addr'} //= pack_sockaddr_in($self->{'port'}, $host_addr);
265 6   66     92 $self->{'proto'} //= getprotobyname("tcp");
266              
267 6         28 for (1 .. $amount) {
268 6         30 my $slot = $self->_make_slot();
269 6         26 $self->_connect_slot($slot);
270 6         31 $self->_add_slot($slot);
271             }
272             }
273              
274 6     6   11 sub _add_slot ($self, $slot) {
  6         9  
  6         9  
  6         6  
275 6 50       45 push($self->{'_conns'}->@*, $slot) if ($slot);
276             }
277              
278 6     6   11 sub _make_slot ($self) {
  6         14  
  6         11  
279             return {
280 6         121 'reader' => undef,
281             'writer' => undef,
282             'socket' => undef,
283             'sock_no' => 0,
284             'is_busy' => 0,
285             'request' => undef,
286             'tx' => undef,
287             'exp_ts' => 0,
288             'tmp_response' => undef,
289             'reconnect_is_required' => 0,
290             'last_response_ts' => 0,
291             'connected_ts' => 0,
292             };
293             }
294              
295 14311     14311   15761 sub _check_for_errors ($self, $socks2slots = {}, $error_handles = '', $reason = '') {
  14311         18578  
  14311         16956  
  14311         15443  
  14311         31409  
  14311         16427  
296              
297 14311         16500 my $message = $reason;
298              
299 14311 50       25847 if (!$message) {
300 14311 50 33     70433 $message = ($!{'EPIPE'} || $!{'ECONNRESET'} || $!{'ECONNREFUSED'} || $!{'ECONNABORTED'}) ? 'Premature connection close' : 'Unknown error';
301             }
302              
303 14311         510888 for my $slot_no (keys %$socks2slots) {
304 18929 50       44039 if ( vec($error_handles, $slot_no, 1) != 0 ) {
305 0         0 my $slot = $socks2slots->{ $slot_no };
306 0         0 $self->_mark_response_as_broken($slot, 520, $message);
307             }
308             }
309             }
310              
311 16     16   30 sub _get_free_slot ($self) {
  16         32  
  16         26  
312              
313 16         27 my $slot;
314 10         58 my %socks2slots = map { $_->{'sock_no'} => $_ }
315 20 50 66     175 grep { !$_->{'is_busy'} && $_->{'socket'} && !$_->{'reconnect_is_required'} }
316 16         66 $self->{'_conns'}->@*;
317              
318 16 100       52 if (%socks2slots) {
319              
320 8         77 local $!;
321 8         16 my $write_handles = '';
322              
323 8         65 vec($write_handles, $_, 1) = 1 for keys %socks2slots;
324              
325 8         77 my $error_handles = $write_handles;
326 8         95 my ($nfound, $timeleft) = select(undef, $write_handles, $error_handles, 0);
327              
328 8         43 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
329              
330 8 50       18 if ($nfound) {
331 8     8   76 my $slot_no = first { vec($write_handles, $_, 1) == 1 } keys %socks2slots;
  8         26  
332 8         89 $slot = $socks2slots{ $slot_no };
333             }
334             }
335              
336 16         45 return $slot;
337             }
338              
339             =head2 add ($self, $request_or_uri, $timeout = undef)
340              
341             Adss HTTP request into empty slot.
342              
343             If the request was successfully added, then it will return 1.
344             Otherwise it will return 0.
345              
346             The request can be not added into slot only in case, if there are no empty slots and new slot wasn't created due to
347             the limit of slot's amount had been reached (see C and C.
348              
349             It's recommendable always to check result code of this method.
350              
351             Example:
352              
353             my $ua = MojoX::HTTP::Async->new('host' => 'my-host.com', 'slots' => 1);
354              
355             # let's occupy the only slot
356             $ua->add('/page1.html');
357              
358             # let's wait until it's become free again
359             while ( ! $ua->add('/page2.html') ) {
360             while (my $tx = $ua->wait_for_next_response() ) {
361             # do something here
362             }
363             }
364              
365             =over
366              
367             =item $request_or_uri
368              
369             It can be either an instance of C class, or an instance of C.
370             It also can be a simple URI srtring.
371              
372             If the resourse contains the host, then it must be the same as in the constructor C.
373              
374             Using of string with URI or an instance of C class assumes that GET HTTP method will be used.
375              
376             =item $timeout
377              
378             Time in seconds. Can be fractional with microseconds tolerance.
379              
380             The C from conmtrucor will be used by default.
381              
382             =back
383              
384             =cut
385              
386 10     10 1 800 sub add ($self, $request_or_uri, $timeout = undef) {
  10         29  
  10         25  
  10         19  
  10         21  
387 10         24 my $status = 0;
388 10         45 my $slot = $self->_get_free_slot();
389              
390 10 100 100     90 if ( ! $slot && $self->{'slots'} > scalar($self->{'_conns'}->@*) ) {
391 6         64 $self->_make_connections(1);
392 6         16 $slot = $self->_get_free_slot();
393             }
394              
395 10 100       33 if ($slot) {
396 8         15 my $request = $request_or_uri;
397 8 50 0     41 if ( !ref($request_or_uri) || ( blessed($request_or_uri) && $request_or_uri->isa('Mojo::URL') ) ) {
      33        
398 8         170 $request = Mojo::Message::Request->new();
399 8         105 $request->url()->parse($request_or_uri);
400             }
401 8 50       4173 if ($request) {
402 8         51 $self->_send_request($slot, $request, $timeout);
403 8         44 $status = 1;
404             }
405             }
406              
407 10         107 return $status;
408             }
409              
410 14     14   24 sub _clear_slot ($self, $slot, $force = 0) {
  14         18  
  14         21  
  14         20  
  14         16  
411 14         23 $slot->{'is_busy'} = 0;
412 14         23 $slot->{'exp_ts'} = 0;
413 14         19 $slot->{'tx'} = undef;
414 14         28 $slot->{'request'} = undef;
415 14         20 $slot->{'tmp_response'} = undef;
416 14 100       30 if ($force) {
417 6 50       400 close($slot->{'socket'}) if $slot->{'socket'};
418 6         24 $slot->{'socket'} = undef;
419 6         13 $slot->{'reader'} = undef;
420 6         26 $slot->{'writer'} = undef;
421 6         11 $slot->{'sock_no'} = 0;
422 6         10 $slot->{'reconnect_is_required'} = 0;
423 6         11 $slot->{'last_response_ts'} = 0;
424 6         19 $slot->{'connected_ts'} = 0;
425             }
426             }
427              
428 4     4   9 sub _mark_slot_as_broken($self, $slot) {
  4         8  
  4         5  
  4         7  
429 4         6 $slot->{'reconnect_is_required'} = 1;
430 4         8 $slot->{'is_busy'} = 1;
431 4   33     15 $slot->{'request'} //= Mojo::Message::Request->new();
432             $slot->{'tx'} //= Mojo::Transaction::HTTP->new(
433 4   33     85 'req' => $slot->{'request'},
434             'res' => Mojo::Message::Response->new()
435             );
436             }
437              
438 0     0   0 sub _mark_request_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  0         0  
  0         0  
  0         0  
  0         0  
  0         0  
439 0         0 $self->_mark_slot_as_broken($slot);li
440 0         0 $slot->{'request'}->error({'message' => $msg, 'code' => $code});
441             }
442              
443 4     4   6 sub _mark_response_as_broken ($self, $slot, $code = 520, $msg = 'Unknown Error') {
  4         7  
  4         6  
  4         4  
  4         8  
  4         5  
444 4         14 $self->_mark_slot_as_broken($slot);
445              
446 4         97 my $res = $slot->{'tx'}->res();
447 4         88 $res->error({'message' => $msg, 'code' => $code});
448 4         156 $res->headers()->content_length(0);
449 4         340 $res->code($code);
450 4         36 $res->message($msg);
451             }
452              
453 0     0   0 sub _mark_request_as_timeouted ($self, $slot, $message = 'Request timeout') {
  0         0  
  0         0  
  0         0  
  0         0  
454 0         0 $self->_mark_request_as_broken($slot, 524, $message);
455             }
456              
457 4     4   5 sub _mark_response_as_timeouted ($self, $slot, $message = 'Request timeout') {
  4         9  
  4         7  
  4         20  
  4         7  
458 4         18 $self->_mark_response_as_broken($slot, 524, $message);
459             }
460              
461 8     8   11 sub _send_request ($self, $slot, $request, $timeout = undef) {
  8         10  
  8         13  
  8         9  
  8         13  
  8         9  
462              
463 8 50       22 croak("slot is busy") if ($slot->{'is_busy'});
464 8 50       20 croak("request object is obligatory") if (!$request);
465 8 50       47 croak('request must be a descendant of Mojo::Message::Request package') if (!$request->isa('Mojo::Message::Request'));
466              
467 8 50       40 my $required_scheme = $self->{'ssl'} ? 'https' : 'http';
468 8         19 my $url = $request->url();
469 8         114 my $uri = URI->new( $url );
470 8         10757 my $scheme = $url->scheme();
471              
472 8 50 33     100 if ($scheme && $required_scheme ne $scheme) {
473 0         0 croak(sprintf("Wrong scheme in URI '%s'. It must correspond to the 'ssl' option", $uri->as_string()));
474             }
475              
476 8 50       53 if (! $uri->scheme()) {
477             # URI::_generic doesn't have C method, that's why we set the scheme by ourseleves to change C<$uri> type
478 8         243 $uri->scheme($required_scheme);
479             }
480              
481 8 50       5280 if (my $host = $uri->host()) {
482 0 0       0 if ($host ne $self->{'host'}) {
483 0         0 croak(sprintf("Wrong host in URI '%s'. It must be the same as it was specified in constructor: %s", $uri->as_string(), $self->{'host'}));
484             }
485             }
486              
487 8   33     263 $timeout //= $self->{'request_timeout'};
488              
489 8         64 my $response = '';
490 8         17 state $default_ua_hdr = 'perl/' . __PACKAGE__;
491              
492 8         71 my $h = $request->headers();
493 8 50       837 $h->host($self->{'host'}) if (! $h->host() );
494 8 50       145 $h->user_agent($default_ua_hdr) if (! $h->user_agent() );
495              
496 8         106 $slot->{'request'} = $request;
497 8         16 $slot->{'is_busy'} = 1;
498 8 50       46 $slot->{'exp_ts'} = ($timeout > 0) ? ( time() + $timeout ) : 0;
499              
500 8         49 my $plain_request = $request->to_string();
501              
502 8 50       5629 if ($self->{'ssl'}) {
503 0         0 $slot->{'writer'}->print($plain_request);
504             } else {
505 8         27 my $socket = $slot->{'socket'};
506 8         72 my $msg_len = bytes::length($plain_request);
507 8         917 my $sent_bytes = 0;
508 8         13 my $attempts = 10;
509              
510 8         52 local $!;
511              
512 8   66     91 while ($sent_bytes < $msg_len && $attempts--) {
513 8         422 my $bytes = syswrite($socket, $plain_request, $msg_len, $sent_bytes);
514              
515 8 50 33     72 if ($! || ! defined($bytes)) {
516 0   0     0 my $error = $! // 'Unknown error';
517 0         0 $self->_mark_request_as_broken($slot, 520, $error);
518 0         0 return;
519             }
520              
521 8         14 $sent_bytes += $bytes;
522 8 50       26 $plain_request = substr($plain_request, $bytes) if $sent_bytes < $msg_len;
523             }
524              
525 8 50       35 if ($sent_bytes < $msg_len) {
526 0   0     0 my $error = $! // 'sent message is shorter than original';
527 0         0 $self->_mark_request_as_broken($slot, 520, $error);
528 0         0 return;
529             }
530             }
531              
532 8 50 33     72 if ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'}) {
533 0         0 $self->_mark_request_as_timeouted($slot);
534             }
535              
536 8         28 return;
537             }
538              
539 4     4   7 sub _try_to_read ($self, $slot) {
  4         5  
  4         6  
  4         5  
540              
541 4 50 33     60 return if $slot->{'tx'} || ! $slot->{'is_busy'};
542              
543 4         13 my $reader = $slot->{'reader'};
544 4   33     102 my $response = $slot->{'tmp_response'} // Mojo::Message::Response->new();
545              
546 4         220 $response->parse($_) while (<$reader>);
547              
548 4 50 33     5668 if ($! && !$!{'EAGAIN'} && !$!{'EWOULDBLOCK'}) { # not a "Resourse temporary unavailable" (no data)
    50 33        
      33        
549 0         0 $self->_mark_response_as_broken($slot, 520, $!);
550             } elsif ($response && $response->code()) {
551              
552 4         106 my $content = $response->content();
553              
554 4 50       24 if ($content->is_finished()) { # this is required to support "Transfer-Encoding: chunked"
555             $slot->{'tx'} = Mojo::Transaction::HTTP->new(
556 4         87 'req' => $slot->{'request'},
557             'res' => $response
558             );
559 4         64 $slot->{'tmp_response'} = undef;
560 4         25 $slot->{'last_response_ts'} = time();
561             } else {
562 0         0 $slot->{'tmp_response'} = $response;
563             }
564              
565 4 50       12 $slot->{'reconnect_is_required'} = 1 if ($content->relaxed()); # responses that are terminated with a connection close
566             }
567              
568 4 0 0     48 if (! $slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      33        
569 0         0 $self->_mark_response_as_timeouted($slot);
570             }
571             }
572              
573             =head2 not_empty($self)
574              
575             Returns 1 if there even one slot is busy or slot contains a not processed response.
576             Otherwise the method returns 0.
577              
578             =cut
579              
580 14301     14301 1 25011 sub not_empty ($self) {
  14301         15862  
  14301         12833  
581              
582 14301         17871 my $not_empty = scalar $self->{'_conns'}->@*;
583              
584 14301         21274 for my $slot ($self->{'_conns'}->@*) {
585 28602 50 66     58898 $not_empty-- if !$slot->{'is_busy'} && !$slot->{'tx'};
586             }
587              
588 14301 100       36007 return $not_empty ? 1 : 0;
589             }
590              
591              
592             =head2 wait_for_next_response($self, $timeout = 0)
593              
594             Waits for first received response or time-outed request in any slot.
595             Returns the C instance with result.
596              
597             =over
598              
599             =item $timeout
600              
601             Period of time in seconds. Can be fractial with microsecond tollerance.
602             The response will be marked as time-outed after this time is out.
603              
604             The default value is 0, which means that request will have been blocked until the response is received.
605              
606             If all slots are empty, then C will be returned.
607              
608             =back
609              
610             =cut
611              
612 10     10 1 27328 sub wait_for_next_response ($self, $timeout = 0) {
  10         22  
  10         18  
  10         17  
613              
614 10         16 my $response;
615 10 50       56 my $exp_ts = $timeout ? (time() + $timeout) : 0;
616              
617 10         17 while (1) {
618 13765 50 33     93979 last if ($exp_ts && time() >= $exp_ts); # awaiting process is time-outed
619 13765 100 100     31425 last if (($response = $self->next_response()) || !$self->not_empty());
620 13755 50       1300534 sleep(1E-6) if (!$response); # sleep 1 microsecond
621             }
622              
623 10         31 return $response;
624             }
625              
626             =head2 next_response ($self)
627              
628             Returns an instance of C class.
629             If there is no response, it will return C.
630              
631             =cut
632              
633 14306     14306 1 21516 sub next_response ($self) {
  14306         16341  
  14306         14010  
634 14306   66     25100 return $self->_get_response_from_ready_slot() // $self->_get_response_from_slot();
635             }
636              
637 14298     14298   14792 sub _get_response_from_slot ($self) {
  14298         14090  
  14298         13525  
638              
639 14298         13361 my $tx;
640 14298     28596   72466 my $slot = first { $_->{'tx'} } $self->{'_conns'}->@*;
  28596         45278  
641              
642 14298 50       39427 if ($slot) {
643 0         0 $tx = $slot->{'tx'};
644 0         0 $self->_clear_slot($slot, $slot->{'reconnect_is_required'});
645             }
646              
647 14298         55142 return $tx;
648             }
649              
650 14306     14306   16350 sub _get_response_from_ready_slot ($self) {
  14306         14383  
  14306         15268  
651              
652 14306         14455 my $tx;
653 18917         61630 my %socks2slots = map { $_->{'sock_no'} => $_ }
654 28612 100 66     117738 grep { ! $_->{'tx'} && ! $_->{'reconnect_is_required'} && $_->{'is_busy'} }
655 14306         31873 $self->{'_conns'}->@*;
656              
657 14306 100       31000 if (%socks2slots) {
658              
659 14302         72133 local $!;
660 14302         19681 my $read_handles = '';
661              
662 14302         74133 vec($read_handles, $_, 1) = 1 for keys %socks2slots;
663              
664 14302         21638 my $error_handles = $read_handles;
665 14302         120116 my ($nfound, $timeleft) = select($read_handles, undef, $error_handles, 0);
666              
667 14302         61600 $self->_check_for_errors(\%socks2slots, $error_handles, $!);
668              
669 14302         25084 for my $sock_no (keys %socks2slots) {
670 18915         23012 my $slot = $socks2slots{ $sock_no };
671 18915 100 100     30594 if ( $nfound && vec($read_handles, $sock_no, 1) == 1 ) {
672 4         16 $self->_try_to_read($slot);
673 4 50       14 next if ! $slot->{'tx'};
674 4 50       10 next if ! $slot->{'is_busy'};
675 4         8 $tx = $slot->{'tx'};
676             } else {
677 18911 100 66     85585 if (!$slot->{'tx'} && ($slot->{'exp_ts'} && time() > $slot->{'exp_ts'})) {
      66        
678 4         20 $self->_mark_response_as_timeouted($slot);
679 4         26 $tx = $slot->{'tx'};
680             }
681             }
682              
683 18915 100       49758 if ($tx) {
684 8         40 $self->_clear_slot($slot, 0);
685 8         32 last;
686             }
687             }
688             }
689              
690 14306         46091 return $tx;
691             }
692              
693             =head2 refresh_connections ($self)
694              
695             Closes connections in slots in the following cases:
696              
697             1. The slot was marked as timeouted
698             2. C was set and the connection was expired
699             3. There are some errors in socket (for example: Connection reset by peer, Broken pipe, etc)
700              
701             =cut
702              
703 1     1 1 2 sub refresh_connections ($self) {
  1         2  
  1         2  
704              
705 1         2 my $n = 0;
706 1         3 my $now = time();
707 1   50     4 my $keep_ts = $self->{'inactivity_conn_ts'} // 0;
708              
709 1 50       3 if (scalar $self->{'_conns'}->@*) {
710              
711 1         5 local $!;
712 1         2 my $error_handles = '';
713 1         3 my %socks2slots = map { $_->{'sock_no'} => $_ } $self->{'_conns'}->@*;
  2         6  
714              
715 1         8 vec($error_handles, $_, 1) = 1 for keys %socks2slots;
716 1         11 select(undef, undef, $error_handles, 0);
717              
718 1         6 $self->_check_for_errors(\%socks2slots, $error_handles, $!); # broken connections will be marked as required to reconnect
719             }
720              
721 1         4 for my $i (reverse( 0 .. $#{ $self->{'_conns'} })) {
  1         3  
722 2         5 my $slot = $self->{'_conns'}->[$i];
723 2   33     26 my $slot_exp_ts = ($slot->{'last_response_ts'} || $slot->{'connected_ts'}) + $keep_ts;
724 2   33     12 my $is_outdated = $keep_ts && $slot_exp_ts <= $now;
725              
726 2 50 33     8 warn("Outdated\n") if $self->{'debug'} && $is_outdated;
727              
728 2 50 33     9 if ($slot->{'reconnect_is_required'} || $is_outdated) {
729 2 50       7 warn("Going to reconnect\n") if $self->{'debug'};
730 2         8 $self->_clear_slot($slot, 1);
731 2         9 $self->_connect_slot($slot);
732 2         7 $n++;
733             }
734             }
735              
736 1         6 return $n;
737             }
738              
739             =head2 close_all ($self)
740              
741             Closes all opened connections and resets all slots with requests.
742              
743             =cut
744              
745 2     2 1 3017 sub close_all ($self) {
  2         4  
  2         6  
746 2         10 $self->_clear_slot($_, 1) for $self->{'_conns'}->@*;
747 2         14 $self->{'_conns'} = [];
748 2         7 return;
749             }
750              
751             =head2 DESTROY($class)
752              
753             The class destructor.
754              
755             Closes all opened sockets.
756              
757             =cut
758              
759 1     1   10696 sub DESTROY ($self) {
  1         3  
  1         2  
760 1         2 my $in_use = 0;
761 1         10 while ( my $slot = shift($self->{'_conns'}->@*) ) {
762 2 50       47 $in_use++ if ($slot->{'is_busy'});
763 2 50       53 $slot->{'socket'}->close() if ($slot->{'socket'});
764             }
765 1 50       69 warn ref($self) ." object destroyed but still in use" if $in_use;
766             }
767              
768             1;
769             __END__