File Coverage

blib/lib/MogileFS/NewHTTPFile.pm
Criterion Covered Total %
statement 24 204 11.7
branch 0 88 0.0
condition 0 30 0.0
subroutine 8 29 27.5
pod 0 3 0.0
total 32 354 9.0


line stmt bran cond sub pod time code
1             ################################################################################
2             # MogileFS::HTTPFile object
3             # NOTE: This is meant to be used within IO::WrapTie...
4             #
5              
6             package MogileFS::NewHTTPFile;
7              
8 4     4   23 use strict;
  4         9  
  4         162  
9 4     4   21 no strict 'refs';
  4         8  
  4         96  
10              
11 4     4   19 use Carp;
  4         9  
  4         277  
12 4     4   22 use POSIX qw( EAGAIN );
  4         7  
  4         66  
13 4     4   5361 use Socket qw( PF_INET SOCK_STREAM );
  4         9  
  4         220  
14 4     4   23 use Errno qw( EINPROGRESS EISCONN );
  4         7  
  4         221  
15              
16 4     4   21 use vars qw($PROTO_TCP);
  4         9  
  4         314  
17              
18 4         40 use fields ('host',
19             'sock', # IO::Socket; created only when we need it
20             'uri',
21             'data', # buffered data we have
22             'pos', # simulated file position
23             'length', # length of data field
24             'content_length', # declared length of data we will be receiving (not required)
25             'mg',
26             'fid',
27             'devid',
28             'class',
29             'key',
30             'path', # full URL to save data to
31             'backup_dests',
32             'bytes_out', # count of how many bytes we've written to the socket
33             'data_in', # storage for data we've read from the socket
34             'create_close_args', # Extra arguments hashref for the do_request of create_close during CLOSE
35 4     4   20 );
  4         6  
