File Coverage

blib/lib/MogileFS/Worker/Reaper.pm
Criterion Covered Total %
statement 24 93 25.8
branch 0 26 0.0
condition 0 5 0.0
subroutine 8 19 42.1
pod 0 7 0.0
total 32 150 21.3


line stmt bran cond sub pod time code
1             package MogileFS::Worker::Reaper;
2             # deletes files
3              
4 21     21   142 use strict;
  21         55  
  21         1078  
5 21     21   130 use base 'MogileFS::Worker';
  21         54  
  21         3777  
6 21     21   141 use MogileFS::Server;
  21         55  
  21         605  
7 21     21   122 use MogileFS::Util qw(error debug);
  21         44  
  21         1659  
8 21     21   144 use MogileFS::Config qw(DEVICE_SUMMARY_CACHE_TIMEOUT);
  21         52  
  21         1432  
9 21     21   141 use constant REAP_INTERVAL => 5;
  21         50  
  21         1570  
10 21     21   145 use constant REAP_BACKOFF_MIN => 60;
  21         48  
  21         1090  
11              
12             # completely forget about devices we've reaped after 2 hours of idleness
13 21     21   126 use constant REAP_BACKOFF_MAX => 7200;
  21         54  
  21         28971  
14              
15             sub new {
16 0     0 0   my ($class, $psock) = @_;
17 0           my $self = fields::new($class);
18 0           $self->SUPER::new($psock);
19              
20 0           return $self;
21             }
22              
23             sub watchdog_timeout {
24 0     0 0   return 240;
25             }
26              
27             # order is important here:
28             #
29             # first, add fid to file_to_replicate table. it
30             # shouldn't matter if the replicator gets to this
31             # before the subsequent 'forget_about' method, as the
32             # replicator will treat dead file_on devices as
33             # non-existent anyway. however, it is important that
34             # we enqueue it for replication first, before we
35             # forget about that file_on row, otherwise a failure
36             # after/during 'forget_about' could leave a stranded
37             # file on a dead device and we'd never fix it.
38             sub reap_fid {
39 0     0 0   my ($self, $fid, $dev) = @_;
40              
41 0           $fid->enqueue_for_replication(in => 1);
42 0           $dev->forget_about($fid);
43             }
44              
45             # this returns 1000 by default
46             sub reaper_inject_limit {
47 0     0 0   my ($self) = @_;
48              
49 0           my $sto = Mgd::get_store();
50 0           my $max = MogileFS::Config->server_setting_cached('queue_size_for_reaper');
51 0   0       my $limit = MogileFS::Config->server_setting_cached('queue_rate_for_reaper') || 1000;
52              
53             # max defaults to zero, meaning we inject $limit every wakeup
54 0 0         if ($max) {
55             # if a queue size limit is configured for reaper, prevent too many
56             # files from entering the repl queue:
57 0           my $len = $sto->deferred_repl_queue_length;
58 0           my $space_left = $max - $len;
59              
60 0 0         $limit = $space_left if ($limit > $space_left);
61              
62             # limit may end up being negative here since other processes
63             # can inject into the deferred replication queue, reaper is
64             # the only one which can respect this queue size
65 0 0         $limit = 0 if $limit < 0;
66             }
67              
68 0           return $limit;
69             }
70              
71             # we pass the $devid here (instead of a Device object) to avoid
72             # potential memory leaks since this sub reschedules itself to run
73             # forever. $delay is the current delay we were scheduled at
74             sub reap_dev {
75 0     0 0   my ($self, $devid, $delay) = @_;
76              
77             # ensure the master DB is up, retry in REAP_INTERVAL if down
78 0 0         unless ($self->validate_dbh) {
79 0           $delay = REAP_INTERVAL;
80 0     0     Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
  0            
81 0           return;
82             }
83              
84 0           my $limit = $self->reaper_inject_limit;
85              
86             # just in case a user mistakenly nuked a devid from the device table:
87 0           my $dev = Mgd::device_factory()->get_by_id($devid);
88 0 0         unless ($dev) {
89 0           error("No device row for dev$devid, cannot reap");
90 0           $delay = undef;
91             }
92              
93             # limit == 0 if we hit the queue size limit, we'll just reschedule
94 0 0 0       if ($limit && $dev) {
95 0           my $sto = Mgd::get_store();
96 0           my $lock = "mgfs:reaper";
97 0           my $lock_timeout = $self->watchdog_timeout / 4;
98 0           my @fids;
99              
100 0 0         if ($sto->get_lock($lock, $lock_timeout)) {
101 0           @fids = $dev->fid_list(limit => $limit);
102 0 0         if (@fids) {
103 0           $self->still_alive;
104 0           foreach my $fid (@fids) {
105 0           $self->reap_fid($fid, $dev);
106             }
107             }
108 0           $sto->release_lock($lock);
109              
110             # if we've found any FIDs (perhaps even while backing off)
111             # ensure we try to find more soon:
112 0 0         if (@fids) {
113 0           $delay = REAP_INTERVAL;
114             } else {
115 0           $delay = $self->reap_dev_backoff_delay($delay);
116             }
117             } else {
118             # No lock after a long lock_timeout? Try again soon.
119             # We should never get here under MySQL, and rarely for other DBs.
120 0           debug("get_lock($lock, $lock_timeout) failed");
121 0           $delay = REAP_INTERVAL;
122             }
123             }
124              
125 0 0         return unless defined $delay;
126              
127             # schedule another update, delay could be REAP_BACKOFF_MAX
128 0     0     Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
  0            
129             }
130              
131             # called when we're hopefully all done with a device, but reschedule
132             # into the future in case the replicator had an out-of-date cache and the
133             # "dead" device was actually writable.
134             sub reap_dev_backoff_delay {
135 0     0 0   my ($self, $delay) = @_;
136              
137 0 0         return REAP_BACKOFF_MIN if ($delay < REAP_BACKOFF_MIN);
138              
139 0           $delay *= 2;
140 0 0         return $delay > REAP_BACKOFF_MAX ? undef : $delay;
141             }
142              
143             # looks for dead devices
144             sub work {
145 0     0 0   my $self = shift;
146              
147             # ensure we get monitor updates
148 0     0     Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent });
  0            
149              
150 0           my %devid_seen;
151             my $reap_check;
152             $reap_check = sub {
153             # get db and note we're starting a run
154 0     0     $self->parent_ping;
155 0           debug("Reaper running; looking for dead devices");
156              
157 0           foreach my $dev (grep { $_->dstate->is_perm_dead }
  0            
158             Mgd::device_factory()->get_all)
159             {
160 0 0         next if $devid_seen{$dev->id};
161              
162             # delay the initial device reap in case any replicator cache
163             # thinks the device is still alive
164             Danga::Socket->AddTimer(DEVICE_SUMMARY_CACHE_TIMEOUT + 1, sub {
165 0           $self->reap_dev($dev->id, REAP_INTERVAL);
166 0           });
167              
168             # once we've seen a device, reap_dev will takeover scheduling
169             # reaping for the given device.
170 0           $devid_seen{$dev->id} = 1;
171             }
172              
173 0           Danga::Socket->AddTimer(REAP_INTERVAL, $reap_check);
174 0           };
175              
176             # kick off the reaper and loop forever
177 0           $reap_check->();
178 0           Danga::Socket->EventLoop;
179             }
180              
181             1;
182              
183             # Local Variables:
184             # mode: perl
185             # c-basic-indent: 4
186             # indent-tabs-mode: nil
187             # End: