File Coverage

blib/lib/MogileFS/Worker/Reaper.pm
Criterion Covered Total %
statement 24 96 25.0
branch 0 28 0.0
condition 0 5 0.0
subroutine 8 19 42.1
pod 0 7 0.0
total 32 155 20.6


line stmt bran cond sub pod time code
1             package MogileFS::Worker::Reaper;
2             # deletes files
3              
4 21     21   120 use strict;
  21         39  
  21         576  
5 21     21   85 use base 'MogileFS::Worker';
  21         34  
  21         2267  
6 21     21   124 use MogileFS::Server;
  21         36  
  21         680  
7 21     21   105 use MogileFS::Util qw(error debug);
  21         44  
  21         1309  
8 21     21   125 use MogileFS::Config qw(DEVICE_SUMMARY_CACHE_TIMEOUT);
  21         32  
  21         929  
9 21     21   111 use constant REAP_INTERVAL => 5;
  21         31  
  21         1018  
10 21     21   105 use constant REAP_BACKOFF_MIN => 60;
  21         68  
  21         942  
11              
12             # completely forget about devices we've reaped after 2 hours of idleness
13 21     21   101 use constant REAP_BACKOFF_MAX => 7200;
  21         67  
  21         14902  
14              
15             sub new {
16 0     0 0   my ($class, $psock) = @_;
17 0           my $self = fields::new($class);
18 0           $self->SUPER::new($psock);
19              
20 0           return $self;
21             }
22              
23             sub watchdog_timeout {
24 0     0 0   return 240;
25             }
26              
27             # order is important here:
28             #
29             # first, add fid to file_to_replicate table. it
30             # shouldn't matter if the replicator gets to this
31             # before the subsequent 'forget_about' method, as the
32             # replicator will treat dead file_on devices as
33             # non-existent anyway. however, it is important that
34             # we enqueue it for replication first, before we
35             # forget about that file_on row, otherwise a failure
36             # after/during 'forget_about' could leave a stranded
37             # file on a dead device and we'd never fix it.
38             sub reap_fid {
39 0     0 0   my ($self, $fid, $dev) = @_;
40              
41 0           $fid->enqueue_for_replication(in => 1);
42 0           $dev->forget_about($fid);
43             }
44              
45             # this returns 1000 by default
46             sub reaper_inject_limit {
47 0     0 0   my ($self) = @_;
48              
49 0           my $sto = Mgd::get_store();
50 0           my $max = MogileFS::Config->server_setting_cached('queue_size_for_reaper');
51 0   0       my $limit = MogileFS::Config->server_setting_cached('queue_rate_for_reaper') || 1000;
52              
53             # max defaults to zero, meaning we inject $limit every wakeup
54 0 0         if ($max) {
55             # if a queue size limit is configured for reaper, prevent too many
56             # files from entering the repl queue:
57 0           my $len = $sto->deferred_repl_queue_length;
58 0           my $space_left = $max - $len;
59              
60 0 0         $limit = $space_left if ($limit > $space_left);
61              
62             # limit may end up being negative here since other processes
63             # can inject into the deferred replication queue, reaper is
64             # the only one which can respect this queue size
65 0 0         $limit = 0 if $limit < 0;
66             }
67              
68 0           return $limit;
69             }
70              
71             # we pass the $devid here (instead of a Device object) to avoid
72             # potential memory leaks since this sub reschedules itself to run
73             # forever. $delay is the current delay we were scheduled at
74             sub reap_dev {
75 0     0 0   my ($self, $devid, $delay) = @_;
76              
77             # ensure the master DB is up, retry in REAP_INTERVAL if down
78 0 0         unless ($self->validate_dbh) {
79 0           $delay = REAP_INTERVAL;
80 0     0     Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
  0            
81 0           return;
82             }
83              
84 0           my $limit = $self->reaper_inject_limit;
85              
86             # just in case a user mistakenly nuked a devid from the device table:
87 0           my $dev = Mgd::device_factory()->get_by_id($devid);
88 0 0         unless ($dev) {
89 0           error("No device row for dev$devid, cannot reap");
90 0           $delay = undef;
91             }
92              
93             # user resurrected a "dead" device, not supported, really...
94 0 0         if (!$dev->dstate->is_perm_dead) {
95 0           Mgd::log("dev$devid is no longer dead to reaper");
96 0           return;
97             }
98              
99             # limit == 0 if we hit the queue size limit, we'll just reschedule
100 0 0 0       if ($limit && $dev) {
101 0           my $sto = Mgd::get_store();
102 0           my $lock = "mgfs:reaper";
103 0           my $lock_timeout = $self->watchdog_timeout / 4;
104 0           my @fids;
105              
106 0 0         if ($sto->get_lock($lock, $lock_timeout)) {
107 0           @fids = $dev->fid_list(limit => $limit);
108 0 0         if (@fids) {
109 0           $self->still_alive;
110 0           foreach my $fid (@fids) {
111 0           $self->reap_fid($fid, $dev);
112             }
113             }
114 0           $sto->release_lock($lock);
115              
116             # if we've found any FIDs (perhaps even while backing off)
117             # ensure we try to find more soon:
118 0 0         if (@fids) {
119 0           $delay = REAP_INTERVAL;
120             } else {
121 0           $delay = $self->reap_dev_backoff_delay($delay);
122             }
123             } else {
124             # No lock after a long lock_timeout? Try again soon.
125             # We should never get here under MySQL, and rarely for other DBs.
126 0           debug("get_lock($lock, $lock_timeout) failed");
127 0           $delay = REAP_INTERVAL;
128             }
129             }
130              
131 0 0         return unless defined $delay;
132              
133             # schedule another update, delay could be REAP_BACKOFF_MAX
134 0     0     Danga::Socket->AddTimer($delay, sub { $self->reap_dev($devid, $delay) });
  0            
135             }
136              
137             # called when we're hopefully all done with a device, but reschedule
138             # into the future in case the replicator had an out-of-date cache and the
139             # "dead" device was actually writable.
140             sub reap_dev_backoff_delay {
141 0     0 0   my ($self, $delay) = @_;
142              
143 0 0         return REAP_BACKOFF_MIN if ($delay < REAP_BACKOFF_MIN);
144              
145 0           $delay *= 2;
146 0 0         return $delay > REAP_BACKOFF_MAX ? undef : $delay;
147             }
148              
149             # looks for dead devices
150             sub work {
151 0     0 0   my $self = shift;
152              
153             # ensure we get monitor updates
154 0     0     Danga::Socket->AddOtherFds($self->psock_fd, sub{ $self->read_from_parent });
  0            
155              
156 0           my %devid_seen;
157             my $reap_check;
158             $reap_check = sub {
159             # get db and note we're starting a run
160 0     0     $self->parent_ping;
161 0           debug("Reaper running; looking for dead devices");
162              
163 0           foreach my $dev (grep { $_->dstate->is_perm_dead }
  0            
164             Mgd::device_factory()->get_all)
165             {
166 0 0         next if $devid_seen{$dev->id};
167              
168             # delay the initial device reap in case any replicator cache
169             # thinks the device is still alive
170             Danga::Socket->AddTimer(DEVICE_SUMMARY_CACHE_TIMEOUT + 1, sub {
171 0           $self->reap_dev($dev->id, REAP_INTERVAL);
172 0           });
173              
174             # once we've seen a device, reap_dev will takeover scheduling
175             # reaping for the given device.
176 0           $devid_seen{$dev->id} = 1;
177             }
178              
179 0           Danga::Socket->AddTimer(REAP_INTERVAL, $reap_check);
180 0           };
181              
182             # kick off the reaper and loop forever
183 0           $reap_check->();
184 0           Danga::Socket->EventLoop;
185             }
186              
187             1;
188              
189             # Local Variables:
190             # mode: perl
191             # c-basic-indent: 4
192             # indent-tabs-mode: nil
193             # End: