File Coverage

blib/lib/MogileFS/Worker/Replicate.pm
Criterion Covered Total %
statement 30 417 7.1
branch 0 238 0.0
condition 0 105 0.0
subroutine 10 29 34.4
pod 0 8 0.0
total 40 797 5.0


line stmt bran cond sub pod time code
1             package MogileFS::Worker::Replicate;
2             # replicates files around
3              
4 21     21   135 use strict;
  21         51  
  21         919  
5 21     21   190 use base 'MogileFS::Worker';
  21         47  
  21         3936  
6             use fields (
7 21         205 'fidtodo', # hashref { fid => 1 }
8 21     21   138 );
  21         50  
9              
10 21     21   1503 use List::Util ();
  21         43  
  21         348  
11 21     21   118 use MogileFS::Server;
  21         43  
  21         508  
12 21     21   110 use MogileFS::Util qw(error every debug);
  21         41  
  21         1684  
13 21     21   131 use MogileFS::Config;
  21         39  
  21         10416  
14 21     21   19046 use MogileFS::ReplicationRequest qw(rr_upgrade);
  21         62  
  21         1382  
15 21     21   124 use Digest;
  21         46  
  21         468  
16 21     21   41686 use MIME::Base64 qw(encode_base64);
  21         23495  
  21         168311  