36              
37 0     0 0   sub path { _getset(shift, 'path'); }
38 0     0 0   sub class { _getset(shift, 'class', @_); }
39 0     0 0   sub key { _getset(shift, 'key', @_); }
40              
41             sub _parse_url {
42 0     0     my MogileFS::NewHTTPFile $self = shift;
43 0           my $url = shift;
44 0 0         return 0 unless $url =~ m!http://(.+?)(/.+)$!;
45 0           $self->{host} = $1;
46 0           $self->{uri} = $2;
47 0           $self->{path} = $url;
48 0           return 1;
49             }
50              
51             sub TIEHANDLE {
52 0     0     my MogileFS::NewHTTPFile $self = shift;
53 0 0         $self = fields::new($self) unless ref $self;
54              
55 0           my %args = @_;
56 0 0         return undef unless $self->_parse_url($args{path});
57              
58 0           $self->{data} = '';
59 0           $self->{length} = 0;
60 0   0       $self->{backup_dests} = $args{backup_dests} || [];
61 0           $self->{content_length} = $args{content_length} + 0;
62 0           $self->{pos} = 0;
63 0           $self->{$_} = $args{$_} foreach qw(mg fid devid class key);
64 0           $self->{bytes_out} = 0;
65 0           $self->{data_in} = '';
66 0   0       $self->{create_close_args} = $args{create_close_args} || {};
67              
68 0           return $self;
69             }
70             *new = *TIEHANDLE;
71              
72             sub _sock_to_host { # (host)
73 0     0     my MogileFS::NewHTTPFile $self = shift;
74 0           my $host = shift;
75              
76             # setup
77 0           my ($ip, $port) = $host =~ /^(.*):(\d+)$/;
78 0           my $sock = "Sock_$host";
79 0   0       my $proto = $PROTO_TCP ||= getprotobyname('tcp');
80 0           my $sin;
81              
82             # create the socket
83 0           socket($sock, PF_INET, SOCK_STREAM, $proto);
84 0           $sin = Socket::sockaddr_in($port, Socket::inet_aton($ip));
85              
86             # unblock the socket
87 0           IO::Handle::blocking($sock, 0);
88              
89             # attempt a connection
90 0           my $ret = connect($sock, $sin);
91 0 0 0       if (!$ret && $! == EINPROGRESS) {
92 0           my $win = '';
93 0           vec($win, fileno($sock), 1) = 1;
94              
95             # watch for writeability
96 0 0         if (select(undef, $win, undef, 3) > 0) {
97 0           $ret = connect($sock, $sin);
98              
99             # EISCONN means connected & won't re-connect, so success
100 0 0 0       $ret = 1 if !$ret && $! == EISCONN;
101             }
102             }
103              
104             # just throw back the socket we have
105 0 0         return $sock if $ret;
106 0           return undef;
107             }
108              
109             sub _connect_sock {
110 0     0     my MogileFS::NewHTTPFile $self = shift;
111 0 0         return 1 if $self->{sock};
112              
113 0           my @down_hosts;
114              
115 0   0       while (!$self->{sock} && $self->{host}) {
116             # attempt to connect
117 0 0         return 1 if
118             $self->{sock} = $self->_sock_to_host($self->{host});
119              
120 0           push @down_hosts, $self->{host};
121 0 0         if (my $dest = shift @{$self->{backup_dests}}) {
  0            
122             # dest is [$devid,$path]
123 0           _debug("connecting to $self->{host} (dev $self->{devid}) failed; now trying $dest->[1] (dev $dest->[0])");
124 0 0         $self->_parse_url($dest->[1]) or _fail("bogus URL");
125 0           $self->{devid} = $dest->[0];
126             } else {
127 0           $self->{host} = undef;
128             }
129             }
130              
131 0           _fail("unable to open socket to storage node (tried: @down_hosts): $!");
132             }
133              
134             # abstracted read; implements what ends up being a blocking read but
135             # does it in terms of non-blocking operations.
136             sub _getline {
137 0     0     my MogileFS::NewHTTPFile $self = shift;
138 0   0       my $timeout = shift || 3;
139 0 0         return undef unless $self->{sock};
140              
141             # short cut if we already have data read
142 0 0         if ($self->{data_in} =~ s/^(.*?\r?\n)//) {
143 0           return $1;
144             }
145              
146 0           my $rin = '';
147 0           vec($rin, fileno($self->{sock}), 1) = 1;
148              
149             # nope, we have to read a line
150 0           my $nfound;
151 0           my $t1 = Time::HiRes::time();
152 0           while ($nfound = select($rin, undef, undef, $timeout)) {
153 0           my $data;
154 0           my $bytesin = sysread($self->{sock}, $data, 1024);
155 0 0         if (defined $bytesin) {
156             # we can also get 0 here, which means EOF. no error, but no data.
157 0 0         $self->{data_in} .= $data if $bytesin;
158             } else {
159 0 0         next if $! == EAGAIN;
160 0           _fail("error reading from node for device $self->{devid}: $!");
161             }
162              
163             # return a line if we got one
164 0 0         if ($self->{data_in} =~ s/^(.*?\r?\n)//) {
165 0           return $1;
166             }
167              
168             # and if we got no data, it's time to return EOF
169 0 0         unless ($bytesin) {
170 0           $@ = "\$bytesin is 0";
171 0           return undef;
172             }
173             }
174              
175             # if we got here, nothing was readable in our time limit
176 0           my $t2 = Time::HiRes::time();
177 0           $@ = sprintf("not readable in %0.02f seconds", $t2-$t1);
178 0           return undef;
179             }
180              
181             # abstracted write function that uses non-blocking I/O and checking for
182             # writeability to ensure that we don't get stuck doing a write if the
183             # node we're talking to goes down. also handles logic to fall back to
184             # a backup node if we're on our first write and the first node is down.
185             # this entire function is a blocking function, it just uses intelligent
186             # non-blocking write functionality.
187             #
188             # this function returns success (1) or it croaks on failure.
189             sub _write {
190 0     0     my MogileFS::NewHTTPFile $self = shift;
191 0 0         return undef unless $self->{sock};
192              
193 0           my $win = '';
194 0           vec($win, fileno($self->{sock}), 1) = 1;
195              
196             # setup data and counters
197 0           my $data = shift();
198 0           my $bytesleft = length($data);
199 0           my $bytessent = 0;
200              
201             # main sending loop for data, will keep looping until all of the data
202             # we've been asked to send is sent
203 0           my $nfound;
204 0   0       while ($bytesleft && ($nfound = select(undef, $win, undef, 3))) {
205 0           my $bytesout = syswrite($self->{sock}, $data, $bytesleft, $bytessent);
206 0 0         if (defined $bytesout) {
207             # update our myriad counters
208 0           $bytessent += $bytesout;
209 0           $self->{bytes_out} += $bytesout;
210 0           $bytesleft -= $bytesout;
211             } else {
212             # if we get EAGAIN, restart the select loop, else fail
213 0 0         next if $! == EAGAIN;
214 0           _fail("error writing to node for device $self->{devid}: $!");
215             }
216             }
217 0 0         return 1 unless $bytesleft;
218              
219             # at this point, we had a socket error, since we have bytes left, and
220             # the loop above didn't finish sending them. if this was our first
221             # write, let's try to fall back to a different host.
222 0 0         unless ($self->{bytes_out}) {
223 0 0         if (my $dest = shift @{$self->{backup_dests}}) {
  0            
224             # dest is [$devid,$path]
225 0 0         $self->_parse_url($dest->[1]) or _fail("bogus URL");
226 0           $self->{devid} = $dest->[0];
227 0           $self->_connect_sock;
228              
229             # now repass this write to try again
230 0           return $self->_write($data);
231             }
232             }
233              
234             # total failure (croak)
235 0           $self->{sock} = undef;
236 0           _fail(sprintf("unable to write to any allocated storage node, last tried dev %s on host %s uri %s. Had sent %s bytes, %s bytes left", $self->{devid}, $self->{host}, $self->{uri}, $self->{bytes_out}, $bytesleft));
237             }
238              
239             sub PRINT {
240 0     0     my MogileFS::NewHTTPFile $self = shift;
241              
242             # get data to send to server
243 0           my $data = shift;
244 0           my $newlen = length $data;
245 0           $self->{pos} += $newlen;
246              
247             # now make socket if we don't have one
248 0 0 0       if (!$self->{sock} && $self->{content_length}) {
249 0           $self->_connect_sock;
250 0           $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{content_length}\r\n\r\n");
251             }
252              
253             # write some data to our socket
254 0 0         if ($self->{sock}) {
255             # save the first 1024 bytes of data so that we can seek back to it
256             # and do some work later
257 0 0         if ($self->{length} < 1024) {
258 0 0         if ($self->{length} + $newlen > 1024) {
259 0           $self->{length} = 1024;
260 0           $self->{data} .= substr($data, 0, 1024 - $self->{length});
261             } else {
262 0           $self->{length} += $newlen;
263 0           $self->{data} .= $data;
264             }
265             }
266              
267             # actually write
268 0           $self->_write($data);
269             } else {
270             # or not, just stick it on our queued data
271 0           $self->{data} .= $data;
272 0           $self->{length} += $newlen;
273             }
274             }
275             *print = *PRINT;
276              
277             sub CLOSE {
278 0     0     my MogileFS::NewHTTPFile $self = shift;
279              
280             # if we're closed and we have no sock...
281 0 0         unless ($self->{sock}) {
282 0           $self->_connect_sock;
283 0           $self->_write("PUT $self->{uri} HTTP/1.0\r\nContent-length: $self->{length}\r\n\r\n");
284 0           $self->_write($self->{data});
285             }
286              
287             # set a message in $! and $@
288             my $err = sub {
289 0     0     $@ = "$_[0]\n";
290 0           return undef;
291 0           };
292              
293             # get response from put
294 0 0         if ($self->{sock}) {
295 0           my $line = $self->_getline(6); # wait up to 6 seconds for response to PUT.
296              
297 0 0         return $err->("Unable to read response line from server ($self->{sock}) after PUT of $self->{length} to $self->{uri}. _getline says: $@")
298             unless defined $line;
299              
300 0 0         if ($line =~ m!^HTTP/\d+\.\d+\s+(\d+)!) {
301             # all 2xx responses are success
302 0 0 0       unless ($1 >= 200 && $1 <= 299) {
303 0           my $errcode = $1;
304             # read through to the body
305 0           my ($found_header, $body);
306 0           while (defined (my $l = $self->_getline)) {
307             # remove trailing stuff
308 0           $l =~ s/[\r\n\s]+$//g;
309 0 0         $found_header = 1 unless $l;
310 0 0         next unless $found_header;
311              
312             # add line to the body, with a space for readability
313 0           $body .= " $l";
314             }
315 0 0         $body = substr($body, 0, 512) if length $body > 512;
316 0           return $err->("HTTP response $errcode from upload of $self->{uri} to $self->{sock}: $body");
317             }
318             } else {
319 0           return $err->("Response line not understood from $self->{sock}: $line");
320             }
321 0           $self->{sock}->close;
322             }
323              
324 0           my MogileFS $mg = $self->{mg};
325 0           my $domain = $mg->{domain};
326              
327 0           my $fid = $self->{fid};
328 0           my $devid = $self->{devid};
329 0           my $path = $self->{path};
330              
331 0           my $create_close_args = $self->{create_close_args};
332              
333 0   0       my $key = shift || $self->{key};
334              
335 0 0         my $rv = $mg->{backend}->do_request
336             ("create_close", {
337             %$create_close_args,
338             fid => $fid,
339             devid => $devid,
340             domain => $domain,
341             size => $self->{content_length} ? $self->{content_length} : $self->{length},
342             key => $key,
343             path => $path,
344             });
345 0 0         unless ($rv) {
346             # set $@, as our callers expect $@ to contain the error message that
347             # failed during a close. since we failed in the backend, we have to
348             # do this manually.
349 0           return $err->("$mg->{backend}->{lasterr}: $mg->{backend}->{lasterrstr}");
350             }
351              
352 0           return 1;
353             }
354             *close = *CLOSE;
355              
356             sub TELL {
357             # return our current pos
358 0     0     return $_[0]->{pos};
359             }
360             *tell = *TELL;
361              
362             sub SEEK {
363             # simply set pos...
364 0 0   0     _fail("seek past end of file") if $_[1] > $_[0]->{length};
365 0           $_[0]->{pos} = $_[1];
366             }
367             *seek = *SEEK;
368              
369             sub EOF {
370 0 0   0     return ($_[0]->{pos} >= $_[0]->{length}) ? 1 : 0;
371             }
372             *eof = *EOF;
373              
374 0     0     sub BINMODE {
375             # no-op, we're always in binary mode
376             }
377             *binmode = *BINMODE;
378              
379             sub READ {
380 0     0     my MogileFS::NewHTTPFile $self = shift;
381 0           my $count = $_[1] + 0;
382              
383 0           my $max = $self->{length} - $self->{pos};
384 0 0         $max = $count if $count < $max;
385              
386 0           $_[0] = substr($self->{data}, $self->{pos}, $max);
387 0           $self->{pos} += $max;
388              
389 0           return $max;
390             }
391             *read = *READ;
392              
393              
394             ################################################################################
395             # MogileFS::NewHTTPFile class methods
396             #
397              
398             sub _fail {
399 0     0     croak "MogileFS::NewHTTPFile: $_[0]";
400             }
401              
402             sub _debug {
403 0     0     MogileFS::Client::_debug(@_);
404             }
405              
406             sub _getset {
407 0     0     my MogileFS::NewHTTPFile $self = shift;
408 0           my $what = shift;
409              
410 0 0         if (@_) {
411             # note: we're a TIEHANDLE interface, so we're not QUITE like a
412             # normal class... our parameters tend to come in via an arrayref
413 0           my $val = shift;
414 0 0         $val = shift(@$val) if ref $val eq 'ARRAY';
415 0           return $self->{$what} = $val;
416             } else {
417 0           return $self->{$what};
418             }
419             }
420              
421             sub _fid {
422 0     0     my MogileFS::NewHTTPFile $self = shift;
423 0           return $self->{fid};
424             }
425              
426             1;