File Coverage

blib/lib/MogileFS/HTTPFile.pm
Criterion Covered Total %
statement 21 152 13.8
branch 0 52 0.0
condition 0 23 0.0
subroutine 7 27 25.9
pod 0 11 0.0
total 28 265 10.5


line stmt bran cond sub pod time code
1             package MogileFS::HTTPFile;
2 21     21   140 use strict;
  21         42  
  21         931  
3 21     21   138 use warnings;
  21         44  
  21         674  
4 21     21   129 use Carp qw(croak);
  21         42  
  21         1104  
5 21     21   27147 use Digest;
  21         15551  
  21         612  
6 21     21   150 use MogileFS::Server;
  21         50  
  21         1757  
7 21     21   123 use MogileFS::Util qw(error undeferr wait_for_readability wait_for_writeability);
  21         52  
  21         22340  
8              
9             my %sidechannel_nexterr; # host => next error log time
10              
11             # create a new MogileFS::HTTPFile instance from a URL. not called
12             # "new" because I don't want to imply that it's creating anything.
13             sub at {
14 0     0 0   my ($class, $url) = @_;
15 0           my $self = bless {}, $class;
16              
17 0 0         unless ($url =~ m!^http://([^:/]+)(?::(\d+))?(/.+)$!) {
18 0           croak "Bogus URL.\n";
19             }
20              
21 0           $self->{url} = $url;
22 0           $self->{host} = $1;
23 0           $self->{port} = $2;
24 0           $self->{uri} = $3;
25 0           return $self;
26             }
27              
28             sub device_id {
29 0     0 0   my $self = shift;
30 0 0         return $self->{devid} if $self->{devid};
31 0 0         $self->{url} =~ /\bdev(\d+)\b/
32             or die "Can't find device from URL: $self->{url}\n";
33 0           return $self->{devid} = $1;
34             }
35              
36             sub host_id {
37 0     0 0   my $self = shift;
38 0           return $self->device->hostid;
39             }
40              
41             # return MogileFS::Device object
42             sub device {
43 0     0 0   my $self = shift;
44 0           return Mgd::device_factory()->get_by_id($self->device_id);
45             }
46              
47             # return MogileFS::Host object
48             sub host {
49 0     0 0   my $self = shift;
50 0           return $self->device->host;
51             }
52              
53             # returns true on success, dies on failure
54             sub delete {
55 0     0 0   my $self = shift;
56 0           my %opts = @_;
57 0           my ($host, $port) = ($self->{host}, $self->{port});
58 0           my %http_opts = ( port => $port );
59 0           my $res;
60              
61 0     0     $self->host->http("DELETE", $self->{uri}, \%http_opts, sub { ($res) = @_ });
  0            
62              
63 0     0     Danga::Socket->SetPostLoopCallback(sub { !defined $res });
  0            
64 0           Danga::Socket->EventLoop;
65              
66 0 0 0       if ($res->code == 204 || ($res->code == 404 && $opts{ignore_missing})) {
      0        
67 0           return 1;
68             }
69 0           my $line = $res->status_line;
70 0           die "Bad response on DELETE $self->{url}: [$line]";
71             }
72              
73             # returns size of file, (doing a HEAD request and looking at content-length)
74             # returns -1 on file missing (404),
75             # returns undef on connectivity error
76             #
77             # If an optional callback is supplied, the return value is given to the
78             # callback.
79             #
80             # workers running Danga::Socket->EventLoop must supply a callback
81             # workers NOT running Danga::Socket->EventLoop msut not supply a callback
82 21     21   160 use constant FILE_MISSING => -1;
  21         54  
  21         44760  
