File Coverage

blib/lib/MogileFS/Store.pm
Criterion Covered Total %
statement 37 972 3.8
branch 3 330 0.9
condition 0 121 0.0
subroutine 12 218 5.5
pod 0 181 0.0
total 52 1822 2.8


line stmt bran cond sub pod time code
1             package MogileFS::Store;
2 21     21   149 use strict;
  21         40  
  21         523  
3 21     21   87 use warnings;
  21         49  
  21         533  
4 21     21   89 use Carp qw(croak confess);
  21         53  
  21         904  
5 21     21   100 use MogileFS::Util qw(throw max error);
  21         31  
  21         719  
6 21     21   24312 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
  21         286841  
  21         1032  
7 21     21   132 use List::Util qw(shuffle);
  21         37  
  21         1500  
8              
9             # this is incremented whenever the schema changes. server will refuse
10             # to start-up with an old schema version
11             #
12             # 6: adds file_to_replicate table
13             # 7: adds file_to_delete_later table
14             # 8: adds fsck_log table
15             # 9: adds 'drain' state to enum in device table
16             # 10: adds 'replpolicy' column to 'class' table
17             # 11: adds 'file_to_queue' table
18             # 12: adds 'file_to_delete2' table
19             # 13: modifies 'server_settings.value' to TEXT for wider values
20             # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21             # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22             # 15: adds checksum table, adds 'hashtype' column to 'class' table
23             # 16: no-op, see 17
24             # 17: adds 'readonly' state to enum in host table
25 21     21   112 use constant SCHEMA_VERSION => 17;
  21         33  
  21         63105  
26              
27             sub new {
28 1     1 0 10 my ($class) = @_;
29 1         4 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
  4         11  
30             }
31              
32             sub new_from_dsn_user_pass {
33 1     1 0 3 my ($class, $dsn, $user, $pass, $max_handles) = @_;
34 1         2 my $subclass;
35 1 50       8 if ($dsn =~ /^DBI:mysql:/i) {
    50          
    0          
    0          
36 0         0 $subclass = "MogileFS::Store::MySQL";
37             } elsif ($dsn =~ /^DBI:SQLite:/i) {
38 1         2 $subclass = "MogileFS::Store::SQLite";
39             } elsif ($dsn =~ /^DBI:Oracle:/i) {
40 0         0 $subclass = "MogileFS::Store::Oracle";
41             } elsif ($dsn =~ /^DBI:Pg:/i) {
42 0         0 $subclass = "MogileFS::Store::Postgres";
43             } else {
44 0         0 die "Unknown database type: $dsn";
45             }
46 1 50   1   52 unless (eval "use $subclass; 1") {
  1         422  
  0            
  0            
47 1         10 die "Error loading $subclass: $@\n";
48             }
49 0         0 my $self = bless {
50             dsn => $dsn,
51             user => $user,
52             pass => $pass,
53             max_handles => $max_handles, # Max number of handles to allow
54             raise_errors => $subclass->want_raise_errors,
55             slave_list_version => 0,
56             slave_list_cache => [],
57             recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
58             recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
59             handles_left => 0, # amount of times this handle can still be verified
60             connected_slaves => {},
61             dead_slaves => {},
62             dead_backoff => {}, # how many times in a row a slave has died
63             connect_timeout => 10, # High default.
64             }, $subclass;
65 0         0 $self->init;
66 0         0 return $self;
67             }
68              
69             # Defaults to true now.
70             sub want_raise_errors {
71 0     0 0 0 1;
72             }
73              
74             sub new_from_mogdbsetup {
75 0     0 0 0 my ($class, %args) = @_;
76             # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
77 0         0 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
78              
79             my $try_make_sto = sub {
80             my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
81 0 0   0   0 PrintError => 0,
82             }) or return undef;
83 0         0 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
84 0         0 $sto->raise_errors;
85 0         0 return $sto;
86 0         0 };
87              
88             # upgrading, apparently, as this database already exists.
89 0         0 my $sto = $try_make_sto->();
90 0 0       0 return $sto if $sto;
91              
92             # otherwise, we need to make the requested database, setup permissions, etc
93 0         0 $class->status("couldn't connect to database as mogilefs user. trying root...");
94 0         0 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
95             my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
96 0 0       0 PrintError => 0,
97             }) or
98             die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
99 0         0 $class->status("connected to database as root user.");
100              
101 0         0 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
102 0         0 $class->create_db_if_not_exists($rdbh, $args{dbname});
103 0         0 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
104 0         0 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
105              
106             # should be ready now:
107 0         0 $sto = $try_make_sto->();
108 0 0       0 return $sto if $sto;
109              
110 0         0 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
111             }
112              
113             # given a root DBI connection, create the named database. succeed
114             # if it it's made, or already exists. die otherwise.
115             sub create_db_if_not_exists {
116 0     0 0 0 my ($pkg, $rdbh, $dbname) = @_;
117 0 0       0 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
118             or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
119             }
120              
121             sub grant_privileges {
122 0     0 0 0 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
123 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
124             undef, $pass)
125             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
126 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
127             undef, $pass)
128             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
129             }
130              
131 0     0 0 0 sub can_replace { 0 }
132 0     0 0 0 sub can_insertignore { 0 }
133 0     0 0 0 sub can_insert_multi { 0 }
134 0     0 0 0 sub can_for_update { 1 }
135              
136 0     0 0 0 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
137              
138             sub ignore_replace {
139 0     0 0 0 my $self = shift;
140 0 0       0 return "INSERT IGNORE " if $self->can_insertignore;
141 0 0       0 return "REPLACE " if $self->can_replace;
142 0         0 die "Can't INSERT IGNORE or REPLACE?";
143             }
144              
145             my $on_status = sub {};
146             my $on_confirm = sub { 1 };
147 0     0 0 0 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
  0         0  
148 0     0 0 0 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
  0         0  
149 0     0 0 0 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
  0         0  
150 0 0   0 0 0 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
  0         0  
151              
152 0     0 0 0 sub latest_schema_version { SCHEMA_VERSION }
153              
154             sub raise_errors {
155 0     0 0 0 my $self = shift;
156 0         0 $self->{raise_errors} = 1;
157 0         0 $self->dbh->{RaiseError} = 1;
158             }
159              
160 0     0 0 0 sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
161              
162 0     0 0 0 sub dsn { $_[0]{dsn} }
163 0     0 0 0 sub user { $_[0]{user} }
164 0     0 0 0 sub pass { $_[0]{pass} }
165              
166 0     0 0 0 sub connect_timeout { $_[0]{connect_timeout} }
167              
168 0     0 0 0 sub init { 1 }
169 0     0 0 0 sub post_dbi_connect { 1 }
170              
171 0     0 0 0 sub can_do_slaves { 0 }
172              
173             sub mark_as_slave {
174 0     0 0 0 my $self = shift;
175 0 0       0 die "Incapable of becoming slave." unless $self->can_do_slaves;
176              
177 0         0 $self->{is_slave} = 1;
178             }
179              
180             sub is_slave {
181 0     0 0 0 my $self = shift;
182 0         0 return $self->{is_slave};
183             }
184              
185             sub _slaves_list_changed {
186 0     0   0 my $self = shift;
187 0   0     0 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
188 0 0       0 if ($ver <= $self->{slave_list_version}) {
189 0         0 return 0;
190             }
191 0         0 $self->{slave_list_version} = $ver;
192             # Restart connections from scratch if the configuration changed.
193 0         0 $self->{connected_slaves} = {};
194 0         0 return 1;
195             }
196              
197             # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
198             sub _slaves_list {
199 0     0   0 my $self = shift;
200 0         0 my $now = time();
201              
202 0 0       0 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
203             or return ();
204              
205 0         0 my @ret;
206 0         0 foreach my $key (split /\s*,\s*/, $sk) {
207 0         0 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
208              
209 0 0       0 if (!$slave) {
210 0         0 error("key for slave DB config: slave_$key not found in configuration");
211 0         0 next;
212             }
213              
214 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
215 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
216 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
217 0         0 next;
218             }
219 0         0 push @ret, [$dsn, $user, $pass]
220             }
221              
222 0         0 return @ret;
223             }
224              
225             sub _pick_slave {
226 0     0   0 my $self = shift;
227 0         0 my @temp = shuffle keys %{$self->{connected_slaves}};
  0         0  
228 0 0       0 return unless @temp;
229 0         0 return $self->{connected_slaves}->{$temp[0]};
230             }
231              
232             sub _connect_slave {
233 0     0   0 my $self = shift;
234 0         0 my $slave_fulldsn = shift;
235 0         0 my $now = time();
236              
237 0   0     0 my $dead_retry =
238             MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
239              
240 0   0     0 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
241 0         0 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
242 0 0 0     0 return if (defined $dead_timeout
243             && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
244 0 0       0 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
245              
246 0         0 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
247 0   0     0 $newslave->set_connect_timeout(
248             MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
249 0         0 $self->{slave}->{next_check} = 0;
250 0         0 $newslave->mark_as_slave;
251 0 0       0 if ($self->check_slave) {
252 0         0 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
253 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
254             } else {
255             # Magic numbers are saddening...
256 0 0       0 $dead_backoff++ unless $dead_backoff > 20;
257 0         0 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
258 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
259             }
260             }
261              
262             sub get_slave {
263 0     0 0 0 my $self = shift;
264              
265 0 0       0 die "Incapable of having slaves." unless $self->can_do_slaves;
266              
267 0         0 $self->{slave} = undef;
268 0         0 foreach my $slave (keys %{$self->{dead_slaves}}) {
  0         0  
269 0         0 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
  0         0  
  0         0  
270 0 0       0 unless ($full_dsn) {
271 0         0 delete $self->{dead_slaves}->{$slave};
272 0         0 next;
273             }
274 0         0 $self->_connect_slave($full_dsn);
275             }
276              
277 0 0       0 unless ($self->_slaves_list_changed) {
278 0 0       0 if ($self->{slave} = $self->_pick_slave) {
279 0         0 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
280 0 0       0 return $self->{slave} if $self->check_slave;
281             }
282             }
283              
284 0 0       0 if ($self->{slave}) {
285 0         0 my $dsn = $self->{slave}->{dsn};
286 0         0 $self->{dead_slaves}->{$dsn} = time();
287 0         0 $self->{dead_backoff}->{$dsn} = 0;
288 0         0 delete $self->{connected_slaves}->{$dsn};
289 0         0 error("Error talking to slave: $dsn");
290             }
291 0         0 my @slaves_list = $self->_slaves_list;
292              
293             # If we have no slaves, then return silently.
294 0 0       0 return unless @slaves_list;
295              
296 0         0 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
297              
298 0 0 0     0 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
299 0         0 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
300             }
301              
302 0         0 $self->{slave_list_cache} = \@slaves_list;
303              
304 0         0 foreach my $slave_fulldsn (@slaves_list) {
305 0         0 $self->_connect_slave($slave_fulldsn);
306             }
307              
308 0 0       0 if ($self->{slave} = $self->_pick_slave) {
309 0         0 return $self->{slave};
310             }
311 0         0 warn "Slave list exhausted, failing back to master.";
312 0         0 return;
313             }
314              
315             sub read_store {
316 0     0 0 0 my $self = shift;
317              
318 0 0       0 return $self unless $self->can_do_slaves;
319              
320 0 0       0 if ($self->{slave_ok}) {
321 0 0       0 if (my $slave = $self->get_slave) {
322 0         0 return $slave;
323             }
324             }
325              
326 0         0 return $self;
327             }
328              
329             sub slaves_ok {
330 0     0 0 0 my $self = shift;
331 0         0 my $coderef = shift;
332              
333 0 0       0 return unless ref $coderef eq 'CODE';
334              
335 0         0 local $self->{slave_ok} = 1;
336              
337 0         0 return $coderef->(@_);
338             }
339              
340             sub recheck_dbh {
341 0     0 0 0 my $self = shift;
342 0         0 $self->{recheck_req_gen}++;
343             }
344              
345             sub dbh {
346 0     0 0 0 my $self = shift;
347            
348 0 0       0 if ($self->{dbh}) {
349 0 0       0 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
350 0 0       0 $self->{dbh} = undef unless $self->{dbh}->ping;
351             # Handles a memory leak under Solaris/Postgres.
352             # We may leak a little extra memory if we're holding a lock,
353             # since dropping a connection mid-lock is fatal
354             $self->{dbh} = undef if ($self->{max_handles} &&
355 0 0 0     0 $self->{handles_left}-- < 0 && !$self->{lock_depth});
      0        
356 0         0 $self->{recheck_done_gen} = $self->{recheck_req_gen};
357             }
358 0 0       0 return $self->{dbh} if $self->{dbh};
359             }
360              
361             # Shortcut flag: if monitor thinks the master is down, avoid attempting to
362             # connect to it for now. If we already have a connection to the master,
363             # keep using it as above.
364 0 0       0 if (!$self->is_slave) {
365 0         0 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
366 0 0 0     0 return if (defined $flag && $flag == 0);;
367             }
368              
369             # auto-reconnect is unsafe if we're holding a lock
370 0 0       0 if ($self->{lock_depth}) {
371 0         0 die "DB connection recovery unsafe, lock held: $self->{last_lock}";
372             }
373              
374 0         0 eval {
375 0     0   0 local $SIG{ALRM} = sub { die "timeout\n" };
  0         0  
376 0         0 alarm($self->connect_timeout);
377             $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
378             PrintError => 0,
379             AutoCommit => 1,
380             # FUTURE: will default to on (have to validate all callers first):
381 0   0     0 RaiseError => ($self->{raise_errors} || 0),
382             sqlite_use_immediate_transaction => 1,
383             });
384             };
385 0         0 alarm(0);
386 0 0       0 if ($@ eq "timeout\n") {
    0          
387 0         0 die "Failed to connect to database: timeout";
388             } elsif ($@) {
389 0         0 die "Failed to connect to database: " . DBI->errstr;
390             }
391 0         0 $self->post_dbi_connect;
392 0 0       0 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
393 0         0 return $self->{dbh};
394             }
395              
396 0 0   0 0 0 sub have_dbh { return 1 if $_[0]->{dbh}; }
397              
398             sub ping {
399 0     0 0 0 my $self = shift;
400 0         0 return $self->dbh->ping;
401             }
402              
403             sub condthrow {
404 0     0 0 0 my ($self, $optmsg) = @_;
405 0         0 my $dbh = $self->dbh;
406 0 0       0 return 1 unless $dbh->err;
407 0         0 my ($pkg, $fn, $line) = caller;
408 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
409 0 0       0 $msg .= ": $optmsg" if $optmsg;
410             # Auto rollback failures around transactions.
411 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
412 0         0 croak($msg);
413             }
414              
415             sub dowell {
416 0     0 0 0 my ($self, $sql, @do_params) = @_;
417 0         0 my $rv = eval { $self->dbh->do($sql, @do_params) };
  0         0  
418 0 0 0     0 return $rv unless $@ || $self->dbh->err;
419 0         0 warn "Error with SQL: $sql\n";
420 0   0     0 Carp::confess($@ || $self->dbh->errstr);
421             }
422              
423             sub _valid_params {
424 0 0   0   0 croak("Odd number of parameters!") if scalar(@_) % 2;
425 0         0 my ($self, $vlist, %uarg) = @_;
426 0         0 my %ret;
427 0         0 $ret{$_} = delete $uarg{$_} foreach @$vlist;
428 0 0       0 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
429 0         0 return %ret;
430             }
431              
432             sub was_deadlock_error {
433 0     0 0 0 my $self = shift;
434 0         0 my $dbh = $self->dbh;
435 0         0 die "UNIMPLEMENTED";
436             }
437              
438             sub was_duplicate_error {
439 0     0 0 0 my $self = shift;
440 0         0 my $dbh = $self->dbh;
441 0         0 die "UNIMPLEMENTED";
442             }
443              
444             # run a subref (presumably a database update) in an eval, because you expect it to
445             # maybe fail on duplicate key error, and throw a dup exception for you, else return
446             # its return value
447             sub conddup {
448 0     0 0 0 my ($self, $code) = @_;
449 0         0 my $rv = eval { $code->(); };
  0         0  
450 0 0       0 throw("dup") if $self->was_duplicate_error;
451 0 0       0 croak($@) if $@;
452 0         0 return $rv;
453             }
454              
455             # insert row if doesn't already exist
456             # WARNING: This function is NOT transaction safe if the duplicate errors causes
457             # your transaction to halt!
458             # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
459             # is false! Rows before the duplicate will be inserted, but rows after the
460             # duplicate might not be, depending your database.
461             sub insert_ignore {
462 0     0 0 0 my ($self, $sql, @params) = @_;
463 0         0 my $dbh = $self->dbh;
464 0 0       0 if ($self->can_insertignore) {
465 0         0 return $dbh->do("INSERT IGNORE $sql", @params);
466             } else {
467             # TODO: Detect bad multi-row insert here.
468 0         0 my $rv = eval { $dbh->do("INSERT $sql", @params); };
  0         0  
469 0 0 0     0 if ($@ || $dbh->err) {
470 0 0       0 return 1 if $self->was_duplicate_error;
471             # This chunk is identical to condthrow, but we include it directly
472             # here as we know there is definitely an error, and we would like
473             # the caller of this function.
474 0         0 my ($pkg, $fn, $line) = caller;
475 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
476 0         0 croak($msg);
477             }
478 0         0 return $rv;
479             }
480             }
481              
482             sub retry_on_deadlock {
483 0     0 0 0 my $self = shift;
484 0         0 my $code = shift;
485 0   0     0 my $tries = shift || 3;
486 0 0       0 croak("deadlock retries must be positive") if $tries < 1;
487 0         0 my $rv;
488              
489 0         0 while ($tries-- > 0) {
490 0         0 $rv = eval { $code->(); };
  0         0  
491 0 0       0 next if ($self->was_deadlock_error);
492 0 0       0 croak($@) if $@;
493 0         0 last;
494             }
495 0         0 return $rv;
496             }
497              
498             # --------------------------------------------------------------------------
499              
500             my @extra_tables;
501              
502             sub add_extra_tables {
503 0     0 0 0 my $class = shift;
504 0         0 push @extra_tables, @_;
505             }
506              
507 21         162876 use constant TABLES => qw( domain class file tempfile file_to_delete
508             unreachable_fids file_on file_on_corrupt host
509             device server_settings file_to_replicate
510             file_to_delete_later fsck_log file_to_queue
511 21     21   168 file_to_delete2 checksum);
  21         59  
512              
513             sub setup_database {
514 0     0 0 0 my $sto = shift;
515              
516 0         0 my $curver = $sto->schema_version;
517              
518 0         0 my $latestver = SCHEMA_VERSION;
519 0 0       0 if ($curver == $latestver) {
520 0         0 $sto->status("Schema already up-to-date at version $curver.");
521 0         0 return 1;
522             }
523              
524 0 0       0 if ($curver > $latestver) {
525 0         0 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
526             }
527              
528 0 0       0 if ($curver) {
529 0         0 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
530             }
531              
532 0         0 foreach my $t (TABLES, @extra_tables) {
533 0         0 $sto->create_table($t);
534             }
535              
536 0         0 $sto->upgrade_add_host_getport;
537 0         0 $sto->upgrade_add_host_altip;
538 0         0 $sto->upgrade_add_device_asof;
539 0         0 $sto->upgrade_add_device_weight;
540 0         0 $sto->upgrade_add_device_readonly;
541 0         0 $sto->upgrade_add_device_drain;
542 0         0 $sto->upgrade_add_class_replpolicy;
543 0         0 $sto->upgrade_modify_server_settings_value;
544 0         0 $sto->upgrade_add_file_to_queue_arg;
545 0         0 $sto->upgrade_modify_device_size;
546 0         0 $sto->upgrade_add_class_hashtype;
547 0         0 $sto->upgrade_add_host_readonly;
548              
549 0         0 return 1;
550             }
551              
552             sub cached_schema_version {
553 0     0 0 0 my $self = shift;
554             return $self->{_cached_schema_version} ||=
555 0   0     0 $self->schema_version;
556             }
557              
558             sub schema_version {
559 0     0 0 0 my $self = shift;
560 0         0 my $dbh = $self->dbh;
561 0   0     0 return eval {
562             $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
563             } || 0;
564             }
565              
566 0     0 0 0 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
  0         0  
