File Coverage

blib/lib/MogileFS/Client/Async.pm
Criterion Covered Total %
statement 52 196 26.5
branch 2 58 3.4
condition 4 23 17.3
subroutine 15 27 55.5
pod 5 6 83.3
total 78 310 25.1


line stmt bran cond sub pod time code
1             package MogileFS::Client::Async;
2 3     3   30258 use strict;
  3         7  
  3         222  
3 3     3   17 use warnings;
  3         6  
  3         88  
4 3     3   5398 use AnyEvent;
  3         30531  
  3         116  
5 3     3   3886 use AnyEvent::HTTP;
  3         146512  
  3         339  
6 3     3   4664 use AnyEvent::Socket;
  3         77203  
  3         554  
7 3     3   15327 use URI;
  3         22822  
  3         155  
8 3     3   31 use Carp qw/confess/;
  3         6  
  3         218  
9 3     3   3779 use POSIX qw( EAGAIN );
  3         26664  
  3         34  
10 3     3   5785 use Socket qw/ IPPROTO_TCP /;
  3         7  
  3         170  
11              
12 3     3   29 use base qw/ MogileFS::Client /;
  3         7  
  3         4914  
13              
14 3     3   392776 use IO::AIO qw/ fadvise /;
  3         16801  
  3         7929  
15              
16 3 50   3   37 use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX
  3         9  
  3         500  
17              
18             our $VERSION = '0.030';
19              
20             =head1 NAME
21              
22             MogileFS:Client::Async
23              
24             =head1 SYNOPSIS
25              
26             my $mogfs = MogileFS::Client::Async->new( ... )
27              
28             $mogfs->read_to_file($key, $filename);
29              
30             $mogfs->store_file($key, $class, $filename, \%opts );
31              
32             $mogfs->store_content($key, $class, \$content, \%opts );
33              
34             =head1 DESCRIPTION
35              
36             This package provides replacement implementations of some methods in
37             L to allow for non-blocking IO under L and the
38             ability to read and write files stored in MogileFS without needing to store
39             the entire file in memory.
40              
41             =head1 SEE ALSO
42              
43             =over
44              
45             =item *
46             L
47              
48             =item *
49             L
50              
51             =item *
52             L
53              
54             =back
55              
56             =cut
57              
58 3     3   3656 use namespace::clean;
  3         66553  
  3         31  
