File Coverage

blib/lib/MogileFS/Worker/Replicate.pm
Criterion Covered Total %
statement 33 434 7.6
branch 0 250 0.0
condition 0 113 0.0
subroutine 11 30 36.6
pod 0 8 0.0
total 44 835 5.2


line stmt bran cond sub pod time code
1             package MogileFS::Worker::Replicate;
2             # replicates files around
3              
4 21     21   122 use strict;
  21         36  
  21         530  
5 21     21   86 use base 'MogileFS::Worker';
  21         32  
  21         2213  
6             use fields (
7 21         91 'fidtodo', # hashref { fid => 1 }
8 21     21   123 );
  21         33  
9              
10 21     21   915 use List::Util ();
  21         41  
  21         293  
11 21     21   80 use MogileFS::Server;
  21         39  
  21         568  
12 21     21   103 use MogileFS::Util qw(error every debug wait_for_readability);
  21         39  
  21         994  
13 21     21   110 use MogileFS::Config;
  21         31  
  21         1832  
14 21     21   6436 use MogileFS::ReplicationRequest qw(rr_upgrade);
  21         40  
  21         910  
15 21     21   108 use Digest;
  21         28  
  21         438  
16 21     21   7672 use MIME::Base64 qw(encode_base64);
  21         11227  
  21         2059  
17              
18             sub new {
19 0     0 0   my ($class, $psock) = @_;
20 0           my $self = fields::new($class);
21 0           $self->SUPER::new($psock);
22 0           $self->{fidtodo} = {};
23 0           return $self;
24             }
25              
26             # replicator wants
27 0     0 0   sub watchdog_timeout { 90; }
28 21     21   118 use constant SOCK_TIMEOUT => 45;
  21         44  
  21         75935  
