File Coverage

blib/lib/MogileFS/Store.pm
Criterion Covered Total %
statement 37 971 3.8
branch 3 330 0.9
condition 0 121 0.0
subroutine 12 218 5.5
pod 0 181 0.0
total 52 1821 2.8


line stmt bran cond sub pod time code
1             package MogileFS::Store;
2 21     21   103 use strict;
  21         40  
  21         810  
3 21     21   97 use warnings;
  21         40  
  21         748  
4 21     21   100 use Carp qw(croak confess);
  21         36  
  21         1277  
5 21     21   103 use MogileFS::Util qw(throw max error);
  21         34  
  21         921  
6 21     21   30584 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
  21         300348  
  21         1543  
7 21     21   208 use List::Util qw(shuffle);
  21         34  
  21         2056  
8              
9             # this is incremented whenever the schema changes. server will refuse
10             # to start-up with an old schema version
11             #
12             # 6: adds file_to_replicate table
13             # 7: adds file_to_delete_later table
14             # 8: adds fsck_log table
15             # 9: adds 'drain' state to enum in device table
16             # 10: adds 'replpolicy' column to 'class' table
17             # 11: adds 'file_to_queue' table
18             # 12: adds 'file_to_delete2' table
19             # 13: modifies 'server_settings.value' to TEXT for wider values
20             # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21             # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22             # 15: adds checksum table, adds 'hashtype' column to 'class' table
23             # 16: adds 'readonly' state to enum in host table
24 21     21   118 use constant SCHEMA_VERSION => 16;
  21         26  
  21         59294  
25              
26             sub new {
27 1     1 0 9 my ($class) = @_;
28 1         3 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
  4         10  
29             }
30              
31             sub new_from_dsn_user_pass {
32 1     1 0 2 my ($class, $dsn, $user, $pass, $max_handles) = @_;
33 1         1 my $subclass;
34 1 50       9 if ($dsn =~ /^DBI:mysql:/i) {
    50          
    0          
    0          
35 0         0 $subclass = "MogileFS::Store::MySQL";
36             } elsif ($dsn =~ /^DBI:SQLite:/i) {
37 1         2 $subclass = "MogileFS::Store::SQLite";
38             } elsif ($dsn =~ /^DBI:Oracle:/i) {
39 0         0 $subclass = "MogileFS::Store::Oracle";
40             } elsif ($dsn =~ /^DBI:Pg:/i) {
41 0         0 $subclass = "MogileFS::Store::Postgres";
42             } else {
43 0         0 die "Unknown database type: $dsn";
44             }
45 1 50   1   80 unless (eval "use $subclass; 1") {
  1         514  
  0            
  0            
46 1         7 die "Error loading $subclass: $@\n";
47             }
48 0         0 my $self = bless {
49             dsn => $dsn,
50             user => $user,
51             pass => $pass,
52             max_handles => $max_handles, # Max number of handles to allow
53             raise_errors => $subclass->want_raise_errors,
54             slave_list_version => 0,
55             slave_list_cache => [],
56             recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
57             recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
58             handles_left => 0, # amount of times this handle can still be verified
59             connected_slaves => {},
60             dead_slaves => {},
61             dead_backoff => {}, # how many times in a row a slave has died
62             connect_timeout => 10, # High default.
63             }, $subclass;
64 0         0 $self->init;
65 0         0 return $self;
66             }
67              
68             # Defaults to true now.
69             sub want_raise_errors {
70 0     0 0 0 1;
71             }
72              
73             sub new_from_mogdbsetup {
74 0     0 0 0 my ($class, %args) = @_;
75             # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
76 0         0 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
77              
78             my $try_make_sto = sub {
79 0 0   0   0 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
80             PrintError => 0,
81             }) or return undef;
82 0         0 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
83 0         0 $sto->raise_errors;
84 0         0 return $sto;
85 0         0 };
86              
87             # upgrading, apparently, as this database already exists.
88 0         0 my $sto = $try_make_sto->();
89 0 0       0 return $sto if $sto;
90              
91             # otherwise, we need to make the requested database, setup permissions, etc
92 0         0 $class->status("couldn't connect to database as mogilefs user. trying root...");
93 0         0 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
94 0 0       0 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
95             PrintError => 0,
96             }) or
97             die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
98 0         0 $class->status("connected to database as root user.");
99              
100 0         0 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
101 0         0 $class->create_db_if_not_exists($rdbh, $args{dbname});
102 0         0 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
103 0         0 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
104              
105             # should be ready now:
106 0         0 $sto = $try_make_sto->();
107 0 0       0 return $sto if $sto;
108              
109 0         0 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
110             }
111              
112             # given a root DBI connection, create the named database. succeed
113             # if it it's made, or already exists. die otherwise.
114             sub create_db_if_not_exists {
115 0     0 0 0 my ($pkg, $rdbh, $dbname) = @_;
116 0 0       0 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
117             or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
118             }
119              
120             sub grant_privileges {
121 0     0 0 0 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
122 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
123             undef, $pass)
124             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
125 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
126             undef, $pass)
127             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
128             }
129              
130 0     0 0 0 sub can_replace { 0 }
131 0     0 0 0 sub can_insertignore { 0 }
132 0     0 0 0 sub can_insert_multi { 0 }
133 0     0 0 0 sub can_for_update { 1 }
134              
135 0     0 0 0 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
136              
137             sub ignore_replace {
138 0     0 0 0 my $self = shift;
139 0 0       0 return "INSERT IGNORE " if $self->can_insertignore;
140 0 0       0 return "REPLACE " if $self->can_replace;
141 0         0 die "Can't INSERT IGNORE or REPLACE?";
142             }
143              
144             my $on_status = sub {};
145             my $on_confirm = sub { 1 };
146 0     0 0 0 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
  0         0  
147 0     0 0 0 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
  0         0  
148 0     0 0 0 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
  0         0  
149 0 0   0 0 0 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
  0         0  
150              
151 0     0 0 0 sub latest_schema_version { SCHEMA_VERSION }
152              
153             sub raise_errors {
154 0     0 0 0 my $self = shift;
155 0         0 $self->{raise_errors} = 1;
156 0         0 $self->dbh->{RaiseError} = 1;
157             }
158              
159 0     0 0 0 sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
160              
161 0     0 0 0 sub dsn { $_[0]{dsn} }
162 0     0 0 0 sub user { $_[0]{user} }
163 0     0 0 0 sub pass { $_[0]{pass} }
164              
165 0     0 0 0 sub connect_timeout { $_[0]{connect_timeout} }
166              
167 0     0 0 0 sub init { 1 }
168 0     0 0 0 sub post_dbi_connect { 1 }
169              
170 0     0 0 0 sub can_do_slaves { 0 }
171              
172             sub mark_as_slave {
173 0     0 0 0 my $self = shift;
174 0 0       0 die "Incapable of becoming slave." unless $self->can_do_slaves;
175              
176 0         0 $self->{is_slave} = 1;
177             }
178              
179             sub is_slave {
180 0     0 0 0 my $self = shift;
181 0         0 return $self->{is_slave};
182             }
183              
184             sub _slaves_list_changed {
185 0     0   0 my $self = shift;
186 0   0     0 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
187 0 0       0 if ($ver <= $self->{slave_list_version}) {
188 0         0 return 0;
189             }
190 0         0 $self->{slave_list_version} = $ver;
191             # Restart connections from scratch if the configuration changed.
192 0         0 $self->{connected_slaves} = {};
193 0         0 return 1;
194             }
195              
196             # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
197             sub _slaves_list {
198 0     0   0 my $self = shift;
199 0         0 my $now = time();
200              
201 0 0       0 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
202             or return ();
203              
204 0         0 my @ret;
205 0         0 foreach my $key (split /\s*,\s*/, $sk) {
206 0         0 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
207              
208 0 0       0 if (!$slave) {
209 0         0 error("key for slave DB config: slave_$key not found in configuration");
210 0         0 next;
211             }
212              
213 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
214 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
215 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
216 0         0 next;
217             }
218 0         0 push @ret, [$dsn, $user, $pass]
219             }
220              
221 0         0 return @ret;
222             }
223              
224             sub _pick_slave {
225 0     0   0 my $self = shift;
226 0         0 my @temp = shuffle keys %{$self->{connected_slaves}};
  0         0  
227 0 0       0 return unless @temp;
228 0         0 return $self->{connected_slaves}->{$temp[0]};
229             }
230              
231             sub _connect_slave {
232 0     0   0 my $self = shift;
233 0         0 my $slave_fulldsn = shift;
234 0         0 my $now = time();
235              
236 0   0     0 my $dead_retry =
237             MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
238              
239 0   0     0 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
240 0         0 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
241 0 0 0     0 return if (defined $dead_timeout
242             && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
243 0 0       0 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
244              
245 0         0 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
246 0   0     0 $newslave->set_connect_timeout(
247             MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
248 0         0 $self->{slave}->{next_check} = 0;
249 0         0 $newslave->mark_as_slave;
250 0 0       0 if ($self->check_slave) {
251 0         0 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
252 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
253             } else {
254             # Magic numbers are saddening...
255 0 0       0 $dead_backoff++ unless $dead_backoff > 20;
256 0         0 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
257 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
258             }
259             }
260              
261             sub get_slave {
262 0     0 0 0 my $self = shift;
263              
264 0 0       0 die "Incapable of having slaves." unless $self->can_do_slaves;
265              
266 0         0 $self->{slave} = undef;
267 0         0 foreach my $slave (keys %{$self->{dead_slaves}}) {
  0         0  
268 0         0 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
  0         0  
  0         0  
269 0 0       0 unless ($full_dsn) {
270 0         0 delete $self->{dead_slaves}->{$slave};
271 0         0 next;
272             }
273 0         0 $self->_connect_slave($full_dsn);
274             }
275              
276 0 0       0 unless ($self->_slaves_list_changed) {
277 0 0       0 if ($self->{slave} = $self->_pick_slave) {
278 0         0 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
279 0 0       0 return $self->{slave} if $self->check_slave;
280             }
281             }
282              
283 0 0       0 if ($self->{slave}) {
284 0         0 my $dsn = $self->{slave}->{dsn};
285 0         0 $self->{dead_slaves}->{$dsn} = time();
286 0         0 $self->{dead_backoff}->{$dsn} = 0;
287 0         0 delete $self->{connected_slaves}->{$dsn};
288 0         0 error("Error talking to slave: $dsn");
289             }
290 0         0 my @slaves_list = $self->_slaves_list;
291              
292             # If we have no slaves, then return silently.
293 0 0       0 return unless @slaves_list;
294              
295 0         0 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
296              
297 0 0 0     0 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
298 0         0 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
299             }
300              
301 0         0 $self->{slave_list_cache} = \@slaves_list;
302              
303 0         0 foreach my $slave_fulldsn (@slaves_list) {
304 0         0 $self->_connect_slave($slave_fulldsn);
305             }
306              
307 0 0       0 if ($self->{slave} = $self->_pick_slave) {
308 0         0 return $self->{slave};
309             }
310 0         0 warn "Slave list exhausted, failing back to master.";
311 0         0 return;
312             }
313              
314             sub read_store {
315 0     0 0 0 my $self = shift;
316              
317 0 0       0 return $self unless $self->can_do_slaves;
318              
319 0 0       0 if ($self->{slave_ok}) {
320 0 0       0 if (my $slave = $self->get_slave) {
321 0         0 return $slave;
322             }
323             }
324              
325 0         0 return $self;
326             }
327              
328             sub slaves_ok {
329 0     0 0 0 my $self = shift;
330 0         0 my $coderef = shift;
331              
332 0 0       0 return unless ref $coderef eq 'CODE';
333              
334 0         0 local $self->{slave_ok} = 1;
335              
336 0         0 return $coderef->(@_);
337             }
338              
339             sub recheck_dbh {
340 0     0 0 0 my $self = shift;
341 0         0 $self->{recheck_req_gen}++;
342             }
343              
344             sub dbh {
345 0     0 0 0 my $self = shift;
346            
347 0 0       0 if ($self->{dbh}) {
348 0 0       0 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
349 0 0       0 $self->{dbh} = undef unless $self->{dbh}->ping;
350             # Handles a memory leak under Solaris/Postgres.
351             # We may leak a little extra memory if we're holding a lock,
352             # since dropping a connection mid-lock is fatal
353 0 0 0     0 $self->{dbh} = undef if ($self->{max_handles} &&
      0        
354             $self->{handles_left}-- < 0 && !$self->{lock_depth});
355 0         0 $self->{recheck_done_gen} = $self->{recheck_req_gen};
356             }
357 0 0       0 return $self->{dbh} if $self->{dbh};
358             }
359              
360             # Shortcut flag: if monitor thinks the master is down, avoid attempting to
361             # connect to it for now. If we already have a connection to the master,
362             # keep using it as above.
363 0 0       0 if (!$self->is_slave) {
364 0         0 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
365 0 0 0     0 return if (defined $flag && $flag == 0);;
366             }
367              
368             # auto-reconnect is unsafe if we're holding a lock
369 0 0       0 if ($self->{lock_depth}) {
370 0         0 die "DB connection recovery unsafe, lock held: $self->{last_lock}";
371             }
372              
373 0         0 eval {
374 0     0   0 local $SIG{ALRM} = sub { die "timeout\n" };
  0         0  
375 0         0 alarm($self->connect_timeout);
376 0   0     0 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
377             PrintError => 0,
378             AutoCommit => 1,
379             # FUTURE: will default to on (have to validate all callers first):
380             RaiseError => ($self->{raise_errors} || 0),
381             sqlite_use_immediate_transaction => 1,
382             });
383             };
384 0         0 alarm(0);
385 0 0       0 if ($@ eq "timeout\n") {
    0          
386 0         0 die "Failed to connect to database: timeout";
387             } elsif ($@) {
388 0         0 die "Failed to connect to database: " . DBI->errstr;
389             }
390 0         0 $self->post_dbi_connect;
391 0 0       0 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
392 0         0 return $self->{dbh};
393             }
394              
395 0 0   0 0 0 sub have_dbh { return 1 if $_[0]->{dbh}; }
396              
397             sub ping {
398 0     0 0 0 my $self = shift;
399 0         0 return $self->dbh->ping;
400             }
401              
402             sub condthrow {
403 0     0 0 0 my ($self, $optmsg) = @_;
404 0         0 my $dbh = $self->dbh;
405 0 0       0 return 1 unless $dbh->err;
406 0         0 my ($pkg, $fn, $line) = caller;
407 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
408 0 0       0 $msg .= ": $optmsg" if $optmsg;
409             # Auto rollback failures around transactions.
410 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
411 0         0 croak($msg);
412             }
413              
414             sub dowell {
415 0     0 0 0 my ($self, $sql, @do_params) = @_;
416 0         0 my $rv = eval { $self->dbh->do($sql, @do_params) };
  0         0  
417 0 0 0     0 return $rv unless $@ || $self->dbh->err;
418 0         0 warn "Error with SQL: $sql\n";
419 0   0     0 Carp::confess($@ || $self->dbh->errstr);
420             }
421              
422             sub _valid_params {
423 0 0   0   0 croak("Odd number of parameters!") if scalar(@_) % 2;
424 0         0 my ($self, $vlist, %uarg) = @_;
425 0         0 my %ret;
426 0         0 $ret{$_} = delete $uarg{$_} foreach @$vlist;
427 0 0       0 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
428 0         0 return %ret;
429             }
430              
431             sub was_deadlock_error {
432 0     0 0 0 my $self = shift;
433 0         0 my $dbh = $self->dbh;
434 0         0 die "UNIMPLEMENTED";
435             }
436              
437             sub was_duplicate_error {
438 0     0 0 0 my $self = shift;
439 0         0 my $dbh = $self->dbh;
440 0         0 die "UNIMPLEMENTED";
441             }
442              
443             # run a subref (presumably a database update) in an eval, because you expect it to
444             # maybe fail on duplicate key error, and throw a dup exception for you, else return
445             # its return value
446             sub conddup {
447 0     0 0 0 my ($self, $code) = @_;
448 0         0 my $rv = eval { $code->(); };
  0         0  
449 0 0       0 throw("dup") if $self->was_duplicate_error;
450 0 0       0 croak($@) if $@;
451 0         0 return $rv;
452             }
453              
454             # insert row if doesn't already exist
455             # WARNING: This function is NOT transaction safe if the duplicate errors causes
456             # your transaction to halt!
457             # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
458             # is false! Rows before the duplicate will be inserted, but rows after the
459             # duplicate might not be, depending your database.
460             sub insert_ignore {
461 0     0 0 0 my ($self, $sql, @params) = @_;
462 0         0 my $dbh = $self->dbh;
463 0 0       0 if ($self->can_insertignore) {
464 0         0 return $dbh->do("INSERT IGNORE $sql", @params);
465             } else {
466             # TODO: Detect bad multi-row insert here.
467 0         0 my $rv = eval { $dbh->do("INSERT $sql", @params); };
  0         0  
468 0 0 0     0 if ($@ || $dbh->err) {
469 0 0       0 return 1 if $self->was_duplicate_error;
470             # This chunk is identical to condthrow, but we include it directly
471             # here as we know there is definitely an error, and we would like
472             # the caller of this function.
473 0         0 my ($pkg, $fn, $line) = caller;
474 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
475 0         0 croak($msg);
476             }
477 0         0 return $rv;
478             }
479             }
480              
481             sub retry_on_deadlock {
482 0     0 0 0 my $self = shift;
483 0         0 my $code = shift;
484 0   0     0 my $tries = shift || 3;
485 0 0       0 croak("deadlock retries must be positive") if $tries < 1;
486 0         0 my $rv;
487              
488 0         0 while ($tries-- > 0) {
489 0         0 $rv = eval { $code->(); };
  0         0  
490 0 0       0 next if ($self->was_deadlock_error);
491 0 0       0 croak($@) if $@;
492 0         0 last;
493             }
494 0         0 return $rv;
495             }
496              
497             # --------------------------------------------------------------------------
498              
499             my @extra_tables;
500              
501             sub add_extra_tables {
502 0     0 0 0 my $class = shift;
503 0         0 push @extra_tables, @_;
504             }
505              
506 21         162578 use constant TABLES => qw( domain class file tempfile file_to_delete
507             unreachable_fids file_on file_on_corrupt host
508             device server_settings file_to_replicate
509             file_to_delete_later fsck_log file_to_queue
510 21     21   175 file_to_delete2 checksum);
  21         41  
511              
512             sub setup_database {
513 0     0 0 0 my $sto = shift;
514              
515 0         0 my $curver = $sto->schema_version;
516              
517 0         0 my $latestver = SCHEMA_VERSION;
518 0 0       0 if ($curver == $latestver) {
519 0         0 $sto->status("Schema already up-to-date at version $curver.");
520 0         0 return 1;
521             }
522              
523 0 0       0 if ($curver > $latestver) {
524 0         0 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
525             }
526              
527 0 0       0 if ($curver) {
528 0         0 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
529             }
530              
531 0         0 foreach my $t (TABLES, @extra_tables) {
532 0         0 $sto->create_table($t);
533             }
534              
535 0         0 $sto->upgrade_add_host_getport;
536 0         0 $sto->upgrade_add_host_altip;
537 0         0 $sto->upgrade_add_device_asof;
538 0         0 $sto->upgrade_add_device_weight;
539 0         0 $sto->upgrade_add_device_readonly;
540 0         0 $sto->upgrade_add_device_drain;
541 0         0 $sto->upgrade_add_class_replpolicy;
542 0         0 $sto->upgrade_modify_server_settings_value;
543 0         0 $sto->upgrade_add_file_to_queue_arg;
544 0         0 $sto->upgrade_modify_device_size;
545 0         0 $sto->upgrade_add_class_hashtype;
546              
547 0         0 return 1;
548             }
549              
550             sub cached_schema_version {
551 0     0 0 0 my $self = shift;
552 0   0     0 return $self->{_cached_schema_version} ||=
553             $self->schema_version;
554             }
555              
556             sub schema_version {
557 0     0 0 0 my $self = shift;
558 0         0 my $dbh = $self->dbh;
559 0   0     0 return eval {
560             $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
561             } || 0;
562             }
563              
564 0     0 0 0 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
  0         0  
565              
566             sub create_table {
567 0     0 0 0 my ($self, $table) = @_;
568 0         0 my $dbh = $self->dbh;
569 0 0       0 return 1 if $self->table_exists($table);
570 0         0 my $meth = "TABLE_$table";
571 0         0 my $sql = $self->$meth;
572 0         0 $sql = $self->filter_create_sql($sql);
573 0         0 $self->status("Running SQL: $sql;");
574 0 0       0 $dbh->do($sql) or
575             die "Failed to create table $table: " . $dbh->errstr;
576 0         0 my $imeth = "INDEXES_$table";
577 0         0 my @indexes = eval { $self->$imeth };
  0         0  
578 0         0 foreach $sql (@indexes) {
579 0         0 $self->status("Running SQL: $sql;");
580 0 0       0 $dbh->do($sql) or
581             die "Failed to create indexes on $table: " . $dbh->errstr;
582             }
583             }
584              
585             # Please try to keep all tables aligned nicely
586             # with '"CREATE TABLE' on the first line
587             # and ')"' alone on the last line.
588              
589             sub TABLE_domain {
590             # classes are tied to domains. domains can have classes of items
591             # with different mindevcounts.
592             #
593             # a minimum devcount is the number of copies the system tries to
594             # maintain for files in that class
595             #
596             # unspecified classname means classid=0 (implicit class), and that
597             # implies mindevcount=2
598 0     0 0 0 "CREATE TABLE domain (
599             dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
600             namespace VARCHAR(255),
601             UNIQUE (namespace)
602             )"
603             }
604              
605             sub TABLE_class {
606 0     0 0 0 "CREATE TABLE class (
607             dmid SMALLINT UNSIGNED NOT NULL,
608             classid TINYINT UNSIGNED NOT NULL,
609             PRIMARY KEY (dmid,classid),
610             classname VARCHAR(50),
611             UNIQUE (dmid,classname),
612             mindevcount TINYINT UNSIGNED NOT NULL,
613             hashtype TINYINT UNSIGNED
614             )"
615             }
616              
617             # the length field is only here for easy verifications of content
618             # integrity when copying around. no sums or content types or other
619             # metadata here. application can handle that.
620             #
621             # classid is what class of file this belongs to. for instance, on fotobilder
622             # there will be a class for original pictures (the ones the user uploaded)
623             # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
624             # each domain can setup classes and assign the minimum redundancy level for
625             # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
626             # photos and and a 1 minimum for derived images (which means the sole device
627             # for a derived image can die, bringing devcount to 0 for that file, but
628             # the application can recreate it from its original)
629             sub TABLE_file {
630 0     0 0 0 "CREATE TABLE file (
631             fid INT UNSIGNED NOT NULL,
632             PRIMARY KEY (fid),
633              
634             dmid SMALLINT UNSIGNED NOT NULL,
635             dkey VARCHAR(255), # domain-defined
636             UNIQUE dkey (dmid, dkey),
637              
638             length BIGINT UNSIGNED, # big limit
639              
640             classid TINYINT UNSIGNED NOT NULL,
641             devcount TINYINT UNSIGNED NOT NULL,
642             INDEX devcount (dmid,classid,devcount)
643             )"
644             }
645              
646             sub TABLE_tempfile {
647 0     0 0 0 "CREATE TABLE tempfile (
648             fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
649             PRIMARY KEY (fid),
650              
651             createtime INT UNSIGNED NOT NULL,
652             classid TINYINT UNSIGNED NOT NULL,
653             dmid SMALLINT UNSIGNED NOT NULL,
654             dkey VARCHAR(255),
655             devids VARCHAR(60)
656             )"
657             }
658              
659             # files marked for death when their key is overwritten. then they get a new
660             # fid, but since the old row (with the old fid) had to be deleted immediately,
661             # we need a place to store the fid so an async job can delete the file from
662             # all devices.
663             sub TABLE_file_to_delete {
664 0     0 0 0 "CREATE TABLE file_to_delete (
665             fid INT UNSIGNED NOT NULL,
666             PRIMARY KEY (fid)
667             )"
668             }
669              
670             # if the replicator notices that a fid has no sources, that file gets inserted
671             # into the unreachable_fids table. it is up to the application to actually
672             # handle fids stored in this table.
673             sub TABLE_unreachable_fids {
674 0     0 0 0 "CREATE TABLE unreachable_fids (
675             fid INT UNSIGNED NOT NULL,
676             lastupdate INT UNSIGNED NOT NULL,
677             PRIMARY KEY (fid),
678             INDEX (lastupdate)
679             )"
680             }
681              
682             # what files are on what devices? (most likely physical devices,
683             # as logical devices of RAID arrays would be costly, and mogilefs
684             # already handles redundancy)
685             #
686             # the devid index lets us answer "What files were on this now-dead disk?"
687             sub TABLE_file_on {
688 0     0 0 0 "CREATE TABLE file_on (
689             fid INT UNSIGNED NOT NULL,
690             devid MEDIUMINT UNSIGNED NOT NULL,
691             PRIMARY KEY (fid, devid),
692             INDEX (devid)
693             )"
694             }
695              
696             # if application or framework detects an error in one of the duplicate files
697             # for whatever reason, it can register its complaint and the framework
698             # will do some verifications and fix things up w/ an async job
699             # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
700             # on the other devices?
701             sub TABLE_file_on_corrupt {
702 0     0 0 0 "CREATE TABLE file_on_corrupt (
703             fid INT UNSIGNED NOT NULL,
704             devid MEDIUMINT UNSIGNED NOT NULL,
705             PRIMARY KEY (fid, devid)
706             )"
707             }
708              
709             # hosts (which contain devices...)
710             sub TABLE_host {
711 0     0 0 0 "CREATE TABLE host (
712             hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
713              
714             status ENUM('alive','dead','down'),
715             http_port MEDIUMINT UNSIGNED DEFAULT 7500,
716             http_get_port MEDIUMINT UNSIGNED,
717              
718             hostname VARCHAR(40),
719             hostip VARCHAR(15),
720             altip VARCHAR(15),
721             altmask VARCHAR(18),
722             UNIQUE (hostname),
723             UNIQUE (hostip),
724             UNIQUE (altip)
725             )"
726             }
727              
728             # disks...
729             sub TABLE_device {
730 0     0 0 0 "CREATE TABLE device (
731             devid MEDIUMINT UNSIGNED NOT NULL,
732             hostid MEDIUMINT UNSIGNED NOT NULL,
733              
734             status ENUM('alive','dead','down'),
735             weight MEDIUMINT DEFAULT 100,
736              
737             mb_total INT UNSIGNED,
738             mb_used INT UNSIGNED,
739             mb_asof INT UNSIGNED,
740             PRIMARY KEY (devid),
741             INDEX (status)
742             )"
743             }
744              
745             sub TABLE_server_settings {
746 0     0 0 0 "CREATE TABLE server_settings (
747             field VARCHAR(50) PRIMARY KEY,
748             value TEXT
749             )"
750             }
751              
752             sub TABLE_file_to_replicate {
753             # nexttry is time to try to replicate it next.
754             # 0 means immediate. it's only on one host.
755             # 1 means lower priority. it's on 2+ but isn't happy where it's at.
756             # unix timestamp means at/after that time. some previous error occurred.
757             # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
758             # failcount. how many times we've failed, just for doing backoff of nexttry.
759             # flags. reserved for future use.
760 0     0 0 0 "CREATE TABLE file_to_replicate (
761             fid INT UNSIGNED NOT NULL PRIMARY KEY,
762             nexttry INT UNSIGNED NOT NULL,
763             INDEX (nexttry),
764             fromdevid INT UNSIGNED,
765             failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
766             flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
767             )"
768             }
769              
770             sub TABLE_file_to_delete_later {
771 0     0 0 0 "CREATE TABLE file_to_delete_later (
772             fid INT UNSIGNED NOT NULL PRIMARY KEY,
773             delafter INT UNSIGNED NOT NULL,
774             INDEX (delafter)
775             )"
776             }
777              
778             sub TABLE_fsck_log {
779 0     0 0 0 "CREATE TABLE fsck_log (
780             logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
781             PRIMARY KEY (logid),
782             utime INT UNSIGNED NOT NULL,
783             fid INT UNSIGNED NULL,
784             evcode CHAR(4),
785             devid MEDIUMINT UNSIGNED,
786             INDEX(utime)
787             )"
788             }
789              
790             # generic queue table, designed to be used for workers/jobs which aren't
791             # constantly in use, and are async to the user.
792             # ie; fsck, drain, rebalance.
793             sub TABLE_file_to_queue {
794 0     0 0 0 "CREATE TABLE file_to_queue (
795             fid INT UNSIGNED NOT NULL,
796             devid INT UNSIGNED,
797             type TINYINT UNSIGNED NOT NULL,
798             nexttry INT UNSIGNED NOT NULL,
799             failcount TINYINT UNSIGNED NOT NULL default '0',
800             flags SMALLINT UNSIGNED NOT NULL default '0',
801             arg TEXT,
802             PRIMARY KEY (fid, type),
803             INDEX type_nexttry (type,nexttry)
804             )"
805             }
806              
807             # new style async delete table.
808             # this is separate from file_to_queue since deletes are more actively used,
809             # and partitioning on 'type' doesn't always work so well.
810             sub TABLE_file_to_delete2 {
811 0     0 0 0 "CREATE TABLE file_to_delete2 (
812             fid INT UNSIGNED NOT NULL PRIMARY KEY,
813             nexttry INT UNSIGNED NOT NULL,
814             failcount TINYINT UNSIGNED NOT NULL default '0',
815             INDEX nexttry (nexttry)
816             )"
817             }
818              
819             sub TABLE_checksum {
820 0     0 0 0 "CREATE TABLE checksum (
821             fid INT UNSIGNED NOT NULL PRIMARY KEY,
822             hashtype TINYINT UNSIGNED NOT NULL,
823             checksum VARBINARY(64) NOT NULL
824             )"
825             }
826              
827             # these five only necessary for MySQL, since no other database existed
828             # before, so they can just create the tables correctly to begin with.
829             # in the future, there might be new alters that non-MySQL databases
830             # will have to implement.
831 0     0 0 0 sub upgrade_add_host_getport { 1 }
832 0     0 0 0 sub upgrade_add_host_altip { 1 }
833 0     0 0 0 sub upgrade_add_device_asof { 1 }
834 0     0 0 0 sub upgrade_add_device_weight { 1 }
835 0     0 0 0 sub upgrade_add_device_readonly { 1 }
836 0     0 0 0 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
837 0     0 0 0 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
838 0     0 0 0 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
839 0     0 0 0 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
840              
841             sub upgrade_add_class_replpolicy {
842 0     0 0 0 my ($self) = @_;
843 0 0       0 unless ($self->column_type("class", "replpolicy")) {
844 0         0 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
845             }
846             }
847              
848             sub upgrade_add_class_hashtype {
849 0     0 0 0 my ($self) = @_;
850 0 0       0 unless ($self->column_type("class", "hashtype")) {
851 0         0 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
852             }
853             }
854              
855             # return true if deleted, 0 if didn't exist, exception if error
856             sub delete_host {
857 0     0 0 0 my ($self, $hostid) = @_;
858 0         0 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
859             }
860              
861             # return true if deleted, 0 if didn't exist, exception if error
862             sub delete_domain {
863 0     0 0 0 my ($self, $dmid) = @_;
864 0         0 my ($err, $rv);
865 0         0 my $dbh = $self->dbh;
866 0         0 eval {
867 0         0 $dbh->begin_work;
868 0 0       0 if ($self->domain_has_files($dmid)) {
    0          
869 0         0 $err = "has_files";
870             } elsif ($self->domain_has_classes($dmid)) {
871 0         0 $err = "has_classes";
872             } else {
873 0         0 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
874              
875             # remove the "default" class if one was created (for mindevcount)
876             # this is currently the only way to delete the "default" class
877 0         0 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
878 0         0 $dbh->commit;
879             }
880 0 0       0 $dbh->rollback if $err;
881             };
882 0         0 $self->condthrow; # will rollback on errors
883 0 0       0 throw($err) if $err;
884 0         0 return $rv;
885             }
886              
887             sub domain_has_files {
888 0     0 0 0 my ($self, $dmid) = @_;
889 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
890             undef, $dmid);
891 0 0       0 return $has_a_fid ? 1 : 0;
892             }
893              
894             sub domain_has_classes {
895 0     0 0 0 my ($self, $dmid) = @_;
896             # queryworker does not permit removing default class, so domain_has_classes
897             # should not register the default class
898 0         0 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
899             undef, $dmid);
900 0         0 return defined($has_a_class);
901             }
902              
903             sub class_has_files {
904 0     0 0 0 my ($self, $dmid, $clid) = @_;
905 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
906             undef, $dmid, $clid);
907 0 0       0 return $has_a_fid ? 1 : 0;
908             }
909              
910             # return new classid on success (non-zero integer), die on failure
911             # throw 'dup' on duplicate name
912             sub create_class {
913 0     0 0 0 my ($self, $dmid, $classname) = @_;
914 0         0 my $dbh = $self->dbh;
915              
916 0         0 my ($clsid, $rv);
917              
918 0         0 eval {
919 0         0 $dbh->begin_work;
920 0 0       0 if ($classname eq 'default') {
921 0         0 $clsid = 0;
922             } else {
923             # get the max class id in this domain
924 0   0     0 my $maxid = $dbh->selectrow_array
925             ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
926 0         0 $clsid = $maxid + 1;
927             }
928             # now insert the new class
929 0         0 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
930             undef, $dmid, $clsid, $classname, 2);
931 0 0       0 $dbh->commit if $rv;
932             };
933 0 0 0     0 if ($@ || $dbh->err) {
934 0 0       0 if ($self->was_duplicate_error) {
935             # ensure we're not inside a transaction
936 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
937 0         0 throw("dup");
938             }
939             }
940 0         0 $self->condthrow; # this will rollback on errors
941 0 0       0 return $clsid if $rv;
942 0         0 die;
943             }
944              
945             # return 1 on success, throw "dup" on duplicate name error, die otherwise
946             sub update_class_name {
947 0     0 0 0 my $self = shift;
948 0         0 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
949 0         0 my $rv = eval {
950 0         0 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
951             undef, $arg{classname}, $arg{dmid}, $arg{classid});
952             };
953 0 0       0 throw("dup") if $self->was_duplicate_error;
954 0         0 $self->condthrow;
955 0         0 return 1;
956             }
957              
958             # return 1 on success, die otherwise
959             sub update_class_mindevcount {
960 0     0 0 0 my $self = shift;
961 0         0 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
962 0         0 eval {
963 0         0 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
964             undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
965             };
966 0         0 $self->condthrow;
967 0         0 return 1;
968             }
969              
970             # return 1 on success, die otherwise
971             sub update_class_replpolicy {
972 0     0 0 0 my $self = shift;
973 0         0 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
974 0         0 eval {
975 0         0 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
976             undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
977             };
978 0         0 $self->condthrow;
979 0         0 return 1;
980             }
981              
982             # return 1 on success, die otherwise
983             sub update_class_hashtype {
984 0     0 0 0 my $self = shift;
985 0         0 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
986 0         0 eval {
987 0         0 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
988             undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
989             };
990 0         0 $self->condthrow;
991             }
992              
993             sub nfiles_with_dmid_classid_devcount {
994 0     0 0 0 my ($self, $dmid, $classid, $devcount) = @_;
995 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
996             undef, $dmid, $classid, $devcount);
997             }
998              
999             sub set_server_setting {
1000 0     0 0 0 my ($self, $key, $val) = @_;
1001 0         0 my $dbh = $self->dbh;
1002 0 0       0 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1003              
1004 0         0 eval {
1005 0 0       0 if (defined $val) {
1006 0         0 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1007             } else {
1008 0         0 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1009             }
1010             };
1011              
1012 0 0       0 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1013 0         0 return 1;
1014             }
1015              
1016             # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
1017             sub incr_server_setting {
1018 0     0 0 0 my ($self, $key, $val) = @_;
1019 0 0       0 $val = 1 unless defined $val;
1020 0 0       0 return unless $val;
1021              
1022 0 0       0 return 1 if $self->dbh->do("UPDATE server_settings ".
1023             "SET value=value+? ".
1024             "WHERE field=?", undef,
1025             $val, $key) > 0;
1026 0         0 $self->set_server_setting($key, $val);
1027             }
1028              
1029             sub server_setting {
1030 0     0 0 0 my ($self, $key) = @_;
1031 0         0 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1032             undef, $key);
1033             }
1034              
1035             sub server_settings {
1036 0     0 0 0 my ($self) = @_;
1037 0         0 my $ret = {};
1038 0         0 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1039 0         0 $sth->execute;
1040 0         0 while (my ($k, $v) = $sth->fetchrow_array) {
1041 0         0 $ret->{$k} = $v;
1042             }
1043 0         0 return $ret;
1044             }
1045              
1046             # register a tempfile and return the fidid, which should be allocated
1047             # using autoincrement/sequences if the passed in fid is undef. however,
1048             # if fid is passed in, that value should be used and returned.
1049             #
1050             # return new/passed in fidid on success.
1051             # throw 'dup' if fid already in use
1052             # return 0/undef/die on failure
1053             #
1054             sub register_tempfile {
1055 0     0 0 0 my $self = shift;
1056 0         0 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1057              
1058 0         0 my $dbh = $self->dbh;
1059 0         0 my $fid = $arg{fid};
1060              
1061 0 0       0 my $explicit_fid_used = $fid ? 1 : 0;
1062              
1063             # setup the new mapping. we store the devices that we picked for
1064             # this file in here, knowing that they might not be used. create_close
1065             # is responsible for actually mapping in file_on. NOTE: fid is being
1066             # passed in, it's either some number they gave us, or it's going to be
1067             # 0/undef which translates into NULL which means to automatically create
1068             # one. that should be fine.
1069             my $ins_tempfile = sub {
1070 0     0   0 my $rv = eval {
1071             # We must only pass the correct number of bind parameters
1072             # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1073             # Postgres, where you are expected to leave it out or use DEFAULT
1074             # Leaving it out seems sanest and least likely to cause problems
1075             # with other databases.
1076 0         0 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1077 0         0 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1078 0   0     0 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1079             # Do not check for $explicit_fid_used, but rather $fid directly
1080             # as this anonymous sub is called from the loop later
1081 0 0       0 if($fid) {
1082 0         0 unshift @keys, 'fid';
1083 0         0 unshift @vars, '?';
1084 0         0 unshift @vals, $fid;
1085             }
1086 0         0 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1087 0         0 $dbh->do($sql, undef, @vals);
1088             };
1089 0 0       0 if (!$rv) {
1090 0 0       0 return undef if $self->was_duplicate_error;
1091 0         0 die "Unexpected db error into tempfile: " . $dbh->errstr;
1092             }
1093              
1094 0 0       0 unless (defined $fid) {
1095             # if they did not give us a fid, then we want to grab the one that was
1096             # theoretically automatically generated
1097 0 0       0 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1098             or die "No last_insert_id found";
1099             }
1100 0 0 0     0 return undef unless defined $fid && $fid > 0;
1101 0         0 return 1;
1102 0         0 };
1103              
1104 0 0       0 unless ($ins_tempfile->()) {
1105 0 0       0 throw("dup") if $explicit_fid_used;
1106 0         0 die "tempfile insert failed";
1107             }
1108              
1109             my $fid_in_use = sub {
1110 0     0   0 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1111 0 0       0 return $exists ? 1 : 0;
1112 0         0 };
1113              
1114             # See notes in MogileFS::Config->check_database
1115 0         0 my $min_fidid = MogileFS::Config->config('min_fidid');
1116              
1117             # if the fid is in use, do something
1118 0   0     0 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1119 0 0       0 throw("dup") if $explicit_fid_used;
1120              
1121             # be careful of databases which reset their
1122             # auto-increment/sequences when the table is empty (InnoDB
1123             # did/does this, for instance). So check if it's in use, and
1124             # re-seed the table with the highest known fid from the file
1125             # table.
1126              
1127             # get the highest fid from the filetable and insert a dummy row
1128 0         0 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1129 0         0 $ins_tempfile->(); # don't care about its result
1130              
1131             # then do a normal auto-increment
1132 0         0 $fid = undef;
1133 0 0       0 $ins_tempfile->() or die "register_tempfile failed after seeding";
1134             }
1135              
1136 0         0 return $fid;
1137             }
1138              
1139             # return hashref of row containing columns "fid, dmid, dkey, length,
1140             # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1141             # row.
1142             sub file_row_from_dmid_key {
1143 0     0 0 0 my ($self, $dmid, $key) = @_;
1144 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1145             "FROM file WHERE dmid=? AND dkey=?",
1146             undef, $dmid, $key);
1147             }
1148              
1149             # return hashref of row containing columns "fid, dmid, dkey, length,
1150             # classid, devcount" provided a $fidid or undef if no row.
1151             sub file_row_from_fidid {
1152 0     0 0 0 my ($self, $fidid) = @_;
1153 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1154             "FROM file WHERE fid=?",
1155             undef, $fidid);
1156             }
1157              
1158             # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1159             # classid, devcount" provided a pair of $fidid or undef if no rows.
1160             sub file_row_from_fidid_range {
1161 0     0 0 0 my ($self, $fromfid, $count) = @_;
1162 0         0 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1163             "FROM file WHERE fid > ? LIMIT ?");
1164 0         0 $sth->execute($fromfid,$count);
1165 0         0 return $sth->fetchall_arrayref({});
1166             }
1167              
1168             # return array of devids that a fidid is on
1169             sub fid_devids {
1170 0     0 0 0 my ($self, $fidid) = @_;
1171 0 0       0 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
  0         0  
1172             undef, $fidid) || [] };
1173             }
1174              
1175             # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1176             sub fid_devids_multiple {
1177 0     0 0 0 my ($self, @fidids) = @_;
1178 0         0 my $in = join(",", map { $_+0 } @fidids);
  0         0  
1179 0         0 my $ret = {};
1180 0         0 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1181 0         0 $sth->execute;
1182 0         0 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1183 0   0     0 push @{$ret->{$fidid} ||= []}, $devid;
  0         0  
1184             }
1185 0         0 return $ret;
1186             }
1187              
1188             # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1189             sub tempfile_row_from_fid {
1190 0     0 0 0 my ($self, $fidid) = @_;
1191 0         0 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1192             "FROM tempfile WHERE fid=?",
1193             undef, $fidid);
1194             }
1195              
1196             # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1197             sub create_device {
1198 0     0 0 0 my ($self, $devid, $hostid, $status) = @_;
1199             my $rv = $self->conddup(sub {
1200 0     0   0 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1201             $devid, $hostid, $status);
1202 0         0 });
1203 0         0 $self->condthrow;
1204 0 0       0 die "error making device $devid\n" unless $rv > 0;
1205 0         0 return 1;
1206             }
1207              
1208             sub update_device {
1209 0     0 0 0 my ($self, $devid, $to_update) = @_;
1210 0         0 my @keys = sort keys %$to_update;
1211 0 0       0 return unless @keys;
1212             $self->conddup(sub {
1213 0         0 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1214 0     0   0 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1215             $devid);
1216 0         0 });
1217 0         0 return 1;
1218             }
1219              
1220             sub update_device_usage {
1221 0     0 0 0 my $self = shift;
1222 0         0 my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
1223 0         0 eval {
1224 0         0 $self->dbh->do("UPDATE device SET ".
1225             "mb_total = ?, mb_used = ?, mb_asof = ?" .
1226             " WHERE devid = ?",
1227             undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
1228             $arg{devid});
1229             };
1230 0         0 $self->condthrow;
1231             }
1232              
1233             # MySQL has an optimized version
1234             sub update_device_usages {
1235 0     0 0 0 my ($self, $updates, $cb) = @_;
1236 0         0 foreach my $upd (@$updates) {
1237 0         0 $self->update_device_usage(%$upd);
1238 0         0 $cb->();
1239             }
1240             }
1241              
1242             # This is unimplemented at the moment as we must verify:
1243             # - no file_on rows exist
1244             # - nothing in file_to_queue is going to attempt to use it
1245             # - nothing in file_to_replicate is going to attempt to use it
1246             # - it's already been marked dead
1247             # - that all trackers are likely to know this :/
1248             # - ensure the devid can't be reused
1249             # IE; the user can't mark it dead then remove it all at once and cause their
1250             # cluster to implode.
1251             sub delete_device {
1252 0     0 0 0 die "Unimplemented; needs further testing";
1253             }
1254              
1255             sub set_device_weight {
1256 0     0 0 0 my ($self, $devid, $weight) = @_;
1257 0         0 eval {
1258 0         0 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1259             };
1260 0         0 $self->condthrow;
1261             }
1262              
1263             sub set_device_state {
1264 0     0 0 0 my ($self, $devid, $state) = @_;
1265 0         0 eval {
1266 0         0 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1267             };
1268 0         0 $self->condthrow;
1269             }
1270              
1271             sub delete_class {
1272 0     0 0 0 my ($self, $dmid, $cid) = @_;
1273 0 0       0 throw("has_files") if $self->class_has_files($dmid, $cid);
1274 0         0 eval {
1275 0         0 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1276             };
1277 0         0 $self->condthrow;
1278             }
1279              
1280             # called from a queryworker process, will trigger delete_fidid_enqueued
1281             # in the delete worker
1282             sub delete_fidid {
1283 0     0 0 0 my ($self, $fidid) = @_;
1284 0         0 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
  0         0  
1285 0         0 $self->condthrow;
1286 0         0 $self->enqueue_for_delete2($fidid, 0);
1287 0         0 $self->condthrow;
1288             }
1289              
1290             # Only called from delete workers (after delete_fidid),
1291             # this reduces client-visible latency from the queryworker
1292             sub delete_fidid_enqueued {
1293 0     0 0 0 my ($self, $fidid) = @_;
1294 0         0 eval { $self->delete_checksum($fidid); };
  0         0  
1295 0         0 $self->condthrow;
1296 0         0 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1297 0         0 $self->condthrow;
1298             }
1299              
1300             sub delete_tempfile_row {
1301 0     0 0 0 my ($self, $fidid) = @_;
1302 0         0 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1303 0         0 $self->condthrow;
1304 0         0 return $rv;
1305             }
1306              
1307             # Load the specified tempfile, then delete it. If we succeed, we were
1308             # here first; otherwise, someone else beat us here (and we return undef)
1309             sub delete_and_return_tempfile_row {
1310 0     0 0 0 my ($self, $fidid) = @_;
1311 0         0 my $rv = $self->tempfile_row_from_fid($fidid);
1312 0         0 my $rows_deleted = $self->delete_tempfile_row($fidid);
1313 0 0       0 return $rv if ($rows_deleted > 0);
1314             }
1315              
1316             sub replace_into_file {
1317 0     0 0 0 my $self = shift;
1318 0         0 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1319 0 0       0 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1320 0         0 eval {
1321 0         0 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1322             "VALUES (?,?,?,?,?,?) ", undef,
1323             @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1324             };
1325 0         0 $self->condthrow;
1326             }
1327              
1328             # returns 1 on success, 0 on duplicate key error, dies on exception
1329             # TODO: need a test to hit the duplicate name error condition
1330             # TODO: switch to using "dup" exception here?
1331             sub rename_file {
1332 0     0 0 0 my ($self, $fidid, $to_key) = @_;
1333 0         0 my $dbh = $self->dbh;
1334 0         0 eval {
1335 0         0 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1336             undef, $to_key, $fidid);
1337             };
1338 0 0 0     0 if ($@ || $dbh->err) {
1339             # first is MySQL's error code for duplicates
1340 0 0       0 if ($self->was_duplicate_error) {
1341 0         0 return 0;
1342             } else {
1343 0         0 die $@;
1344             }
1345             }
1346 0         0 $self->condthrow;
1347 0         0 return 1;
1348             }
1349              
1350             sub get_domainid_by_name {
1351 0     0 0 0 my $self = shift;
1352 0         0 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1353             undef, $_[0]);
1354 0         0 return $dmid;
1355             }
1356              
1357             # returns a hash of domains. Key is namespace, value is dmid.
1358             sub get_all_domains {
1359 0     0 0 0 my ($self) = @_;
1360 0         0 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1361 0 0       0 return map { ($_->[0], $_->[1]) } @{$domains || []};
  0         0  
  0         0  
1362             }
1363              
1364             sub get_classid_by_name {
1365 0     0 0 0 my $self = shift;
1366 0         0 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1367             undef, $_[0], $_[1]);
1368 0         0 return $classid;
1369             }
1370              
1371             # returns an array of hashrefs, one hashref per row in the 'class' table
1372             sub get_all_classes {
1373 0     0 0 0 my ($self) = @_;
1374 0         0 my (@ret, $row);
1375              
1376 0         0 my @cols = qw/dmid classid classname mindevcount/;
1377 0 0       0 if ($self->cached_schema_version >= 10) {
1378 0         0 push @cols, 'replpolicy';
1379 0 0       0 if ($self->cached_schema_version >= 15) {
1380 0         0 push @cols, 'hashtype';
1381             }
1382             }
1383 0         0 my $cols = join(', ', @cols);
1384 0         0 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1385 0         0 $sth->execute;
1386 0         0 push @ret, $row while $row = $sth->fetchrow_hashref;
1387 0         0 return @ret;
1388             }
1389              
1390             # add a record of fidid existing on devid
1391             # returns 1 on success, 0 on duplicate
1392             sub add_fidid_to_devid {
1393 0     0 0 0 my ($self, $fidid, $devid) = @_;
1394 0 0       0 croak("fidid not non-zero") unless $fidid;
1395 0 0       0 croak("devid not non-zero") unless $devid;
1396              
1397             # TODO: This should possibly be insert_ignore instead
1398             # As if we are adding an extra file_on entry, we do not want to replace the
1399             # exist one. Check REPLACE semantics.
1400 0         0 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1401             undef, $fidid, $devid);
1402 0 0       0 return 1 if $rv > 0;
1403 0         0 return 0;
1404             }
1405              
1406             # remove a record of fidid existing on devid
1407             # returns 1 on success, 0 if not there anyway
1408             sub remove_fidid_from_devid {
1409 0     0 0 0 my ($self, $fidid, $devid) = @_;
1410 0         0 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
  0         0  
1411             undef, $fidid, $devid); };
1412 0         0 $self->condthrow;
1413 0         0 return $rv;
1414             }
1415              
1416             # Test if host exists.
1417             sub get_hostid_by_id {
1418 0     0 0 0 my $self = shift;
1419 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1420             undef, $_[0]);
1421 0         0 return $hostid;
1422             }
1423              
1424             sub get_hostid_by_name {
1425 0     0 0 0 my $self = shift;
1426 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1427             undef, $_[0]);
1428 0         0 return $hostid;
1429             }
1430              
1431             # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1432             sub get_all_hosts {
1433 0     0 0 0 my ($self) = @_;
1434 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1435             "hostip, http_port, http_get_port, altip, altmask FROM host");
1436 0         0 $sth->execute;
1437 0         0 my @ret;
1438 0         0 while (my $row = $sth->fetchrow_hashref) {
1439 0         0 push @ret, $row;
1440             }
1441 0         0 return @ret;
1442             }
1443              
1444             # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1445             sub get_all_devices {
1446 0     0 0 0 my ($self) = @_;
1447 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1448             "mb_used, mb_asof, status, weight FROM device");
1449 0         0 $self->condthrow;
1450 0         0 $sth->execute;
1451 0         0 my @return;
1452 0         0 while (my $row = $sth->fetchrow_hashref) {
1453 0         0 push @return, $row;
1454             }
1455 0         0 return @return;
1456             }
1457              
1458             # update the device count for a given fidid
1459             sub update_devcount {
1460 0     0 0 0 my ($self, $fidid) = @_;
1461 0         0 my $dbh = $self->dbh;
1462 0         0 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1463             undef, $fidid);
1464              
1465 0         0 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
  0         0  