567              
568             sub create_table {
569 0     0 0 0 my ($self, $table) = @_;
570 0         0 my $dbh = $self->dbh;
571 0 0       0 return 1 if $self->table_exists($table);
572 0         0 my $meth = "TABLE_$table";
573 0         0 my $sql = $self->$meth;
574 0         0 $sql = $self->filter_create_sql($sql);
575 0         0 $self->status("Running SQL: $sql;");
576 0 0       0 $dbh->do($sql) or
577             die "Failed to create table $table: " . $dbh->errstr;
578 0         0 my $imeth = "INDEXES_$table";
579 0         0 my @indexes = eval { $self->$imeth };
  0         0  
580 0         0 foreach $sql (@indexes) {
581 0         0 $self->status("Running SQL: $sql;");
582 0 0       0 $dbh->do($sql) or
583             die "Failed to create indexes on $table: " . $dbh->errstr;
584             }
585             }
586              
587             # Please try to keep all tables aligned nicely
588             # with '"CREATE TABLE' on the first line
589             # and ')"' alone on the last line.
590              
591             sub TABLE_domain {
592             # classes are tied to domains. domains can have classes of items
593             # with different mindevcounts.
594             #
595             # a minimum devcount is the number of copies the system tries to
596             # maintain for files in that class
597             #
598             # unspecified classname means classid=0 (implicit class), and that
599             # implies mindevcount=2
600 0     0 0 0 "CREATE TABLE domain (
601             dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
602             namespace VARCHAR(255),
603             UNIQUE (namespace)
604             )"
605             }
606              
607             sub TABLE_class {
608 0     0 0 0 "CREATE TABLE class (
609             dmid SMALLINT UNSIGNED NOT NULL,
610             classid TINYINT UNSIGNED NOT NULL,
611             PRIMARY KEY (dmid,classid),
612             classname VARCHAR(50),
613             UNIQUE (dmid,classname),
614             mindevcount TINYINT UNSIGNED NOT NULL,
615             hashtype TINYINT UNSIGNED
616             )"
617             }
618              
619             # the length field is only here for easy verifications of content
620             # integrity when copying around. no sums or content types or other
621             # metadata here. application can handle that.
622             #
623             # classid is what class of file this belongs to. for instance, on fotobilder
624             # there will be a class for original pictures (the ones the user uploaded)
625             # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
626             # each domain can setup classes and assign the minimum redundancy level for
627             # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
628             # photos and and a 1 minimum for derived images (which means the sole device
629             # for a derived image can die, bringing devcount to 0 for that file, but
630             # the application can recreate it from its original)
631             sub TABLE_file {
632 0     0 0 0 "CREATE TABLE file (
633             fid INT UNSIGNED NOT NULL,
634             PRIMARY KEY (fid),
635              
636             dmid SMALLINT UNSIGNED NOT NULL,
637             dkey VARCHAR(255), # domain-defined
638             UNIQUE dkey (dmid, dkey),
639              
640             length BIGINT UNSIGNED, # big limit
641              
642             classid TINYINT UNSIGNED NOT NULL,
643             devcount TINYINT UNSIGNED NOT NULL,
644             INDEX devcount (dmid,classid,devcount)
645             )"
646             }
647              
648             sub TABLE_tempfile {
649 0     0 0 0 "CREATE TABLE tempfile (
650             fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
651             PRIMARY KEY (fid),
652              
653             createtime INT UNSIGNED NOT NULL,
654             classid TINYINT UNSIGNED NOT NULL,
655             dmid SMALLINT UNSIGNED NOT NULL,
656             dkey VARCHAR(255),
657             devids VARCHAR(60)
658             )"
659             }
660              
661             # files marked for death when their key is overwritten. then they get a new
662             # fid, but since the old row (with the old fid) had to be deleted immediately,
663             # we need a place to store the fid so an async job can delete the file from
664             # all devices.
665             sub TABLE_file_to_delete {
666 0     0 0 0 "CREATE TABLE file_to_delete (
667             fid INT UNSIGNED NOT NULL,
668             PRIMARY KEY (fid)
669             )"
670             }
671              
672             # if the replicator notices that a fid has no sources, that file gets inserted
673             # into the unreachable_fids table. it is up to the application to actually
674             # handle fids stored in this table.
675             sub TABLE_unreachable_fids {
676 0     0 0 0 "CREATE TABLE unreachable_fids (
677             fid INT UNSIGNED NOT NULL,
678             lastupdate INT UNSIGNED NOT NULL,
679             PRIMARY KEY (fid),
680             INDEX (lastupdate)
681             )"
682             }
683              
684             # what files are on what devices? (most likely physical devices,
685             # as logical devices of RAID arrays would be costly, and mogilefs
686             # already handles redundancy)
687             #
688             # the devid index lets us answer "What files were on this now-dead disk?"
689             sub TABLE_file_on {
690 0     0 0 0 "CREATE TABLE file_on (
691             fid INT UNSIGNED NOT NULL,
692             devid MEDIUMINT UNSIGNED NOT NULL,
693             PRIMARY KEY (fid, devid),
694             INDEX (devid)
695             )"
696             }
697              
698             # if application or framework detects an error in one of the duplicate files
699             # for whatever reason, it can register its complaint and the framework
700             # will do some verifications and fix things up w/ an async job
701             # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
702             # on the other devices?
703             sub TABLE_file_on_corrupt {
704 0     0 0 0 "CREATE TABLE file_on_corrupt (
705             fid INT UNSIGNED NOT NULL,
706             devid MEDIUMINT UNSIGNED NOT NULL,
707             PRIMARY KEY (fid, devid)
708             )"
709             }
710              
711             # hosts (which contain devices...)
712             sub TABLE_host {
713 0     0 0 0 "CREATE TABLE host (
714             hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
715              
716             status ENUM('alive','dead','down'),
717             http_port MEDIUMINT UNSIGNED DEFAULT 7500,
718             http_get_port MEDIUMINT UNSIGNED,
719              
720             hostname VARCHAR(40),
721             hostip VARCHAR(15),
722             altip VARCHAR(15),
723             altmask VARCHAR(18),
724             UNIQUE (hostname),
725             UNIQUE (hostip),
726             UNIQUE (altip)
727             )"
728             }
729              
730             # disks...
731             sub TABLE_device {
732 0     0 0 0 "CREATE TABLE device (
733             devid MEDIUMINT UNSIGNED NOT NULL,
734             hostid MEDIUMINT UNSIGNED NOT NULL,
735              
736             status ENUM('alive','dead','down'),
737             weight MEDIUMINT DEFAULT 100,
738              
739             mb_total INT UNSIGNED,
740             mb_used INT UNSIGNED,
741             mb_asof INT UNSIGNED,
742             PRIMARY KEY (devid),
743             INDEX (status)
744             )"
745             }
746              
747             sub TABLE_server_settings {
748 0     0 0 0 "CREATE TABLE server_settings (
749             field VARCHAR(50) PRIMARY KEY,
750             value TEXT
751             )"
752             }
753              
754             sub TABLE_file_to_replicate {
755             # nexttry is time to try to replicate it next.
756             # 0 means immediate. it's only on one host.
757             # 1 means lower priority. it's on 2+ but isn't happy where it's at.
758             # unix timestamp means at/after that time. some previous error occurred.
759             # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
760             # failcount. how many times we've failed, just for doing backoff of nexttry.
761             # flags. reserved for future use.
762 0     0 0 0 "CREATE TABLE file_to_replicate (
763             fid INT UNSIGNED NOT NULL PRIMARY KEY,
764             nexttry INT UNSIGNED NOT NULL,
765             INDEX (nexttry),
766             fromdevid INT UNSIGNED,
767             failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
768             flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
769             )"
770             }
771              
772             sub TABLE_file_to_delete_later {
773 0     0 0 0 "CREATE TABLE file_to_delete_later (
774             fid INT UNSIGNED NOT NULL PRIMARY KEY,
775             delafter INT UNSIGNED NOT NULL,
776             INDEX (delafter)
777             )"
778             }
779              
780             sub TABLE_fsck_log {
781 0     0 0 0 "CREATE TABLE fsck_log (
782             logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
783             PRIMARY KEY (logid),
784             utime INT UNSIGNED NOT NULL,
785             fid INT UNSIGNED NULL,
786             evcode CHAR(4),
787             devid MEDIUMINT UNSIGNED,
788             INDEX(utime)
789             )"
790             }
791              
792             # generic queue table, designed to be used for workers/jobs which aren't
793             # constantly in use, and are async to the user.
794             # ie; fsck, drain, rebalance.
795             sub TABLE_file_to_queue {
796 0     0 0 0 "CREATE TABLE file_to_queue (
797             fid INT UNSIGNED NOT NULL,
798             devid INT UNSIGNED,
799             type TINYINT UNSIGNED NOT NULL,
800             nexttry INT UNSIGNED NOT NULL,
801             failcount TINYINT UNSIGNED NOT NULL default '0',
802             flags SMALLINT UNSIGNED NOT NULL default '0',
803             arg TEXT,
804             PRIMARY KEY (fid, type),
805             INDEX type_nexttry (type,nexttry)
806             )"
807             }
808              
809             # new style async delete table.
810             # this is separate from file_to_queue since deletes are more actively used,
811             # and partitioning on 'type' doesn't always work so well.
812             sub TABLE_file_to_delete2 {
813 0     0 0 0 "CREATE TABLE file_to_delete2 (
814             fid INT UNSIGNED NOT NULL PRIMARY KEY,
815             nexttry INT UNSIGNED NOT NULL,
816             failcount TINYINT UNSIGNED NOT NULL default '0',
817             INDEX nexttry (nexttry)
818             )"
819             }
820              
821             sub TABLE_checksum {
822 0     0 0 0 "CREATE TABLE checksum (
823             fid INT UNSIGNED NOT NULL PRIMARY KEY,
824             hashtype TINYINT UNSIGNED NOT NULL,
825             checksum VARBINARY(64) NOT NULL
826             )"
827             }
828              
829             # these five only necessary for MySQL, since no other database existed
830             # before, so they can just create the tables correctly to begin with.
831             # in the future, there might be new alters that non-MySQL databases
832             # will have to implement.
833 0     0 0 0 sub upgrade_add_host_getport { 1 }
834 0     0 0 0 sub upgrade_add_host_altip { 1 }
835 0     0 0 0 sub upgrade_add_device_asof { 1 }
836 0     0 0 0 sub upgrade_add_device_weight { 1 }
837 0     0 0 0 sub upgrade_add_device_readonly { 1 }
838 0     0 0 0 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
839 0     0 0 0 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
840 0     0 0 0 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
841 0     0 0 0 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
842              
843             sub upgrade_add_class_replpolicy {
844 0     0 0 0 my ($self) = @_;
845 0 0       0 unless ($self->column_type("class", "replpolicy")) {
846 0         0 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
847             }
848             }
849              
850             sub upgrade_add_class_hashtype {
851 0     0 0 0 my ($self) = @_;
852 0 0       0 unless ($self->column_type("class", "hashtype")) {
853 0         0 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
854             }
855             }
856              
857             # return true if deleted, 0 if didn't exist, exception if error
858             sub delete_host {
859 0     0 0 0 my ($self, $hostid) = @_;
860 0         0 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
861             }
862              
863             # return true if deleted, 0 if didn't exist, exception if error
864             sub delete_domain {
865 0     0 0 0 my ($self, $dmid) = @_;
866 0         0 my ($err, $rv);
867 0         0 my $dbh = $self->dbh;
868 0         0 eval {
869 0         0 $dbh->begin_work;
870 0 0       0 if ($self->domain_has_files($dmid)) {
    0          
871 0         0 $err = "has_files";
872             } elsif ($self->domain_has_classes($dmid)) {
873 0         0 $err = "has_classes";
874             } else {
875 0         0 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
876              
877             # remove the "default" class if one was created (for mindevcount)
878             # this is currently the only way to delete the "default" class
879 0         0 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
880 0         0 $dbh->commit;
881             }
882 0 0       0 $dbh->rollback if $err;
883             };
884 0         0 $self->condthrow; # will rollback on errors
885 0 0       0 throw($err) if $err;
886 0         0 return $rv;
887             }
888              
889             sub domain_has_files {
890 0     0 0 0 my ($self, $dmid) = @_;
891 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
892             undef, $dmid);
893 0 0       0 return $has_a_fid ? 1 : 0;
894             }
895              
896             sub domain_has_classes {
897 0     0 0 0 my ($self, $dmid) = @_;
898             # queryworker does not permit removing default class, so domain_has_classes
899             # should not register the default class
900 0         0 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
901             undef, $dmid);
902 0         0 return defined($has_a_class);
903             }
904              
905             sub class_has_files {
906 0     0 0 0 my ($self, $dmid, $clid) = @_;
907 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
908             undef, $dmid, $clid);
909 0 0       0 return $has_a_fid ? 1 : 0;
910             }
911              
912             # return new classid on success (non-zero integer), die on failure
913             # throw 'dup' on duplicate name
914             sub create_class {
915 0     0 0 0 my ($self, $dmid, $classname) = @_;
916 0         0 my $dbh = $self->dbh;
917              
918 0         0 my ($clsid, $rv);
919              
920 0         0 eval {
921 0         0 $dbh->begin_work;
922 0 0       0 if ($classname eq 'default') {
923 0         0 $clsid = 0;
924             } else {
925             # get the max class id in this domain
926 0   0     0 my $maxid = $dbh->selectrow_array
927             ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
928 0         0 $clsid = $maxid + 1;
929             }
930             # now insert the new class
931 0         0 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
932             undef, $dmid, $clsid, $classname, 2);
933 0 0       0 $dbh->commit if $rv;
934             };
935 0 0 0     0 if ($@ || $dbh->err) {
936 0 0       0 if ($self->was_duplicate_error) {
937             # ensure we're not inside a transaction
938 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
939 0         0 throw("dup");
940             }
941             }
942 0         0 $self->condthrow; # this will rollback on errors
943 0 0       0 return $clsid if $rv;
944 0         0 die;
945             }
946              
947             # return 1 on success, throw "dup" on duplicate name error, die otherwise
948             sub update_class_name {
949 0     0 0 0 my $self = shift;
950 0         0 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
951 0         0 my $rv = eval {
952             $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
953 0         0 undef, $arg{classname}, $arg{dmid}, $arg{classid});
954             };
955 0 0       0 throw("dup") if $self->was_duplicate_error;
956 0         0 $self->condthrow;
957 0         0 return 1;
958             }
959              
960             # return 1 on success, die otherwise
961             sub update_class_mindevcount {
962 0     0 0 0 my $self = shift;
963 0         0 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
964 0         0 eval {
965             $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
966 0         0 undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
967             };
968 0         0 $self->condthrow;
969 0         0 return 1;
970             }
971              
972             # return 1 on success, die otherwise
973             sub update_class_replpolicy {
974 0     0 0 0 my $self = shift;
975 0         0 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
976 0         0 eval {
977             $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
978 0         0 undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
979             };
980 0         0 $self->condthrow;
981 0         0 return 1;
982             }
983              
984             # return 1 on success, die otherwise
985             sub update_class_hashtype {
986 0     0 0 0 my $self = shift;
987 0         0 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
988 0         0 eval {
989             $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
990 0         0 undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
991             };
992 0         0 $self->condthrow;
993             }
994              
995             sub nfiles_with_dmid_classid_devcount {
996 0     0 0 0 my ($self, $dmid, $classid, $devcount) = @_;
997 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
998             undef, $dmid, $classid, $devcount);
999             }
1000              
1001             sub set_server_setting {
1002 0     0 0 0 my ($self, $key, $val) = @_;
1003 0         0 my $dbh = $self->dbh;
1004 0 0       0 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1005              
1006 0         0 eval {
1007 0 0       0 if (defined $val) {
1008 0         0 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1009             } else {
1010 0         0 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1011             }
1012             };
1013              
1014 0 0       0 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1015 0         0 return 1;
1016             }
1017              
1018             # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
1019             sub incr_server_setting {
1020 0     0 0 0 my ($self, $key, $val) = @_;
1021 0 0       0 $val = 1 unless defined $val;
1022 0 0       0 return unless $val;
1023              
1024 0 0       0 return 1 if $self->dbh->do("UPDATE server_settings ".
1025             "SET value=value+? ".
1026             "WHERE field=?", undef,
1027             $val, $key) > 0;
1028 0         0 $self->set_server_setting($key, $val);
1029             }
1030              
1031             sub server_setting {
1032 0     0 0 0 my ($self, $key) = @_;
1033 0         0 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1034             undef, $key);
1035             }
1036              
1037             sub server_settings {
1038 0     0 0 0 my ($self) = @_;
1039 0         0 my $ret = {};
1040 0         0 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1041 0         0 $sth->execute;
1042 0         0 while (my ($k, $v) = $sth->fetchrow_array) {
1043 0         0 $ret->{$k} = $v;
1044             }
1045 0         0 return $ret;
1046             }
1047              
1048             # register a tempfile and return the fidid, which should be allocated
1049             # using autoincrement/sequences if the passed in fid is undef. however,
1050             # if fid is passed in, that value should be used and returned.
1051             #
1052             # return new/passed in fidid on success.
1053             # throw 'dup' if fid already in use
1054             # return 0/undef/die on failure
1055             #
1056             sub register_tempfile {
1057 0     0 0 0 my $self = shift;
1058 0         0 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1059              
1060 0         0 my $dbh = $self->dbh;
1061 0         0 my $fid = $arg{fid};
1062              
1063 0 0       0 my $explicit_fid_used = $fid ? 1 : 0;
1064              
1065             # setup the new mapping. we store the devices that we picked for
1066             # this file in here, knowing that they might not be used. create_close
1067             # is responsible for actually mapping in file_on. NOTE: fid is being
1068             # passed in, it's either some number they gave us, or it's going to be
1069             # 0/undef which translates into NULL which means to automatically create
1070             # one. that should be fine.
1071             my $ins_tempfile = sub {
1072 0     0   0 my $rv = eval {
1073             # We must only pass the correct number of bind parameters
1074             # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1075             # Postgres, where you are expected to leave it out or use DEFAULT
1076             # Leaving it out seems sanest and least likely to cause problems
1077             # with other databases.
1078 0         0 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1079 0         0 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1080 0   0     0 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1081             # Do not check for $explicit_fid_used, but rather $fid directly
1082             # as this anonymous sub is called from the loop later
1083 0 0       0 if($fid) {
1084 0         0 unshift @keys, 'fid';
1085 0         0 unshift @vars, '?';
1086 0         0 unshift @vals, $fid;
1087             }
1088 0         0 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1089 0         0 $dbh->do($sql, undef, @vals);
1090             };
1091 0 0       0 if (!$rv) {
1092 0 0       0 return undef if $self->was_duplicate_error;
1093 0         0 die "Unexpected db error into tempfile: " . $dbh->errstr;
1094             }
1095              
1096 0 0       0 unless (defined $fid) {
1097             # if they did not give us a fid, then we want to grab the one that was
1098             # theoretically automatically generated
1099 0 0       0 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1100             or die "No last_insert_id found";
1101             }
1102 0 0 0     0 return undef unless defined $fid && $fid > 0;
1103 0         0 return 1;
1104 0         0 };
1105              
1106 0 0       0 unless ($ins_tempfile->()) {
1107 0 0       0 throw("dup") if $explicit_fid_used;
1108 0         0 die "tempfile insert failed";
1109             }
1110              
1111             my $fid_in_use = sub {
1112 0     0   0 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1113 0 0       0 return $exists ? 1 : 0;
1114 0         0 };
1115              
1116             # See notes in MogileFS::Config->check_database
1117 0         0 my $min_fidid = MogileFS::Config->config('min_fidid');
1118              
1119             # if the fid is in use, do something
1120 0   0     0 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1121 0 0       0 throw("dup") if $explicit_fid_used;
1122              
1123             # be careful of databases which reset their
1124             # auto-increment/sequences when the table is empty (InnoDB
1125             # did/does this, for instance). So check if it's in use, and
1126             # re-seed the table with the highest known fid from the file
1127             # table.
1128              
1129             # get the highest fid from the filetable and insert a dummy row
1130 0         0 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1131 0         0 $ins_tempfile->(); # don't care about its result
1132              
1133             # then do a normal auto-increment
1134 0         0 $fid = undef;
1135 0 0       0 $ins_tempfile->() or die "register_tempfile failed after seeding";
1136             }
1137              
1138 0         0 return $fid;
1139             }
1140              
1141             # return hashref of row containing columns "fid, dmid, dkey, length,
1142             # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1143             # row.
1144             sub file_row_from_dmid_key {
1145 0     0 0 0 my ($self, $dmid, $key) = @_;
1146 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1147             "FROM file WHERE dmid=? AND dkey=?",
1148             undef, $dmid, $key);
1149             }
1150              
1151             # return hashref of row containing columns "fid, dmid, dkey, length,
1152             # classid, devcount" provided a $fidid or undef if no row.
1153             sub file_row_from_fidid {
1154 0     0 0 0 my ($self, $fidid) = @_;
1155 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1156             "FROM file WHERE fid=?",
1157             undef, $fidid);
1158             }
1159              
1160             # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1161             # classid, devcount" provided a pair of $fidid or undef if no rows.
1162             sub file_row_from_fidid_range {
1163 0     0 0 0 my ($self, $fromfid, $count) = @_;
1164 0         0 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1165             "FROM file WHERE fid > ? LIMIT ?");
1166 0         0 $sth->execute($fromfid,$count);
1167 0         0 return $sth->fetchall_arrayref({});
1168             }
1169              
1170             # return array of devids that a fidid is on
1171             sub fid_devids {
1172 0     0 0 0 my ($self, $fidid) = @_;
1173 0 0       0 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
  0         0  
1174             undef, $fidid) || [] };
1175             }
1176              
1177             # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1178             sub fid_devids_multiple {
1179 0     0 0 0 my ($self, @fidids) = @_;
1180 0         0 my $in = join(",", map { $_+0 } @fidids);
  0         0  