17              
18             sub new {
19 0     0 0   my ($class, $psock) = @_;
20 0           my $self = fields::new($class);
21 0           $self->SUPER::new($psock);
22 0           $self->{fidtodo} = {};
23 0           return $self;
24             }
25              
26             # replicator wants
27 0     0 0   sub watchdog_timeout { 90; }
28              
29             sub work {
30 0     0 0   my $self = shift;
31              
32             every(1.0, sub {
33 0     0     $self->send_to_parent("worker_bored 100 replicate rebalance");
34              
35 0           my $queue_todo = $self->queue_todo('replicate');
36 0           my $queue_todo2 = $self->queue_todo('rebalance');
37 0 0 0       return unless (@$queue_todo || @$queue_todo2);
38              
39 0 0         return unless $self->validate_dbh;
40 0           my $sto = Mgd::get_store();
41              
42 0           while (my $todo = shift @$queue_todo) {
43 0           my $fid = $todo->{fid};
44 0           $self->replicate_using_torepl_table($todo);
45             }
46 0           while (my $todo = shift @$queue_todo2) {
47 0           $self->still_alive;
48             # deserialize the arg :/
49 0           $todo->{arg} = [split /,/, $todo->{arg}];
50 0           my $devfid =
51             MogileFS::DevFID->new($todo->{devid}, $todo->{fid});
52 0           $self->rebalance_devfid($devfid,
53             { target_devids => $todo->{arg} });
54              
55             # If files error out, we want to send the error up to syslog
56             # and make a real effort to chew through the queue. Users may
57             # manually re-run rebalance to retry.
58 0           $sto->delete_fid_from_file_to_queue($todo->{fid}, REBAL_QUEUE);
59             }
60 0           $_[0]->(0); # don't sleep.
61 0           });
62             }
63              
64             # return 1 if we did something (or tried to do something), return 0 if
65             # there was nothing to be done.
66             sub replicate_using_torepl_table {
67 0     0 0   my $self = shift;
68 0           my $todo = shift;
69              
70             # find some fids to replicate, prioritize based on when they should be tried
71 0           my $sto = Mgd::get_store();
72              
73 0           my $fid = $todo->{fid};
74 0           $self->still_alive;
75              
76 0           my $errcode;
77              
78             my %opts;
79 0           $opts{errref} = \$errcode;
80 0           $opts{no_unlock} = 1; # to make it return an $unlock subref
81 0 0         $opts{source_devid} = $todo->{fromdevid} if $todo->{fromdevid};
82              
83 0           my ($status, $unlock) = replicate($fid, %opts);
84              
85 0 0         if ($status) {
86             # $status is either 0 (failure, handled below), 1 (success, we actually
87             # replicated this file), or 2 (success, but someone else replicated it).
88              
89             # when $staus eq "lost_race", this delete is unnecessary normally
90             # (somebody else presumably already deleted it if they
91             # also replicated it), but in the case of running with old
92             # replicators from previous versions, -or- simply if the
93             # other guy's delete failed, this cleans it up....
94 0           $sto->delete_fid_from_file_to_replicate($fid);
95 0 0         $unlock->() if $unlock;
96 0           next;
97             }
98              
99 0 0         debug("Replication of fid=$fid failed with errcode=$errcode") if $Mgd::DEBUG >= 2;
100              
101             # ERROR CASES:
102              
103             # README: please keep this up to date if you update the replicate() function so we ensure
104             # that this code always does the right thing
105             #
106             # -- HARMLESS --
107             # failed_getting_lock => harmless. skip. somebody else probably doing.
108             #
109             # -- ACTIONABLE --
110             # too_happy => too many copies, attempt to rebalance.
111             #
112             # -- TEMPORARY; DO EXPONENTIAL BACKOFF --
113             # source_down => only source available is observed down.
114             # policy_error_doing_failed => policy plugin fucked up. it's looping.
115             # policy_error_already_there => policy plugin fucked up. it's dumb.
116             # policy_no_suggestions => no copy was attempted. policy is just not happy.
117             # copy_error => policy said to do 1+ things, we failed, it ran out of suggestions.
118             #
119             # -- FATAL; DON'T TRY AGAIN --
120             # no_source => it simply exists nowhere. not that something's down, but file_on is empty.
121              
122             # bail if we failed getting the lock, that means someone else probably
123             # already did it, so we should just move on
124 0 0         if ($errcode eq 'failed_getting_lock') {
125 0 0         $unlock->() if $unlock;
126 0           next;
127             }
128              
129             # logic for setting the next try time appropriately
130             my $update_nexttry = sub {
131 0     0     my ($type, $delay) = @_;
132 0           my $sto = Mgd::get_store();
133 0 0         if ($type eq 'end_of_time') {
    0          
134             # special; update to a time that won't happen again,
135             # as we've encountered a scenario in which case we're
136             # really hosed
137 0           $sto->reschedule_file_to_replicate_absolute($fid, $sto->end_of_time);
138             } elsif ($type eq "offset") {
139 0           $sto->reschedule_file_to_replicate_relative($fid, $delay+0);
140             } else {
141 0           $sto->reschedule_file_to_replicate_absolute($fid, $delay+0);
142             }
143 0           };
144              
145             # now let's handle any error we want to consider a total failure; do not
146             # retry at any point. push this file off to the end so someone has to come
147             # along and figure out what went wrong.
148 0 0         if ($errcode eq 'no_source') {
149 0           $update_nexttry->( end_of_time => 1 );
150 0 0         $unlock->() if $unlock;
151 0           next;
152             }
153              
154             # try to shake off extra copies. fall through to the backoff logic
155             # so we don't flood if it's impossible to properly weaken the fid.
156             # there's a race where the fid could be checked again, but the
157             # exclusive locking prevents replication clobbering.
158 0 0         if ($errcode eq 'too_happy') {
159 0 0         $unlock->() if $unlock;
160 0           $unlock = undef;
161 0           my $f = MogileFS::FID->new($fid);
162 0           my @devs = List::Util::shuffle($f->devids);
163 0           my $devfid;
164             # First one we can delete from, we try to rebalance away from.
165 0           for (@devs) {
166 0           my $dev = Mgd::device_factory()->get_by_id($_);
167             # Not positive 'should_read_from' needs to be here.
168             # We must be able to delete off of this dev so the fid can
169             # move.
170 0 0 0       if ($dev->can_delete_from && $dev->should_read_from) {
171 0           $devfid = MogileFS::DevFID->new($dev, $f);
172 0           last;
173             }
174             }
175 0 0         $self->rebalance_devfid($devfid) if $devfid;
176             }
177              
178             # at this point, the rest of the errors require exponential backoff. define what this means
179             # as far as failcount -> delay to next try.
180             # 15s, 1m, 5m, 30m, 1h, 2h, 4h, 8h, 24h, 24h, 24h, 24h, ...
181 0           my @backoff = qw( 15 60 300 1800 3600 7200 14400 28800 );
182 0   0       $update_nexttry->( offset => int(($backoff[$todo->{failcount}] || 86400) * (rand(0.4) + 0.8)) );
183 0 0         $unlock->() if $unlock;
184 0           return 1;
185             }
186              
187             # Return 1 on success, 0 on failure.
188             sub rebalance_devfid {
189 0     0 0   my ($self, $devfid, $opts) = @_;
190 0   0       $opts ||= {};
191 0           MogileFS::Util::okay_args($opts, qw(avoid_devids target_devids));
192              
193 0           my $fid = $devfid->fid;
194              
195             # bail out early if this FID is no longer in the namespace (weird
196             # case where file is in file_on because not yet deleted, but
197             # has been replaced/deleted in 'file' table...). not too harmful
198             # (just noisy) if this line didn't exist, but whatever... it
199             # makes stuff cleaner on my intentionally-corrupted-for-fsck-testing
200             # dev machine...
201 0 0         return 1 if ! $fid->exists;
202              
203 0           my $errcode;
204 0           my ($ret, $unlock) = replicate($fid,
205             mask_devids => { $devfid->devid => 1 },
206             no_unlock => 1,
207             target_devids => $opts->{target_devids},
208             errref => \$errcode,
209             );
210              
211             my $fail = sub {
212 0     0     my $error = shift;
213 0           $unlock->();
214 0           error("Rebalance for $devfid (" . $devfid->url . ") failed: $error");
215 0           return 0;
216 0           };
217              
218 0 0 0       unless ($ret || $errcode eq "too_happy") {
219 0           return $fail->("Replication failed");
220             }
221              
222 0           my $should_delete = 0;
223 0           my $del_reason;
224              
225 0 0 0       if ($errcode eq "too_happy" || $ret eq "lost_race") {
    0          
226             # for some reason, we did no work. that could be because
227             # either 1) we lost the race, as the error code implies,
228             # and some other process rebalanced this first, or 2)
229             # the file is over-replicated, and everybody just thinks they
230             # lost the race because the replication policy said there's
231             # nothing to do, even with this devfid masked away.
232             # so let's figure it out... if this devfid still exists,
233             # we're over-replicated, else we just lost the race.
234 0 0         if ($devfid->exists) {
235             # over-replicated
236              
237             # see if some copy, besides this one we want
238             # to delete, is currently alive & of right size..
239             # just as extra paranoid check before we delete it
240 0           foreach my $test_df ($fid->devfids) {
241 0 0         next if $test_df->devid == $devfid->devid;
242 0 0         if ($test_df->size_matches) {
243 0           $should_delete = 1;
244 0           $del_reason = "over_replicated";
245 0           last;
246             }
247             }
248             } else {
249             # lost race
250 0           $should_delete = 0; # no-op
251             }
252             } elsif ($ret eq "would_worsen") {
253             # replication has indicated we would be making ruining this fid's day
254             # if we delete an existing copy, so lets not do that.
255             # this indicates a condition where there're no suitable devices to
256             # copy new data onto, so lets be loud about it.
257 0           return $fail->("no suitable destination devices available");
258             } else {
259 0           $should_delete = 1;
260 0           $del_reason = "did_rebalance;ret=$ret";
261             }
262              
263 0           my %destroy_opts;
264              
265 0 0         $destroy_opts{ignore_missing} = 1
266             if MogileFS::Config->config("rebalance_ignore_missing");
267              
268 0 0         if ($should_delete) {
269 0           eval { $devfid->destroy(%destroy_opts) };
  0            
270 0 0         if ($@) {
271 0           return $fail->("HTTP delete (due to '$del_reason') failed: $@");
272             }
273             }
274              
275 0           $unlock->();
276 0           return 1;
277             }
278              
279             # replicates $fid to make sure it meets its class' replicate policy.
280             #
281             # README: if you update this sub to return a new error code, please update the
282             # appropriate callers to know how to deal with the errors returned.
283             #
284             # returns either:
285             # $rv
286             # ($rv, $unlock_sub) -- when 'no_unlock' %opt is used. subref to release lock.
287             # $rv is one of:
288             # 0 = failure (failure written to ${$opts{errref}})
289             # 1 = success
290             # "lost_race" = skipping, we did no work and policy was already met.
291             # "nofid" => fid no longer exists. skip replication.
292             sub replicate {
293 0     0 0   my ($fid, %opts) = @_;
294 0 0         $fid = MogileFS::FID->new($fid) unless ref $fid;
295 0           my $fidid = $fid->id;
296              
297 0 0         debug("Replication for $fidid called, opts=".join(',',keys(%opts))) if $Mgd::DEBUG >= 2;
298              
299 0           my $errref = delete $opts{'errref'};
300 0           my $no_unlock = delete $opts{'no_unlock'};
301 0           my $fixed_source = delete $opts{'source_devid'};
302 0   0       my $mask_devids = delete $opts{'mask_devids'} || {};
303 0   0       my $avoid_devids = delete $opts{'avoid_devids'} || {};
304 0   0       my $target_devids = delete $opts{'target_devids'} || []; # inverse of avoid_devids.
305 0 0         die "unknown_opts" if %opts;
306 0 0         die unless ref $mask_devids eq "HASH";
307              
308 0           my $sdevid;
309              
310 0           my $sto = Mgd::get_store();
311             my $unlock = sub {
312 0     0     $sto->note_done_replicating($fidid);
313 0           };
314              
315             my $retunlock = sub {
316 0     0     my $rv = shift;
317 0           my ($errmsg, $errcode);
318 0 0         if (@_ == 2) {
319 0           ($errcode, $errmsg) = @_;
320 0           $errmsg = "$errcode: $errmsg"; # include code with message
321             } else {
322 0           ($errmsg) = @_;
323             }
324 0 0         $$errref = $errcode if $errref;
325              
326 0           my $ret;
327 0 0 0       if ($errcode && $errcode eq "failed_getting_lock") {
328             # don't emit a warning with error() on lock failure. not
329             # a big deal, don't scare people.
330 0           $ret = 0;
331             } else {
332 0 0         $ret = $rv ? $rv : error($errmsg);
333             }
334 0 0         if ($no_unlock) {
335 0 0         die "ERROR: must be called in list context w/ no_unlock" unless wantarray;
336 0           return ($ret, $unlock);
337             } else {
338 0 0         die "ERROR: must not be called in list context w/o no_unlock" if wantarray;
339 0           $unlock->();
340 0           return $ret;
341             }
342 0           };
343              
344             # hashref of devid -> MogileFS::Device
345 0 0         my $devs = Mgd::device_factory()->map_by_id
346             or die "No device map";
347              
348 0 0         return $retunlock->(0, "failed_getting_lock", "Unable to obtain lock for fid $fidid")
349             unless $sto->should_begin_replicating_fidid($fidid);
350              
351             # if the fid doesn't even exist, consider our job done! no point
352             # replicating file contents of a file no longer in the namespace.
353 0 0         return $retunlock->("nofid") unless $fid->exists;
354              
355 0           my $cls = $fid->class;
356 0           my $polobj = $cls->repl_policy_obj;
357              
358             # learn what this devices file is already on
359 0           my @on_devs; # all devices fid is on, reachable or not.
360             my @on_devs_tellpol; # subset of @on_devs, to tell the policy class about
361 0           my @on_up_devid; # subset of @on_devs: just devs that are readable
362              
363 0           foreach my $devid ($fid->devids) {
364 0 0         my $d = Mgd::device_factory()->get_by_id($devid)
365             or next;
366 0           push @on_devs, $d;
367 0 0 0       if ($d->dstate->should_have_files && ! $mask_devids->{$devid}) {
368 0           push @on_devs_tellpol, $d;
369             }
370 0 0         if ($d->should_read_from) {
371 0           push @on_up_devid, $devid;
372             }
373             }
374              
375 0 0         return $retunlock->(0, "no_source", "Source is no longer available replicating $fidid") if @on_devs == 0;
376 0 0         return $retunlock->(0, "source_down", "No alive devices available replicating $fidid") if @on_up_devid == 0;
377              
378 0 0 0       if ($fixed_source && ! grep { $_ == $fixed_source } @on_up_devid) {
  0            
379 0           error("Fixed source dev$fixed_source requested for $fidid but not available. Trying other devices");
380             }
381              
382 0           my %dest_failed; # devid -> 1 for each devid we were asked to copy to, but failed.
383             my %source_failed; # devid -> 1 for each devid we had problems reading from.
384 0           my $got_copy_request = 0; # true once replication policy asks us to move something somewhere
385 0           my $copy_err;
386              
387 0           my $dest_devs = $devs;
388 0 0         if (@$target_devids) {
389 0           $dest_devs = {map { $_ => $devs->{$_} } @$target_devids};
  0            
390             }
391              
392 0           my $rr; # MogileFS::ReplicationRequest
393 0           while (1) {
394 0           $rr = rr_upgrade($polobj->replicate_to(
395             fid => $fidid,
396             on_devs => \@on_devs_tellpol, # all device objects fid is on, dead or otherwise
397             all_devs => $dest_devs,
398             failed => \%dest_failed,
399             min => $cls->mindevcount,
400             ));
401              
402 0 0         last if $rr->is_happy;
403              
404 0           my @ddevs; # dest devs, in order of preference
405             my $ddevid; # dest devid we've chosen to copy to
406 0 0         if (@ddevs = $rr->copy_to_one_of_ideally) {
    0          
407 0 0 0       if (my @not_masked_ids = (grep { ! $mask_devids->{$_} &&
  0            
  0            
408             ! $avoid_devids->{$_}
409             }
410             map { $_->id } @ddevs)) {
411 0           $ddevid = $not_masked_ids[0];
412             } else {
413             # once we masked devids away, there were no
414             # ideal suggestions. this is the case of rebalancing,
415             # which without this check could 'worsen' the state
416             # of the world. consider the case:
417             # h1[ d1 d2 ] h2[ d3 ]
418             # and files are on d1 & d3, an ideal layout.
419             # if d3 is being rebalanced, and masked away, the
420             # replication policy could presumably say to put
421             # the file on d2, even though d3 isn't dead.
422             # so instead, when masking is in effect, we don't
423             # use non-ideal placement, just bailing out.
424              
425             # this used to return "lost_race" as a lie, but rebalance was
426             # happily deleting the masked fid if at least one other fid
427             # existed... because it assumed it was over replicated.
428             # now we tell rebalance that touching this fid would be
429             # stupid.
430 0           return $retunlock->("would_worsen");
431             }
432             } elsif (@ddevs = $rr->copy_to_one_of_desperate) {
433             # TODO: reschedule a replication for 'n' minutes in future, or
434             # when new hosts/devices become available or change state
435 0           $ddevid = $ddevs[0]->id;
436             } else {
437 0           last;
438             }
439              
440 0           $got_copy_request = 1;
441              
442             # replication policy shouldn't tell us to put a file on a device
443             # we've already told it that we've failed at. so if we get that response,
444             # the policy plugin is broken and we should terminate now.
445 0 0         if ($dest_failed{$ddevid}) {
446 0           return $retunlock->(0, "policy_error_doing_failed",
447             "replication policy told us to do something we already told it we failed at while replicating fid $fidid");
448             }
449              
450             # replication policy shouldn't tell us to put a file on a
451             # device that it's already on. that's just stupid.
452 0 0         if (grep { $_->id == $ddevid } @on_devs) {
  0            
453 0           return $retunlock->(0, "policy_error_already_there",
454             "replication policy told us to put fid $fidid on dev $ddevid, but it's already there!");
455             }
456              
457             # find where we're replicating from
458             {
459             # TODO: use an observed good device+host as source to start.
460 0           my @choices = grep { ! $source_failed{$_} } @on_up_devid;
  0            
  0            
461 0 0         return $retunlock->(0, "source_down", "No devices available replicating $fidid") unless @choices;
462 0 0 0       if ($fixed_source && grep { $_ == $fixed_source } @choices) {
  0            
463 0           $sdevid = $fixed_source;
464             } else {
465 0           @choices = List::Util::shuffle(@choices);
466 0           MogileFS::run_global_hook('replicate_order_final_choices', $devs, \@choices);
467 0           $sdevid = shift @choices;
468             }
469             }
470              
471 0 0         my $worker = MogileFS::ProcManager->is_child or die;
472 0           my $digest;
473 0           my $fid_checksum = $fid->checksum;
474 0 0         $digest = Digest->new($fid_checksum->hashname) if $fid_checksum;
475 0 0 0       $digest ||= Digest->new($cls->hashname) if $cls->hashtype;
476              
477             my $rv = http_copy(
478             sdevid => $sdevid,
479             ddevid => $ddevid,
480             fid => $fid,
481             errref => \$copy_err,
482 0     0     callback => sub { $worker->still_alive; },
483 0           digest => $digest,
484             );
485 0 0 0       die "Bogus error code: $copy_err" if !$rv && $copy_err !~ /^(?:src|dest)_error$/;
486              
487 0 0         unless ($rv) {
488 0           error("Failed copying fid $fidid from devid $sdevid to devid $ddevid (error type: $copy_err)");
489 0 0         if ($copy_err eq "src_error") {
490 0           $source_failed{$sdevid} = 1;
491              
492 0 0 0       if ($fixed_source && $fixed_source == $sdevid) {
493 0           error("Fixed source dev$fixed_source was requested for $fidid but failed: will try other sources");
494             }
495              
496             } else {
497 0           $dest_failed{$ddevid} = 1;
498             }
499 0           next;
500             }
501              
502 0           my $dfid = MogileFS::DevFID->new($ddevid, $fid);
503 0           $dfid->add_to_db;
504 0 0 0       if ($digest && !$fid->checksum) {
505 0           $sto->set_checksum($fidid, $cls->hashtype, $digest->digest);
506             }
507              
508 0           push @on_devs, $devs->{$ddevid};
509 0           push @on_devs_tellpol, $devs->{$ddevid};
510 0           push @on_up_devid, $ddevid;
511             }
512              
513             # We are over replicated. Let caller decide if it should rebalance.
514 0 0         if ($rr->too_happy) {
515 0           return $retunlock->(0, "too_happy", "fid $fidid is on too many devices");
516             }
517              
518 0 0         if ($rr->is_happy) {
519 0 0         return $retunlock->(1) if $got_copy_request;
520 0           return $retunlock->("lost_race"); # some other process got to it first. policy was happy immediately.
521             }
522              
523 0           return $retunlock->(0, "policy_no_suggestions",
524             "replication policy ran out of suggestions for us replicating fid $fidid");
525             }
526              
527             # Returns a hashref with the following:
528             # {
529             # code => HTTP status code integer,
530             # keep => boolean, whether to keep the connection after reading
531             # len => value of the Content-Length header (integer)
532             # }
533             sub read_headers {
534 0     0 0   my ($sock) = @_;
535 0           my %rv = ();
536             # FIXME: this can block. needs to timeout.
537 0           my $line = <$sock>;
538 0 0         return unless defined $line;
539 0 0         $line =~ m!\AHTTP/(\d+\.\d+)\s+(\d+)! or return;
540 0           $rv{keep} = $1 >= 1.1;
541 0           $rv{code} = $2;
542              
543 0           while (1) {
544 0           $line = <$sock>;
545 0 0         return unless defined $line;
546 0 0         last if $line =~ /\A\r?\n\z/;
547 0 0         if ($line =~ /\AConnection:\s*keep-alive\s*\z/is) {
    0          
    0          
548 0           $rv{keep} = 1;
549             } elsif ($line =~ /\AConnection:\s*close\s*\z/is) {
550 0           $rv{keep} = 0;
551             } elsif ($line =~ /\AContent-Length:\s*(\d+)\s*\z/is) {
552 0           $rv{len} = $1;
553             }
554             }
555 0           return \%rv;
556             }
557              
558             # copies a file from one Perlbal to another utilizing HTTP
559             sub http_copy {
560 0     0 0   my %opts = @_;
561 0           my ($sdevid, $ddevid, $fid, $intercopy_cb, $errref, $digest) =
562 0           map { delete $opts{$_} } qw(sdevid
563             ddevid
564             fid
565             callback
566             errref
567             digest
568             );
569 0 0         die if %opts;
570              
571 0 0         $fid = MogileFS::FID->new($fid) unless ref($fid);
572 0           my $fidid = $fid->id;
573 0           my $expected_clen = $fid->length;
574 0           my $clen;
575 0           my $content_md5 = '';
576 0           my ($sconn, $dconn);
577 0           my $fid_checksum = $fid->checksum;
578 0 0 0       if ($fid_checksum && $fid_checksum->hashname eq "MD5") {
579             # some HTTP servers may be able to verify Content-MD5 on PUT
580             # and reject corrupted requests. no HTTP server should reject
581             # a request for an unrecognized header
582 0           my $b64digest = encode_base64($fid_checksum->{checksum}, "");
583 0           $content_md5 = "\r\nContent-MD5: $b64digest";
584             }
585              
586 0   0 0     $intercopy_cb ||= sub {};
  0            
587              
588             my $err_common = sub {
589 0     0     my ($err, $msg) = @_;
590 0 0         $$errref = $err if $errref;
591 0 0         $sconn->close($err) if $sconn;
592 0 0         $dconn->close($err) if $dconn;
593 0           return error($msg);
594 0           };
595              
596             # handles setting unreachable magic; $error->(reachability, "message")
597             my $error_unreachable = sub {
598 0     0     return $err_common->("src_error", "Fid $fidid unreachable while replicating: $_[0]");
599 0           };
600              
601             my $dest_error = sub {
602 0     0     return $err_common->("dest_error", $_[0]);
603 0           };
604              
605             my $src_error = sub {
606 0     0     return $err_common->("src_error", $_[0]);
607 0           };
608              
609             # get some information we'll need
610 0           my $sdev = Mgd::device_factory()->get_by_id($sdevid);
611 0           my $ddev = Mgd::device_factory()->get_by_id($ddevid);
612              
613 0 0 0       return error("Error: unable to get device information: source=$sdevid, destination=$ddevid, fid=$fidid")
614             unless $sdev && $ddev;
615              
616 0           my $s_dfid = MogileFS::DevFID->new($sdev, $fid);
617 0           my $d_dfid = MogileFS::DevFID->new($ddev, $fid);
618              
619 0           my ($spath, $dpath) = (map { $_->uri_path } ($s_dfid, $d_dfid));
  0            
620 0           my ($shost, $dhost) = (map { $_->host } ($sdev, $ddev));
  0            
621              
622 0           my ($shostip, $sport) = ($shost->ip, $shost->http_port);
623 0 0         if (MogileFS::Config->config("repl_use_get_port")) {
624 0           $sport = $shost->http_get_port;
625             }
626 0           my ($dhostip, $dport) = ($dhost->ip, $dhost->http_port);
627 0 0 0       unless (defined $spath && defined $dpath && defined $shostip && defined $dhostip && $sport && $dport) {
      0        
      0        
      0        
      0        
628             # show detailed information to find out what's not configured right
629 0           error("Error: unable to replicate file fid=$fidid from device id $sdevid to device id $ddevid");
630 0           error(" http://$shostip:$sport$spath -> http://$dhostip:$dport$dpath");
631 0           return 0;
632             }
633              
634 0           my $put = "PUT $dpath HTTP/1.0\r\nConnection: keep-alive\r\n" .
635             "Content-length: $expected_clen$content_md5\r\n\r\n";
636              
637             # need by webdav servers, like lighttpd...
638 0           $ddev->vivify_directories($d_dfid->url);
639              
640             # call a hook for odd casing completely different source data
641             # for specific files.
642 0           my $shttphost;
643 0           MogileFS::run_global_hook('replicate_alternate_source',
644             $fid, \$shostip, \$sport, \$spath, \$shttphost);
645              
646 0           my $durl = "http://$dhostip:$dport$dpath";
647 0           my $surl = "http://$shostip:$sport$spath";
648             # okay, now get the file
649 0           my %sopts = ( ip => $shostip, port => $sport );
650              
651 0           my $get = "GET $spath HTTP/1.0\r\nConnection: keep-alive\r\n";
652             # plugin set a custom host.
653 0 0         $get .= "Host: $shttphost\r\n" if $shttphost;
654              
655 0           my $data = '';
656 0           my ($sock, $dsock);
657 0           my ($wcount, $bytes_to_read, $written, $remain);
658 0           my ($stries, $dtries) = (0, 0);
659              
660 0 0         retry:
661             $sconn->close("retrying") if $sconn;
662 0 0         $dconn->close("retrying") if $dconn;
663 0           $dconn = undef;
664 0           $stries++;
665 0 0         $sconn = $shost->http_conn_get(\%sopts)
666             or return $src_error->("Unable to create source socket to $shostip:$sport for $spath");
667 0           $sock = $sconn->sock;
668 0 0         unless ($sock->write("$get\r\n")) {
669 0 0 0       goto retry if $sconn->retryable && $stries == 1;
670 0           return $src_error->("Pipe closed retrieving $spath from $shostip:$sport");
671             }
672              
673             # we just want a content length
674 0           my $sres = read_headers($sock);
675 0 0         unless ($sres) {
676 0 0 0       goto retry if $sconn->retryable && $stries == 1;
677 0           return $error_unreachable->("Error: Resource $surl failed to return an HTTP response");
678             }
679 0 0 0       unless ($sres->{code} >= 200 && $sres->{code} <= 299) {
680 0           return $error_unreachable->("Error: Resource $surl failed: HTTP $sres->{code}");
681             }
682 0           $clen = $sres->{len};
683              
684 0 0         return $error_unreachable->("File $spath has unexpected content-length of $clen, not $expected_clen")
685             if $clen != $expected_clen;
686              
687             # open target for put
688 0           $dtries++;
689 0 0         $dconn = $dhost->http_conn_get
690             or return $dest_error->("Unable to create dest socket to $dhostip:$dport for $dpath");
691 0           $dsock = $dconn->sock;
692              
693 0 0         unless ($dsock->write($put)) {
694 0 0 0       goto retry if $dconn->retryable && $dtries == 1;
695 0           return $dest_error->("Pipe closed during write to $dpath on $dhostip:$dport");
696             }
697              
698             # now read data and print while we're reading.
699 0           ($written, $remain) = (0, $clen);
700 0           $bytes_to_read = 1024*1024; # read 1MB at a time until there's less than that remaining
701 0 0         $bytes_to_read = $remain if $remain < $bytes_to_read;
702 0           $wcount = 0;
703              
704 0           while ($bytes_to_read) {
705 0           my $bytes = $sock->read($data, $bytes_to_read);
706 0 0         unless (defined $bytes) {
707 0           return $src_error->("error reading midway through source: $!");
708             }
709 0 0         if ($bytes == 0) {
710 0           return $src_error->("EOF reading midway through source: $!");
711             }
712              
713             # now we've read in $bytes bytes
714 0           $remain -= $bytes;
715 0 0         $bytes_to_read = $remain if $remain < $bytes_to_read;
716 0 0         $digest->add($data) if $digest;
717              
718 0           my $data_len = $bytes;
719 0           my $data_off = 0;
720 0           while (1) {
721 0           my $wbytes = syswrite($dsock, $data, $data_len, $data_off);
722 0 0         unless (defined $wbytes) {
723             # it can take two writes to determine if a socket is dead
724             # (TCP_NODELAY and TCP_CORK are (and must be) zero here)
725 0 0 0       goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
      0        
726 0           return $dest_error->("Error: syswrite failed after $written bytes with: $!; failed putting to $dpath");
727             }
728 0           $wcount++;
729 0           $written += $wbytes;
730 0           $intercopy_cb->();
731 0 0         last if ($data_len == $wbytes);
732              
733 0           $data_len -= $wbytes;
734 0           $data_off += $wbytes;
735             }
736              
737 0 0         die if $bytes_to_read < 0;
738             }
739              
740             # source connection drained, return to pool
741 0 0         if ($sres->{keep}) {
742 0           $shost->http_conn_put($sconn);
743 0           $sconn = undef;
744             } else {
745 0           $sconn->close("http_close");
746             }
747              
748             # callee will want this digest, too, so clone as "digest" is destructive
749 0 0         $digest = $digest->clone->digest if $digest;
750              
751 0 0         if ($fid_checksum) {
752 0 0         if ($digest ne $fid_checksum->{checksum}) {
753 0           my $expect = $fid_checksum->hexdigest;
754 0           $digest = unpack("H*", $digest);
755 0           return $src_error->("checksum mismatch on GET: expected: $expect actual: $digest");
756             }
757             }
758              
759             # now read in the response line (should be first line)
760 0           my $dres = read_headers($dsock);
761 0 0         unless ($dres) {
762 0 0 0       goto retry if (!$wcount && $dconn->retryable && $dtries == 1);
      0        
763 0           return $dest_error->("Error: HTTP response line not recognized writing to $durl");
764             }
765              
766             # drain the response body if there is one
767             # there may be no dres->{len}/Content-Length if there is no body
768 0 0         if ($dres->{len}) {
769 0           my $r = $dsock->read($data, $dres->{len}); # dres->{len} should be tiny
770 0 0         if (defined $r) {
771 0 0         if ($r != $dres->{len}) {
772 0           Mgd::error("Failed to read $r of Content-Length:$dres->{len} bytes for PUT response on $durl");
773 0           $dres->{keep} = 0;
774             }
775             } else {
776 0           Mgd::error("Failed to read Content-Length:$dres->{len} bytes for PUT response on $durl ($!)");
777 0           $dres->{keep} = 0;
778             }
779             }
780              
781             # return the connection back to the connection pool
782 0 0         if ($dres->{keep}) {
783 0           $dhost->http_conn_put($dconn);
784 0           $dconn = undef;
785             } else {
786 0           $dconn->close("http_close");
787             }
788              
789 0 0 0       if ($dres->{code} >= 200 && $dres->{code} <= 299) {
790 0 0         if ($digest) {
791 0   0       my $alg = ($fid_checksum && $fid_checksum->hashname) || $fid->class->hashname;
792              
793 0 0 0       if ($ddev->{reject_bad_md5} && ($alg eq "MD5")) {
794             # dest device would've rejected us with a error,
795             # no need to reread the file
796 0           return 1;
797             }
798 0           my $httpfile = MogileFS::HTTPFile->at($durl);
799 0           my $actual = $httpfile->digest($alg, $intercopy_cb);
800 0 0         if ($actual ne $digest) {
801 0           my $expect = unpack("H*", $digest);
802 0           $actual = unpack("H*", $actual);
803 0           return $dest_error->("checksum mismatch on PUT, expected: $expect actual: $digest");
804             }
805             }
806 0           return 1;
807             }
808 0           return $dest_error->("Got HTTP status code $dres->{code} PUTing to $durl");
809             }
810              
811             1;
812              
813             # Local Variables:
814             # mode: perl
815             # c-basic-indent: 4
816             # indent-tabs-mode: nil
817             # End:
818              
819             __END__