File Coverage

blib/lib/MogileFS/Backend.pm
Criterion Covered Total %
statement 110 224 49.1
branch 25 92 27.1
condition 10 41 24.3
subroutine 20 32 62.5
pod 0 11 0.0
total 165 400 41.2


line stmt bran cond sub pod time code
1             package MogileFS::Backend;
2              
3 4     4   24 use strict;
  4         8  
  4         161  
4 4     4   22 no strict 'refs';
  4         6  
  4         89  
5              
6 4     4   20 use Carp;
  4         8  
  4         353  
7 4     4   5393 use IO::Socket::INET;
  4         111571  
  4         40  
8 4     4   2618 use Socket qw( MSG_NOSIGNAL PF_INET IPPROTO_TCP SOCK_STREAM );
  4         7  
  4         1119  
9 4     4   24 use Errno qw( EINPROGRESS EWOULDBLOCK EISCONN );
  4         8  
  4         574  
10 4     4   3817 use POSIX ();
  4         27519  
  4         108  
11 4     4   34 use MogileFS::Client;
  4         10  
  4         126  
12 4     4   22 use List::Util qw/ shuffle /;
  4         9  
  4         519  
13              
14 4         42 use fields ('hosts', # arrayref of "$host:$port" of mogilefsd servers
15             'host_dead', # "$host:$port" -> $time (of last connect failure)
16             'lasterr', # string: \w+ identifier of last error
17             'lasterrstr', # string: english of last error
18             'sock_cache', # cached socket to mogilefsd tracker
19             'pref_ip', # hashref; { ip => preferred ip }
20             'timeout', # time in seconds to allow sockets to become readable
21             'last_host_connected', # "ip:port" of last host connected to
22             'last_host_idx', # array index of the last host we connected to
23             'hooks', # hash: hookname -> coderef
24 4     4   19 );
  4         7  
25              
26 4     4   411 use vars qw($FLAG_NOSIGNAL $PROTO_TCP);
  4         8  
  4         11367  
