File Coverage

blib/lib/MogileFS/Connection/HTTP.pm
Criterion Covered Total %
statement 118 131 90.0
branch 28 52 53.8
condition 11 28 39.2
subroutine 18 18 100.0
pod 3 6 50.0
total 178 235 75.7


line stmt bran cond sub pod time code
1             package MogileFS::Connection::HTTP;
2 21     21   120 use strict;
  21         54  
  21         828  
3 21     21   119 use warnings;
  21         50  
  21         729  
4 21     21   11885 use MogileFS::Connection::Poolable;
  21         72  
  21         611  
5 21     21   31790 use HTTP::Response;
  21         943180  
  21         997  
6 21     21   258 use base qw(MogileFS::Connection::Poolable);
  21         62  
  21         5032  
7 21     21   145 use MogileFS::Util qw/debug/;
  21         59  
  21         1964  
8              
9             use fields (
10 21         227 'read_size_hint', # bytes to read for body
11             'http_response', # HTTP::Response object
12             'http_req', # HTTP request ("GET $URL")
13             'http_res_cb', # called on HTTP::Response (_after_ body is read)
14             'http_res_body_read',# number of bytes read in the response body
15             'http_res_content_cb' # filter for the response body (success-only)
16 21     21   133 );
  21         45  
17 21     21   28744 use Net::HTTP::NB;
  21         580794  
  21         442  