59              
60 0     0 1 0 sub new_file { confess("new_file is unsupported in " . __PACKAGE__) }
61 0     0 1 0 sub edit_file { confess("edit_file is unsupported in " . __PACKAGE__) }
62 0     0 1 0 sub read_file { confess("read_file is unsupported in " . __PACKAGE__) }
63              
64             sub read_to_file {
65 1     1 0 761225 my $self = shift;
66 1         3 my $key = shift;
67 1         3 my $fn = shift;
68              
69 1         20 my @paths = $self->get_paths($key);
70              
71 0 0       0 die("No paths for $key") unless @paths;
72              
73 0         0 for (1..2) {
74 0         0 foreach my $path (@paths) {
75 0         0 my ($bytes, $write) = (0, undef);
76 0 0       0 open $write, '>', $fn or confess("Could not open $fn to write");
77              
78 0         0 my $cv = AnyEvent->condvar;
79 0         0 my $h;
80             my $guard = http_request
81             GET => $path,
82             timeout => 120, # 2m
83             on_header => sub {
84 0     0   0 my ($headers) = @_;
85 0 0       0 return 0 if ($headers->{Status} != 200);
86 0         0 $h = $headers;
87 0         0 1;
88             },
89             on_body => sub {
90 0 0   0   0 syswrite $write, $_[0] or return 0;
91 0         0 $bytes += length($_[0]);
92 0         0 1;
93             },
94             sub { # On complete!
95 0     0   0 my (undef, $headers) = @_;
96 0         0 $h = $headers;
97 0         0 close($write);
98 0         0 undef $write;
99 0         0 $cv->send;
100 0         0 1;
101 0         0 };
102 0         0 $cv->recv;
103 0 0 0     0 return $bytes if ($bytes && !$write);
104             # Error..
105 0         0 $h->{Code} = 590;
106 0         0 $h->{Reason} = "Unknown error";
107 0         0 warn("HTTP error getting mogile $key: " . $h->{Reason} . "\n");
108 0         0 close $write;
109 0         0 unlink $fn;
110             }
111             }
112 0         0 confess("Could not read $key from mogile");
113             }
114              
115             sub store_file {
116 1     1 1 5088 my $self = shift;
117 1 50       8 return undef if $self->{readonly};
118              
119 1         5 my ($key, $class, $file, $opts) = @_;
120 1   50     10 $opts ||= {};
121              
122             # Extra args to be passed along with the create_open and create_close commands.
123             # Any internally generated args of the same name will overwrite supplied ones in
124             # these hashes.
125 1   50     75 my $create_open_args = $opts->{create_open_args} || {};
126 1   50     9 my $create_close_args = $opts->{create_close_args} || {};
127              
128 1         13 $self->run_hook('store_file_start', $self, $key, $class, $opts);
129 1         14 $self->run_hook('new_file_start', $self, $key, $class, $opts);
130              
131 1 0 50     28 my $res = $self->{backend}->do_request(
132             create_open => {
133             %$create_open_args,
134             domain => $self->{domain},
135             class => $class,
136             key => $key,
137             fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one
138             multi_dest => 1,
139             }
140             ) or return undef;
141              
142 0           my $dests = []; # [ [devid,path], [devid,path], ... ]
143              
144             # determine old vs. new format to populate destinations
145 0 0         unless (exists $res->{dev_count}) {
146 0           push @$dests, [ $res->{devid}, $res->{path} ];
147             } else {
148 0           for my $i (1..$res->{dev_count}) {
149 0           push @$dests, [ $res->{"devid_$i"}, $res->{"path_$i"} ];
150             }
151             }
152              
153 0           my ($length, $error, $devid, $path);
154 0           my @dests = (@$dests, @$dests, @$dests); # 2 retries
155 0           my $try = 0;
156 0           foreach my $dest (@dests) {
157 0           $try++;
158 0           ($devid, $path) = @$dest;
159 0           my $uri = URI->new($path);
160 0           my $cv = AnyEvent->condvar;
161 0           my ($socket_guard, $socket_fh);
162             $socket_guard = tcp_connect $uri->host, $uri->port, sub {
163 0     0     my ($fh, $host, $port) = @_;
164 0           $error = $!;
165 0 0         if (!$fh) {
166 0           $cv->send;
167 0           return;
168             }
169 0           $socket_fh = $fh;
170 0 0         setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK;
171 0           $cv->send;
172 0     0     }, sub { 10 };
  0            
173 0           $cv->recv;
174 0 0         if (! $socket_fh) {
175 0   0       $error ||= 'unknown error';
176 0           warn("Connection error: $error to $path");
177 0           next;
178             }
179 0           undef $error;
180             # We are connected!
181 0 0         open my $fh_from, "<", $file or confess("Could not open $file");
182              
183             # Hint to Linux that doubling readahead will probably pay off.
184 0           fadvise($fh_from, 0, 0, IO::AIO::FADV_SEQUENTIAL());
185              
186 0           $length = -s $file;
187 0           my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $length\r\n\r\n";
188 0           $cv = AnyEvent->condvar;
189 0           my $w;
190             my $timeout;
191             my $reset_timer = sub {
192 0     0     my ($type, $time) = @_;
193 0   0       $type ||= 'unknown';
194 0   0       $time ||= 60;
195 0           my $start = time();
196             $timeout = AnyEvent->timer(
197             after => $time,
198             cb => sub {
199 0           undef $w;
200 0           my $took = time() - $start;
201 0           $error = "Connection timed out duing data transfer of type $type (after $took seconds)";
202 0           $cv->send;
203             },
204 0           );
205 0           };
206             $w = AnyEvent->io( fh => $socket_fh, poll => 'w', cb => sub {
207 0     0     $reset_timer->('read');
208 0 0         if (!length($buf)) {
209 0           my $bytes = sysread $fh_from, $buf, '4096';
210 0           $reset_timer->('write');
211 0 0         if (!defined $bytes) { # Error, read FH blocking, no need to check EAGAIN
212 0           $error = $!;
213 0           $cv->send;
214 0           return;
215             }
216 0 0         if (0 == $bytes) { # EOF reading, and we already wrote everything
217 0           $cv->send;
218 0           return;
219             }
220             }
221 0           my $len = syswrite $socket_fh, $buf;
222 0           $reset_timer->('loop');
223 0 0 0       if ($len && $len > 0) {
224 0           $buf = substr $buf, $len;
225             }
226 0 0 0       if (!defined $len && $! != EAGAIN) { # Error, we could get EAGAIN as write sock non-blocking
227 0           $error = $!;
228 0           $cv->send;
229 0           return;
230             }
231 0           });
232 0           $reset_timer->('start PUT');
233 0           $cv->recv;
234 0 0         setsockopt($socket_fh, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK" if TCP_CORK;
235 0 0         shutdown($socket_fh, 1) or warn "could not shutdown our socket: $!";
236 0           $cv = AnyEvent->condvar;
237             # FIXME - Cheat here, the response should be small, so we assume it'll allways all be
238             # readable at once, THIS MAY NOT BE TRUE!!!
239             $w = AnyEvent->io( fh => $socket_fh, poll => 'r', cb => sub {
240 0     0     undef $timeout;
241 0           undef $w;
242 0           $cv->send;
243 0           my $buf;
244 0           do {
245 0 0         if ($socket_fh->eof) {
246 0           $error = "Connection closed unexpectedly without response";
247 0           return;
248             }
249 0           my $res; $socket_fh->read($res, 4096); $buf .= $res;
  0            
  0            
250             } while (!length($buf));
251 0           my ($top, @headers) = split /\r?\n/, $buf;
252 0 0         if ($top =~ m{HTTP/1.[01]\s+2\d\d}) {
253             # Woo, 200!
254 0           undef $error;
255             }
256             else {
257 0           $error = "Got non-200 from remote server $top";
258             }
259 0           });
260 0           $reset_timer->('response', 1200); # Wait up to 20m, as lighty
261             # may have to copy the file between
262             # disks. EWWWW
263 0           $cv->recv;
264 0           undef $timeout;
265 0 0         if ($error) {
266 0           warn("Error sending data (try $try) to $uri: $error");
267 0           next; # Retry
268             }
269 0           last; # Success
270             }
271 0 0         die("Could not write to any mogile hosts, should have tried " . scalar(@$dests) . " did try $try")
272             if $error;
273              
274 0           $self->run_hook('new_file_end', $self, $key, $class, $opts);
275              
276 0           my $rv = $self->{backend}->do_request
277             ("create_close", {
278             fid => $res->{fid},
279             devid => $devid,
280             domain => $self->{domain},
281             size => $length,
282             key => $key,
283             path => $path,
284             });
285              
286 0 0         unless ($rv) {
287 0           die "$self->{backend}->{lasterr}: $self->{backend}->{lasterrstr}";
288 0           return undef;
289             }
290              
291 0           $self->run_hook('store_file_end', $self, $key, $class, $opts);
292              
293 0           return $length;
294             }
295              
296             sub store_content {
297 0     0 1   my MogileFS::Client $self = shift;
298 0 0         return undef if $self->{readonly};
299              
300 0           my($key, $class, $content, $opts) = @_;
301              
302 0           $self->run_hook('store_content_start', $self, $key, $class, $opts);
303              
304 0 0         my $fh = $self->new_file($key, $class, undef, $opts) or return;
305 0 0         $content = ref($content) eq 'SCALAR' ? $$content : $content;
306 0           $fh->print($content);
307              
308 0           $self->run_hook('store_content_end', $self, $key, $class, $opts);
309              
310 0 0         $fh->close or return;
311 0           length($content);
312             }
313              
314             1;
315