File Coverage

blib/lib/HTTP/LoadGen/Run.pm
Criterion Covered Total %
statement 16 18 88.8
branch 1 2 50.0
condition n/a
subroutine 6 6 100.0
pod n/a
total 23 26 88.4


line stmt bran cond sub pod time code
1             package HTTP::LoadGen::Run;
2              
3             BEGIN {
4             {
5 3     3   202 package HTTP::LoadGen::Run::_dbg;
6             use Filter::Simple sub {
7 3 50       8093 s/^(\s*)#D /$1/mg if $ENV{"HTTP__LoadGen__Run__dbg"};
8 3     3   7599 };
  3         102638  
  3         29  
9             }
10 3         21 HTTP::LoadGen::Run::_dbg->import;
11             }
12              
13 3     3   239 use strict;
  3         20  
  3         107  
14 3     3   15 use warnings;
  3         5  
  3         108  
15 3     3   15 no warnings qw!uninitialized!;
  3         5  
  3         110  
16 3     3   3645 use Coro;
  0            
  0            
17             use Coro::Signal ();
18             use AnyEvent;
19             use AnyEvent::TLS;
20             use AnyEvent::Socket ();
21             use AnyEvent::Handle ();
22             use Errno qw/EPIPE/;
23              
24             {
25             our $VERSION = '0.03';
26              
27             use Exporter qw/import/;
28             our @EXPORT=qw/RC_STATUS RC_STATUSLINE RC_STARTTIME RC_CONNTIME RC_FIRSTTIME
29             RC_HEADERTIME RC_BODYTIME RC_HEADERS RC_BODY RC_HTTPVERSION
30             RC_DNSCACHED RC_CONNCACHED
31             RQ_METHOD RQ_SCHEME RQ_HOST RQ_PORT RQ_URI RQ_PARAM
32             KEEPALIVE_USE KEEPALIVE_STORE KEEPALIVE/;
33             }
34              
35             # this is if defined a hash reference.
36             # The elements are hostname=>ddd.ddd.ddd.ddd
37             # It is overwritten only if said so by parameters.
38             my $dnscache;
39             sub dnscache () : lvalue {$dnscache};
40              
41             # In normal mode this is "$destip $destport"=>[$connectionhandle1, ...].
42             # In debugging mode (if $ENV{"HTTP__LoadGen__Run__dbg"}) $connectionhandle
43             # is replaced by [$connectionhandle, $localport, $localip]
44             my %conncache;
45             sub conncache () {\%conncache}
46             {
47             no warnings 'redefine';
48             *conncache=\&HTTP::LoadGen::conncache if exists $INC{'HTTP/LoadGen.pm'};
49             }
50              
51             my %tls_cache;
52             sub tlscache () {\%tls_cache}
53             {
54             no warnings 'redefine';
55             *tlscache=\&HTTP::LoadGen::tlscache if exists $INC{'HTTP/LoadGen.pm'};
56             }
57              
58             use constant {
59             KEEPALIVE_USE=>1, # use a kept alive connection if available
60             KEEPALIVE_STORE=>2, # keep the connection alive if possible
61             KEEPALIVE=>3, # USE|STORE combined
62              
63             RQ_METHOD=>0, # req params see $el in run_urllist
64             RQ_SCHEME=>1,
65             RQ_HOST=>2,
66             RQ_PORT=>3,
67             RQ_URI=>4,
68             RQ_PARAM=>5,
69              
70             RC_STATUS=>0, # indices into run_url's result
71             RC_STATUSLINE=>1,
72             RC_HTTPVERSION=>2,
73             RC_STARTTIME=>3,
74             RC_CONNTIME=>4,
75             RC_FIRSTTIME=>5,
76             RC_HEADERTIME=>6,
77             RC_BODYTIME=>7,
78             RC_HEADERS=>8,
79             RC_BODY=>9,
80             RC_DNSCACHED=>10,
81             RC_CONNCACHED=>11,
82              
83             DEFAULT_TIMEOUT=>300, # connection inactivity timeout
84             };
85              
86             sub build_req {
87             my ($method, $scheme, $host, $port, $uri, $param)=@_;
88             my $hdr=$param->{headers} || [];
89             my ($need_host_hdr, @h, $body);
90              
91             my $eol="\015\012";
92              
93             $need_host_hdr=1;
94             for (my $i=0; $i<@$hdr; $i+=2) {
95             push @h, $hdr->[$i].': '.$hdr->[$i+1];
96             undef $need_host_hdr if lc($hdr->[$i]) eq 'host';
97             }
98             unshift @h, 'Host: '.$host.($scheme eq 'https'
99             ? ($port==443 ? '' : ':'.$port)
100             : ($port==80 ? '' : ':'.$port))
101             if $need_host_hdr;
102              
103             if (exists $param->{body}) {
104             $body=$param->{body};
105             push @h, 'Content-Length: '.length $body if length $body;
106             } else {
107             $body='';
108             }
109              
110             return (join( $eol, "\u$method $uri HTTP/1.1", @h ).$eol.$eol.$body);
111             }
112              
113             sub gen_cb {
114             my ($store_time)=@_;
115             my $sig=Coro::Signal->new;
116             my @queue;
117             return (sub {
118             if (defined $$store_time) {
119             $$$store_time=AE::now;
120             undef $$store_time;
121             }
122             push @queue, [@_];
123             $sig->send if $sig->awaited;
124             },
125             sub {$sig->wait unless @queue; @{shift @queue}});
126             }
127              
128             # use push_read($cb) here since the built-in 'line' doesn't handle
129             # lines with single embedded \r correctly, e.g. "aa\rbb\r\n" is not
130             # accepted as a 'line'
131             sub readln {
132             my ($handle, $cb, $wait)=@_;
133             $handle->push_read
134             (sub {
135             $_[0]->{rbuf} =~ s/^(([^\015\012]|\015(?!\012))*)(\015?\012)// or return;
136             $cb->($_[0], $1, $2);
137             1;
138             });
139             wantarray ? ($wait->())[1,2] : ($wait->())[1];
140             }
141              
142             sub readchunk {
143             my ($handle, $cb, $wait, $len)=@_;
144             $handle->push_read(chunk=>$len, $cb);
145             ($wait->())[1];
146             }
147              
148             sub readchunked {
149             my ($handle, $cb, $wait)=@_;
150             my $body='';
151             my ($len, $xlen);
152              
153             {
154             $xlen=readln $handle, $cb, $wait;
155             return unless length $xlen;
156             $len=hex $xlen;
157             #D warn " --> readchunked: about to read chunk of $len (hex:$xlen) bytes\n";
158             if( $len==0 ) {
159             readln $handle, $cb, $wait; # read the line break after the 0-chunk
160             return $body;
161             }
162             $body.=readchunk $handle, $cb, $wait, $len;
163             readln $handle, $cb, $wait; # read the line break after the chunk
164             redo;
165             }
166             }
167              
168             sub readEOF {
169             my ($handle, $cb, $wait)=@_;
170             $handle->on_eof(undef);
171             $handle->on_error(sub {$cb->(delete $_[0]->{rbuf})});
172             $handle->on_read(sub {});
173             ($wait->())[0];
174             }
175              
176             sub tlsctx {
177             my ($ctx)=@_;
178             return $ctx unless ref $ctx eq 'HASH';
179             my $c=tlscache;
180             warn "c=$c, ctx=$c->{$ctx}";
181             return $c->{$ctx}||=AnyEvent::TLS->new(%$ctx);
182             }
183              
184             sub config_handle {
185             my ($handle, $cb, $err, $eof, $restart, $was_cached)=@_;
186             $handle->on_error
187             (sub {
188             if ($!==EPIPE || $!==0 and $was_cached) {
189             $$restart=1;
190             } else {
191             @{$err}[RC_STATUS, RC_STATUSLINE]=(599, $_[2]);
192             }
193             #D warn "Caught Error: $_[2] / fatal=$_[1]\n";
194             $cb->();
195             });
196             $handle->on_eof
197             (sub {
198             $$eof=1;
199             #D warn "Caught EOF\n";
200             $cb->();
201             });
202             $handle->on_starttls
203             (sub {
204             @{$err}[RC_STATUS, RC_STATUSLINE]=(599, $_[2]) unless $_[1];
205             #D if( $_[1] ) {
206             #D warn "TLS Handshake done\n";
207             #D } else {
208             #D warn "TLS Handshake failed: $_[2]\n";
209             #D }
210             $cb->();
211             });
212             }
213              
214             my @no_response_body_codes;
215             my %no_response_body_methods;
216              
217             sub register_no_response_body_method {
218             $no_response_body_methods{$_[0]}=1;
219             }
220              
221             sub delete_no_response_body_method {
222             delete $no_response_body_methods{$_[0]};
223             }
224              
225             sub register_no_response_body_code {
226             $no_response_body_codes[$_[0]]=1;
227             }
228              
229             sub delete_no_response_body_code {
230             undef $no_response_body_codes[$_[0]];
231             }
232              
233             sub no_response_body {
234             #my ($code, $method)=@_;
235             $no_response_body_codes[$_[0]] ||
236             exists $no_response_body_methods{$_[1]};
237             }
238              
239             BEGIN {
240             for (100..199, 204, 304) {
241             register_no_response_body_code $_;
242             }
243             register_no_response_body_method 'HEAD';
244             }
245              
246             sub run_url {
247             my ($method, $scheme, $host, $port, $uri, $param)=@_;
248             $method=uc $method;
249             $scheme=lc $scheme;
250             $host=lc $host;
251              
252             my $store_time;
253             my ($cb, $wait)=gen_cb \$store_time;
254             my (@rc, @err, $eof, $line);
255              
256             #D warn "Starting $method $scheme://$host:$port$uri\n";
257              
258             my $ip;
259             if( defined $dnscache ) {
260             if( exists $dnscache->{$host} ) {
261             $ip=$dnscache->{$host};
262             $rc[RC_DNSCACHED]=1;
263             } else {
264             AnyEvent::Socket::inet_aton $host, $cb;
265             my @addr=$wait->();
266             if( @addr ) {
267             $dnscache->{$host}=$ip=AnyEvent::Socket::format_address $addr[0];
268             } else {
269             @err[RC_STATUS, RC_STATUSLINE]=(599, "Cannot resolve host $host");
270             return (\@err, undef);
271             }
272             $rc[RC_DNSCACHED]=0;
273             }
274             } else {
275             AnyEvent::Socket::inet_aton $host, $cb;
276             my @addr=$wait->();
277             if( @addr ) {
278             $ip=AnyEvent::Socket::format_address $addr[0];
279             } else {
280             @err[RC_STATUS, RC_STATUSLINE]=(599, "Cannot resolve host $host");
281             return (\@err, undef);
282             }
283             $rc[RC_DNSCACHED]=0;
284             }
285              
286             #D warn "$host resolves to IP $ip".($rc[RC_DNSCACHED]?' (cached)':'')."\n";
287              
288             my $conncache=conncache;
289              
290             my ($connh, $restart);
291             #D my ($lip, $lport); # only used when debugging
292             RESTART: {
293             #D warn "Restarting connection to $ip:$port\n" if $restart;
294             undef $restart;
295             undef $connh;
296             undef $store_time;
297             undef $eof;
298              
299             my $key;
300              
301             if( exists $param->{keepalive} and
302             $param->{keepalive}&KEEPALIVE_USE and
303             exists $conncache->{$key="$ip $port"} and
304             $connh=do{my $l=$conncache->{$key};
305             shift @$l while(@$l and !$l->[0]->[1]); # drop all unusables
306             shift @$l} ) {
307             #D ($lport, $lip)=@{$connh}[2,3];
308             $connh=$connh->[0];
309              
310             $rc[RC_CONNCACHED]=1;
311              
312             #D warn "Using kept-alive connection ".
313             #D $lip.':'.$lport." ==> $ip:$port\n";
314              
315             $rc[RC_STARTTIME]=$rc[RC_CONNTIME]=AE::now;
316             config_handle $connh, $cb, \@err, \$eof, \$restart, 1;
317             } else {
318             $rc[RC_CONNCACHED]=0;
319             AnyEvent::Socket::tcp_connect $ip, $port, $cb, sub {
320             $rc[RC_STARTTIME]=AE::now;
321             $store_time=\$rc[RC_CONNTIME];
322             exists $param->{conn_timeout} ? $param->{conn_timeout} : 0;
323             };
324              
325             unless( ($connh)=$wait->() ) {
326             #D warn "Connection to $ip failed: $!";
327             @err[RC_STATUS, RC_STATUSLINE]=(599, "Connection failed: $!");
328             return (\@err, $connh);
329             }
330              
331             #D ($lport, $lip)=AnyEvent::Socket::unpack_sockaddr getsockname $connh;
332             #D $lip=AnyEvent::Socket::format_address $lip;
333              
334             #D warn "New connection established ".
335             #D $lip.':'.$lport." ==> $ip:$port\n";
336              
337             $connh=AnyEvent::Handle->new( fh=>$connh,
338             timeout=>(exists $param->{timeout}
339             ? $param->{timeout}
340             : DEFAULT_TIMEOUT),
341             peername=>$host,
342             (exists $param->{tls_ctx}
343             ? (tls_ctx=>tlsctx $param->{tls_ctx})
344             : ()) );
345              
346             config_handle $connh, $cb, \@err, \$eof, \$restart, 0;
347             if ($scheme eq "https") {
348             #D warn "Starting TLS\n";
349             $connh->starttls('connect');
350             $wait->();
351             return (\@err, $connh) if @err;
352             }
353             }
354              
355             #D {
356             #D my $rq=build_req $method, $scheme, $host, $port, $uri, $param;
357             #D $rq=~s/\n?\z/\n/;
358             #D warn "--Sending Request----------------------------------------\n".
359             #D $rq.
360             #D "--Response Header----------------------------------------\n";
361             #D }
362              
363             $connh->push_write(build_req $method, $scheme, $host, $port, $uri, $param);
364              
365             # read status line
366             $store_time=\$rc[RC_FIRSTTIME];
367             $line=readln $connh, $cb, $wait;
368             #D if( $restart ) {
369             #D warn "RESTARTING\n".
370             #D " line='$line'\n".
371             #D "----rbuf-----------------------------------------------\n".
372             #D $connh->{rbuf}.
373             #D "\n----end of rbuf----------------------------------------\n";
374             #D }
375             redo RESTART if $restart;
376              
377             return (\@err, $connh) if @err; # error
378              
379             #D warn "$line\n";
380             unless (@rc[RC_HTTPVERSION, RC_STATUS, RC_STATUSLINE]=
381             $line=~m!^HTTP/(\d+\.\d+)\s+(\d+)(?:\s+(.+))!) {
382             redo RESTART if length !$line and $rc[RC_CONNCACHED];
383              
384             @err[RC_STATUS, RC_STATUSLINE]=(599, "Invalid HTTP status line: $line");
385             return (\@err, $connh);
386             }
387             }
388              
389             # read header
390             $rc[RC_HEADERS]=\my %headers;
391             my ($name, $value);
392             while (defined($line=readln $connh, $cb, $wait) and length $line) {
393             #D warn "$line\n";
394             if( ($name, $value)=$line=~/^(\S+)\s*:\s*(.+)/ ) {
395             $name=lc $name;
396             push @{$headers{$name}}, $value;
397             } elsif(!defined $name) {
398             @err[RC_STATUS, RC_STATUSLINE]=(599, "Invalid HTTP header block");
399             return (\@err, $connh);
400             } else { # MIME continuation lines
401             $line=~s/^\s+//;
402             my $l=$headers{$name};
403             $l->[$#{$l}].=$line;
404             }
405             }
406              
407             $rc[RC_HEADERTIME]=AE::now;
408              
409             # don't read the response body if the message MUST NOT include one
410             # STATUS 1xx, 204, 304 and HEAD requests
411              
412             unless (no_response_body $rc[RC_STATUS], $method) {
413             if( exists $headers{'transfer-encoding'} and
414             $headers{'transfer-encoding'}->[0] ne 'identity' ) {
415             # according to RFC2616 section 4.4 anything other than 'identity'
416             # means 'chunked'
417             $rc[RC_BODY]=readchunked $connh, $cb, $wait;
418             return (\@err, $connh) if @err;
419             } elsif(exists $headers{'content-length'}) {
420             $rc[RC_BODY]=readchunk $connh, $cb, $wait,
421             $headers{'content-length'}->[0];
422             return (\@err, $connh) if @err;
423             #} elsif( ct=multipart/byteranges ) { # not implemented
424             } else {
425             $rc[RC_BODY]=readEOF $connh, $cb, $wait;
426             return (\@err, $connh) if @err;
427             }
428             }
429              
430             $rc[RC_BODYTIME]=AE::now;
431              
432             #D warn "--Response Body------------------------------------------\n".
433             #D ($ENV{"HTTP__LoadGen__Run__dbg"}>1
434             #D ? do {my $s=$rc[RC_BODY]; $s=~s/\n?$/\n/; $s}
435             #D : "BODY omitted: set HTTP__LoadGen__Run__dbg>1 to get it\n")
436             #D unless(no_response_body $rc[RC_STATUS], $method);
437             #D warn "---------------------------------------------------------\n";
438              
439             # update connection cache
440             if(!$eof and
441             exists $param->{keepalive} and
442             ($param->{keepalive} & KEEPALIVE_STORE) and
443             ($rc[RC_HTTPVERSION]>=1.1 &&
444             !(exists $headers{connection} and
445             $headers{connection}->[0]=~/close/i) or
446             $rc[RC_HTTPVERSION]<1.1 &&
447             (exists $headers{connection} and
448             $headers{connection}->[0]=~/keep-alive/i))) {
449             my $ccel=[$connh, 1];
450             #D push @$ccel, $lport, $lip;
451             $connh->on_starttls(undef);
452             $connh->on_read(undef);
453             $connh->on_eof(undef);
454             # EOF as well as any other error is now handled by on_error
455             $connh->on_error(sub {
456             #D warn "Connection ($ccel->[3]:$ccel->[2])=>($ip:$port) closed while cached: $_[2]\n";
457             $ccel->[1]=0;
458             });
459             push @{$conncache->{"$ip $port"}}, $ccel;
460             }
461              
462             return (\@rc, $connh);
463             }
464              
465             sub run_urllist {
466             my ($o)=@_;
467             my ($times, $before, $after, $itgenerator)=
468             @{$o}{qw/times before after InitURLs/};
469              
470             for( my $i=0; $times<=0 or $i<$times; $i++ ) {
471             my ($el, $rc, $connh);
472             for( my $it=$itgenerator->(); $el=$it->($rc, $el); ) {
473             $before->($el) if $before;
474             ($rc, $connh)=run_url @$el;
475             if($after) {
476             $after->($rc, $el, $connh) and return;
477             }
478             }
479             }
480             }
481              
482             1;
483             __END__