| line |
stmt |
bran |
cond |
sub |
pod |
time |
code |
|
1
|
|
|
|
|
|
|
package MogileFS::Client::CallbackFile; |
|
2
|
2
|
|
|
2
|
|
71406
|
use strict; |
|
|
2
|
|
|
|
|
13
|
|
|
|
2
|
|
|
|
|
59
|
|
|
3
|
2
|
|
|
2
|
|
30
|
use warnings; |
|
|
2
|
|
|
|
|
4
|
|
|
|
2
|
|
|
|
|
61
|
|
|
4
|
2
|
|
|
2
|
|
511
|
use URI; |
|
|
2
|
|
|
|
|
7076
|
|
|
|
2
|
|
|
|
|
51
|
|
|
5
|
2
|
|
|
2
|
|
12
|
use Carp; |
|
|
2
|
|
|
|
|
3
|
|
|
|
2
|
|
|
|
|
155
|
|
|
6
|
2
|
|
|
2
|
|
470
|
use IO::Socket::INET; |
|
|
2
|
|
|
|
|
20769
|
|
|
|
2
|
|
|
|
|
24
|
|
|
7
|
2
|
|
|
2
|
|
2163
|
use File::Slurp qw/ slurp /; |
|
|
2
|
|
|
|
|
37128
|
|
|
|
2
|
|
|
|
|
122
|
|
|
8
|
2
|
|
|
2
|
|
489
|
use Try::Tiny; |
|
|
2
|
|
|
|
|
2012
|
|
|
|
2
|
|
|
|
|
113
|
|
|
9
|
2
|
|
|
2
|
|
15
|
use Socket qw/ SO_SNDBUF SOL_SOCKET IPPROTO_TCP /; |
|
|
2
|
|
|
|
|
5
|
|
|
|
2
|
|
|
|
|
308
|
|
|
10
|
2
|
|
|
2
|
|
507
|
use Time::HiRes qw/ gettimeofday tv_interval /; |
|
|
2
|
|
|
|
|
1327
|
|
|
|
2
|
|
|
|
|
14
|
|
|
11
|
2
|
|
|
2
|
|
1254
|
use Linux::PipeMagic qw/ syssendfile /; |
|
|
2
|
|
|
|
|
6134
|
|
|
|
2
|
|
|
|
|
124
|
|
|
12
|
2
|
|
|
2
|
|
739
|
use IO::AIO qw/ fadvise /; |
|
|
2
|
|
|
|
|
6051
|
|
|
|
2
|
|
|
|
|
139
|
|
|
13
|
2
|
|
|
2
|
|
889
|
use LWP::Simple qw/ head /; |
|
|
2
|
|
|
|
|
75968
|
|
|
|
2
|
|
|
|
|
14
|
|
|
14
|
|
|
|
|
|
|
|
|
15
|
2
|
|
|
2
|
|
338
|
use base qw/ MogileFS::Client::Async /; |
|
|
2
|
|
|
|
|
4
|
|
|
|
2
|
|
|
|
|
664
|
|
|
16
|
|
|
|
|
|
|
|
|
17
|
2
|
50
|
|
2
|
|
16
|
use constant TCP_CORK => ($^O eq "linux" ? 3 : 0); # XXX |
|
|
2
|
|
|
|
|
4
|
|
|
|
2
|
|
|
|
|
116
|
|
|
18
|
|
|
|
|
|
|
|
|
19
|
2
|
|
|
2
|
|
12
|
use namespace::clean; |
|
|
2
|
|
|
|
|
4
|
|
|
|
2
|
|
|
|
|
13
|
|
|
20
|
|
|
|
|
|
|
|
|
21
|
|
|
|
|
|
|
=head1 NAME |
|
22
|
|
|
|
|
|
|
|
|
23
|
|
|
|
|
|
|
MogileFS::Client::CallbackFile |
|
24
|
|
|
|
|
|
|
|
|
25
|
|
|
|
|
|
|
=head1 SYNOPSIS |
|
26
|
|
|
|
|
|
|
|
|
27
|
|
|
|
|
|
|
my $mogfs = MogileFS::Client::CallbackFile->new( ... ) |
|
28
|
|
|
|
|
|
|
|
|
29
|
|
|
|
|
|
|
open(my $read_fh, "<", "...") or die ... |
|
30
|
|
|
|
|
|
|
my $eventual_length = -s $read_fh; |
|
31
|
|
|
|
|
|
|
my $f = $mogfs->store_file_from_fh($key, $class, $read_fh, $eventual_length, \%opts); |
|
32
|
|
|
|
|
|
|
|
|
33
|
|
|
|
|
|
|
$f->($eventual_length, 0); # upload entire file |
|
34
|
|
|
|
|
|
|
|
|
35
|
|
|
|
|
|
|
$f->($eventual_length, 1); # indicate EOF |
|
36
|
|
|
|
|
|
|
|
|
37
|
|
|
|
|
|
|
=head1 DESCRIPTION |
|
38
|
|
|
|
|
|
|
|
|
39
|
|
|
|
|
|
|
This package inherits from L and provides an additional |
|
40
|
|
|
|
|
|
|
blocking API in which the data you wish to upload is read from a file when |
|
41
|
|
|
|
|
|
|
commanded by a callback function. This allows other processing to take place on |
|
42
|
|
|
|
|
|
|
data as you read it from disc or elsewhere. |
|
43
|
|
|
|
|
|
|
|
|
44
|
|
|
|
|
|
|
The trackers, and storage backends, are tried repeatedly until the file is |
|
45
|
|
|
|
|
|
|
successfully stored, or an error is thrown. |
|
46
|
|
|
|
|
|
|
|
|
47
|
|
|
|
|
|
|
The C<$key> parameter may be a closure. In this case, it is called every time |
|
48
|
|
|
|
|
|
|
before C is called, allowing a different key to be used if an |
|
49
|
|
|
|
|
|
|
upload fails, allowing for additional paranoia. |
|
50
|
|
|
|
|
|
|
|
|
51
|
|
|
|
|
|
|
=head1 SEE ALSO |
|
52
|
|
|
|
|
|
|
|
|
53
|
|
|
|
|
|
|
=over |
|
54
|
|
|
|
|
|
|
|
|
55
|
|
|
|
|
|
|
=item L |
|
56
|
|
|
|
|
|
|
|
|
57
|
|
|
|
|
|
|
=back |
|
58
|
|
|
|
|
|
|
|
|
59
|
|
|
|
|
|
|
=cut |
|
60
|
|
|
|
|
|
|
|
|
61
|
|
|
|
|
|
|
sub store_file_from_fh { |
|
62
|
0
|
|
|
0
|
0
|
|
my $self = shift; |
|
63
|
0
|
0
|
|
|
|
|
return undef if $self->{readonly}; |
|
64
|
|
|
|
|
|
|
|
|
65
|
0
|
|
|
|
|
|
my ($_key, $class, $read_fh, $eventual_length, $opts) = @_; |
|
66
|
0
|
|
0
|
|
|
|
$opts ||= {}; |
|
67
|
|
|
|
|
|
|
|
|
68
|
|
|
|
|
|
|
# Hint to Linux that doubling readahead will probably pay off. |
|
69
|
0
|
|
|
|
|
|
fadvise($read_fh, 0, 0, IO::AIO::FADV_SEQUENTIAL()); |
|
70
|
|
|
|
|
|
|
|
|
71
|
|
|
|
|
|
|
# Extra args to be passed along with the create_open and create_close commands. |
|
72
|
|
|
|
|
|
|
# Any internally generated args of the same name will overwrite supplied ones in |
|
73
|
|
|
|
|
|
|
# these hashes. |
|
74
|
0
|
|
0
|
|
|
|
my $create_open_args = $opts->{create_open_args} || {}; |
|
75
|
0
|
|
0
|
|
|
|
my $create_close_args = $opts->{create_close_args} || {}; |
|
76
|
|
|
|
|
|
|
|
|
77
|
0
|
|
|
|
|
|
my @dests; # ( [devid,path,fid], [devid,path,fid], ... ) |
|
78
|
|
|
|
|
|
|
|
|
79
|
|
|
|
|
|
|
my $key; |
|
80
|
|
|
|
|
|
|
|
|
81
|
|
|
|
|
|
|
my $get_new_dest = sub { |
|
82
|
0
|
0
|
|
0
|
|
|
if (@dests) { |
|
83
|
0
|
|
|
|
|
|
return pop @dests; |
|
84
|
|
|
|
|
|
|
} |
|
85
|
|
|
|
|
|
|
|
|
86
|
0
|
|
|
|
|
|
foreach (1..5) { |
|
87
|
0
|
0
|
|
|
|
|
$key = ref($_key) eq 'CODE' ? $_key->() : $_key; |
|
88
|
|
|
|
|
|
|
|
|
89
|
0
|
|
|
|
|
|
$self->run_hook('store_file_start', $self, $key, $class, $opts); |
|
90
|
0
|
|
|
|
|
|
$self->run_hook('new_file_start', $self, $key, $class, $opts); |
|
91
|
|
|
|
|
|
|
|
|
92
|
|
|
|
|
|
|
# Calls to the backend may be explodey. |
|
93
|
0
|
|
|
|
|
|
my $res; |
|
94
|
|
|
|
|
|
|
try { |
|
95
|
|
|
|
|
|
|
$res = $self->{backend}->do_request( |
|
96
|
|
|
|
|
|
|
create_open => { |
|
97
|
|
|
|
|
|
|
%$create_open_args, |
|
98
|
|
|
|
|
|
|
domain => $self->{domain}, |
|
99
|
|
|
|
|
|
|
class => $class, |
|
100
|
|
|
|
|
|
|
key => $key, |
|
101
|
0
|
|
0
|
|
|
|
fid => $opts->{fid} || 0, # fid should be specified, or pass 0 meaning to auto-generate one |
|
102
|
|
|
|
|
|
|
multi_dest => 1, |
|
103
|
|
|
|
|
|
|
size => $eventual_length, # not supported by current version |
|
104
|
|
|
|
|
|
|
} |
|
105
|
|
|
|
|
|
|
); |
|
106
|
|
|
|
|
|
|
} |
|
107
|
|
|
|
|
|
|
catch { |
|
108
|
0
|
|
|
|
|
|
warn "Mogile backend failed: $_"; |
|
109
|
0
|
0
|
|
|
|
|
$self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect'); |
|
110
|
0
|
|
|
|
|
|
}; |
|
111
|
|
|
|
|
|
|
|
|
112
|
0
|
0
|
|
|
|
|
unless ($res) { |
|
113
|
|
|
|
|
|
|
# Attempting to connect to the Mogile backend completely failed |
|
114
|
|
|
|
|
|
|
# let's sleep for a second to see if the problem clears. We |
|
115
|
|
|
|
|
|
|
# don't sleep for other errors as we'll arrive back here if the |
|
116
|
|
|
|
|
|
|
# network fails eventually. |
|
117
|
0
|
|
|
|
|
|
sleep 1; |
|
118
|
0
|
|
|
|
|
|
next; |
|
119
|
|
|
|
|
|
|
} |
|
120
|
|
|
|
|
|
|
|
|
121
|
0
|
|
|
|
|
|
for my $i (1..$res->{dev_count}) { |
|
122
|
|
|
|
|
|
|
push @dests, { |
|
123
|
|
|
|
|
|
|
devid => $res->{"devid_$i"}, |
|
124
|
|
|
|
|
|
|
path => $res->{"path_$i"}, |
|
125
|
|
|
|
|
|
|
fid => $res->{fid}, |
|
126
|
0
|
|
|
|
|
|
}; |
|
127
|
|
|
|
|
|
|
} |
|
128
|
0
|
0
|
|
|
|
|
if (@dests) { |
|
129
|
0
|
|
|
|
|
|
return pop @dests; |
|
130
|
|
|
|
|
|
|
} |
|
131
|
|
|
|
|
|
|
} |
|
132
|
0
|
|
|
|
|
|
die "Fail to get a destination to write to."; |
|
133
|
0
|
|
|
|
|
|
}; |
|
134
|
|
|
|
|
|
|
|
|
135
|
|
|
|
|
|
|
# When we have a hiccough in your connection, we mark $socket as undef to |
|
136
|
|
|
|
|
|
|
# indicate that we should reconnect. |
|
137
|
0
|
|
|
|
|
|
my $socket; |
|
138
|
|
|
|
|
|
|
|
|
139
|
|
|
|
|
|
|
|
|
140
|
|
|
|
|
|
|
# We keep track of where we last wrote to. |
|
141
|
|
|
|
|
|
|
my $last_written_point; |
|
142
|
|
|
|
|
|
|
|
|
143
|
|
|
|
|
|
|
# The pointing to the arrayref we're currently writing to. |
|
144
|
0
|
|
|
|
|
|
my $current_dest; |
|
145
|
0
|
|
|
|
|
|
my $create_close_timed_out; |
|
146
|
|
|
|
|
|
|
|
|
147
|
|
|
|
|
|
|
return sub { |
|
148
|
0
|
|
|
0
|
|
|
my ($available_to_read, $eof, $checksum) = @_; |
|
149
|
|
|
|
|
|
|
|
|
150
|
0
|
|
|
|
|
|
my $last_error; |
|
151
|
|
|
|
|
|
|
|
|
152
|
|
|
|
|
|
|
my $fail_write_attempt = sub { |
|
153
|
0
|
|
|
|
|
|
my ($msg) = @_; |
|
154
|
0
|
|
0
|
|
|
|
$last_error = $msg || "unknown error"; |
|
155
|
|
|
|
|
|
|
|
|
156
|
0
|
0
|
|
|
|
|
if ($opts->{on_failure}) { |
|
157
|
|
|
|
|
|
|
$opts->{on_failure}->({ |
|
158
|
|
|
|
|
|
|
url => $current_dest ? $current_dest->{path} : undef, |
|
159
|
0
|
0
|
|
|
|
|
bytes_sent => $last_written_point, |
|
160
|
|
|
|
|
|
|
total_bytes => $eventual_length, |
|
161
|
|
|
|
|
|
|
client => 'callbackfile', |
|
162
|
|
|
|
|
|
|
error => $msg, |
|
163
|
|
|
|
|
|
|
}); |
|
164
|
|
|
|
|
|
|
} |
|
165
|
|
|
|
|
|
|
|
|
166
|
0
|
|
|
|
|
|
warn $msg; |
|
167
|
0
|
|
|
|
|
|
$socket = undef; |
|
168
|
0
|
|
|
|
|
|
$last_written_point = 0; |
|
169
|
0
|
|
|
|
|
|
}; |
|
170
|
|
|
|
|
|
|
|
|
171
|
|
|
|
|
|
|
|
|
172
|
0
|
|
|
|
|
|
foreach (1..5) { |
|
173
|
0
|
|
|
|
|
|
$last_error = undef; |
|
174
|
|
|
|
|
|
|
|
|
175
|
|
|
|
|
|
|
# Create a connection to the storage backend |
|
176
|
0
|
0
|
|
|
|
|
unless ($socket) { |
|
177
|
0
|
0
|
|
|
|
|
sysseek($read_fh, 0, 0) or die "seek failed: $!"; |
|
178
|
|
|
|
|
|
|
try { |
|
179
|
0
|
|
|
|
|
|
$last_written_point = 0; |
|
180
|
0
|
|
|
|
|
|
$current_dest = $get_new_dest->(); |
|
181
|
|
|
|
|
|
|
|
|
182
|
0
|
0
|
|
|
|
|
$opts->{on_new_attempt}->($current_dest) if $opts->{on_new_attempt}; |
|
183
|
|
|
|
|
|
|
|
|
184
|
0
|
|
|
|
|
|
my $uri = URI->new($current_dest->{path}); |
|
185
|
|
|
|
|
|
|
$socket = IO::Socket::INET->new( |
|
186
|
|
|
|
|
|
|
Timeout => 10, |
|
187
|
|
|
|
|
|
|
Proto => "tcp", |
|
188
|
|
|
|
|
|
|
PeerPort => $uri->port, |
|
189
|
|
|
|
|
|
|
PeerHost => $uri->host, |
|
190
|
0
|
0
|
|
|
|
|
) or die "connect to ".$current_dest->{path}." failed: $!"; |
|
191
|
|
|
|
|
|
|
|
|
192
|
0
|
0
|
|
|
|
|
$opts->{on_connect}->() if $opts->{on_connect}; |
|
193
|
|
|
|
|
|
|
|
|
194
|
0
|
|
|
|
|
|
my $buf = 'PUT ' . $uri->path . " HTTP/1.0\r\nConnection: close\r\nContent-Length: $eventual_length\r\n\r\n"; |
|
195
|
0
|
0
|
0
|
|
|
|
setsockopt($socket, SOL_SOCKET, SO_SNDBUF, 65536) or warn "could not enlarge socket buffer: $!" if (unpack("I", getsockopt($socket, SOL_SOCKET, SO_SNDBUF)) < 65536); |
|
196
|
0
|
0
|
|
|
|
|
setsockopt($socket, IPPROTO_TCP, TCP_CORK, 1) or warn "could not set TCP_CORK" if TCP_CORK; |
|
197
|
0
|
0
|
|
|
|
|
syswrite($socket, $buf)==length($buf) or die "Could not write all: $!"; |
|
198
|
|
|
|
|
|
|
} |
|
199
|
|
|
|
|
|
|
catch { |
|
200
|
0
|
|
|
|
|
|
$fail_write_attempt->($_); |
|
201
|
0
|
|
|
|
|
|
}; |
|
202
|
|
|
|
|
|
|
} |
|
203
|
|
|
|
|
|
|
|
|
204
|
|
|
|
|
|
|
# Write as much data as we have |
|
205
|
0
|
0
|
|
|
|
|
if ($socket) { |
|
206
|
0
|
|
|
|
|
|
my $bytes_to_write = $available_to_read - $last_written_point; |
|
207
|
0
|
|
|
|
|
|
my $block_size = $bytes_to_write; |
|
208
|
|
|
|
|
|
|
|
|
209
|
0
|
|
|
|
|
|
SENDFILE: while ($bytes_to_write > 0) { |
|
210
|
0
|
|
|
|
|
|
my $c = syssendfile($socket, $read_fh, $block_size); |
|
211
|
0
|
0
|
0
|
|
|
|
if ($c > 0) { |
|
|
|
0
|
|
|
|
|
|
|
212
|
0
|
|
|
|
|
|
$last_written_point += $c; |
|
213
|
0
|
|
|
|
|
|
$bytes_to_write -= $c; |
|
214
|
|
|
|
|
|
|
} |
|
215
|
|
|
|
|
|
|
elsif ($c == -1 && $block_size > 1024*1024) { |
|
216
|
|
|
|
|
|
|
# 32 bit kernels won't even allow you to send more than 2Gb, it seems. |
|
217
|
|
|
|
|
|
|
# Retry with a smaller block size. |
|
218
|
0
|
|
|
|
|
|
$block_size = 1024*1024; |
|
219
|
|
|
|
|
|
|
} |
|
220
|
|
|
|
|
|
|
else { |
|
221
|
0
|
|
|
|
|
|
$fail_write_attempt->($_); |
|
222
|
0
|
|
|
|
|
|
warn "syssendfile failed, only $c out of $bytes_to_write written: $!"; |
|
223
|
0
|
|
|
|
|
|
last SENDFILE; |
|
224
|
|
|
|
|
|
|
} |
|
225
|
|
|
|
|
|
|
} |
|
226
|
|
|
|
|
|
|
|
|
227
|
0
|
0
|
|
|
|
|
if ($bytes_to_write < 0) { |
|
228
|
0
|
|
|
|
|
|
die "unpossible!"; |
|
229
|
|
|
|
|
|
|
} |
|
230
|
|
|
|
|
|
|
} |
|
231
|
|
|
|
|
|
|
|
|
232
|
0
|
0
|
0
|
|
|
|
if ($socket && $eof) { |
|
|
|
0
|
|
|
|
|
|
|
233
|
0
|
0
|
|
|
|
|
setsockopt($socket, IPPROTO_TCP, TCP_CORK, 0) or warn "could not unset TCP_CORK: $!" if TCP_CORK; |
|
234
|
0
|
0
|
|
|
|
|
shutdown($socket, 1) or warn "could not shutdown socket: $!"; |
|
235
|
0
|
0
|
|
|
|
|
die "File is longer than initially declared, is it still being written to? We are at $last_written_point, $eventual_length initially declared" if ($last_written_point > $eventual_length); |
|
236
|
0
|
0
|
|
|
|
|
die "Cannot be at eof, only $last_written_point out of $eventual_length written!" unless ($last_written_point == $eventual_length); |
|
237
|
|
|
|
|
|
|
|
|
238
|
0
|
|
|
|
|
|
$self->run_hook('new_file_end', $self, $key, $class, $opts); |
|
239
|
|
|
|
|
|
|
|
|
240
|
0
|
|
|
|
|
|
my $buf; |
|
241
|
|
|
|
|
|
|
try { |
|
242
|
0
|
|
|
|
|
|
$buf = slurp($socket); |
|
243
|
|
|
|
|
|
|
} |
|
244
|
|
|
|
|
|
|
catch { |
|
245
|
0
|
|
|
|
|
|
warn $_; |
|
246
|
0
|
|
|
|
|
|
}; |
|
247
|
|
|
|
|
|
|
|
|
248
|
0
|
0
|
|
|
|
|
if (!defined($buf)) { |
|
249
|
0
|
|
|
|
|
|
$fail_write_attempt->("slurp failed"); |
|
250
|
0
|
|
|
|
|
|
next; |
|
251
|
|
|
|
|
|
|
} |
|
252
|
|
|
|
|
|
|
|
|
253
|
0
|
0
|
|
|
|
|
unless(close($socket)) { |
|
254
|
0
|
|
|
|
|
|
$fail_write_attempt->($!); |
|
255
|
0
|
|
|
|
|
|
warn "could not close socket: $!"; |
|
256
|
0
|
|
|
|
|
|
next; |
|
257
|
|
|
|
|
|
|
} |
|
258
|
|
|
|
|
|
|
|
|
259
|
0
|
|
|
|
|
|
my ($top, @headers) = split /\r?\n/, $buf; |
|
260
|
0
|
0
|
|
|
|
|
if ($top =~ m{HTTP/1.[01]\s+2\d\d}) { |
|
261
|
|
|
|
|
|
|
# Woo, 200! |
|
262
|
|
|
|
|
|
|
|
|
263
|
0
|
0
|
|
|
|
|
$opts->{on_http_done}->() if $opts->{http_done}; |
|
264
|
|
|
|
|
|
|
|
|
265
|
0
|
|
|
|
|
|
my @cs; |
|
266
|
|
|
|
|
|
|
|
|
267
|
0
|
0
|
0
|
|
|
|
if (!$checksum) { |
|
|
|
0
|
|
|
|
|
|
|
268
|
|
|
|
|
|
|
try { |
|
269
|
|
|
|
|
|
|
# XXX - What's the timeout here. |
|
270
|
0
|
|
|
|
|
|
my $probe_length = (head($current_dest->{path}))[1]; |
|
271
|
0
|
0
|
|
|
|
|
die "probe failed: $probe_length vs $eventual_length" if $probe_length != $eventual_length; |
|
272
|
|
|
|
|
|
|
} |
|
273
|
|
|
|
|
|
|
catch { |
|
274
|
0
|
|
|
|
|
|
$fail_write_attempt->("HEAD check on newly written file failed: $_"); |
|
275
|
0
|
|
|
|
|
|
}; |
|
276
|
|
|
|
|
|
|
# No checksum to supply, but we have at least checked the length. |
|
277
|
|
|
|
|
|
|
} |
|
278
|
|
|
|
|
|
|
elsif ($checksum && $create_close_timed_out) { |
|
279
|
|
|
|
|
|
|
try { |
|
280
|
0
|
|
|
|
|
|
my $md5 = Digest::MD5->new(); |
|
281
|
0
|
|
|
|
|
|
my $req = HTTP::Request->new(GET => $current_dest->{path}); |
|
282
|
0
|
|
|
|
|
|
LWP::UserAgent->new->request($req, sub { $md5->add($_[0]) }); |
|
|
0
|
|
|
|
|
|
|
|
283
|
|
|
|
|
|
|
|
|
284
|
0
|
|
|
|
|
|
my $hex_checked = $md5->hexdigest(); |
|
285
|
0
|
0
|
|
|
|
|
die "Got $hex_checked, expected $checksum" if "MD5:$hex_checked" ne $checksum; |
|
286
|
|
|
|
|
|
|
} |
|
287
|
|
|
|
|
|
|
catch { |
|
288
|
0
|
|
|
|
|
|
$fail_write_attempt->("Cross network checksum failed: $_"); |
|
289
|
0
|
|
|
|
|
|
}; |
|
290
|
0
|
|
|
|
|
|
@cs = ( checksum => $checksum, checksumverify => 0 ); |
|
291
|
|
|
|
|
|
|
} |
|
292
|
|
|
|
|
|
|
else { |
|
293
|
0
|
|
|
|
|
|
@cs = ( checksum => $checksum, checksumverify => 1 ); |
|
294
|
|
|
|
|
|
|
} |
|
295
|
|
|
|
|
|
|
|
|
296
|
0
|
0
|
|
|
|
|
if (defined $last_error) { |
|
297
|
0
|
|
|
|
|
|
next; |
|
298
|
|
|
|
|
|
|
} |
|
299
|
|
|
|
|
|
|
|
|
300
|
0
|
|
|
|
|
|
my $rv; |
|
301
|
0
|
|
|
|
|
|
my $ts_sent_create_close = [gettimeofday]; |
|
302
|
|
|
|
|
|
|
try { |
|
303
|
|
|
|
|
|
|
$rv = $self->{backend}->do_request |
|
304
|
|
|
|
|
|
|
("create_close", { |
|
305
|
|
|
|
|
|
|
fid => $current_dest->{fid}, |
|
306
|
|
|
|
|
|
|
devid => $current_dest->{devid}, |
|
307
|
|
|
|
|
|
|
domain => $self->{domain}, |
|
308
|
|
|
|
|
|
|
size => $eventual_length, |
|
309
|
|
|
|
|
|
|
key => $key, |
|
310
|
|
|
|
|
|
|
path => $current_dest->{path}, |
|
311
|
0
|
|
|
|
|
|
@cs, |
|
312
|
|
|
|
|
|
|
}); |
|
313
|
|
|
|
|
|
|
} |
|
314
|
|
|
|
|
|
|
catch { |
|
315
|
0
|
|
|
|
|
|
warn "create_close exploded: $_"; |
|
316
|
0
|
0
|
|
|
|
|
$self->{backend}->force_disconnect() if $self->{backend}->can('force_disconnect'); |
|
317
|
0
|
|
|
|
|
|
}; |
|
318
|
|
|
|
|
|
|
|
|
319
|
|
|
|
|
|
|
# TODO we used to have a file check to query the size of the |
|
320
|
|
|
|
|
|
|
# file which we just uploaded to MogileFS. |
|
321
|
|
|
|
|
|
|
|
|
322
|
0
|
0
|
0
|
|
|
|
if ($rv) { |
|
|
|
0
|
0
|
|
|
|
|
|
323
|
0
|
|
|
|
|
|
$self->run_hook('store_file_end', $self, $key, $class, $opts); |
|
324
|
0
|
|
|
|
|
|
return $eventual_length; |
|
325
|
|
|
|
|
|
|
} |
|
326
|
|
|
|
|
|
|
elsif (!$create_close_timed_out && $checksum && tv_interval($ts_sent_create_close) >= $self->{backend}->{timeout}) { |
|
327
|
0
|
|
|
|
|
|
@dests = (); |
|
328
|
0
|
|
|
|
|
|
$create_close_timed_out = 1; |
|
329
|
0
|
|
|
|
|
|
$fail_write_attempt->("create_close failed, possibly timed out checksumming"); |
|
330
|
|
|
|
|
|
|
} |
|
331
|
|
|
|
|
|
|
else { |
|
332
|
|
|
|
|
|
|
# create_close may explode due to a back checksum, |
|
333
|
|
|
|
|
|
|
# or a network error sending the acknowledgement of |
|
334
|
|
|
|
|
|
|
# a successfuly upload. To handle this. if |
|
335
|
|
|
|
|
|
|
# create_close fails we always retry with a new |
|
336
|
|
|
|
|
|
|
# create_open to get a new FID. |
|
337
|
0
|
|
|
|
|
|
@dests = (); |
|
338
|
0
|
|
|
|
|
|
$fail_write_attempt->("create_close failed"); |
|
339
|
|
|
|
|
|
|
} |
|
340
|
|
|
|
|
|
|
} |
|
341
|
|
|
|
|
|
|
else { |
|
342
|
0
|
|
|
|
|
|
$fail_write_attempt->("Got non-200 from remote server $top"); |
|
343
|
0
|
|
|
|
|
|
next; |
|
344
|
|
|
|
|
|
|
} |
|
345
|
|
|
|
|
|
|
} |
|
346
|
|
|
|
|
|
|
elsif ($last_written_point == $available_to_read) { |
|
347
|
0
|
|
|
|
|
|
return; |
|
348
|
|
|
|
|
|
|
} |
|
349
|
|
|
|
|
|
|
} |
|
350
|
|
|
|
|
|
|
|
|
351
|
0
|
|
|
|
|
|
die "Mogile write failed: $last_error"; |
|
352
|
0
|
|
|
|
|
|
}; |
|
353
|
|
|
|
|
|
|
} |
|
354
|
|
|
|
|
|
|
|
|
355
|
|
|
|
|
|
|
sub store_file { |
|
356
|
0
|
|
|
0
|
1
|
|
my ($self, $key, $class, $fn, $opts) = @_; |
|
357
|
|
|
|
|
|
|
|
|
358
|
0
|
0
|
|
|
|
|
if (ref($fn)) { |
|
359
|
0
|
|
|
|
|
|
warn "not scalar!"; |
|
360
|
0
|
|
|
|
|
|
return $self->SUPER::store_file($key, $class, $fn, $opts); |
|
361
|
|
|
|
|
|
|
} |
|
362
|
|
|
|
|
|
|
|
|
363
|
0
|
0
|
|
|
|
|
open(my $fh, "<", $fn) or die "could not open '$fn': $!"; |
|
364
|
|
|
|
|
|
|
|
|
365
|
0
|
|
|
|
|
|
my $file_length = -s $fh; |
|
366
|
|
|
|
|
|
|
|
|
367
|
0
|
|
|
|
|
|
my $cb = $self->store_file_from_fh( |
|
368
|
|
|
|
|
|
|
$key, $class, $fh, $file_length, $opts |
|
369
|
|
|
|
|
|
|
); |
|
370
|
|
|
|
|
|
|
|
|
371
|
0
|
0
|
|
|
|
|
open(my $checksum, "-|", "md5sum", "-b", "--", $fn) or die "could not fork off md5sum: $!"; |
|
372
|
0
|
|
|
|
|
|
$cb->($file_length, 0); |
|
373
|
0
|
|
|
|
|
|
my $line = <$checksum>; |
|
374
|
0
|
0
|
|
|
|
|
close($checksum) or die "could not finish checksum: $!"; |
|
375
|
|
|
|
|
|
|
|
|
376
|
0
|
0
|
|
|
|
|
$line =~ /^([0-9a-f]{32})/ or die "could not find checksum"; |
|
377
|
|
|
|
|
|
|
|
|
378
|
0
|
|
|
|
|
|
$cb->($file_length, 1, "MD5:$1"); |
|
379
|
|
|
|
|
|
|
|
|
380
|
0
|
|
|
|
|
|
return $file_length; |
|
381
|
|
|
|
|
|
|
} |
|
382
|
|
|
|
|
|
|
|
|
383
|
|
|
|
|
|
|
1; |
|
384
|
|
|
|
|
|
|
|