File Coverage

blib/lib/MogileFS/Connection/HTTP.pm
Criterion Covered Total %
statement 118 131 90.0
branch 28 52 53.8
condition 11 28 39.2
subroutine 18 18 100.0
pod 3 6 50.0
total 178 235 75.7


line stmt bran cond sub pod time code
1             package MogileFS::Connection::HTTP;
2 21     21   87 use strict;
  21         38  
  21         639  
3 21     21   83 use warnings;
  21         30  
  21         425  
4 21     21   7271 use MogileFS::Connection::Poolable;
  21         59  
  21         532  
5 21     21   15426 use HTTP::Response;
  21         489489  
  21         800  
6 21     21   181 use base qw(MogileFS::Connection::Poolable);
  21         32  
  21         3889  
7 21     21   123 use MogileFS::Util qw/debug/;
  21         35  
  21         1351  
8              
9             use fields (
10 21         167 'read_size_hint', # bytes to read for body
11             'http_response', # HTTP::Response object
12             'http_req', # HTTP request ("GET $URL")
13             'http_res_cb', # called on HTTP::Response (_after_ body is read)
14             'http_res_body_read',# number of bytes read in the response body
15             'http_res_content_cb' # filter for the response body (success-only)
16 21     21   99 );
  21         37  
17 21     21   12107 use Net::HTTP::NB;
  21         244398  
  21         222  