27             eval { $FLAG_NOSIGNAL = MSG_NOSIGNAL; };
28              
29             sub new {
30 1     1 0 3 my MogileFS::Backend $self = shift;
31 1 50       11 $self = fields::new($self) unless ref $self;
32              
33 1         90 return $self->_init(@_);
34             }
35              
36             sub reload {
37 0     0 0 0 my MogileFS::Backend $self = shift;
38 0 0       0 return undef unless $self;
39              
40 0         0 return $self->_init(@_);
41             }
42              
43             sub _init {
44 1     1   4 my MogileFS::Backend $self = shift;
45              
46 1         2 my %args = @_;
47              
48             # FIXME: add actual validation
49             {
50 1 50       2 $self->{hosts} = $args{hosts} or
  1         7  
51             _fail("constructor requires parameter 'hosts'");
52              
53 1 50       6 _fail("'hosts' argument must be an arrayref")
54             unless ref $self->{hosts} eq 'ARRAY';
55              
56 1         12 _fail("'hosts' argument must be of form: 'host:port'")
57 1 50       2 if grep(! /:\d+$/, @{$self->{hosts}});
58              
59 1 50 33     6 _fail("'timeout' argument must be a number")
60             if $args{timeout} && $args{timeout} !~ /^\d+$/;
61 1   50     10 $self->{timeout} = $args{timeout} || 3;
62             }
63              
64 1         2 $self->{hosts} = [ shuffle(@{ $self->{hosts} }) ];
  1         50  
65              
66 1         3 $self->{host_dead} = {};
67              
68 1         9 return $self;
69             }
70              
71             sub run_hook {
72 0     0 0 0 my MogileFS::Backend $self = shift;
73 0   0     0 my $hookname = shift || return;
74              
75 0         0 my $hook = $self->{hooks}->{$hookname};
76 0 0       0 return unless $hook;
77              
78 0         0 eval { $hook->(@_) };
  0         0  
79              
80 0 0       0 warn "MogileFS::Backend hook '$hookname' threw error: $@\n" if $@;
81             }
82              
83             sub add_hook {
84 0     0 0 0 my MogileFS::Backend $self = shift;
85 0   0     0 my $hookname = shift || return;
86              
87 0 0       0 if (@_) {
88 0         0 $self->{hooks}->{$hookname} = shift;
89             } else {
90 0         0 delete $self->{hooks}->{$hookname};
91             }
92             }
93              
94             sub set_pref_ip {
95 0     0 0 0 my MogileFS::Backend $self = shift;
96 0         0 $self->{pref_ip} = shift;
97 0 0 0     0 $self->{pref_ip} = undef
98             unless $self->{pref_ip} &&
99             ref $self->{pref_ip} eq 'HASH';
100             }
101              
102             sub _wait_for_readability {
103 0     0   0 my ($fileno, $timeout) = @_;
104 0 0 0     0 return 0 unless $fileno && $timeout;
105              
106 0         0 my $rin = '';
107 0         0 vec($rin, $fileno, 1) = 1;
108             # FIXME: signals/ptrace attach can interrupt the select. we should resume selecting
109             # and keep track of hires time remaining
110 0         0 my $nfound = select($rin, undef, undef, $timeout);
111              
112             # undef/0 are failure, 1 is success
113 0 0       0 return $nfound ? 1 : 0;
114             }
115              
116             sub do_request {
117 1     1 0 3 my MogileFS::Backend $self = shift;
118 1         4 my ($cmd, $args) = @_;
119              
120 1 50 33     12 _fail("invalid arguments to do_request")
121             unless $cmd && $args;
122              
123 1 50       7 local $SIG{'PIPE'} = "IGNORE" unless $FLAG_NOSIGNAL;
124              
125 1         4 my $sock = $self->{sock_cache};
126 1         6 my $argstr = _encode_url_string(%$args);
127 1         6 my $req = "$cmd $argstr\r\n";
128 1         4 my $reqlen = length($req);
129 1         3 my $rv = 0;
130              
131 1 50       4 if ($sock) {
132             # try our cached one, but assume it might be bogus
133 0         0 $self->run_hook('do_request_start', $cmd, $self->{last_host_connected});
134 0         0 _debug("SOCK: cached = $sock, REQ: $req");
135 0         0 $rv = send($sock, $req, $FLAG_NOSIGNAL);
136 0 0 0     0 if ($! || ! defined $rv) {
    0          
137             # undef is error, but $! may not be populated, we've found
138 0         0 $self->run_hook('do_request_send_error', $cmd, $self->{last_host_connected});
139 0         0 undef $self->{sock_cache};
140             } elsif ($rv != $reqlen) {
141 0         0 $self->run_hook('do_request_length_mismatch', $cmd, $self->{last_host_connected});
142 0         0 return _fail("send() didn't return expected length ($rv, not $reqlen)");
143             }
144             }
145              
146 1 50       51 unless ($rv) {
147 1 50       6 $sock = $self->_get_sock
148             or return _fail("couldn't connect to mogilefsd backend");
149 0         0 $self->run_hook('do_request_start', $cmd, $self->{last_host_connected});
150 0         0 _debug("SOCK: $sock, REQ: $req");
151 0         0 $rv = send($sock, $req, $FLAG_NOSIGNAL);
152 0 0       0 if ($!) {
    0          
153 0         0 $self->run_hook('do_request_send_error', $cmd, $self->{last_host_connected});
154 0         0 return _fail("error talking to mogilefsd tracker: $!");
155             } elsif ($rv != $reqlen) {
156 0         0 $self->run_hook('do_request_length_mismatch', $cmd, $self->{last_host_connected});
157 0         0 return _fail("send() didn't return expected length ($rv, not $reqlen)");
158             }
159 0         0 $self->{sock_cache} = $sock;
160             }
161              
162             # wait up to 3 seconds for the socket to come to life
163 0 0       0 unless (_wait_for_readability(fileno($sock), $self->{timeout})) {
164 0         0 close($sock);
165 0         0 $self->run_hook('do_request_read_timeout', $cmd, $self->{last_host_connected});
166 0         0 undef $self->{sock_cache};
167 0         0 return _fail("timed out after $self->{timeout}s against $self->{last_host_connected} when sending command: [$req]");
168             }
169              
170             # guard against externally-modified $/ changes. patch from
171             # Andreas J. Koenig. in practice nobody should do this, though,
172             # and this line should be unnecessary.
173 0         0 local $/ = "\n";
174              
175 0         0 my $line = <$sock>;
176              
177 0         0 $self->run_hook('do_request_finished', $cmd, $self->{last_host_connected});
178              
179 0         0 _debug("RESPONSE: $line");
180              
181 0 0       0 unless (defined $line) {
182 0         0 undef $self->{sock_cache};
183 0         0 return _fail("socket closed on read");
184             }
185              
186             # ERR
187 0 0       0 if ($line =~ /^ERR\s+(\w+)\s*(\S*)/) {
188 0         0 $self->{'lasterr'} = $1;
189 0 0       0 $self->{'lasterrstr'} = $2 ? _unescape_url_string($2) : undef;
190 0         0 _debug("LASTERR: $1 $2");
191 0         0 return undef;
192             }
193              
194             # OK
195 0 0       0 if ($line =~ /^OK\s+\d*\s*(\S*)/) {
196 0         0 my $args = _decode_url_string($1);
197 0         0 _debug("RETURN_VARS: ", $args);
198 0         0 return $args;
199             }
200              
201 0         0 undef $self->{sock_cache};
202 0         0 _fail("invalid response from server: [$line]");
203 0         0 return undef;
204             }
205              
206             sub errstr {
207 0     0 0 0 my MogileFS::Backend $self = shift;
208 0 0       0 return unless $self->{'lasterr'};
209 0         0 return join(" ", $self->{'lasterr'}, $self->{'lasterrstr'});
210             }
211              
212             sub errcode {
213 0     0 0 0 my MogileFS::Backend $self = shift;
214 0         0 return $self->{lasterr};
215             }
216              
217             sub last_tracker {
218 0     0 0 0 my $self = shift;
219 0         0 return $self->{last_host_connected};
220             }
221              
222             sub err {
223 0     0 0 0 my MogileFS::Backend $self = shift;
224 0 0       0 return $self->{lasterr} ? 1 : 0;
225             }
226              
227             sub force_disconnect {
228 1     1 0 2 my MogileFS::Backend $self = shift;
229 1         4 undef $self->{sock_cache};
230 1         3 return;
231             }
232              
233             ################################################################################
234             # MogileFS::Backend class methods
235             #
236              
237             sub _fail {
238 1     1   218 croak "MogileFS::Backend: $_[0]";
239             }
240              
241             *_debug = *MogileFS::Client::_debug;
242              
243             sub _connect_sock { # sock, sin, timeout
244 1     1   4 my ($sock, $sin, $timeout) = @_;
245 1   50     11 $timeout ||= 0.25;
246              
247             # make the socket non-blocking for the connection if wanted, but
248             # unconditionally set it back to blocking mode at the end
249              
250 1 50       5 if ($timeout) {
251 1         11 IO::Handle::blocking($sock, 0);
252             } else {
253 0         0 IO::Handle::blocking($sock, 1);
254             }
255              
256 1         107 my $ret = connect($sock, $sin);
257              
258 1 50 33     32 if (!$ret && $timeout && $!==EINPROGRESS) {
      33        
259              
260 1         2 my $win='';
261 1         6 vec($win, fileno($sock), 1) = 1;
262              
263 1 50       15 if (select(undef, $win, undef, $timeout) > 0) {
264 1         7 $ret = connect($sock, $sin);
265             # EISCONN means connected & won't re-connect, so success
266 1 50 33     11 $ret = 1 if !$ret && $!==EISCONN;
267             }
268             }
269              
270             # turn blocking back on, as we expect to do blocking IO on our sockets
271 1 50       20 IO::Handle::blocking($sock, 1) if $timeout;
272              
273 1         8 return $ret;
274             }
275              
276             sub _sock_to_host { # (host)
277 1     1   2 my MogileFS::Backend $self = shift;
278 1         3 my $host = shift;
279              
280             # create a socket and try to do a non-blocking connect
281 1         8 my ($ip, $port) = $host =~ /^(.*):(\d+)$/;
282 1         4 my $sock = "Sock_$host";
283 1         2 my $connected = 0;
284 1   33     776 my $proto = $PROTO_TCP ||= getprotobyname('tcp');
285 1         3 my $sin;
286              
287             # try preferred ips
288 1 50 33     7 if ($self->{pref_ip} && (my $prefip = $self->{pref_ip}->{$ip})) {
289 0         0 _debug("using preferred ip $prefip over $ip");
290 0         0 socket($sock, PF_INET, SOCK_STREAM, $proto);
291 0         0 $sin = Socket::sockaddr_in($port, Socket::inet_aton($prefip));
292 0 0       0 if (_connect_sock($sock, $sin, 0.1)) {
293 0         0 $connected = 1;
294 0         0 $self->{last_host_connected} = "$prefip:$port";
295             } else {
296 0         0 _debug("failed connect to preferred ip $prefip");
297 0         0 close $sock;
298             }
299             }
300              
301             # now try the original ip
302 1 50       4 unless ($connected) {
303 1         48 socket($sock, PF_INET, SOCK_STREAM, $proto);
304 1 50       18 my $aton_ip = Socket::inet_aton($ip)
305             or return undef;
306 1         8 $sin = Socket::sockaddr_in($port, $aton_ip);
307 1 50       18 return undef unless _connect_sock($sock, $sin);
308 0         0 $self->{last_host_connected} = $host;
309             }
310              
311             # just throw back the socket we have so far
312 0         0 return $sock;
313             }
314              
315             # return a new mogilefsd socket, trying different hosts until one is found,
316             # or undef if they're all dead
317             sub _get_sock {
318 1     1   3 my MogileFS::Backend $self = shift;
319 1 50       4 return undef unless $self;
320              
321 1         1 my $size = scalar(@{$self->{hosts}});
  1         3  
322 1 50       4 my $tries = $size > 15 ? 15 : $size;
323              
324 1 50       5 unless (defined($self->{last_host_idx})) {
325 1         6 $self->{last_host_idx} = int(rand() * $size);
326             }
327              
328 1         8 my $now = time();
329 1         3 my $sock;
330 1         3 foreach (1..$tries) {
331 1         5 $self->{last_host_idx} = ($self->{last_host_idx}+1) % $size;
332 1         4 my $host = $self->{hosts}->[$self->{last_host_idx}];
333              
334             # try dead hosts every 5 seconds
335 1 50 33     9 next if $self->{host_dead}->{$host} &&
336             $self->{host_dead}->{$host} > $now - 5;
337              
338 1 50       5 last if $sock = $self->_sock_to_host($host);
339              
340             # mark sock as dead
341 1         10 _debug("marking host dead: $host @ $now");
342 1         6 $self->{host_dead}->{$host} = $now;
343             }
344              
345 1         7 return $sock;
346             }
347              
348             sub _escape_url_string {
349 0     0   0 my $str = shift;
350 0         0 $str =~ s/([^a-zA-Z0-9_\,\-.\/\\\: ])/uc sprintf("%%%02x",ord($1))/eg;
  0         0  
351 0         0 $str =~ tr/ /+/;
352 0         0 return $str;
353             }
354              
355             sub _unescape_url_string {
356 0     0   0 my $str = shift;
357 0         0 $str =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
  0         0  
358 0         0 $str =~ tr/+/ /;
359 0         0 return $str;
360             }
361              
362             sub _encode_url_string {
363 1     1   4 my %args = @_;
364 1 50       6 return "" unless %args;
365 0           return join("&",
366 0           map { _escape_url_string($_) . '=' .
367             _escape_url_string($args{$_}) }
368 0           grep { defined $args{$_} } keys %args
369             );
370             }
371              
372             sub _decode_url_string {
373 0     0     my $arg = shift;
374 0 0         my $buffer = ref $arg ? $arg : \$arg;
375 0           my $hashref = {}; # output hash
376              
377 0           my $pair;
378 0           my @pairs = split(/&/, $$buffer);
379 0           my ($name, $value);
380 0           foreach $pair (@pairs) {
381 0           ($name, $value) = split(/=/, $pair);
382 0           $value =~ tr/+/ /;
383 0           $value =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
  0            
384 0           $name =~ tr/+/ /;
385 0           $name =~ s/%([a-fA-F0-9][a-fA-F0-9])/pack("C", hex($1))/eg;
  0            
386 0 0         $hashref->{$name} .= $hashref->{$name} ? "\0$value" : $value;
387             }
388              
389 0           return $hashref;
390             }
391              
392             1;