1181 0         0 my $ret = {};
1182 0         0 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1183 0         0 $sth->execute;
1184 0         0 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1185 0   0     0 push @{$ret->{$fidid} ||= []}, $devid;
  0         0  
1186             }
1187 0         0 return $ret;
1188             }
1189              
1190             # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1191             sub tempfile_row_from_fid {
1192 0     0 0 0 my ($self, $fidid) = @_;
1193 0         0 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1194             "FROM tempfile WHERE fid=?",
1195             undef, $fidid);
1196             }
1197              
1198             # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1199             sub create_device {
1200 0     0 0 0 my ($self, $devid, $hostid, $status) = @_;
1201             my $rv = $self->conddup(sub {
1202 0     0   0 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1203             $devid, $hostid, $status);
1204 0         0 });
1205 0         0 $self->condthrow;
1206 0 0       0 die "error making device $devid\n" unless $rv > 0;
1207 0         0 return 1;
1208             }
1209              
1210             sub update_device {
1211 0     0 0 0 my ($self, $devid, $to_update) = @_;
1212 0         0 my @keys = sort keys %$to_update;
1213 0 0       0 return unless @keys;
1214             $self->conddup(sub {
1215             $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1216 0     0   0 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
  0         0  
1217             $devid);
1218 0         0 });
1219 0         0 return 1;
1220             }
1221              
1222             sub update_device_usage {
1223 0     0 0 0 my $self = shift;
1224 0         0 my %arg = $self->_valid_params([qw(mb_total mb_used devid mb_asof)], @_);
1225 0         0 eval {
1226             $self->dbh->do("UPDATE device SET ".
1227             "mb_total = ?, mb_used = ?, mb_asof = ?" .
1228             " WHERE devid = ?",
1229             undef, $arg{mb_total}, $arg{mb_used}, $arg{mb_asof},
1230 0         0 $arg{devid});
1231             };
1232 0         0 $self->condthrow;
1233             }
1234              
1235             # MySQL has an optimized version
1236             sub update_device_usages {
1237 0     0 0 0 my ($self, $updates, $cb) = @_;
1238 0         0 foreach my $upd (@$updates) {
1239 0         0 $self->update_device_usage(%$upd);
1240 0         0 $cb->();
1241             }
1242             }
1243              
1244             # This is unimplemented at the moment as we must verify:
1245             # - no file_on rows exist
1246             # - nothing in file_to_queue is going to attempt to use it
1247             # - nothing in file_to_replicate is going to attempt to use it
1248             # - it's already been marked dead
1249             # - that all trackers are likely to know this :/
1250             # - ensure the devid can't be reused
1251             # IE; the user can't mark it dead then remove it all at once and cause their
1252             # cluster to implode.
1253             sub delete_device {
1254 0     0 0 0 die "Unimplemented; needs further testing";
1255             }
1256              
1257             sub set_device_weight {
1258 0     0 0 0 my ($self, $devid, $weight) = @_;
1259 0         0 eval {
1260 0         0 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1261             };
1262 0         0 $self->condthrow;
1263             }
1264              
1265             sub set_device_state {
1266 0     0 0 0 my ($self, $devid, $state) = @_;
1267 0         0 eval {
1268 0         0 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1269             };
1270 0         0 $self->condthrow;
1271             }
1272              
1273             sub delete_class {
1274 0     0 0 0 my ($self, $dmid, $cid) = @_;
1275 0 0       0 throw("has_files") if $self->class_has_files($dmid, $cid);
1276 0         0 eval {
1277 0         0 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1278             };
1279 0         0 $self->condthrow;
1280             }
1281              
1282             # called from a queryworker process, will trigger delete_fidid_enqueued
1283             # in the delete worker
1284             sub delete_fidid {
1285 0     0 0 0 my ($self, $fidid) = @_;
1286 0         0 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
  0         0  
1287 0         0 $self->condthrow;
1288 0         0 $self->enqueue_for_delete2($fidid, 0);
1289 0         0 $self->condthrow;
1290             }
1291              
1292             # Only called from delete workers (after delete_fidid),
1293             # this reduces client-visible latency from the queryworker
1294             sub delete_fidid_enqueued {
1295 0     0 0 0 my ($self, $fidid) = @_;
1296 0         0 eval { $self->delete_checksum($fidid); };
  0         0  
1297 0         0 $self->condthrow;
1298 0         0 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1299 0         0 $self->condthrow;
1300             }
1301              
1302             sub delete_tempfile_row {
1303 0     0 0 0 my ($self, $fidid) = @_;
1304 0         0 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1305 0         0 $self->condthrow;
1306 0         0 return $rv;
1307             }
1308              
1309             # Load the specified tempfile, then delete it. If we succeed, we were
1310             # here first; otherwise, someone else beat us here (and we return undef)
1311             sub delete_and_return_tempfile_row {
1312 0     0 0 0 my ($self, $fidid) = @_;
1313 0         0 my $rv = $self->tempfile_row_from_fid($fidid);
1314 0         0 my $rows_deleted = $self->delete_tempfile_row($fidid);
1315 0 0       0 return $rv if ($rows_deleted > 0);
1316             }
1317              
1318             sub replace_into_file {
1319 0     0 0 0 my $self = shift;
1320 0         0 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1321 0 0       0 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1322 0         0 eval {
1323             $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1324             "VALUES (?,?,?,?,?,?) ", undef,
1325 0         0 @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1326             };
1327 0         0 $self->condthrow;
1328             }
1329              
1330             # returns 1 on success, 0 on duplicate key error, dies on exception
1331             # TODO: need a test to hit the duplicate name error condition
1332             # TODO: switch to using "dup" exception here?
1333             sub rename_file {
1334 0     0 0 0 my ($self, $fidid, $to_key) = @_;
1335 0         0 my $dbh = $self->dbh;
1336 0         0 eval {
1337 0         0 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1338             undef, $to_key, $fidid);
1339             };
1340 0 0 0     0 if ($@ || $dbh->err) {
1341             # first is MySQL's error code for duplicates
1342 0 0       0 if ($self->was_duplicate_error) {
1343 0         0 return 0;
1344             } else {
1345 0         0 die $@;
1346             }
1347             }
1348 0         0 $self->condthrow;
1349 0         0 return 1;
1350             }
1351              
1352             sub get_domainid_by_name {
1353 0     0 0 0 my $self = shift;
1354 0         0 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1355             undef, $_[0]);
1356 0         0 return $dmid;
1357             }
1358              
1359             # returns a hash of domains. Key is namespace, value is dmid.
1360             sub get_all_domains {
1361 0     0 0 0 my ($self) = @_;
1362 0         0 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1363 0 0       0 return map { ($_->[0], $_->[1]) } @{$domains || []};
  0         0  
  0         0  
1364             }
1365              
1366             sub get_classid_by_name {
1367 0     0 0 0 my $self = shift;
1368 0         0 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1369             undef, $_[0], $_[1]);
1370 0         0 return $classid;
1371             }
1372              
1373             # returns an array of hashrefs, one hashref per row in the 'class' table
1374             sub get_all_classes {
1375 0     0 0 0 my ($self) = @_;
1376 0         0 my (@ret, $row);
1377              
1378 0         0 my @cols = qw/dmid classid classname mindevcount/;
1379 0 0       0 if ($self->cached_schema_version >= 10) {
1380 0         0 push @cols, 'replpolicy';
1381 0 0       0 if ($self->cached_schema_version >= 15) {
1382 0         0 push @cols, 'hashtype';
1383             }
1384             }
1385 0         0 my $cols = join(', ', @cols);
1386 0         0 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1387 0         0 $sth->execute;
1388 0         0 push @ret, $row while $row = $sth->fetchrow_hashref;
1389 0         0 return @ret;
1390             }
1391              
1392             # add a record of fidid existing on devid
1393             # returns 1 on success, 0 on duplicate
1394             sub add_fidid_to_devid {
1395 0     0 0 0 my ($self, $fidid, $devid) = @_;
1396 0 0       0 croak("fidid not non-zero") unless $fidid;
1397 0 0       0 croak("devid not non-zero") unless $devid;
1398              
1399             # TODO: This should possibly be insert_ignore instead
1400             # As if we are adding an extra file_on entry, we do not want to replace the
1401             # exist one. Check REPLACE semantics.
1402 0         0 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1403             undef, $fidid, $devid);
1404 0 0       0 return 1 if $rv > 0;
1405 0         0 return 0;
1406             }
1407              
1408             # remove a record of fidid existing on devid
1409             # returns 1 on success, 0 if not there anyway
1410             sub remove_fidid_from_devid {
1411 0     0 0 0 my ($self, $fidid, $devid) = @_;
1412 0         0 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
  0         0  
1413             undef, $fidid, $devid); };
1414 0         0 $self->condthrow;
1415 0         0 return $rv;
1416             }
1417              
1418             # Test if host exists.
1419             sub get_hostid_by_id {
1420 0     0 0 0 my $self = shift;
1421 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1422             undef, $_[0]);
1423 0         0 return $hostid;
1424             }
1425              
1426             sub get_hostid_by_name {
1427 0     0 0 0 my $self = shift;
1428 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1429             undef, $_[0]);
1430 0         0 return $hostid;
1431             }
1432              
1433             # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1434             sub get_all_hosts {
1435 0     0 0 0 my ($self) = @_;
1436 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1437             "hostip, http_port, http_get_port, altip, altmask FROM host");
1438 0         0 $sth->execute;
1439 0         0 my @ret;
1440 0         0 while (my $row = $sth->fetchrow_hashref) {
1441 0         0 push @ret, $row;
1442             }
1443 0         0 return @ret;
1444             }
1445              
1446             # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1447             sub get_all_devices {
1448 0     0 0 0 my ($self) = @_;
1449 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1450             "mb_used, mb_asof, status, weight FROM device");
1451 0         0 $self->condthrow;
1452 0         0 $sth->execute;
1453 0         0 my @return;
1454 0         0 while (my $row = $sth->fetchrow_hashref) {
1455 0         0 push @return, $row;
1456             }
1457 0         0 return @return;
1458             }
1459              
1460             # update the device count for a given fidid
1461             sub update_devcount {
1462 0     0 0 0 my ($self, $fidid) = @_;
1463 0         0 my $dbh = $self->dbh;
1464 0         0 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1465             undef, $fidid);
1466              
1467 0         0 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
  0         0  
1468             $ct, $fidid); };
1469 0         0 $self->condthrow;
1470              
1471 0         0 return 1;
1472             }
1473              
1474             # update the classid for a given fidid
1475             sub update_classid {
1476 0     0 0 0 my ($self, $fidid, $classid) = @_;
1477 0         0 my $dbh = $self->dbh;
1478              
1479 0         0 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1480             $classid, $fidid);
1481              
1482 0         0 $self->condthrow;
1483 0         0 return 1;
1484             }
1485              
1486             # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1487             sub enqueue_for_replication {
1488 0     0 0 0 my ($self, $fidid, $from_devid, $in) = @_;
1489              
1490 0         0 my $nexttry = 0;
1491 0 0       0 if ($in) {
1492 0         0 $nexttry = $self->unix_timestamp . " + " . int($in);
1493             }
1494              
1495             $self->retry_on_deadlock(sub {
1496 0     0   0 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1497             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1498 0         0 });
1499             }
1500              
1501             # enqueue a fidid for delete
1502             # note: if we get one more "independent" queue like this, the
1503             # code should be collapsable? I tried once and it looked too ugly, so we have
1504             # some redundancy.
1505             sub enqueue_for_delete2 {
1506 0     0 0 0 my ($self, $fidid, $in) = @_;
1507              
1508 0 0       0 $in = 0 unless $in;
1509 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1510              
1511             $self->retry_on_deadlock(sub {
1512 0     0   0 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1513             "VALUES (?,$nexttry)", undef, $fidid);
1514 0         0 });
1515             }
1516              
1517             # enqueue a fidid for work
1518             sub enqueue_for_todo {
1519 0     0 0 0 my ($self, $fidid, $type, $in) = @_;
1520              
1521 0 0       0 $in = 0 unless $in;
1522 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1523              
1524             $self->retry_on_deadlock(sub {
1525 0 0   0   0 if (ref($fidid)) {
1526 0         0 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1527             "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1528             $fidid->[0], $fidid->[1], $fidid->[2], $type);
1529             } else {
1530 0         0 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1531             "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1532             }
1533 0         0 });
1534             }
1535              
1536             # return 1 on success. die otherwise.
1537             sub enqueue_many_for_todo {
1538 0     0 0 0 my ($self, $fidids, $type, $in) = @_;
1539 0 0 0     0 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1540 0         0 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1541 0         0 return 1;
1542             }
1543              
1544 0 0       0 $in = 0 unless $in;
1545 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1546              
1547             # TODO: convert to prepared statement?
1548             $self->retry_on_deadlock(sub {
1549 0 0   0   0 if (ref($fidids->[0]) eq 'ARRAY') {
1550 0         0 my $sql = $self->ignore_replace .
1551             "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1552             join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1553 0         0 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
  0         0  
1554             } else {
1555             $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1556             nexttry) VALUES " .
1557 0         0 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
  0         0  
1558             }
1559 0         0 });
1560 0         0 $self->condthrow;
1561             }
1562              
1563             # For file_to_queue queues that should be kept small, find the size.
1564             # This isn't fast, but for small queues won't be slow, and is usually only ran
1565             # from a single tracker.
1566             sub file_queue_length {
1567 0     0 0 0 my $self = shift;
1568 0         0 my $type = shift;
1569              
1570 0         0 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1571             "WHERE type = ?", undef, $type);
1572             }
1573              
1574             # reschedule all deferred replication, return number rescheduled
1575             sub replicate_now {
1576 0     0 0 0 my ($self) = @_;
1577              
1578             $self->retry_on_deadlock(sub {
1579 0     0   0 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1580             " WHERE nexttry > " . $self->unix_timestamp);
1581 0         0 });
1582             }
1583              
1584             # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1585             sub get_fidids_by_device {
1586 0     0 0 0 my ($self, $devid, $limit) = @_;
1587              
1588 0         0 my $dbh = $self->dbh;
1589 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1590             undef, $devid);
1591 0         0 return $fidids;
1592             }
1593              
1594             # finds a chunk of fids given a set of constraints:
1595             # devid, fidid, age (new or old), limit
1596             # Note that if this function is very slow on your large DB, you're likely
1597             # sorting by "newfiles" and are missing a new index.
1598             # returns an arrayref of fidids
1599             sub get_fidid_chunks_by_device {
1600 0     0 0 0 my ($self, %o) = @_;
1601              
1602 0         0 my $dbh = $self->dbh;
1603 0         0 my $devid = delete $o{devid};
1604 0 0       0 croak("must supply at least a devid") unless $devid;
1605 0         0 my $age = delete $o{age};
1606 0         0 my $fidid = delete $o{fidid};
1607 0         0 my $limit = delete $o{limit};
1608 0 0       0 croak("invalid options: " . join(', ', keys %o)) if %o;
1609             # If supplied a "previous" fidid, we're paging through.
1610 0         0 my $fidsort = '';
1611 0         0 my $order = '';
1612 0   0     0 $age ||= 'old';
1613 0 0       0 if ($age eq 'old') {
    0          
1614 0 0       0 $fidsort = 'AND fid > ?' if $fidid;
1615 0         0 $order = 'ASC';
1616             } elsif ($age eq 'new') {
1617 0 0       0 $fidsort = 'AND fid < ?' if $fidid;
1618 0         0 $order = 'DESC';
1619             } else {
1620 0         0 croak("invalid age argument: " . $age);
1621             }
1622 0   0     0 $limit ||= 100;
1623 0         0 my @extra = ();
1624 0 0       0 push @extra, $fidid if $fidid;
1625              
1626 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1627             $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1628 0         0 return $fidids;
1629             }
1630              
1631             # gets fidids above fidid_low up to (and including) fidid_high
1632             sub get_fidids_between {
1633 0     0 0 0 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1634 0   0     0 $limit ||= 1000;
1635 0         0 $limit = int($limit);
1636              
1637 0         0 my $dbh = $self->dbh;
1638 0         0 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1639             WHERE fid > ? and fid <= ?
1640             ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1641 0         0 return $fidids;
1642             }
1643              
1644             # creates a new domain, given a domain namespace string. return the dmid on success,
1645             # throw 'dup' on duplicate name.
1646             # override if you want a less racy version.
1647             sub create_domain {
1648 0     0 0 0 my ($self, $name) = @_;
1649 0         0 my $dbh = $self->dbh;
1650              
1651             # get the max domain id
1652 0   0     0 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1653 0         0 my $rv = eval {
1654 0         0 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1655             undef, $maxid + 1, $name);
1656             };
1657 0 0       0 if ($self->was_duplicate_error) {
1658 0         0 throw("dup");
1659             }
1660 0 0       0 return $maxid+1 if $rv;
1661 0         0 die "failed to make domain"; # FIXME: the above is racy.
1662             }
1663              
1664             sub update_host {
1665 0     0 0 0 my ($self, $hid, $to_update) = @_;
1666 0         0 my @keys = sort keys %$to_update;
1667 0 0       0 return unless @keys;
1668             $self->conddup(sub {
1669             $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1670 0     0   0 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
  0         0  
1671             $hid);
1672 0         0 });
1673 0         0 return 1;
1674             }
1675              
1676             # return ne hostid, or throw 'dup' on error.
1677             # NOTE: you need to put them into the initial 'down' state.
1678             sub create_host {
1679 0     0 0 0 my ($self, $hostname, $ip) = @_;
1680 0         0 my $dbh = $self->dbh;
1681             # racy! lazy. no, better: portable! how often does this happen? :)
1682 0   0     0 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1683             my $rv = $self->conddup(sub {
1684 0     0   0 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1685             "VALUES (?, ?, ?, 'down')",
1686             undef, $hid, $hostname, $ip);
1687 0         0 });
1688 0 0       0 return $hid if $rv;
1689 0         0 die "db failure";
1690             }
1691              
1692             # return array of row hashrefs containing columns: (fid, fromdevid,
1693             # failcount, flags, nexttry)
1694             sub files_to_replicate {
1695 0     0 0 0 my ($self, $limit) = @_;
1696 0         0 my $ut = $self->unix_timestamp;
1697 0 0       0 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1698             SELECT fid, fromdevid, failcount, flags, nexttry
1699             FROM file_to_replicate
1700             WHERE nexttry <= $ut
1701             ORDER BY nexttry
1702             LIMIT $limit
1703             }, "fid") or return ();
1704 0         0 return values %$to_repl_map;
1705             }
1706              
1707             # "new" style queue consumption code.
1708             # from within a transaction, fetch a limit of fids,
1709             # then update each fid's nexttry to be off in the future,
1710             # giving local workers some time to dequeue the items.
1711             # Note:
1712             # DBI (even with RaiseError) returns weird errors on
1713             # deadlocks from selectall_hashref. So we can't do that.
1714             # we also used to retry on deadlock within the routine,
1715             # but instead lets return undef and let job_master retry.
1716             sub grab_queue_chunk {
1717 0     0 0 0 my $self = shift;
1718 0         0 my $queue = shift;
1719 0         0 my $limit = shift;
1720 0         0 my $extfields = shift;
1721              
1722 0         0 my $dbh = $self->dbh;
1723 0         0 my $tries = 3;
1724 0         0 my $work;
1725              
1726 0 0       0 return 0 unless $self->lock_queue($queue);
1727              
1728 0   0     0 my $extwhere = shift || '';
1729 0         0 my $fields = 'fid, nexttry, failcount';
1730 0 0       0 $fields .= ', ' . $extfields if $extfields;
1731 0         0 eval {
1732 0         0 $dbh->begin_work;
1733 0         0 my $ut = $self->unix_timestamp;
1734 0         0 my $query = qq{
1735             SELECT $fields
1736             FROM $queue
1737             WHERE nexttry <= $ut
1738             $extwhere
1739             ORDER BY nexttry
1740             LIMIT $limit
1741             };
1742 0 0       0 $query .= "FOR UPDATE\n" if $self->can_for_update;
1743 0         0 my $sth = $dbh->prepare($query);
1744 0         0 $sth->execute;
1745 0         0 $work = $sth->fetchall_hashref('fid');
1746             # Nothing to work on.
1747             # Now claim the fids for a while.
1748             # TODO: Should be configurable... but not necessary.
1749 0         0 my $fidlist = join(',', keys %$work);
1750 0 0       0 unless ($fidlist) { $dbh->commit; return; }
  0         0  
  0         0  
1751 0         0 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1752 0         0 $dbh->commit;
1753             };
1754 0 0       0 if ($self->was_deadlock_error) {
1755 0         0 eval { $dbh->rollback };
  0         0  
1756 0         0 $work = undef;
1757             } else {
1758 0         0 $self->condthrow;
1759             }
1760             # FIXME: Super extra paranoia to prevent deadlocking.
1761             # Need to handle or die on all errors above, but $@ can get reset. For now
1762             # we'll just always ensure there's no transaction running at the end here.
1763             # A (near) release should figure the error detection correctly.
1764 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
1765 0         0 $self->unlock_queue($queue);
1766              
1767 0 0       0 return defined $work ? values %$work : ();
1768             }
1769              
1770             sub grab_files_to_replicate {
1771 0     0 0 0 my ($self, $limit) = @_;
1772 0         0 return $self->grab_queue_chunk('file_to_replicate', $limit,
1773             'fromdevid, flags');
1774             }
1775              
1776             sub grab_files_to_delete2 {
1777 0     0 0 0 my ($self, $limit) = @_;
1778 0         0 return $self->grab_queue_chunk('file_to_delete2', $limit);
1779             }
1780              
1781             # $extwhere is ugly... but should be fine.
1782             sub grab_files_to_queued {
1783 0     0 0 0 my ($self, $type, $what, $limit) = @_;
1784 0   0     0 $what ||= 'type, flags';
1785 0         0 return $self->grab_queue_chunk('file_to_queue', $limit,
1786             $what, 'AND type = ' . $type);
1787             }
1788              
1789             # although it's safe to have multiple tracker hosts and/or processes
1790             # replicating the same file, around, it's inefficient CPU/time-wise,
1791             # and it's also possible they pick different places and waste disk.
1792             # so the replicator asks the store interface when it's about to start
1793             # and when it's done replicating a fidid, so you can do something smart
1794             # and tell it not to.
1795             sub should_begin_replicating_fidid {
1796 0     0 0 0 my ($self, $fidid) = @_;
1797 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1798 0 0       0 return 1 if $self->get_lock($lockname, 1);
1799 0         0 return 0;
1800             }
1801              
1802             # called when replicator is done replicating a fid, so you can cleanup
1803             # whatever you did in 'should_begin_replicating_fidid' above.
1804             #
1805             # NOTE: there's a theoretical race condition in the rebalance code,
1806             # where (without locking as provided by
1807             # should_begin_replicating_fidid/note_done_replicating), all copies of
1808             # a file can be deleted by independent replicators doing rebalancing
1809             # in different ways. so you'll probably want to implement some
1810             # locking in this pair of functions.
1811             sub note_done_replicating {
1812 0     0 0 0 my ($self, $fidid) = @_;
1813 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1814 0         0 $self->release_lock($lockname);
1815             }
1816              
1817             sub find_fid_from_file_to_replicate {
1818 0     0 0 0 my ($self, $fidid) = @_;
1819 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1820             undef, $fidid);
1821             }
1822              
1823             sub find_fid_from_file_to_delete2 {
1824 0     0 0 0 my ($self, $fidid) = @_;
1825 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1826             undef, $fidid);
1827             }
1828              
1829             sub find_fid_from_file_to_queue {
1830 0     0 0 0 my ($self, $fidid, $type) = @_;
1831 0         0 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1832             undef, $fidid, $type);
1833             }
1834              
1835             sub delete_fid_from_file_to_replicate {
1836 0     0 0 0 my ($self, $fidid) = @_;
1837             $self->retry_on_deadlock(sub {
1838 0     0   0 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1839 0         0 });
1840             }
1841              
1842             sub delete_fid_from_file_to_queue {
1843 0     0 0 0 my ($self, $fidid, $type) = @_;
1844             $self->retry_on_deadlock(sub {
1845 0     0   0 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1846             undef, $fidid, $type);
1847 0         0 });
1848             }
1849              
1850             sub delete_fid_from_file_to_delete2 {
1851 0     0 0 0 my ($self, $fidid) = @_;
1852             $self->retry_on_deadlock(sub {
1853 0     0   0 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1854 0         0 });
1855             }
1856              
1857             sub reschedule_file_to_replicate_absolute {
1858 0     0 0 0 my ($self, $fid, $abstime) = @_;
1859             $self->retry_on_deadlock(sub {
1860 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1861             undef, $abstime, $fid);
1862 0         0 });
1863             }
1864              
1865             sub reschedule_file_to_replicate_relative {
1866 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1867             $self->retry_on_deadlock(sub {
1868 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1869             "failcount = failcount + 1 WHERE fid = ?",
1870             undef, $in_n_secs, $fid);
1871 0         0 });
1872             }
1873              
1874             sub reschedule_file_to_delete2_absolute {
1875 0     0 0 0 my ($self, $fid, $abstime) = @_;
1876             $self->retry_on_deadlock(sub {
1877 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1878             undef, $abstime, $fid);
1879 0         0 });
1880             }
1881              
1882             sub reschedule_file_to_delete2_relative {
1883 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1884             $self->retry_on_deadlock(sub {
1885 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1886             "failcount = failcount + 1 WHERE fid = ?",
1887             undef, $in_n_secs, $fid);
1888 0         0 });
1889             }
1890              
1891             # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1892             # table.
1893             sub get_keys_like {
1894 0     0 0 0 my ($self, $dmid, $prefix, $after, $limit) = @_;
1895             # fix the input... prefix always ends with a % so that it works
1896             # in a LIKE call, and after is either blank or something
1897 0 0       0 $prefix = '' unless defined $prefix;
1898              
1899             # escape underscores, % and \
1900 0         0 $prefix =~ s/([%\\_])/\\$1/g;
1901              
1902 0         0 $prefix .= '%';
1903 0 0       0 $after = '' unless defined $after;
1904              
1905 0         0 my $like = $self->get_keys_like_operator;
1906              
1907             # now select out our keys
1908 0         0 return $self->dbh->selectcol_arrayref
1909             ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1910             "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1911             }
1912              
1913 0     0 0 0 sub get_keys_like_operator { return "LIKE"; }
1914              
1915             # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1916             # that were created $secs_ago seconds ago or older.
1917             sub old_tempfiles {
1918 0     0 0 0 my ($self, $secs_old) = @_;
1919 0         0 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1920             "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1921             }
1922              
1923             # given an array of MogileFS::DevFID objects, mass-insert them all
1924             # into file_on (ignoring if they're already present)
1925             sub mass_insert_file_on {
1926 0     0 0 0 my ($self, @devfids) = @_;
1927 0 0       0 return 1 unless @devfids;
1928              
1929 0 0 0     0 if (@devfids > 1 && ! $self->can_insert_multi) {
1930 0         0 $self->mass_insert_file_on($_) foreach @devfids;
1931 0         0 return 1;
1932             }
1933              
1934 0         0 my (@qmarks, @binds);
1935 0         0 foreach my $df (@devfids) {
1936 0         0 my ($fidid, $devid) = ($df->fidid, $df->devid);
1937 0 0       0 Carp::croak("got a false fidid") unless $fidid;
1938 0 0       0 Carp::croak("got a false devid") unless $devid;
1939 0         0 push @binds, $fidid, $devid;
1940 0         0 push @qmarks, "(?,?)";
1941             }
1942              
1943             # TODO: This should possibly be insert_ignore instead
1944             # As if we are adding an extra file_on entry, we do not want to replace the
1945             # exist one. Check REPLACE semantics.
1946 0         0 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1947 0         0 return 1;
1948             }
1949              
1950             sub set_schema_vesion {
1951 0     0 0 0 my ($self, $ver) = @_;
1952 0         0 $self->set_server_setting("schema_version", int($ver));
1953             }
1954              
1955             # returns array of fidids to try and delete again
1956             sub fids_to_delete_again {
1957 0     0 0 0 my $self = shift;
1958 0         0 my $ut = $self->unix_timestamp;
1959 0 0       0 return @{ $self->dbh->selectcol_arrayref(qq{
  0         0  
1960             SELECT fid
1961             FROM file_to_delete_later
1962             WHERE delafter < $ut
1963             LIMIT 500
1964             }) || [] };
1965             }
1966              
1967             # return 1 on success. die otherwise.
1968             sub enqueue_fids_to_delete {
1969 0     0 0 0 my ($self, @fidids) = @_;
1970             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1971             # when the first row causes the duplicate error, and the remaining rows are
1972             # not processed.
1973 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1974 0         0 $self->enqueue_fids_to_delete($_) foreach @fidids;
1975 0         0 return 1;
1976             }
1977             # TODO: convert to prepared statement?
1978             $self->retry_on_deadlock(sub {
1979             $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1980 0     0   0 join(",", map { "(" . int($_) . ")" } @fidids));
  0         0  
1981 0         0 });
1982 0         0 $self->condthrow;
1983             }
1984              
1985             sub enqueue_fids_to_delete2 {
1986 0     0 0 0 my ($self, @fidids) = @_;
1987             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1988             # when the first row causes the duplicate error, and the remaining rows are
1989             # not processed.
1990 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1991 0         0 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1992 0         0 return 1;
1993             }
1994              
1995 0         0 my $nexttry = $self->unix_timestamp;
1996              
1997             # TODO: convert to prepared statement?
1998             $self->retry_on_deadlock(sub {
1999             $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
2000             nexttry) VALUES " .
2001 0     0   0 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
  0         0  
2002 0         0 });
2003 0         0 $self->condthrow;
2004             }
2005              
2006             # clears everything from the fsck_log table
2007             # return 1 on success. die otherwise.
2008             sub clear_fsck_log {
2009 0     0 0 0 my $self = shift;
2010 0         0 $self->dbh->do("DELETE FROM fsck_log");
2011 0         0 return 1;
2012             }
2013              
2014             # FIXME: Fsck log entries are processed a little out of order.
2015             # Once a fsck has completed, the log should be re-summarized.
2016             sub fsck_log_summarize {
2017 0     0 0 0 my $self = shift;
2018              
2019 0         0 my $lockname = 'mgfs:fscksum';
2020 0         0 my $lock = eval { $self->get_lock($lockname, 10) };
  0         0  
2021 0 0 0     0 return 0 if defined $lock && $lock == 0;
2022              
2023 0         0 my $logid = $self->max_fsck_logid;
2024              
2025             # sum-up evcode counts every so often, to make fsck_status faster,
2026             # avoiding a potentially-huge GROUP BY in the future..
2027 0   0     0 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2028             # both inclusive:
2029 0   0     0 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2030 0         0 $min_logid++;
2031 0         0 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
2032 0         0 while (my ($evcode, $ct) = each %$cts) {
2033 0         0 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2034             }
2035 0         0 $self->set_server_setting("fsck_logid_processed", $logid);
2036              
2037 0 0       0 $self->release_lock($lockname) if $lock;
2038             }
2039              
2040             sub fsck_log {
2041 0     0 0 0 my ($self, %opts) = @_;
2042             $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2043             "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2044             undef,
2045             delete $opts{fid},
2046             delete $opts{code},
2047 0         0 delete $opts{devid});
2048 0 0       0 croak("Unknown opts") if %opts;
2049 0         0 $self->condthrow;
2050              
2051 0         0 return 1;
2052             }
2053              
2054             sub get_db_unixtime {
2055 0     0 0 0 my $self = shift;
2056 0         0 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2057             }
2058              
2059             sub max_fidid {
2060 0     0 0 0 my $self = shift;
2061 0         0 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2062             }
2063              
2064             sub max_fsck_logid {
2065 0     0 0 0 my $self = shift;
2066 0   0     0 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2067             }
2068              
2069             # returns array of $row hashrefs, from fsck_log table
2070             sub fsck_log_rows {
2071 0     0 0 0 my ($self, $after_logid, $limit) = @_;
2072 0   0     0 $limit = int($limit || 100);
2073 0   0     0 $after_logid = int($after_logid || 0);
2074              
2075 0         0 my @rows;
2076 0         0 my $sth = $self->dbh->prepare(qq{
2077             SELECT logid, utime, fid, evcode, devid
2078             FROM fsck_log
2079             WHERE logid > ?
2080             ORDER BY logid
2081             LIMIT $limit
2082             });
2083 0         0 $sth->execute($after_logid);
2084 0         0 my $row;
2085 0         0 push @rows, $row while $row = $sth->fetchrow_hashref;
2086 0         0 return @rows;
2087             }
2088              
2089             sub fsck_evcode_counts {
2090 0     0 0 0 my ($self, %opts) = @_;
2091 0         0 my $timegte = delete $opts{time_gte};
2092 0         0 my $logr = delete $opts{logid_range};
2093 0 0       0 die if %opts;
2094              
2095 0         0 my $ret = {};
2096 0         0 my $sth;
2097 0 0       0 if ($timegte) {
2098 0         0 $sth = $self->dbh->prepare(qq{
2099             SELECT evcode, COUNT(*) FROM fsck_log
2100             WHERE utime >= ?
2101             GROUP BY evcode
2102             });
2103 0   0     0 $sth->execute($timegte||0);
2104             }
2105 0 0       0 if ($logr) {
2106 0         0 $sth = $self->dbh->prepare(qq{
2107             SELECT evcode, COUNT(*) FROM fsck_log
2108             WHERE logid >= ? AND logid <= ?
2109             GROUP BY evcode
2110             });
2111 0         0 $sth->execute($logr->[0], $logr->[1]);
2112             }
2113 0         0 while (my ($ev, $ct) = $sth->fetchrow_array) {
2114 0         0 $ret->{$ev} = $ct;
2115             }
2116 0         0 return $ret;
2117             }
2118              
2119             # run before daemonizing. you can die from here if you see something's amiss. or emit
2120             # warnings.
2121             sub pre_daemonize_checks {
2122 0     0 0 0 my $self = shift;
2123              
2124 0         0 $self->pre_daemonize_check_slaves;
2125             }
2126              
2127             sub pre_daemonize_check_slaves {
2128 0 0   0 0 0 my $sk = MogileFS::Config->server_setting('slave_keys')
2129             or return;
2130              
2131 0         0 my @slaves;
2132 0         0 foreach my $key (split /\s*,\s*/, $sk) {
2133 0         0 my $slave = MogileFS::Config->server_setting("slave_$key");
2134              
2135 0 0       0 if (!$slave) {
2136 0         0 error("key for slave DB config: slave_$key not found in configuration");
2137 0         0 next;
2138             }
2139              
2140 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
2141 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
2142 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2143 0         0 next;
2144             }
2145 0         0 push @slaves, [$dsn, $user, $pass]
2146             }
2147              
2148 0 0       0 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2149              
2150 0         0 MogileFS::run_global_hook('slave_list_check', \@slaves);
2151             }
2152              
2153              
2154             # attempt to grab a lock of lockname, and timeout after timeout seconds.
2155             # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2156             sub get_lock {
2157 0     0 0 0 my ($self, $lockname, $timeout) = @_;
2158 0 0       0 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2159 0         0 die "get_lock not implemented for $self";
2160             }
2161              
2162             # attempt to release a lock of lockname.
2163             # returns 1 on success and 0 if no lock we have has that name.
2164             sub release_lock {
2165 0     0 0 0 my ($self, $lockname) = @_;
2166 0         0 die "release_lock not implemented for $self";
2167             }
2168              
2169             # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2170             # hang forever around some transactions. Use ghetto locking to cope.
2171 0     0 0 0 sub lock_queue { 1 }
2172 0     0 0 0 sub unlock_queue { 1 }
2173              
2174 0     0 0 0 sub BLOB_BIND_TYPE { undef; }
2175              
2176             sub set_checksum {
2177 0     0 0 0 my ($self, $fidid, $hashtype, $checksum) = @_;
2178 0         0 my $dbh = $self->dbh;
2179 0 0       0 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2180              
2181 0         0 eval {
2182 0         0 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2183             "(fid, hashtype, checksum) " .
2184             "VALUES (?, ?, ?)");
2185 0         0 $sth->bind_param(1, $fidid);
2186 0         0 $sth->bind_param(2, $hashtype);
2187 0         0 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2188 0         0 $sth->execute;
2189             };
2190 0         0 $self->condthrow;
2191             }
2192              
2193             sub get_checksum {
2194 0     0 0 0 my ($self, $fidid) = @_;
2195              
2196 0         0 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2197             "FROM checksum WHERE fid = ?",
2198             undef, $fidid);
2199             }
2200              
2201             sub delete_checksum {
2202 0     0 0 0 my ($self, $fidid) = @_;
2203              
2204 0         0 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2205             }
2206              
2207             # setup the value used in a 'nexttry' field to indicate that this item will
2208             # never actually be tried again and require some sort of manual intervention.
2209 21     21   180 use constant ENDOFTIME => 2147483647;
  21         41  
  21         2178  
2210              
2211 0     0 0 0 sub end_of_time { ENDOFTIME; }
2212              
2213             # returns the size of the non-urgent replication queue
2214             # nexttry == 0 - the file is urgent
2215             # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2216             sub deferred_repl_queue_length {
2217 0     0 0 0 my ($self) = @_;
2218              
2219 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2220             }
2221              
2222             1;
2223              
2224             __END__