18              
19             sub new {
20 10     10 1 57 my ($self, $ip, $port) = @_;
21 10         134 my %opts = ( Host => "$ip:$port", Blocking => 0, KeepAlive => 300 );
22 10 50       181 my $sock = Net::HTTP::NB->new(%opts) or return;
23              
24 10 50       35691 $self = fields::new($self) unless ref $self;
25 10         8846 $self->SUPER::new($sock, $ip, $port); # MogileFS::Connection::Poolable->new
26              
27 10         54 return $self;
28             }
29              
30             # starts an HTTP request, returns immediately and relies on Danga::Socket
31             # to schedule the run the callback.
32             sub start {
33 16     16 0 75 my ($self, $method, $path, $opts, $http_res_cb) = @_;
34 16   50     62 $opts ||= {};
35              
36 16         53 my $err = delete $self->{mfs_err};
37 16 100       54 return $self->err_response($err, $http_res_cb) if $err;
38              
39 15         46 $self->{http_res_cb} = $http_res_cb;
40 15         41 $self->{http_res_content_cb} = $opts->{content_cb};
41 15   50     282 $self->{read_size_hint} = $opts->{read_size_hint} || 4096;
42              
43 15   50     145 my $h = $opts->{headers} || {};
44 15         132 $h->{'User-Agent'} = ref($self) . "/$MogileFS::Server::VERSION";
45 15         35 my $content = $opts->{content};
46 15 50       61 if (defined $content) {
47             # Net::HTTP::NB->format_request will set Content-Length for us
48 0         0 $h->{'Content-Type'} = 'application/octet-stream'
49             } else {
50 15         54 $content = "";
51             }
52              
53             # support full URLs for LWP compatibility
54             # some HTTP daemons don't support Absolute-URIs, so we only give
55             # them the HTTP/1.0-compatible path
56 15 50       68 if ($path =~ m{\Ahttps?://[^/]+(/.*)\z}) {
57 0         0 $path = $1;
58             }
59              
60 15         102 $self->set_timeout("node_timeout");
61              
62             # Force HTTP/1.0 to avoid potential chunked responses and force server
63             # to set Content-Length: instead. In practice, we'll never get chunked
64             # responses anyways as all known DAV servers will set Content-Length
65             # for static files...
66 15 50       55 $self->sock->http_version($method eq "GET" ? "1.0" : "1.1");
67 15         595 $h->{Connection} = "keep-alive";
68              
69             # format the request here since it sets the reader up to read
70 15         50 my $req = $self->sock->format_request($method, $path, %$h, $content);
71 15         3441 $self->{http_req} = "$method http://" . $self->key . $path;
72              
73             # we'll start watching for writes here since it's unlikely the
74             # 3-way handshake for new TCP connections is done at this point
75 15         94 $self->write($req);
76              
77             # start reading once we're done writing
78             $self->write(sub {
79             # we're connected after writing $req is successful, so
80             # change the timeout and wait for readability
81 14     14   693 $self->set_timeout("node_timeout");
82 14         140 $self->watch_read(1);
83 15         1293 });
84             }
85              
86             # called by Danga::Socket upon readability
87             sub event_read {
88 16     16 1 1634990 my ($self) = @_;
89              
90 16         75 my $content_cb = $self->{http_res_content_cb};
91 16         133 my Net::HTTP::NB $sock = $self->sock;
92 16         133 my $res = $self->{http_response};
93              
94             # read and cache HTTP response headers
95 16 100       80 unless ($res) {
96 15         76 my ($code, $mess, @headers) = eval { $sock->read_response_headers };
  15         239  
97              
98             # wait for readability on EAGAIN
99 15 100       5566 unless (defined $code) {
100 3         10 my $err = $@;
101 3 100       20 if ($err) {
102 1         26 $err =~ s/ at .*\z//s; # do not expose source file location
103 1         7 $err =~ s/\r?\n/\\n/g; # just in case
104 1         21 return $self->err("read_response_headers: $err");
105             }
106              
107             # assume EAGAIN, though $! gets clobbered by Net::HTTP::*
108 2         22 return;
109             }
110              
111             # hold onto response object until the response body is processed
112 12         342 $res = HTTP::Response->new($code, $mess, \@headers, "");
113 12         3349 $res->protocol("HTTP/" . $sock->peer_http_version);
114 12         354 $self->{http_response} = $res;
115 12 50       76 $self->{http_res_body_read} = $content_cb ? 0 : undef;
116             }
117              
118             my $body_read = sub {
119 2 50   2   53 $content_cb ? $self->{http_res_body_read} : length($res->content);
120 13         167 };
121              
122             # continue reading the response body if we have a header
123 13         46 my $rsize = $self->{read_size_hint};
124 13         23 my $buf;
125              
126 13         90 my $clen = $res->header("Content-Length");
127 13         1111 while (1) {
128 22         445 my $n = $sock->read_entity_body($buf, $rsize);
129 22 100       1758 if (!defined $n) {
130 2 50       143 if ($!{EAGAIN}) {
131             # workaround a bug in Net::HTTP::NB
132             # ref: https://rt.cpan.org/Ticket/Display.html?id=78233
133 2 50 33     256 if (defined($clen) && $clen == $body_read->()) {
134 0         0 return $self->_http_done;
135             }
136              
137             # reset the timeout if we got any body bytes
138 2         85 $self->set_timeout("node_timeout");
139 2         13 return;
140             }
141 0 0       0 next if $!{EINTR};
142 0         0 return $self->err("read_entity_body: $!");
143             }
144 20 100       64 if ($n == 0) {
145             # EOF, call the response header callback
146 11         110 return $self->_http_done;
147             }
148 9 50       44 if ($n > 0) {
149 9 50 33     93 if ($content_cb && $res->is_success) {
150 0         0 $self->{http_res_body_read} += length($buf);
151              
152             # filter the buffer through content_cb, no buffering.
153             # This will be used by tracker-side checksumming
154             # replicate does NOT use this code path for performance
155             # reasons (tracker-side checksumming is already a slow path,
156             # so there's little point in optimizing).
157             # $buf may be empty on EOF (n == 0)
158 0         0 $content_cb->($buf, $self, $res);
159              
160 0 0 0     0 if (defined($clen) && $clen == $body_read->()) {
161 0         0 return $self->_http_done;
162             }
163             } else {
164             # append to existing buffer, this is only used for
165             # PUT/DELETE/HEAD and small GET responses (monitor)
166 9         76 $res->content($res->content . $buf);
167             }
168             # continue looping until EAGAIN or EOF (n == 0)
169             }
170             }
171             }
172              
173             # this does cleanup as an extra paranoid step to prevent circular refs
174             sub close {
175 9     9 1 88 my ($self, $close_reason) = @_;
176              
177 9         29 delete $self->{http_res_cb};
178 9         28 delete $self->{http_res_content_cb};
179              
180 9         80 $self->SUPER::close($close_reason); # MogileFS::Connection::Poolable->close
181             }
182              
183             # This is only called on a socket-level error (e.g. disconnect, timeout)
184             # bad server responses (500, 403) do not trigger this
185             sub err {
186 3     3 0 10 my ($self, $reason) = @_;
187              
188             # Fake an HTTP response like LWP does on errors.
189             # delete prevents http_res_cb from being invoked twice, as event_read
190             # will delete http_res_cb on success, too
191 3         12 my $http_res_cb = delete $self->{http_res_cb};
192              
193             # don't retry if we already got a response header nor if we got a timeout
194 3 50 33     51 if ($self->retryable($reason) && $http_res_cb && !$self->{http_response}) {
      33        
195             # do not call inflight_expire here, since we need inflight_cb
196             # for retrying
197              
198 0         0 $self->close(":retry"); # trigger a retry in MogileFS::ConnectionPool
199             } else {
200             # ensure we don't call new_err on close()
201 3         57 $self->inflight_expire;
202              
203             # free the FD before invoking the callback
204 3         20 $self->close($reason);
205 3 50       18 $self->err_response($reason, $http_res_cb) if $http_res_cb;
206             }
207             }
208              
209             # Fakes an HTTP response like LWP does on errors.
210             sub err_response {
211 4     4 0 25 my ($self, $err, $http_res_cb) = @_;
212              
213 4         85 my $res = HTTP::Response->new(500, $err);
214 4   50     491 $err ||= "(unspecified error)";
215 4   100     20 my $req = $self->{http_req} || "no HTTP request made";
216 4         57 Mgd::error("$err: $req");
217 4         31 $res->header("X-MFS-Error", $err);
218 4         660 $res->protocol("HTTP/1.0");
219 4         55 $http_res_cb->($res);
220             }
221              
222             # returns true if the HTTP connection is persistent/reusable, false if not.
223             sub _http_persistent {
224 11     11   33 my ($self, $res) = @_;
225              
226             # determine if this connection is reusable:
227 11         55 my $connection = $res->header("Connection");
228 11         556 my $persist;
229              
230             # Connection: header takes precedence over protocol version
231 11 50       41 if ($connection) {
232 0 0       0 if ($connection =~ /\bkeep-alive\b/i) {
    0          
233 0         0 $persist = 1;
234             } elsif ($connection =~ /\bclose\b/i) {
235 0         0 $persist = 0;
236             }
237              
238             # if we can't make sense of the Connection: header, fall through
239             # and decided based on protocol version
240             }
241              
242             # HTTP/1.1 is persistent-by-default, HTTP/1.0 is not.
243             # Will there be HTTP/1.2?
244 11 50       60 $persist = $res->protocol eq "HTTP/1.1" unless defined $persist;
245              
246             # we're not persistent if the pool is full, either
247 11   33     228 return ($persist && $self->persist);
248             }
249              
250             # Called on successfully read HTTP response (it could be a server-side
251             # error (404,403,500...), but not a socket error between client<->server).
252             sub _http_done {
253 11     11   29 my ($self) = @_;
254              
255             # delete ensures we only fire the callback once
256 11         52 my $http_res_cb = delete $self->{http_res_cb};
257 11         40 my $res = delete $self->{http_response};
258 11         37 delete $self->{http_req};
259              
260             # ensure we don't call new_err on eventual close()
261 11         92 $self->inflight_expire;
262              
263             # free up the FD if possible
264 11 50       399 $self->close('http_close') unless $self->_http_persistent($res);
265              
266             # finally, invoke the user-supplied callback
267 11         73 $http_res_cb->($res);
268             }
269              
270             1;