18              
19             sub new {
20 10     10 1 78 my ($self, $ip, $port) = @_;
21 10         160 my %opts = ( Host => "$ip:$port", Blocking => 0, KeepAlive => 300 );
22 10 50       793 my $sock = Net::HTTP::NB->new(%opts) or return;
23              
24 10 50       37290 $self = fields::new($self) unless ref $self;
25 10         15624 $self->SUPER::new($sock, $ip, $port); # MogileFS::Connection::Poolable->new
26              
27 10         59 return $self;
28             }
29              
30             # starts an HTTP request, returns immediately and relies on Danga::Socket
31             # to schedule the run the callback.
32             sub start {
33 16     16 0 46 my ($self, $method, $path, $opts, $http_res_cb) = @_;
34 16   50     61 $opts ||= {};
35              
36 16         49 my $err = delete $self->{mfs_err};
37 16 100       115 return $self->err_response($err, $http_res_cb) if $err;
38              
39 15         46 $self->{http_res_cb} = $http_res_cb;
40 15         45 $self->{http_res_content_cb} = $opts->{content_cb};
41 15   50     286 $self->{read_size_hint} = $opts->{read_size_hint} || 4096;
42              
43 15   50     116 my $h = $opts->{headers} || {};
44 15         125 $h->{'User-Agent'} = ref($self) . "/$MogileFS::Server::VERSION";
45 15         26 my $content = $opts->{content};
46 15 50       119 if (defined $content) {
47             # Net::HTTP::NB->format_request will set Content-Length for us
48 0         0 $h->{'Content-Type'} = 'application/octet-stream'
49             } else {
50 15         52 $content = "";
51             }
52              
53             # support full URLs for LWP compatibility
54             # some HTTP daemons don't support Absolute-URIs, so we only give
55             # them the HTTP/1.0-compatible path
56 15 50       128 if ($path =~ m{\Ahttps?://[^/]+(/.*)\z}) {
57 0         0 $path = $1;
58             }
59              
60 15         101 $self->set_timeout("node_timeout");
61              
62             # Force HTTP/1.0 to avoid potential chunked responses and force server
63             # to set Content-Length: instead. In practice, we'll never get chunked
64             # responses anyways as all known DAV servers will set Content-Length
65             # for static files...
66 15 50       60 $self->sock->http_version($method eq "GET" ? "1.0" : "1.1");
67 15         575 $h->{Connection} = "keep-alive";
68              
69             # format the request here since it sets the reader up to read
70 15         56 my $req = $self->sock->format_request($method, $path, %$h, $content);
71 15         2218 $self->{http_req} = "$method http://" . $self->key . $path;
72              
73             # we'll start watching for writes here since it's unlikely the
74             # 3-way handshake for new TCP connections is done at this point
75 15         124 $self->write($req);
76              
77             # start reading once we're done writing
78             $self->write(sub {
79             # we're connected after writing $req is successful, so
80             # change the timeout and wait for readability
81 14     14   992 $self->set_timeout("node_timeout");
82 14         88 $self->watch_read(1);
83 15         23678 });
84             }
85              
86             # called by Danga::Socket upon readability
87             sub event_read {
88 15     15 1 1747526 my ($self) = @_;
89              
90 15         83 my $content_cb = $self->{http_res_content_cb};
91 15         266 my Net::HTTP::NB $sock = $self->sock;
92 15         202 my $res = $self->{http_response};
93              
94             # read and cache HTTP response headers
95 15 100       112 unless ($res) {
96 14         146 my ($code, $mess, @headers) = eval { $sock->read_response_headers };
  14         333  
97              
98             # wait for readability on EAGAIN
99 14 100       6234 unless (defined $code) {
100 2         57 my $err = $@;
101 2 100       47 if ($err) {
102 1         88 $err =~ s/ at .*\z//s; # do not expose source file location
103 1         34 $err =~ s/\r?\n/\\n/g; # just in case
104 1         36 return $self->err("read_response_headers: $err");
105             }
106              
107             # assume EAGAIN, though $! gets clobbered by Net::HTTP::*
108 1         21 return;
109             }
110              
111             # hold onto response object until the response body is processed
112 12         517 $res = HTTP::Response->new($code, $mess, \@headers, "");
113 12         4570 $res->protocol("HTTP/" . $sock->peer_http_version);
114 12         463 $self->{http_response} = $res;
115 12 50       78 $self->{http_res_body_read} = $content_cb ? 0 : undef;
116             }
117              
118             my $body_read = sub {
119 2 50   2   40 $content_cb ? $self->{http_res_body_read} : length($res->content);
120 13         585 };
121              
122             # continue reading the response body if we have a header
123 13         44 my $rsize = $self->{read_size_hint};
124 13         37 my $buf;
125              
126 13         109 my $clen = $res->header("Content-Length");
127 13         953 while (1) {
128 23         493 my $n = $sock->read_entity_body($buf, $rsize);
129 23 100       1808 if (!defined $n) {
130 2 50       123 if ($!{EAGAIN}) {
131             # workaround a bug in Net::HTTP::NB
132             # ref: https://rt.cpan.org/Ticket/Display.html?id=78233
133 2 50 33     182 if (defined($clen) && $clen == $body_read->()) {
134 0         0 return $self->_http_done;
135             }
136              
137             # reset the timeout if we got any body bytes
138 2         124 $self->set_timeout("node_timeout");
139 2         12 return;
140             }
141 0 0       0 next if $!{EINTR};
142 0         0 return $self->err("read_entity_body: $!");
143             }
144 21 100       88 if ($n == 0) {
145             # EOF, call the response header callback
146 11         110 return $self->_http_done;
147             }
148 10 50       60 if ($n > 0) {
149 10 50 33     135 if ($content_cb && $res->is_success) {
150 0         0 $self->{http_res_body_read} += length($buf);
151              
152             # filter the buffer through content_cb, no buffering.
153             # This will be used by tracker-side checksumming
154             # replicate does NOT use this code path for performance
155             # reasons (tracker-side checksumming is already a slow path,
156             # so there's little point in optimizing).
157             # $buf may be empty on EOF (n == 0)
158 0         0 $content_cb->($buf, $self, $res);
159              
160 0 0 0     0 if (defined($clen) && $clen == $body_read->()) {
161 0         0 return $self->_http_done;
162             }
163             } else {
164             # append to existing buffer, this is only used for
165             # PUT/DELETE/HEAD and small GET responses (monitor)
166 10         110 $res->content($res->content . $buf);
167             }
168             # continue looping until EAGAIN or EOF (n == 0)
169             }
170             }
171             }
172              
173             # this does cleanup as an extra paranoid step to prevent circular refs
174             sub close {
175 9     9 1 108 my ($self, $close_reason) = @_;
176              
177 9         30 delete $self->{http_res_cb};
178 9         43 delete $self->{http_res_content_cb};
179              
180 9         98 $self->SUPER::close($close_reason); # MogileFS::Connection::Poolable->close
181             }
182              
183             # This is only called on a socket-level error (e.g. disconnect, timeout)
184             # bad server responses (500, 403) do not trigger this
185             sub err {
186 3     3 0 39 my ($self, $reason) = @_;
187              
188             # Fake an HTTP response like LWP does on errors.
189             # delete prevents http_res_cb from being invoked twice, as event_read
190             # will delete http_res_cb on success, too
191 3         22 my $http_res_cb = delete $self->{http_res_cb};
192              
193             # don't retry if we already got a response header nor if we got a timeout
194 3 50 33     109 if ($self->retryable($reason) && $http_res_cb && !$self->{http_response}) {
      33        
195             # do not call inflight_expire here, since we need inflight_cb
196             # for retrying
197              
198 0         0 $self->close(":retry"); # trigger a retry in MogileFS::ConnectionPool
199             } else {
200             # ensure we don't call new_err on close()
201 3         55 $self->inflight_expire;
202              
203             # free the FD before invoking the callback
204 3         38 $self->close($reason);
205 3 50       31 $self->err_response($reason, $http_res_cb) if $http_res_cb;
206             }
207             }
208              
209             # Fakes an HTTP response like LWP does on errors.
210             sub err_response {
211 4     4 0 11 my ($self, $err, $http_res_cb) = @_;
212              
213 4         139 my $res = HTTP::Response->new(500, $err);
214 4   50     631 $err ||= "(unspecified error)";
215 4   100     31 my $req = $self->{http_req} || "no HTTP request made";
216 4         102 Mgd::error("$err: $req");
217 4         27 $res->header("X-MFS-Error", $err);
218 4         793 $res->protocol("HTTP/1.0");
219 4         59 $http_res_cb->($res);
220             }
221              
222             # returns true if the HTTP connection is persistent/reusable, false if not.
223             sub _http_persistent {
224 11     11   27 my ($self, $res) = @_;
225              
226             # determine if this connection is reusable:
227 11         60 my $connection = $res->header("Connection");
228 11         495 my $persist;
229              
230             # Connection: header takes precedence over protocol version
231 11 50       41 if ($connection) {
232 0 0       0 if ($connection =~ /\bkeep-alive\b/i) {
    0          
233 0         0 $persist = 1;
234             } elsif ($connection =~ /\bclose\b/i) {
235 0         0 $persist = 0;
236             }
237              
238             # if we can't make sense of the Connection: header, fall through
239             # and decided based on protocol version
240             }
241              
242             # HTTP/1.1 is persistent-by-default, HTTP/1.0 is not.
243             # Will there be HTTP/1.2?
244 11 50       77 $persist = $res->protocol eq "HTTP/1.1" unless defined $persist;
245              
246             # we're not persistent if the pool is full, either
247 11   33     1478 return ($persist && $self->persist);
248             }
249              
250             # Called on successfully read HTTP response (it could be a server-side
251             # error (404,403,500...), but not a socket error between client<->server).
252             sub _http_done {
253 11     11   28 my ($self) = @_;
254              
255             # delete ensures we only fire the callback once
256 11         53 my $http_res_cb = delete $self->{http_res_cb};
257 11         53 my $res = delete $self->{http_response};
258 11         60 delete $self->{http_req};
259              
260             # ensure we don't call new_err on eventual close()
261 11         171 $self->inflight_expire;
262              
263             # free up the FD if possible
264 11 50       79 $self->close('http_close') unless $self->_http_persistent($res);
265              
266             # finally, invoke the user-supplied callback
267 11         67 $http_res_cb->($res);
268             }
269              
270             1;