File Coverage

blib/lib/MR/AsyncHTTP.pm
Criterion Covered Total %
statement 131 177 74.0
branch 49 96 51.0
condition 15 40 37.5
subroutine 16 18 88.8
pod 6 6 100.0
total 217 337 64.3


line stmt bran cond sub pod time code
1             package MR::AsyncHTTP;
2              
3 2     2   2063 use strict;
  2         4  
  2         99  
4 2     2   12 use warnings;
  2         4  
  2         71  
5 2     2   22 use Carp;
  2         4  
  2         169  
6 2     2   8248 use Socket;
  2         11567  
  2         1671  
7 2     2   27 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
  2         4  
  2         130  
8 2     2   16 use Time::HiRes();
  2         3  
  2         4528  
9              
10             our $VERSION = '0.01';
11              
12             =head1 NAME
13              
14             MR::AsyncHTTP - A zero-overhead Perl module for requesting HTTP in async manner, without event pools
15              
16             =head1 SYNOPSIS
17              
18             use MR::AsyncHTTP;
19             my $asynchttp = new MR::AsyncHTTP( connect_timeout=>0.6 );
20              
21             # Send a request, dont wait answer right now
22             my $req_id = $asynchttp->send_get( "http://example.com/" );
23              
24             #..work with something else..
25             my $res = $asynchttp->check_response($req_id);
26             if( !$res ) {
27             # Not ready yet, work on something other
28             }
29            
30             #..Finally, wait it.
31             my $res = $asynchttp->wait($req_id);
32              
33             #..Send a couple of requests
34             for(1..5) {
35             $asynchttp->send_get( "http://example.com/request/".$_ );
36             }
37              
38             #Dedicate some time for sysread() to free buffers
39             $asynchttp->poke;
40              
41             # Wait all responses
42             $asynchttp->wait_all;
43              
44             =head1 DESCRIPTION
45              
46             Note this module have limited functionality compared to say, LWP. Its designed to make simple requests async, thats all.
47              
48             =head2 new(%opts)
49              
50             Captain: create a new object.
51              
52             %opts can be:
53              
54             =over 4
55              
56             =item resolve_timeout
57              
58             Timeout for gethostbyname(). Default is 0.5 second. 0 to point no timeout.
59              
60             =item resolve_cache
61              
62             Enable caching result of gethostbyname(). A B means no resolve cache. 'local' means object-scope cache.
63             Other true values mean global cache between instances.
64              
65             Default is 1 (global).
66              
67             =item connect_timeout
68              
69             Set timeout for socket connect(). Default is 0.2 second. 0 to point no timeout.
70              
71             =item response_timeout
72              
73             Set timeout for response, not including connect. Affect wait() and wait_all() so they wont block like forever. Default is 1 second.
74              
75             =back
76              
77             =cut
78              
79             sub new {
80 2     2 1 5287 my ($class, %opt) = @_;
81 2 50       16 $opt{resolve_timeout} = defined($opt{resolve_timeout}) ? ($opt{resolve_timeout}+0) : 0.5;
82 2 50       13 $opt{connect_timeout} = defined($opt{connect_timeout}) ? ($opt{connect_timeout}+0) : 0.2;
83 2 50       10 $opt{resolve_cache} = defined($opt{resolve_cache}) ? $opt{resolve_cache} : 1;
84 2 50       8 $opt{response_timeout} = defined($opt{response_timeout}) ? $opt{response_timeout} : 1;
85 2         17 return bless {
86             opt => \%opt,
87             req_id => 1,
88             } => $class;
89             }
90              
91             =head2 send_get(url, headers)
92              
93             Send HTTP GET on given url. headers is a hash field=>value
94              
95             Returns ID of request. Or undef on fail (sets $@ variable); 0 on timeout
96              
97             =cut
98              
99             sub send_get {
100 6     6 1 3204 my ($self, $url, $headers) = @_;
101 6 50       31 $headers = ref($headers) eq 'HASH' ? $headers : {};
102 6         13 my $was_timeout=0;
103 6         13 my $ret = eval {
104             local $SIG{ALRM} = sub {
105 1     1   3 $was_timeout=1;
106 1         39 die "alarm\n";
107 6         190 };
108              
109             # Resolve addr.
110 6 50       160 Time::HiRes::ualarm($self->{opt}->{resolve_timeout} * 1_000_000) if $self->{opt}->{resolve_timeout}; #Convert into ms
111 6         24 my $r = $self->_parse_url($url);
112 6 50 33     68 unless( $r && $r->{hostname} && $r->{proto} eq 'http' ) {
      33        
113 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{resolve_timeout};
114 0         0 $@ = "Cant parse url '$url'";
115 0         0 return undef;
116             }
117 6         27 $r->{ipaddr} = $self->_resolve_addr($r->{hostname});
118 6 100       42 unless( $r->{ipaddr} ) {
119 3 50       49 Time::HiRes::ualarm(0) if $self->{opt}->{resolve_timeout};
120 3         1473 carp "Cant resolve hostname '$r->{hostname}'; url='$url'";
121 3         269 $@ = "Cant resolve hostname '$r->{hostname}'";
122 3         73 return undef;
123             }
124 3 50       35 Time::HiRes::ualarm(0) if $self->{opt}->{resolve_timeout};
125              
126             # Create socket, connect
127 3 50       40 Time::HiRes::ualarm($self->{opt}->{connect_timeout} * 1_000_000) if $self->{opt}->{connect_timeout}; #Convert into ms
128 3         7 my $sock;
129 3 50       2791 unless( socket($sock, PF_INET, SOCK_STREAM, getprotobyname('tcp')) ) {
130 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
131 0         0 $@ = "Cant create socket: $!";
132 0         0 return undef;
133             }
134 3         485 my $port = getservbyname($r->{proto}, 'tcp');
135 3 50       21 unless( $port ) {
136 0         0 close $sock;
137 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
138 0         0 $@ = "Cant getservbyname";
139 0         0 return undef;
140             }
141 3 100       685009 unless( connect($sock, pack_sockaddr_in($port, $r->{ipaddr})) ) {
142 0         0 close $sock;
143 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
144 0         0 $@ = "Cant connect: $!";
145 0         0 return undef;
146             }
147              
148             # Drop a request to socket
149 2         27 $headers->{Host} = $r->{hostname};
150 2         9 $headers->{Connection} = 'close';
151 2   33     32 $headers->{'User-Agent'} ||= "perl/MR::AsyncHTTP $VERSION";
152 2         6 my $wr = 1;
153 2   33     253 $wr = $wr && syswrite $sock, "GET $r->{uri} HTTP/1.1\n";
154 2         9 while( my ($key, $value) = each %{$headers} ) {
  8         37  
155 6   33     81 $wr = $wr && syswrite $sock, "$key: $value\n";
156             }
157 2   33     45 $wr = $wr && syswrite $sock, "\n";
158              
159 2 50       22 unless( $wr ) {
160 0         0 close $sock;
161 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
162 0         0 $@ = "Cant write socket: $!";
163 0         0 return undef;
164             }
165              
166 2 50       19 unless( $self->_set_nonblocking($sock, 1) ) {
167 0         0 close $sock;
168 0 0       0 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
169 0         0 $@ = "_set_nonblocking fail: $!";
170 0         0 return undef;
171             }
172              
173 2         32 my $req = {
174             id => $self->{req_id}++,
175             sock => $sock,
176             req_sent => Time::HiRes::time(),
177             r => $r,
178             url => $url
179             };
180 2         13 $self->{req}->{ $req->{id} } = $req;
181 2 50       27 Time::HiRes::ualarm(0) if $self->{opt}->{connect_timeout};
182 2         57 return $req->{id};
183             };#eval
184 6 100       44 if( $was_timeout ) {
185 1         4 $@ = "Resolve/Connection timeout";
186 1         8 return 0;
187             }
188 5         37 return $ret;
189             }
190              
191             =head2 check_response(req_id)
192              
193             Nonblocking check for response. Returns respnse hash when response is complete.
194              
195             =cut
196              
197             sub check_response {
198 3     3 1 3023 my ($self, $req_id) = @_;
199 3         9 my $req = $self->{req}->{$req_id};
200 3 50       14 unless( $req ) {
201 0         0 carp "No such request id '$req_id'. This is cleanly a bug.";
202 0         0 $@ = "No such requst id";
203 0         0 return undef;
204             }
205 3 50       11 return $req->{result} if $req->{done};
206              
207 3 100       18 $req->{result}->{body}='' unless defined $req->{result}->{body};
208 3         19 my $buf;
209             my $rd;
210             # Socket is in nonblocking mode unless we called from wait()
211 3         152962 while( $rd=sysread($req->{sock}, $buf, 4096) ) {
212 3         27426 $req->{result}->{body} .= $buf;
213             }
214 2 100 66     24 if( defined($rd) and $rd==0 ) {
215             # Got EOF, split headers
216 1         3 my $rawheaders;
217 1         54 $req->{result}->{body} =~ s/^(.*?\r?\n)\r?\n/ $rawheaders=$1; '' /se;
  1         7  
  1         19  
218 1 50       15 $req->{result}->{code} = $1 if $rawheaders =~ /^HTTP\/1\.1 (\d+)/;
219 1         17 foreach my $line ( split(/\r\n/, $rawheaders) ) {
220 15 100       56 if( $line =~ /^([^:]+): ([^\r\n]+)/ ) {
221 14         52 $req->{result}->{headers}->{$1} = $2;
222             }
223             }
224 1         6 $req->{done}=1;
225 1         11 $req->{wait_time} = Time::HiRes::time() - $req->{req_sent};
226             #Free socket.
227 1 50       134 close($req->{sock}) if $req->{sock};
228 1         3 $req->{sock}=undef;
229 1         9 return $req;
230             }
231 1         4 return undef;
232             }
233              
234             =head2 wait(req_id)
235              
236             Wait for response upto response_timeout. Returns response hash or 0 if timeout.
237              
238             =cut
239              
240             sub wait {
241 2     2 1 1424 my ($self, $req_id) = @_;
242 2         6 my $req = $self->{req}->{$req_id};
243 2 50       9 unless( $req ) {
244 0         0 carp "No such request id '$req_id'. This is cleanly a bug.";
245 0         0 $@ = "No such requst id";
246 0         0 return undef;
247             }
248 2 50       11 return $req->{result} if $req->{done};
249              
250 2         4 my $was_timeout;
251 2         3 my $ret = eval {
252             local $SIG{ALRM} = sub {
253 1     1   3 $was_timeout=1;
254 1         35 die "alarm\n";
255 2         112 };
256              
257 2 50       31 Time::HiRes::ualarm($self->{opt}->{response_timeout} * 1_000_000) if $self->{opt}->{response_timeout}; #Convert into ms
258 2         13 $self->_set_nonblocking($req->{sock}, 0); #Setup blocking mode
259 2         9 my $res = $self->check_response($req_id);
260 1         12 Time::HiRes::ualarm(0);
261 1         23 return $res;
262             };
263 2 100       11 if( $was_timeout ) {
264 1         9 $self->_set_nonblocking($req->{sock}, 1); #Setup nonblocking mode
265 1         2 $@ = "Timeout for response";
266 1         10 return 0;
267             }
268 1         16 return $ret;
269             }
270              
271             =head2 poke()
272              
273             Dedicate some time for sysread() to free system buffers.
274              
275             Returns number of requests ready for processing in scalar context or list of ready ids in list context.
276              
277             =cut
278              
279             sub poke {
280 0     0 1 0 my $self = shift;
281 0         0 my @rdy;
282 0         0 while( my ($req_id, $req) = each(%{$self->{req}}) ) {
  0         0  
283 0 0       0 if( $req->{done} ) {
284 0         0 push @rdy, $req->{id};
285 0         0 next;
286             }
287 0         0 my $res = $self->check_response($req_id);
288 0 0 0     0 push( @rdy, $res->{id} ) if( $res && $res->{id} && $res->{done} );
      0        
289             }
290 0 0       0 return wantarray ? @rdy : scalar(@rdy);
291             }
292              
293             =head2 wait_all()
294              
295             Blocking wait for all responses. Returns undef.
296              
297             =cut
298              
299             sub wait_all {
300 0     0 1 0 my $self = shift;
301 0         0 foreach my $req_id (keys %{$self->{req}}) {
  0         0  
302 0         0 $self->poke; #Before blocking on one request, make sure all other's buffers are able to continue receive
303 0         0 $self->wait($req_id);
304             }
305 0         0 return undef;
306             }
307              
308             sub DESTROY {
309 2     2   2012 my $self = shift;
310 2         6 foreach my $req (values %{$self->{req}}) {
  2         13  
311 2 100       131 close($req->{sock}) if $req->{sock};
312 2         293 $req->{sock}=undef;
313             }
314             }
315              
316             #### Private methods ####
317             sub _parse_url {
318 6     6   11 my ($self, $url) = @_;
319 6 50 33     217 if( $url && $url =~ /^([hH][tT][tT][pP]):\/\/([^\/]+)(.*)$/ ) {
320             return {
321 6 100       82 proto => lc($1),
322             hostname => $2,
323             uri => ($3 eq '' ? '/' : $3)
324             };
325             }
326 0         0 return undef;
327             }
328              
329             my $global_resolve_cache = {};
330             sub _resolve_addr {
331 6     6   33 my ($self, $hostname) = @_;
332 6 100       53 return inet_aton($hostname) if $hostname =~ /^\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}$/;
333 5 50       27 return $self->{resolve_cache}->{$hostname} if exists($self->{resolve_cache}->{$hostname}); #A local cache
334 5 50 33     54 return $global_resolve_cache->{$hostname}
      33        
335             if( $self->{opt}->{resolve_cache} and $self->{opt}->{resolve_cache} ne 'local' and exists($global_resolve_cache->{$hostname}) );
336              
337 5         29439 my $ip = gethostbyname($hostname);
338              
339 5 50       73 if( $self->{opt}->{resolve_cache} ) { #Cache result
340 5 50       42 if( $self->{opt}->{resolve_cache} eq 'local' ) {
341 0         0 $self->{resolve_cache}->{$hostname} = $ip;
342             }else{
343 5         34 $global_resolve_cache->{$hostname} = $ip;
344             }
345             }
346 5         37 return $ip;
347             }
348              
349             sub _set_nonblocking {
350 5     5   15 my ($self, $sock, $nonblocking_on) = @_;
351 5   100     453 $nonblocking_on ||= 0;
352 5 50       43 my $flags = fcntl($sock, F_GETFL, 0) or return undef;
353 5         49 $flags &= (~O_NONBLOCK);
354 5 50 100     52 fcntl($sock, F_SETFL, $flags | ($nonblocking_on && O_NONBLOCK) ) or return undef;
355 5         16 return 1;
356             }
357              
358             1;
359             __END__