83             sub size {
84 0     0 0   my ($self, $cb) = @_;
85 0           my %opts = ( port => $self->{port} );
86              
87 0 0         if ($cb) { # run asynchronously
88 0 0         if (defined $self->{_size}) {
89 0     0     Danga::Socket->AddTimer(0, sub { $cb->($self->{_size}) });
  0            
90             } else {
91             $self->host->http("HEAD", $self->{uri}, \%opts, sub {
92 0     0     $cb->($self->on_size_response(@_));
93 0           });
94             }
95 0           return undef;
96             } else { # run synchronously
97 0 0         return $self->{_size} if defined $self->{_size};
98              
99 0           my $res;
100 0     0     $self->host->http("HEAD", $self->{uri}, \%opts, sub { ($res) = @_ });
  0            
101              
102 0     0     Danga::Socket->SetPostLoopCallback(sub { !defined $res });
  0            
103 0           Danga::Socket->EventLoop;
104              
105 0           return $self->on_size_response($res);
106             }
107             }
108              
109             sub on_size_response {
110 0     0 0   my ($self, $res) = @_;
111              
112 0 0         if ($res->is_success) {
113 0           my $size = $res->header('content-length');
114 0 0 0       if (! defined $size &&
115             $res->header('server') =~ m/^lighttpd/) {
116             # lighttpd 1.4.x (main release) does not return content-length for
117             # 0 byte files.
118 0           $self->{_size} = 0;
119 0           return 0;
120             }
121 0           $self->{_size} = $size;
122 0           return $size;
123             } else {
124 0 0         if ($res->code == 404) {
125 0           return FILE_MISSING;
126             }
127 0           return undeferr("Failed HEAD check for $self->{url} (" . $res->code . "): "
128             . $res->message);
129             }
130             }
131              
132             sub digest_mgmt {
133 0     0 0   my ($self, $alg, $ping_cb, $reason) = @_;
134 0           my $mogconn = $self->host->mogstored_conn;
135 0           my $node_timeout = MogileFS->config("node_timeout");
136 0           my $sock;
137             my $rv;
138 0           my $expiry;
139              
140             # assuming the storage node can checksum at >=2MB/s, low expectations here
141 0           my $response_timeout = $self->size / (2 * 1024 * 1024);
142 0 0 0       if ($reason && $reason eq "fsck") {
143             # fsck has low priority in mogstored and is concurrency-limited,
144             # so this may be queued indefinitely behind digest requests for
145             # large files
146 0           $response_timeout += 3600;
147             } else {
148             # account for disk/network latency:
149 0           $response_timeout += $node_timeout;
150             }
151              
152 0 0         $reason = defined($reason) ? " $reason" : "";
153 0           my $uri = $self->{uri};
154 0           my $req = "$alg $uri$reason\r\n";
155 0           my $reqlen = length $req;
156              
157             # a dead/stale socket may not be detected until we try to recv on it
158             # after sending a request
159 0           my $retries = 2;
160              
161 0           my $host = $self->{host};
162              
163             retry:
164 0           $sock = eval { $mogconn->sock($node_timeout) };
  0            
165 0 0         if (defined $sock) {
166 0           delete $sidechannel_nexterr{$host};
167             } else {
168             # avoid flooding logs with identical messages
169 0           my $err = $@;
170 0   0       my $next = $sidechannel_nexterr{$host} || 0;
171 0           my $now = time();
172 0 0         return if $now < $next;
173 0           $sidechannel_nexterr{$host} = $now + 300;
174 0           return undeferr("sidechannel failure on $alg $uri: $err");
175             }
176              
177 0           $rv = send($sock, $req, 0);
178 0 0 0       if ($! || $rv != $reqlen) {
179 0           my $err = $!;
180 0           $mogconn->mark_dead;
181 0 0         if ($retries-- <= 0) {
182 0           $req =~ tr/\r\n//d;
183 0 0         $err = $err ? "send() error ($req): $err" :
184             "short send() ($req): $rv != $reqlen";
185 0           $err = $mogconn->{ip} . ":" . $mogconn->{port} . " $err";
186 0           return undeferr($err);
187             }
188 0           goto retry;
189             }
190              
191 0           $expiry = Time::HiRes::time() + $response_timeout;
192 0   0       while (!wait_for_readability(fileno($sock), 1.0) &&
193             (Time::HiRes::time() < $expiry)) {
194 0           $ping_cb->();
195             }
196              
197 0           $rv = <$sock>;
198 0 0         if (! $rv) {
    0          
    0          
    0          
199 0           $mogconn->mark_dead;
200 0 0         return undeferr("EOF from mogstored") if ($retries-- <= 0);
201 0           goto retry;
202             } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=([a-f0-9]{32,128})\r\n/) {
203 0           my $hexdigest = $1;
204              
205 0           my $checksum = eval {
206 0           MogileFS::Checksum->from_string(0, "$alg:$hexdigest")
207             };
208 0 0         return undeferr("$alg failed for $uri: $@") if $@;
209 0           return $checksum->{checksum};
210             } elsif ($rv =~ /^\Q$uri\E \Q$alg\E=-1\r\n/) {
211             # FIXME, this could be another error like EMFILE/ENFILE
212 0           return FILE_MISSING;
213             } elsif ($rv =~ /^ERROR /) {
214 0           return; # old server, fallback to HTTP
215             }
216              
217 0           chomp($rv);
218 0           return undeferr("mogstored failed to handle ($alg $uri): $rv");
219             }
220              
221             sub digest_http {
222 0     0 0   my ($self, $alg, $ping_cb) = @_;
223              
224 0           my $digest = Digest->new($alg);
225             my %opts = (
226             port => $self->{port},
227             # default (4K) is tiny, use 1M like replicate
228             read_size_hint => 0x100000,
229             content_cb => sub {
230 0     0     $digest->add($_[0]);
231 0           $ping_cb->();
232             },
233 0           );
234              
235 0           my $res;
236 0     0     $self->host->http("GET", $self->{uri}, \%opts, sub { ($res) = @_ });
  0            
237              
238             # TODO: async interface for workers already running Danga::Socket->EventLoop
239 0     0     Danga::Socket->SetPostLoopCallback(sub { !defined $res });
  0            
240 0           Danga::Socket->EventLoop;
241              
242 0 0         return $digest->digest if $res->is_success;
243 0 0         return FILE_MISSING if $res->code == 404;
244 0           return undeferr("Failed $alg (GET) check for $self->{url} (" . $res->code . "): "
245             . $res->message);
246             }
247              
248             sub digest {
249 0     0 0   my ($self, $alg, $ping_cb, $reason) = @_;
250 0           my $digest = $self->digest_mgmt($alg, $ping_cb, $reason);
251              
252 0 0 0       return $digest if ($digest && $digest ne FILE_MISSING);
253              
254 0           $self->digest_http($alg, $ping_cb);
255             }
256              
257             1;