1466             $ct, $fidid); };
1467 0         0 $self->condthrow;
1468              
1469 0         0 return 1;
1470             }
1471              
1472             # update the classid for a given fidid
1473             sub update_classid {
1474 0     0 0 0 my ($self, $fidid, $classid) = @_;
1475 0         0 my $dbh = $self->dbh;
1476              
1477 0         0 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1478             $classid, $fidid);
1479              
1480 0         0 $self->condthrow;
1481 0         0 return 1;
1482             }
1483              
1484             # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1485             sub enqueue_for_replication {
1486 0     0 0 0 my ($self, $fidid, $from_devid, $in) = @_;
1487              
1488 0         0 my $nexttry = 0;
1489 0 0       0 if ($in) {
1490 0         0 $nexttry = $self->unix_timestamp . " + " . int($in);
1491             }
1492              
1493             $self->retry_on_deadlock(sub {
1494 0     0   0 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1495             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1496 0         0 });
1497             }
1498              
1499             # enqueue a fidid for delete
1500             # note: if we get one more "independent" queue like this, the
1501             # code should be collapsable? I tried once and it looked too ugly, so we have
1502             # some redundancy.
1503             sub enqueue_for_delete2 {
1504 0     0 0 0 my ($self, $fidid, $in) = @_;
1505              
1506 0 0       0 $in = 0 unless $in;
1507 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1508              
1509             $self->retry_on_deadlock(sub {
1510 0     0   0 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1511             "VALUES (?,$nexttry)", undef, $fidid);
1512 0         0 });
1513             }
1514              
1515             # enqueue a fidid for work
1516             sub enqueue_for_todo {
1517 0     0 0 0 my ($self, $fidid, $type, $in) = @_;
1518              
1519 0 0       0 $in = 0 unless $in;
1520 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1521              
1522             $self->retry_on_deadlock(sub {
1523 0 0   0   0 if (ref($fidid)) {
1524 0         0 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1525             "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1526             $fidid->[0], $fidid->[1], $fidid->[2], $type);
1527             } else {
1528 0         0 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1529             "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1530             }
1531 0         0 });
1532             }
1533              
1534             # return 1 on success. die otherwise.
1535             sub enqueue_many_for_todo {
1536 0     0 0 0 my ($self, $fidids, $type, $in) = @_;
1537 0 0 0     0 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1538 0         0 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1539 0         0 return 1;
1540             }
1541              
1542 0 0       0 $in = 0 unless $in;
1543 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1544              
1545             # TODO: convert to prepared statement?
1546             $self->retry_on_deadlock(sub {
1547 0 0   0   0 if (ref($fidids->[0]) eq 'ARRAY') {
1548 0         0 my $sql = $self->ignore_replace .
1549             "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1550             join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1551 0         0 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
  0         0  
1552             } else {
1553 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1554             nexttry) VALUES " .
1555 0         0 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1556             }
1557 0         0 });
1558 0         0 $self->condthrow;
1559             }
1560              
1561             # For file_to_queue queues that should be kept small, find the size.
1562             # This isn't fast, but for small queues won't be slow, and is usually only ran
1563             # from a single tracker.
1564             sub file_queue_length {
1565 0     0 0 0 my $self = shift;
1566 0         0 my $type = shift;
1567              
1568 0         0 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1569             "WHERE type = ?", undef, $type);
1570             }
1571              
1572             # reschedule all deferred replication, return number rescheduled
1573             sub replicate_now {
1574 0     0 0 0 my ($self) = @_;
1575              
1576             $self->retry_on_deadlock(sub {
1577 0     0   0 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1578             " WHERE nexttry > " . $self->unix_timestamp);
1579 0         0 });
1580             }
1581              
1582             # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1583             sub get_fidids_by_device {
1584 0     0 0 0 my ($self, $devid, $limit) = @_;
1585              
1586 0         0 my $dbh = $self->dbh;
1587 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1588             undef, $devid);
1589 0         0 return $fidids;
1590             }
1591              
1592             # finds a chunk of fids given a set of constraints:
1593             # devid, fidid, age (new or old), limit
1594             # Note that if this function is very slow on your large DB, you're likely
1595             # sorting by "newfiles" and are missing a new index.
1596             # returns an arrayref of fidids
1597             sub get_fidid_chunks_by_device {
1598 0     0 0 0 my ($self, %o) = @_;
1599              
1600 0         0 my $dbh = $self->dbh;
1601 0         0 my $devid = delete $o{devid};
1602 0 0       0 croak("must supply at least a devid") unless $devid;
1603 0         0 my $age = delete $o{age};
1604 0         0 my $fidid = delete $o{fidid};
1605 0         0 my $limit = delete $o{limit};
1606 0 0       0 croak("invalid options: " . join(', ', keys %o)) if %o;
1607             # If supplied a "previous" fidid, we're paging through.
1608 0         0 my $fidsort = '';
1609 0         0 my $order = '';
1610 0   0     0 $age ||= 'old';
1611 0 0       0 if ($age eq 'old') {
    0          
1612 0 0       0 $fidsort = 'AND fid > ?' if $fidid;
1613 0         0 $order = 'ASC';
1614             } elsif ($age eq 'new') {
1615 0 0       0 $fidsort = 'AND fid < ?' if $fidid;
1616 0         0 $order = 'DESC';
1617             } else {
1618 0         0 croak("invalid age argument: " . $age);
1619             }
1620 0   0     0 $limit ||= 100;
1621 0         0 my @extra = ();
1622 0 0       0 push @extra, $fidid if $fidid;
1623              
1624 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1625             $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1626 0         0 return $fidids;
1627             }
1628              
1629             # gets fidids above fidid_low up to (and including) fidid_high
1630             sub get_fidids_between {
1631 0     0 0 0 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1632 0   0     0 $limit ||= 1000;
1633 0         0 $limit = int($limit);
1634              
1635 0         0 my $dbh = $self->dbh;
1636 0         0 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1637             WHERE fid > ? and fid <= ?
1638             ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1639 0         0 return $fidids;
1640             }
1641              
1642             # creates a new domain, given a domain namespace string. return the dmid on success,
1643             # throw 'dup' on duplicate name.
1644             # override if you want a less racy version.
1645             sub create_domain {
1646 0     0 0 0 my ($self, $name) = @_;
1647 0         0 my $dbh = $self->dbh;
1648              
1649             # get the max domain id
1650 0   0     0 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1651 0         0 my $rv = eval {
1652 0         0 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1653             undef, $maxid + 1, $name);
1654             };
1655 0 0       0 if ($self->was_duplicate_error) {
1656 0         0 throw("dup");
1657             }
1658 0 0       0 return $maxid+1 if $rv;
1659 0         0 die "failed to make domain"; # FIXME: the above is racy.
1660             }
1661              
1662             sub update_host {
1663 0     0 0 0 my ($self, $hid, $to_update) = @_;
1664 0         0 my @keys = sort keys %$to_update;
1665 0 0       0 return unless @keys;
1666             $self->conddup(sub {
1667 0         0 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1668 0     0   0 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1669             $hid);
1670 0         0 });
1671 0         0 return 1;
1672             }
1673              
1674             # return ne hostid, or throw 'dup' on error.
1675             # NOTE: you need to put them into the initial 'down' state.
1676             sub create_host {
1677 0     0 0 0 my ($self, $hostname, $ip) = @_;
1678 0         0 my $dbh = $self->dbh;
1679             # racy! lazy. no, better: portable! how often does this happen? :)
1680 0   0     0 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1681             my $rv = $self->conddup(sub {
1682 0     0   0 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1683             "VALUES (?, ?, ?, 'down')",
1684             undef, $hid, $hostname, $ip);
1685 0         0 });
1686 0 0       0 return $hid if $rv;
1687 0         0 die "db failure";
1688             }
1689              
1690             # return array of row hashrefs containing columns: (fid, fromdevid,
1691             # failcount, flags, nexttry)
1692             sub files_to_replicate {
1693 0     0 0 0 my ($self, $limit) = @_;
1694 0         0 my $ut = $self->unix_timestamp;
1695 0 0       0 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1696             SELECT fid, fromdevid, failcount, flags, nexttry
1697             FROM file_to_replicate
1698             WHERE nexttry <= $ut
1699             ORDER BY nexttry
1700             LIMIT $limit
1701             }, "fid") or return ();
1702 0         0 return values %$to_repl_map;
1703             }
1704              
1705             # "new" style queue consumption code.
1706             # from within a transaction, fetch a limit of fids,
1707             # then update each fid's nexttry to be off in the future,
1708             # giving local workers some time to dequeue the items.
1709             # Note:
1710             # DBI (even with RaiseError) returns weird errors on
1711             # deadlocks from selectall_hashref. So we can't do that.
1712             # we also used to retry on deadlock within the routine,
1713             # but instead lets return undef and let job_master retry.
1714             sub grab_queue_chunk {
1715 0     0 0 0 my $self = shift;
1716 0         0 my $queue = shift;
1717 0         0 my $limit = shift;
1718 0         0 my $extfields = shift;
1719              
1720 0         0 my $dbh = $self->dbh;
1721 0         0 my $tries = 3;
1722 0         0 my $work;
1723              
1724 0 0       0 return 0 unless $self->lock_queue($queue);
1725              
1726 0   0     0 my $extwhere = shift || '';
1727 0         0 my $fields = 'fid, nexttry, failcount';
1728 0 0       0 $fields .= ', ' . $extfields if $extfields;
1729 0         0 eval {
1730 0         0 $dbh->begin_work;
1731 0         0 my $ut = $self->unix_timestamp;
1732 0         0 my $query = qq{
1733             SELECT $fields
1734             FROM $queue
1735             WHERE nexttry <= $ut
1736             $extwhere
1737             ORDER BY nexttry
1738             LIMIT $limit
1739             };
1740 0 0       0 $query .= "FOR UPDATE\n" if $self->can_for_update;
1741 0         0 my $sth = $dbh->prepare($query);
1742 0         0 $sth->execute;
1743 0         0 $work = $sth->fetchall_hashref('fid');
1744             # Nothing to work on.
1745             # Now claim the fids for a while.
1746             # TODO: Should be configurable... but not necessary.
1747 0         0 my $fidlist = join(',', keys %$work);
1748 0 0       0 unless ($fidlist) { $dbh->commit; return; }
  0         0  
  0         0  
1749 0         0 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1750 0         0 $dbh->commit;
1751             };
1752 0 0       0 if ($self->was_deadlock_error) {
1753 0         0 eval { $dbh->rollback };
  0         0  
1754 0         0 $work = undef;
1755             } else {
1756 0         0 $self->condthrow;
1757             }
1758             # FIXME: Super extra paranoia to prevent deadlocking.
1759             # Need to handle or die on all errors above, but $@ can get reset. For now
1760             # we'll just always ensure there's no transaction running at the end here.
1761             # A (near) release should figure the error detection correctly.
1762 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
1763 0         0 $self->unlock_queue($queue);
1764              
1765 0 0       0 return defined $work ? values %$work : ();
1766             }
1767              
1768             sub grab_files_to_replicate {
1769 0     0 0 0 my ($self, $limit) = @_;
1770 0         0 return $self->grab_queue_chunk('file_to_replicate', $limit,
1771             'fromdevid, flags');
1772             }
1773              
1774             sub grab_files_to_delete2 {
1775 0     0 0 0 my ($self, $limit) = @_;
1776 0         0 return $self->grab_queue_chunk('file_to_delete2', $limit);
1777             }
1778              
1779             # $extwhere is ugly... but should be fine.
1780             sub grab_files_to_queued {
1781 0     0 0 0 my ($self, $type, $what, $limit) = @_;
1782 0   0     0 $what ||= 'type, flags';
1783 0         0 return $self->grab_queue_chunk('file_to_queue', $limit,
1784             $what, 'AND type = ' . $type);
1785             }
1786              
1787             # although it's safe to have multiple tracker hosts and/or processes
1788             # replicating the same file, around, it's inefficient CPU/time-wise,
1789             # and it's also possible they pick different places and waste disk.
1790             # so the replicator asks the store interface when it's about to start
1791             # and when it's done replicating a fidid, so you can do something smart
1792             # and tell it not to.
1793             sub should_begin_replicating_fidid {
1794 0     0 0 0 my ($self, $fidid) = @_;
1795 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1796 0 0       0 return 1 if $self->get_lock($lockname, 1);
1797 0         0 return 0;
1798             }
1799              
1800             # called when replicator is done replicating a fid, so you can cleanup
1801             # whatever you did in 'should_begin_replicating_fidid' above.
1802             #
1803             # NOTE: there's a theoretical race condition in the rebalance code,
1804             # where (without locking as provided by
1805             # should_begin_replicating_fidid/note_done_replicating), all copies of
1806             # a file can be deleted by independent replicators doing rebalancing
1807             # in different ways. so you'll probably want to implement some
1808             # locking in this pair of functions.
1809             sub note_done_replicating {
1810 0     0 0 0 my ($self, $fidid) = @_;
1811 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1812 0         0 $self->release_lock($lockname);
1813             }
1814              
1815             sub find_fid_from_file_to_replicate {
1816 0     0 0 0 my ($self, $fidid) = @_;
1817 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1818             undef, $fidid);
1819             }
1820              
1821             sub find_fid_from_file_to_delete2 {
1822 0     0 0 0 my ($self, $fidid) = @_;
1823 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1824             undef, $fidid);
1825             }
1826              
1827             sub find_fid_from_file_to_queue {
1828 0     0 0 0 my ($self, $fidid, $type) = @_;
1829 0         0 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1830             undef, $fidid, $type);
1831             }
1832              
1833             sub delete_fid_from_file_to_replicate {
1834 0     0 0 0 my ($self, $fidid) = @_;
1835             $self->retry_on_deadlock(sub {
1836 0     0   0 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1837 0         0 });
1838             }
1839              
1840             sub delete_fid_from_file_to_queue {
1841 0     0 0 0 my ($self, $fidid, $type) = @_;
1842             $self->retry_on_deadlock(sub {
1843 0     0   0 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1844             undef, $fidid, $type);
1845 0         0 });
1846             }
1847              
1848             sub delete_fid_from_file_to_delete2 {
1849 0     0 0 0 my ($self, $fidid) = @_;
1850             $self->retry_on_deadlock(sub {
1851 0     0   0 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1852 0         0 });
1853             }
1854              
1855             sub reschedule_file_to_replicate_absolute {
1856 0     0 0 0 my ($self, $fid, $abstime) = @_;
1857             $self->retry_on_deadlock(sub {
1858 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1859             undef, $abstime, $fid);
1860 0         0 });
1861             }
1862              
1863             sub reschedule_file_to_replicate_relative {
1864 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1865             $self->retry_on_deadlock(sub {
1866 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1867             "failcount = failcount + 1 WHERE fid = ?",
1868             undef, $in_n_secs, $fid);
1869 0         0 });
1870             }
1871              
1872             sub reschedule_file_to_delete2_absolute {
1873 0     0 0 0 my ($self, $fid, $abstime) = @_;
1874             $self->retry_on_deadlock(sub {
1875 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1876             undef, $abstime, $fid);
1877 0         0 });
1878             }
1879              
1880             sub reschedule_file_to_delete2_relative {
1881 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1882             $self->retry_on_deadlock(sub {
1883 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1884             "failcount = failcount + 1 WHERE fid = ?",
1885             undef, $in_n_secs, $fid);
1886 0         0 });
1887             }
1888              
1889             # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1890             # table.
1891             sub get_keys_like {
1892 0     0 0 0 my ($self, $dmid, $prefix, $after, $limit) = @_;
1893             # fix the input... prefix always ends with a % so that it works
1894             # in a LIKE call, and after is either blank or something
1895 0 0       0 $prefix = '' unless defined $prefix;
1896              
1897             # escape underscores, % and \
1898 0         0 $prefix =~ s/([%\\_])/\\$1/g;
1899              
1900 0         0 $prefix .= '%';
1901 0 0       0 $after = '' unless defined $after;
1902              
1903 0         0 my $like = $self->get_keys_like_operator;
1904              
1905             # now select out our keys
1906 0         0 return $self->dbh->selectcol_arrayref
1907             ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1908             "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1909             }
1910              
1911 0     0 0 0 sub get_keys_like_operator { return "LIKE"; }
1912              
1913             # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1914             # that were created $secs_ago seconds ago or older.
1915             sub old_tempfiles {
1916 0     0 0 0 my ($self, $secs_old) = @_;
1917 0         0 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1918             "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1919             }
1920              
1921             # given an array of MogileFS::DevFID objects, mass-insert them all
1922             # into file_on (ignoring if they're already present)
1923             sub mass_insert_file_on {
1924 0     0 0 0 my ($self, @devfids) = @_;
1925 0 0       0 return 1 unless @devfids;
1926              
1927 0 0 0     0 if (@devfids > 1 && ! $self->can_insert_multi) {
1928 0         0 $self->mass_insert_file_on($_) foreach @devfids;
1929 0         0 return 1;
1930             }
1931              
1932 0         0 my (@qmarks, @binds);
1933 0         0 foreach my $df (@devfids) {
1934 0         0 my ($fidid, $devid) = ($df->fidid, $df->devid);
1935 0 0       0 Carp::croak("got a false fidid") unless $fidid;
1936 0 0       0 Carp::croak("got a false devid") unless $devid;
1937 0         0 push @binds, $fidid, $devid;
1938 0         0 push @qmarks, "(?,?)";
1939             }
1940              
1941             # TODO: This should possibly be insert_ignore instead
1942             # As if we are adding an extra file_on entry, we do not want to replace the
1943             # exist one. Check REPLACE semantics.
1944 0         0 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1945 0         0 return 1;
1946             }
1947              
1948             sub set_schema_vesion {
1949 0     0 0 0 my ($self, $ver) = @_;
1950 0         0 $self->set_server_setting("schema_version", int($ver));
1951             }
1952              
1953             # returns array of fidids to try and delete again
1954             sub fids_to_delete_again {
1955 0     0 0 0 my $self = shift;
1956 0         0 my $ut = $self->unix_timestamp;
1957 0 0       0 return @{ $self->dbh->selectcol_arrayref(qq{
  0         0  
1958             SELECT fid
1959             FROM file_to_delete_later
1960             WHERE delafter < $ut
1961             LIMIT 500
1962             }) || [] };
1963             }
1964              
1965             # return 1 on success. die otherwise.
1966             sub enqueue_fids_to_delete {
1967 0     0 0 0 my ($self, @fidids) = @_;
1968             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1969             # when the first row causes the duplicate error, and the remaining rows are
1970             # not processed.
1971 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1972 0         0 $self->enqueue_fids_to_delete($_) foreach @fidids;
1973 0         0 return 1;
1974             }
1975             # TODO: convert to prepared statement?
1976             $self->retry_on_deadlock(sub {
1977 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1978 0     0   0 join(",", map { "(" . int($_) . ")" } @fidids));
1979 0         0 });
1980 0         0 $self->condthrow;
1981             }
1982              
1983             sub enqueue_fids_to_delete2 {
1984 0     0 0 0 my ($self, @fidids) = @_;
1985             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1986             # when the first row causes the duplicate error, and the remaining rows are
1987             # not processed.
1988 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1989 0         0 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1990 0         0 return 1;
1991             }
1992              
1993 0         0 my $nexttry = $self->unix_timestamp;
1994              
1995             # TODO: convert to prepared statement?
1996             $self->retry_on_deadlock(sub {
1997 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
1998             nexttry) VALUES " .
1999 0     0   0 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
2000 0         0 });
2001 0         0 $self->condthrow;
2002             }
2003              
2004             # clears everything from the fsck_log table
2005             # return 1 on success. die otherwise.
2006             sub clear_fsck_log {
2007 0     0 0 0 my $self = shift;
2008 0         0 $self->dbh->do("DELETE FROM fsck_log");
2009 0         0 return 1;
2010             }
2011              
2012             # FIXME: Fsck log entries are processed a little out of order.
2013             # Once a fsck has completed, the log should be re-summarized.
2014             sub fsck_log_summarize {
2015 0     0 0 0 my $self = shift;
2016              
2017 0         0 my $lockname = 'mgfs:fscksum';
2018 0         0 my $lock = eval { $self->get_lock($lockname, 10) };
  0         0  
2019 0 0 0     0 return 0 if defined $lock && $lock == 0;
2020              
2021 0         0 my $logid = $self->max_fsck_logid;
2022              
2023             # sum-up evcode counts every so often, to make fsck_status faster,
2024             # avoiding a potentially-huge GROUP BY in the future..
2025 0   0     0 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2026             # both inclusive:
2027 0   0     0 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2028 0         0 $min_logid++;
2029 0         0 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
2030 0         0 while (my ($evcode, $ct) = each %$cts) {
2031 0         0 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2032             }
2033 0         0 $self->set_server_setting("fsck_logid_processed", $logid);
2034              
2035 0 0       0 $self->release_lock($lockname) if $lock;
2036             }
2037              
2038             sub fsck_log {
2039 0     0 0 0 my ($self, %opts) = @_;
2040 0         0 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2041             "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2042             undef,
2043             delete $opts{fid},
2044             delete $opts{code},
2045             delete $opts{devid});
2046 0 0       0 croak("Unknown opts") if %opts;
2047 0         0 $self->condthrow;
2048              
2049 0         0 return 1;
2050             }
2051              
2052             sub get_db_unixtime {
2053 0     0 0 0 my $self = shift;
2054 0         0 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2055             }
2056              
2057             sub max_fidid {
2058 0     0 0 0 my $self = shift;
2059 0         0 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2060             }
2061              
2062             sub max_fsck_logid {
2063 0     0 0 0 my $self = shift;
2064 0   0     0 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2065             }
2066              
2067             # returns array of $row hashrefs, from fsck_log table
2068             sub fsck_log_rows {
2069 0     0 0 0 my ($self, $after_logid, $limit) = @_;
2070 0   0     0 $limit = int($limit || 100);
2071 0   0     0 $after_logid = int($after_logid || 0);
2072              
2073 0         0 my @rows;
2074 0         0 my $sth = $self->dbh->prepare(qq{
2075             SELECT logid, utime, fid, evcode, devid
2076             FROM fsck_log
2077             WHERE logid > ?
2078             ORDER BY logid
2079             LIMIT $limit
2080             });
2081 0         0 $sth->execute($after_logid);
2082 0         0 my $row;
2083 0         0 push @rows, $row while $row = $sth->fetchrow_hashref;
2084 0         0 return @rows;
2085             }
2086              
2087             sub fsck_evcode_counts {
2088 0     0 0 0 my ($self, %opts) = @_;
2089 0         0 my $timegte = delete $opts{time_gte};
2090 0         0 my $logr = delete $opts{logid_range};
2091 0 0       0 die if %opts;
2092              
2093 0         0 my $ret = {};
2094 0         0 my $sth;
2095 0 0       0 if ($timegte) {
2096 0         0 $sth = $self->dbh->prepare(qq{
2097             SELECT evcode, COUNT(*) FROM fsck_log
2098             WHERE utime >= ?
2099             GROUP BY evcode
2100             });
2101 0   0     0 $sth->execute($timegte||0);
2102             }
2103 0 0       0 if ($logr) {
2104 0         0 $sth = $self->dbh->prepare(qq{
2105             SELECT evcode, COUNT(*) FROM fsck_log
2106             WHERE logid >= ? AND logid <= ?
2107             GROUP BY evcode
2108             });
2109 0         0 $sth->execute($logr->[0], $logr->[1]);
2110             }
2111 0         0 while (my ($ev, $ct) = $sth->fetchrow_array) {
2112 0         0 $ret->{$ev} = $ct;
2113             }
2114 0         0 return $ret;
2115             }
2116              
2117             # run before daemonizing. you can die from here if you see something's amiss. or emit
2118             # warnings.
2119             sub pre_daemonize_checks {
2120 0     0 0 0 my $self = shift;
2121              
2122 0         0 $self->pre_daemonize_check_slaves;
2123             }
2124              
2125             sub pre_daemonize_check_slaves {
2126 0 0   0 0 0 my $sk = MogileFS::Config->server_setting('slave_keys')
2127             or return;
2128              
2129 0         0 my @slaves;
2130 0         0 foreach my $key (split /\s*,\s*/, $sk) {
2131 0         0 my $slave = MogileFS::Config->server_setting("slave_$key");
2132              
2133 0 0       0 if (!$slave) {
2134 0         0 error("key for slave DB config: slave_$key not found in configuration");
2135 0         0 next;
2136             }
2137              
2138 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
2139 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
2140 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2141 0         0 next;
2142             }
2143 0         0 push @slaves, [$dsn, $user, $pass]
2144             }
2145              
2146 0 0       0 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2147              
2148 0         0 MogileFS::run_global_hook('slave_list_check', \@slaves);
2149             }
2150              
2151              
2152             # attempt to grab a lock of lockname, and timeout after timeout seconds.
2153             # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2154             sub get_lock {
2155 0     0 0 0 my ($self, $lockname, $timeout) = @_;
2156 0 0       0 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2157 0         0 die "get_lock not implemented for $self";
2158             }
2159              
2160             # attempt to release a lock of lockname.
2161             # returns 1 on success and 0 if no lock we have has that name.
2162             sub release_lock {
2163 0     0 0 0 my ($self, $lockname) = @_;
2164 0         0 die "release_lock not implemented for $self";
2165             }
2166              
2167             # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2168             # hang forever around some transactions. Use ghetto locking to cope.
2169 0     0 0 0 sub lock_queue { 1 }
2170 0     0 0 0 sub unlock_queue { 1 }
2171              
2172 0     0 0 0 sub BLOB_BIND_TYPE { undef; }
2173              
2174             sub set_checksum {
2175 0     0 0 0 my ($self, $fidid, $hashtype, $checksum) = @_;
2176 0         0 my $dbh = $self->dbh;
2177 0 0       0 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2178              
2179 0         0 eval {
2180 0         0 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2181             "(fid, hashtype, checksum) " .
2182             "VALUES (?, ?, ?)");
2183 0         0 $sth->bind_param(1, $fidid);
2184 0         0 $sth->bind_param(2, $hashtype);
2185 0         0 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2186 0         0 $sth->execute;
2187             };
2188 0         0 $self->condthrow;
2189             }
2190              
2191             sub get_checksum {
2192 0     0 0 0 my ($self, $fidid) = @_;
2193              
2194 0         0 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2195             "FROM checksum WHERE fid = ?",
2196             undef, $fidid);
2197             }
2198              
2199             sub delete_checksum {
2200 0     0 0 0 my ($self, $fidid) = @_;
2201              
2202 0         0 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2203             }
2204              
2205             # setup the value used in a 'nexttry' field to indicate that this item will
2206             # never actually be tried again and require some sort of manual intervention.
2207 21     21   249 use constant ENDOFTIME => 2147483647;
  21         49  
  21         2938  
2208              
2209 0     0 0 0 sub end_of_time { ENDOFTIME; }
2210              
2211             # returns the size of the non-urgent replication queue
2212             # nexttry == 0 - the file is urgent
2213             # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2214             sub deferred_repl_queue_length {
2215 0     0 0 0 my ($self) = @_;
2216              
2217 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2218             }
2219              
2220             1;
2221              
2222             __END__