29              
30             sub work {
31 0     0 0   my $self = shift;
32              
33             every(1.0, sub {
34 0     0     $self->send_to_parent("worker_bored 100 replicate rebalance");
35              
36 0           my $queue_todo = $self->queue_todo('replicate');
37 0           my $queue_todo2 = $self->queue_todo('rebalance');
38 0 0 0       return unless (@$queue_todo || @$queue_todo2);
39              
40 0 0         return unless $self->validate_dbh;
41 0           my $sto = Mgd::get_store();
42              
43 0           while (my $todo = shift @$queue_todo) {
44 0           my $fid = $todo->{fid};
45 0           $self->replicate_using_torepl_table($todo);
46             }
47 0           while (my $todo = shift @$queue_todo2) {
48 0           $self->still_alive;
49             # deserialize the arg :/
50 0           $todo->{arg} = [split /,/, $todo->{arg}];
51             my $devfid =
52 0           MogileFS::DevFID->new($todo->{devid}, $todo->{fid});
53             $self->rebalance_devfid($devfid,
54 0           { target_devids => $todo->{arg} });
55              
56             # If files error out, we want to send the error up to syslog
57             # and make a real effort to chew through the queue. Users may
58             # manually re-run rebalance to retry.
59 0           $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE);
60             }
61 0           $_[0]->(0); # don't sleep.
62 0           });
63             }
64              
65             # return 1 if we did something (or tried to do something), return 0 if
66             # there was nothing to be done.
67             sub replicate_using_torepl_table {
68 0     0 0   my $self = shift;
69 0           my $todo = shift;
70              
71             # find some fids to replicate, prioritize based on when they should be tried
72 0           my $sto = Mgd::get_store();
73              
74 0           my $fid = $todo->{fid};
75 0           $self->still_alive;
76              
77 0           my $errcode;
78              
79             my %opts;
80 0           $opts{errref} = \$errcode;
81 0           $opts{no_unlock} = 1; # to make it return an $unlock subref
82 0 0         $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
83              
84 0           my ($status, $unlock) = replicate($fid, %opts);
85              
86 0 0         if ($status) {
87             # $status is either 0 (failure, handled below), 1 (success, we actually
88             # replicated this file), or 2 (success, but someone else replicated it).
89              
90             # when $staus eq "lost_race", this delete is unnecessary normally
91             # (somebody else presumably already deleted it if they
92             # also replicated it), but in the case of running with old
93             # replicators from previous versions, -or- simply if the
94             # other guy's delete failed, this cleans it up....
95 0           $sto->delete_fid_from_file_to_replicate($fid);
96 0 0         $unlock->() if $unlock;
97 0           next;
98             }
99              
100 0 0         debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
101              
102             # ERROR CASES:
103              
104             # README: please keep this up to date if you update the replicate() function so we ensure
105             # that this code always does the right thing
106             #
107             # -- HARMLESS --
108             # failed_getting_lock => harmless. skip. somebody else probably doing.
109             #
110             # -- ACTIONABLE --
111             # too_happy => too many copies, attempt to rebalance.
112             #
113             # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
114             # source_down => only source available is observed down.
115             # policy_error_doing_failed => policy plugin fucked up. it's looping.
116             # policy_error_already_there => policy plugin fucked up. it's dumb.
117             # policy_no_suggestions => no copy was attempted. policy is just not happy.
118             # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
119             #
120             # -- FATAL; DON'T TRY AGAIN --
121             # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
122              
123             # bail if we failed getting the lock, that means someone else probably
124             # already did it, so we should just move on
125 0 0         if ($errcode eq 'failed_getting_lock') {
126 0 0         $unlock->() if $unlock;
127 0           next;
128             }
129              
130             # logic for setting the next try time appropriately
131             my $update_nexttry = sub {
132 0     0     my ($type, $delay) = @_;
133 0           my $sto = Mgd::get_store();
134 0 0         if ($type eq 'end_of_time') {
    0          
135             # special; update to a time that won't happen again,
136             # as we've encountered a scenario in which case we're
137             # really hosed
138 0           $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time);
139             } elsif ($type eq "offset") {
140 0           $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
141             } else {
142 0           $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
143             }
144 0           };
145              
146             # now let's handle any error we want to consider a total failure; do not
147             # retry at any point. push this file off to the end so someone has to come
148             # along and figure out what went wrong.
149 0 0         if ($errcode eq 'no_source') {
150 0           $update_nexttry->( end_of_time => 1 );
151 0 0         $unlock->() if $unlock;
152 0           next;
153             }
154              
155             # try to shake off extra copies. fall through to the backoff logic
156             # so we don't flood if it's impossible to properly weaken the fid.
157             # there's a race where the fid could be checked again, but the
158             # exclusive locking prevents replication clobbering.
159 0 0         if ($errcode eq 'too_happy') {
160 0 0         $unlock->() if $unlock;
161 0           $unlock = undef;
162 0           my $f = MogileFS::FID->new($fid);
163 0           my @devs = List::Util::shuffle($f->devids);
164 0           my $devfid;
165             # First one we can delete from, we try to rebalance away from.
166 0           for (@devs) {
167 0           my $dev = Mgd::device_factory()->get_by_id($_);
168             # Not positive 'should_read_from' needs to be here.
169             # We must be able to delete off of this dev so the fid can
170             # move.
171 0 0 0       if ($dev->can_delete_from && $dev->should_read_from) {
172 0           $devfid = MogileFS::DevFID->new($dev, $f);
173 0           last;
174             }
175             }
176 0 0         if ($devfid) {
177 0 0         if ($self->rebalance_devfid($devfid)) {
178             # disable exponential backoff below if we rebalanced due to
179             # excessive replication:
180 0           $todo->{failcount} = 0;
181             }
182             }
183             }
184              
185             # at this point, the rest of the errors require exponential backoff. define what this means
186             # as far as failcount -> delay to next try.
187             # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
188 0           my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
189 0   0       $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
190 0 0         $unlock->() if $unlock;
191 0           return 1;
192             }
193              
194             # Return 1 on success, 0 on failure or no-op.
195             sub rebalance_devfid {
196 0     0 0   my ($self, $devfid, $opts) = @_;
197 0   0       $opts ||= {};
198 0           MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids));
199              
200 0           my $fid = $devfid->fid;
201              
202             # bail out early if this FID is no longer in the namespace (weird
203             # case where file is in file_on because not yet deleted, but
204             # has been replaced/deleted in 'file' table...). not too harmful
205             # (just noisy) if this line didn't exist, but whatever... it
206             # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
207             # dev machine...
208 0 0         return 1 if ! $fid->exists;
209              
210 0           my $errcode;
211             my ($ret, $unlock) = replicate($fid,
212             mask_devids => { $devfid->devid => 1 },
213             no_unlock => 1,
214             target_devids => $opts->{target_devids},
215 0           errref => \$errcode,
216             );
217              
218             my $fail = sub {
219 0     0     my $error = shift;
220 0           $unlock->();
221 0           error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
222 0           return 0;
223 0           };
224              
225 0 0 0       unless ($ret || $errcode eq "too_happy") {
226 0           return $fail->("Replication failed");
227             }
228              
229 0           my $should_delete = 0;
230 0           my $del_reason;
231              
232 0 0 0       if ($errcode eq "too_happy" || $ret eq "lost_race") {
    0          
233             # for some reason, we did no work. that could be because
234             # either 1) we lost the race, as the error code implies,
235             # and some other process rebalanced this first, or 2)
236             # the file is over-replicated, and everybody just thinks they
237             # lost the race because the replication policy said there's
238             # nothing to do, even with this devfid masked away.
239             # so let's figure it out... if this devfid still exists,
240             # we're over-replicated, else we just lost the race.
241 0 0         if ($devfid->exists) {
242             # over-replicated
243              
244             # see if some copy, besides this one we want
245             # to delete, is currently alive & of right size..
246             # just as extra paranoid check before we delete it
247 0           foreach my $test_df ($fid->devfids) {
248 0 0         next if $test_df->devid == $devfid->devid;
249 0 0         if ($test_df->size_matches) {
250 0           $should_delete = 1;
251 0           $del_reason = "over_replicated";
252 0           last;
253             }
254             }
255             } else {
256             # lost race
257 0           $should_delete = 0; # no-op
258             }
259             } elsif ($ret eq "would_worsen") {
260             # replication has indicated we would be making ruining this fid's day
261             # if we delete an existing copy, so lets not do that.
262             # this indicates a condition where there're no suitable devices to
263             # copy new data onto, so lets be loud about it.
264 0           return $fail->("no suitable destination devices available");
265             } else {
266 0           $should_delete = 1;
267 0           $del_reason = "did_rebalance;ret=$ret";
268             }
269              
270 0           my %destroy_opts;
271              
272 0 0         $destroy_opts{ignore_missing} = 1
273             if MogileFS::Config->config("rebalance_ignore_missing");
274              
275 0 0         if ($should_delete) {
276 0           eval { $devfid->destroy(%destroy_opts) };
  0            
277 0 0         if ($@) {
278 0           return $fail->("HTTP delete (due to '$del_reason') failed: $@");
279             }
280             }
281              
282 0           $unlock->();
283 0           return $should_delete;
284             }
285              
286             # replicates $fid to make sure it meets its class' replicate policy.
287             #
288             # README: if you update this sub to return a new error code, please update the
289             # appropriate callers to know how to deal with the errors returned.
290             #
291             # returns either:
292             # $rv
293             # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
294             # $rv is one of:
295             # 0 = failure (failure written to ${$opts{errref}})
296             # 1 = success
297             # "lost_race" = skipping, we did no work and policy was already met.
298             # "nofid" => fid no longer exists. skip replication.
299             sub replicate {
300 0     0 0   my ($fid, %opts) = @_;
301 0 0         $fid = MogileFS::FID->new($fid) unless ref $fid;
302 0           my $fidid = $fid->id;
303              
304 0 0         debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
305              
306 0           my $errref = delete $opts{'errref'};
307 0           my $no_unlock = delete $opts{'no_unlock'};
308 0           my $fixed_source = delete $opts{'source_devid'};
309 0   0       my $mask_devids = delete $opts{'mask_devids'} || {};
310 0   0       my $avoid_devids = delete $opts{'avoid_devids'} || {};
311 0   0       my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
312 0 0         die "unknown_opts" if %opts;
313 0 0         die unless ref $mask_devids eq "HASH";
314              
315 0           my $sdevid;
316              
317 0           my $sto = Mgd::get_store();
318             my $unlock = sub {
319 0     0     $sto->note_done_replicating($fidid);
320 0           };
321              
322             my $retunlock = sub {
323 0     0     my $rv = shift;
324 0           my ($errmsg, $errcode);
325 0 0         if (@_ == 2) {
326 0           ($errcode, $errmsg) = @_;
327 0           $errmsg = "$errcode: $errmsg"; # include code with message
328             } else {
329 0           ($errmsg) = @_;
330             }
331 0 0         $$errref = $errcode if $errref;
332              
333 0           my $ret;
334 0 0 0       if ($errcode && $errcode eq "failed_getting_lock") {
335             # don't emit a warning with error() on lock failure. not
336             # a big deal, don't scare people.
337 0           $ret = 0;
338             } else {
339 0 0         $ret = $rv ? $rv : error($errmsg);
340             }
341 0 0         if ($no_unlock) {
342 0 0         die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
343 0           return ($ret, $unlock);
344             } else {
345 0 0         die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
346 0           $unlock->();
347 0           return $ret;
348             }
349 0           };
350              
351             # hashref of devid -> MogileFS::Device
352 0 0         my $devs = Mgd::device_factory()->map_by_id
353             or die "No device map";
354              
355 0 0         return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
356             unless $sto->should_begin_replicating_fidid($fidid);
357              
358             # if the fid doesn't even exist, consider our job done! no point
359             # replicating file contents of a file no longer in the namespace.
360 0 0         return $retunlock->("nofid") unless $fid->exists;
361              
362 0           my $cls = $fid->class;
363 0           my $polobj = $cls->repl_policy_obj;
364              
365             # learn what this devices file is already on
366 0           my @on_devs; # all devices fid is on, reachable or not.
367             my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
368 0           my @on_up_devid; # subset of @on_devs: just devs that are readable
369              
370 0           foreach my $devid ($fid->devids) {
371 0 0         my $d = Mgd::device_factory()->get_by_id($devid)
372             or next;
373 0           push @on_devs, $d;
374 0 0 0       if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
375 0           push @on_devs_tellpol, $d;
376             }
377 0 0         if ($d->should_read_from) {
378 0           push @on_up_devid, $devid;
379             }
380             }
381              
382 0 0         return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
383 0 0         return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
384              
385 0 0 0       if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) {
  0            
386 0           error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices");
387             }
388              
389 0           my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
390             my %source_failed; # devid -> 1 for each devid we had problems reading from.
391 0           my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
392 0           my $copy_err;
393              
394 0           my $dest_devs = $devs;
395 0 0         if (@$target_devids) {
396 0           $dest_devs = {map { $_ => $devs->{$_} } @$target_devids};
  0            
397             }
398              
399 0           my $rr; # MogileFS::ReplicationRequest
400 0           while (1) {
401 0           $rr = rr_upgrade($polobj->replicate_to(
402             fid => $fidid,
403             on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
404             all_devs => $dest_devs,
405             failed => \%dest_failed,
406             min => $cls->mindevcount,
407             ));
408              
409 0 0         last if $rr->is_happy;
410              
411 0           my @ddevs; # dest devs, in order of preference
412             my $ddevid; # dest devid we've chosen to copy to
413 0 0         if (@ddevs = $rr->copy_to_one_of_ideally) {
    0          
414 0 0         if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
415 0   0       ! $avoid_devids->{$_}
416             }
417 0           map { $_->id } @ddevs)) {
418 0           $ddevid = $not_masked_ids[0];
419             } else {
420             # once we masked devids away, there were no
421             # ideal suggestions. this is the case of rebalancing,
422             # which without this check could 'worsen' the state
423             # of the world. consider the case:
424             # h1[ d1 d2 ] h2[ d3 ]
425             # and files are on d1 & d3, an ideal layout.
426             # if d3 is being rebalanced, and masked away, the
427             # replication policy could presumably say to put
428             # the file on d2, even though d3 isn't dead.
429             # so instead, when masking is in effect, we don't
430             # use non-ideal placement, just bailing out.
431              
432             # this used to return "lost_race" as a lie, but rebalance was
433             # happily deleting the masked fid if at least one other fid
434             # existed... because it assumed it was over replicated.
435             # now we tell rebalance that touching this fid would be
436             # stupid.
437 0           return $retunlock->("would_worsen");
438             }
439             } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
440             # TODO: reschedule a replication for 'n' minutes in future, or
441             # when new hosts/devices become available or change state
442 0           $ddevid = $ddevs[0]->id;
443             } else {
444 0           last;
445             }
446              
447 0           $got_copy_request = 1;
448              
449             # replication policy shouldn't tell us to put a file on a device
450             # we've already told it that we've failed at. so if we get that response,
451             # the policy plugin is broken and we should terminate now.
452 0 0         if ($dest_failed{$ddevid}) {
453 0           return $retunlock->(0, "policy_error_doing_failed",
454             "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
455             }
456              
457             # replication policy shouldn't tell us to put a file on a
458             # device that it's already on. that's just stupid.
459 0 0         if (grep { $_->id == $ddevid } @on_devs) {
  0            
460 0           return $retunlock->(0, "policy_error_already_there",
461             "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
462             }
463              
464             # find where we're replicating from
465             {
466             # TODO: use an observed good device+host as source to start.
467 0           my @choices = grep { ! $source_failed{$_} } @on_up_devid;
  0            
  0            
468 0 0         return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
469 0 0 0       if ($fixed_source && grep { $_ == $fixed_source } @choices) {
  0            
470 0           $sdevid = $fixed_source;
471             } else {
472 0           @choices = List::Util::shuffle(@choices);
473 0           MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices);
474 0           $sdevid = shift @choices;
475             }
476             }
477              
478 0 0         my $worker = MogileFS::ProcManager->is_child or die;
479 0           my $digest;
480 0           my $fid_checksum = $fid->checksum;
481 0 0         $digest = Digest->new($fid_checksum->hashname) if $fid_checksum;
482 0 0 0       $digest ||= Digest->new($cls->hashname) if $cls->hashtype;
483              
484             my $rv = http_copy(
485             sdevid => $sdevid,
486             ddevid => $ddevid,
487             fid => $fid,
488             errref => \$copy_err,
489 0     0     callback => sub { $worker->still_alive; },
490 0           digest => $digest,
491             );
492 0 0 0       die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
493              
494 0 0         unless ($rv) {
495 0           error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
496 0 0         if ($copy_err eq "src_error") {
497 0           $source_failed{$sdevid} = 1;
498              
499 0 0 0       if ($fixed_source && $fixed_source == $sdevid) {
500 0           error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources");
501             }
502              
503             } else {
504 0           $dest_failed{$ddevid} = 1;
505             }
506 0           next;
507             }
508              
509 0           my $dfid = MogileFS::DevFID->new($ddevid, $fid);
510 0           $dfid->add_to_db;
511 0 0 0       if ($digest && !$fid->checksum) {
512 0           $sto->set_checksum($fidid, $cls->hashtype, $digest->digest);
513             }
514              
515 0           push @on_devs, $devs->{$ddevid};
516 0           push @on_devs_tellpol, $devs->{$ddevid};
517 0           push @on_up_devid, $ddevid;
518             }
519              
520             # We are over replicated. Let caller decide if it should rebalance.
521 0 0         if ($rr->too_happy) {
522 0           return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
523             }
524              
525 0 0         if ($rr->is_happy) {
526 0 0         return $retunlock->(1) if $got_copy_request;
527 0           return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
528             }
529              
530 0           return $retunlock->(0, "policy_no_suggestions",
531             "replication policy ran out of suggestions for us replicating fid $fidid");
532             }
533              
534             # Returns a hashref with the following:
535             # {
536             # code => HTTP status code integer,
537             # keep => boolean, whether to keep the connection after reading
538             # len => value of the Content-Length header (integer)
539             # }
540             # Returns undef on timeout
541             sub read_headers {
542 0     0 0   my ($sock, $intercopy_cb) = @_;
543 0           my $head = '';
544              
545 0           do {
546 0 0         wait_for_readability(fileno($sock), SOCK_TIMEOUT) or return;
547 0           $intercopy_cb->();
548 0           my $r = sysread($sock, $head, 1024, length($head));
549 0 0 0       if (defined $r) {
    0          
550 0 0         return if $r == 0; # EOF
551             } elsif ($!{EAGAIN} || $!{EINTR}) {
552             # loop again
553             } else {
554 0           return;
555             }
556             } until ($head =~ /\r?\n\r?\n/);
557              
558 0           my $data;
559 0           ($head, $data) = split(/\r?\n\r?\n/, $head, 2);
560 0           my @head = split(/\r?\n/, $head);
561 0           $head = shift(@head);
562 0 0         $head =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return;
563 0           my %rv = ( keep => $1 >= 1.1, code => $2 );
564              
565 0           foreach my $line (@head) {
566 0 0         if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) {
    0          
    0          
567 0           $rv{keep} = 1;
568             } elsif ($line =~ /\AConnection:\s*close\s*\z/is) {
569 0           $rv{keep} = 0;
570             } elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) {
571 0           $rv{len} = $1;
572             }
573             }
574 0           return (\%rv, $data);
575             }
576              
577             # copies a file from one Perlbal to another utilizing HTTP
578             sub http_copy {
579 0     0 0   my %opts = @_;
580             my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) =
581 0           map { delete $opts{$_} } qw(sdevid
  0            
582             ddevid
583             fid
584             callback
585             errref
586             digest
587             );
588 0 0         die if %opts;
589              
590 0 0         $fid = MogileFS::FID->new($fid) unless ref($fid);
591 0           my $fidid = $fid->id;
592 0           my $expected_clen = $fid->length;
593 0           my $clen;
594 0           my $content_md5 = '';
595 0           my ($sconn, $dconn);
596 0           my $fid_checksum = $fid->checksum;
597 0 0 0       if ($fid_checksum && $fid_checksum->hashname eq "MD5") {
598             # some HTTP servers may be able to verify Content-MD5 on PUT
599             # and reject corrupted requests. no HTTP server should reject
600             # a request for an unrecognized header
601 0           my $b64digest = encode_base64($fid_checksum->{checksum}, "");
602 0           $content_md5 = "\r\nContent-MD5: $b64digest";
603             }
604              
605 0   0 0     $intercopy_cb ||= sub {};
606              
607             my $err_common = sub {
608 0     0     my ($err, $msg) = @_;
609 0 0         $$errref = $err if $errref;
610 0 0         $sconn->close($err) if $sconn;
611 0 0         $dconn->close($err) if $dconn;
612 0           return error($msg);
613 0           };
614              
615             # handles setting unreachable magic; $error->(reachability, "message")
616             my $error_unreachable = sub {
617 0     0     return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]");
618 0           };
619              
620             my $dest_error = sub {
621 0     0     return $err_common->("dest_error", $_[0]);
622 0           };
623              
624             my $src_error = sub {
625 0     0     return $err_common->("src_error", $_[0]);
626 0           };
627              
628             # get some information we'll need
629 0           my $sdev = Mgd::device_factory()->get_by_id($sdevid);
630 0           my $ddev = Mgd::device_factory()->get_by_id($ddevid);
631              
632 0 0 0       return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid")
633             unless $sdev && $ddev;
634              
635 0           my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
636 0           my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
637              
638 0           my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
  0            
639 0           my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
  0            
640              
641 0           my ($shostip, $sport) = ($shost->ip, $shost->http_port);
642 0 0         if (MogileFS::Config->config("repl_use_get_port")) {
643 0           $sport = $shost->http_get_port;
644             }
645 0           my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
646 0 0 0       unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
      0        
      0        
      0        
      0        
647             # show detailed information to find out what's not configured right
648 0           error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid");
649 0           error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
650 0           return 0;
651             }
652              
653 0           my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" .
654             "Content-length: $expected_clen$content_md5\r\n\r\n";
655              
656             # need by webdav servers, like lighttpd...
657 0           $ddev->vivify_directories($d_dfid->url);
658              
659             # call a hook for odd casing completely different source data
660             # for specific files.
661 0           my $shttphost;
662 0           MogileFS::run_global_hook('replicate_alternate_source',
663             $fid, \$shostip, \$sport, \$spath, \$shttphost);
664              
665 0           my $durl = "http://$dhostip:$dport$dpath";
666 0           my $surl = "http://$shostip:$sport$spath";
667             # okay, now get the file
668 0           my %sopts = ( ip => $shostip, port => $sport );
669              
670 0           my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n";
671             # plugin set a custom host.
672 0 0         $get .= "Host: $shttphost\r\n" if $shttphost;
673              
674 0           my ($sock, $dsock);
675 0           my ($wcount, $bytes_to_read, $written, $remain);
676 0           my ($stries, $dtries) = (0, 0);
677 0           my ($sres, $data, $bytes);
678              
679 0 0         retry:
680             $sconn->close("retrying") if $sconn;
681 0 0         $dconn->close("retrying") if $dconn;
682 0           $dconn = undef;
683 0           $stries++;
684 0 0         $sconn = $shost->http_conn_get(\%sopts)
685             or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
686 0           $sock = $sconn->sock;
687 0 0         unless ($sock->write("$get\r\n")) {
688 0 0 0       goto retry if $sconn->retryable && $stries == 1;
689 0           return $src_error->("Pipe closed retrieving $spath from $shostip:$sport");
690             }
691              
692             # we just want a content length
693 0           ($sres, $data) = read_headers($sock, $intercopy_cb);
694 0 0         unless ($sres) {
695 0 0 0       goto retry if $sconn->retryable && $stries == 1;
696 0           return $error_unreachable->("Error: Resource $surl failed to return an HTTP response");
697             }
698 0 0 0       unless ($sres->{code} >= 200 && $sres->{code} <= 299) {
699 0           return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}");
700             }
701 0           $clen = $sres->{len};
702              
703 0 0         return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
704             if $clen != $expected_clen;
705              
706             # open target for put
707 0           $dtries++;
708 0 0         $dconn = $dhost->http_conn_get
709             or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
710 0           $dsock = $dconn->sock;
711              
712 0 0         unless ($dsock->write($put)) {
713 0 0 0       goto retry if $dconn->retryable && $dtries == 1;
714 0           return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport");
715             }
716              
717             # now read data and print while we're reading.
718 0           $bytes = length($data);
719 0           ($written, $remain) = (0, $clen);
720 0           $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
721 0 0         $bytes_to_read = $remain if $remain < $bytes_to_read;
722 0           $wcount = 0;
723              
724 0           while ($bytes_to_read) {
725 0 0         unless (defined $bytes) {
726 0           read_again:
727             $bytes = sysread($sock, $data, $bytes_to_read);
728 0 0         unless (defined $bytes) {
729 0 0 0       if ($!{EAGAIN} || $!{EINTR}) {
730 0 0         wait_for_readability(fileno($sock), SOCK_TIMEOUT) and
731             goto read_again;
732             }
733 0           return $src_error->("error reading midway through source: $!");
734             }
735 0 0         if ($bytes == 0) {
736 0           return $src_error->("EOF reading midway through source: $!");
737             }
738             }
739              
740             # now we've read in $bytes bytes
741 0           $remain -= $bytes;
742 0 0         $bytes_to_read = $remain if $remain < $bytes_to_read;
743 0 0         $digest->add($data) if $digest;
744              
745 0           my $data_len = $bytes;
746 0           $bytes = undef;
747 0           my $data_off = 0;
748 0           while (1) {
749 0           my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
750 0 0         unless (defined $wbytes) {
751             # it can take two writes to determine if a socket is dead
752             # (TCP_NODELAY and TCP_CORK are (and must be) zero here)
753 0 0 0       goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
      0        
754 0           return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
755             }
756 0           $wcount++;
757 0           $written += $wbytes;
758 0           $intercopy_cb->();
759 0 0         last if ($data_len == $wbytes);
760              
761 0           $data_len -= $wbytes;
762 0           $data_off += $wbytes;
763             }
764              
765 0 0         die if $bytes_to_read < 0;
766             }
767              
768             # source connection drained, return to pool
769 0 0         if ($sres->{keep}) {
770 0           $shost->http_conn_put($sconn);
771 0           $sconn = undef;
772             } else {
773 0           $sconn->close("http_close");
774             }
775              
776             # callee will want this digest, too, so clone as "digest" is destructive
777 0 0         $digest = $digest->clone->digest if $digest;
778              
779 0 0         if ($fid_checksum) {
780 0 0         if ($digest ne $fid_checksum->{checksum}) {
781 0           my $expect = $fid_checksum->hexdigest;
782 0           $digest = unpack("H*", $digest);
783 0           return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest");
784             }
785             }
786              
787             # now read in the response line (should be first line)
788 0           my ($dres, $ddata) = read_headers($dsock, $intercopy_cb);
789 0 0         unless ($dres) {
790 0 0 0       goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
      0        
791 0           return $dest_error->("Error: HTTP response line not recognized writing to $durl");
792             }
793              
794             # drain the response body if there is one
795             # there may be no dres->{len}/Content-Length if there is no body
796 0   0       my $dlen = ($dres->{len} || 0) - length($ddata);
797 0 0         if ($dlen > 0) {
    0          
798 0           my $r = $dsock->read($data, $dlen); # dres->{len} should be tiny
799 0 0         if (defined $r) {
800 0 0         if ($r != $dlen) {
801 0           Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl");
802 0           $dres->{keep} = 0;
803             }
804             } else {
805 0           Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)");
806 0           $dres->{keep} = 0;
807             }
808             } elsif ($dlen < 0) {
809 0           Mgd::error("strange response Content-Length:$dres->{len} with ".
810             length($ddata) .
811             " extra bytes for PUT response on $durl ($!)");
812 0           $dres->{keep} = 0;
813             }
814              
815             # return the connection back to the connection pool
816 0 0         if ($dres->{keep}) {
817 0           $dhost->http_conn_put($dconn);
818 0           $dconn = undef;
819             } else {
820 0           $dconn->close("http_close");
821             }
822              
823 0 0 0       if ($dres->{code} >= 200 && $dres->{code} <= 299) {
824 0 0         if ($digest) {
825 0   0       my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname;
826              
827 0 0 0       if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) {
828             # dest device would've rejected us with a error,
829             # no need to reread the file
830 0           return 1;
831             }
832 0           my $httpfile = MogileFS::HTTPFile->at($durl);
833 0           my $actual = $httpfile->digest($alg, $intercopy_cb);
834 0 0         if ($actual ne $digest) {
835 0           my $expect = unpack("H*", $digest);
836 0           $actual = unpack("H*", $actual);
837 0           return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest");
838             }
839             }
840 0           return 1;
841             }
842 0           return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl");
843             }
844              
845             1;
846              
847             # Local Variables:
848             # mode: perl
849             # c-basic-indent: 4
850             # indent-tabs-mode: nil
851             # End:
852              
853             __END__