File Coverage

blib/lib/MogileFS/Store.pm
Criterion Covered Total %
statement 37 974 3.8
branch 3 332 0.9
condition 0 121 0.0
subroutine 12 220 5.4
pod 0 182 0.0
total 52 1829 2.8


line stmt bran cond sub pod time code
1             package MogileFS::Store;
2 21     21   140 use strict;
  21         46  
  21         1076  
3 21     21   131 use warnings;
  21         45  
  21         827  
4 21     21   126 use Carp qw(croak confess);
  21         47  
  21         1408  
5 21     21   142 use MogileFS::Util qw(throw max error);
  21         50  
  21         1141  
6 21     21   81717 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
  21         788916  
  21         2311  
7 21     21   301 use List::Util qw(shuffle);
  21         45  
  21         3068  
8              
9             # this is incremented whenever the schema changes. server will refuse
10             # to start-up with an old schema version
11             #
12             # 6: adds file_to_replicate table
13             # 7: adds file_to_delete_later table
14             # 8: adds fsck_log table
15             # 9: adds 'drain' state to enum in device table
16             # 10: adds 'replpolicy' column to 'class' table
17             # 11: adds 'file_to_queue' table
18             # 12: adds 'file_to_delete2' table
19             # 13: modifies 'server_settings.value' to TEXT for wider values
20             # also adds a TEXT 'arg' column to file_to_queue for passing arguments
21             # 14: modifies 'device' mb_total, mb_used to INT for devs > 16TB
22             # 15: adds checksum table, adds 'hashtype' column to 'class' table
23 21     21   140 use constant SCHEMA_VERSION => 15;
  21         40  
  21         110435  
24              
25             sub new {
26 1     1 0 15 my ($class) = @_;
27 1         3 return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass max_handles));
  4         15  
28             }
29              
30             sub new_from_dsn_user_pass {
31 1     1 0 3 my ($class, $dsn, $user, $pass, $max_handles) = @_;
32 1         3 my $subclass;
33 1 50       12 if ($dsn =~ /^DBI:mysql:/i) {
    50          
    0          
    0          
34 0         0 $subclass = "MogileFS::Store::MySQL";
35             } elsif ($dsn =~ /^DBI:SQLite:/i) {
36 1         3 $subclass = "MogileFS::Store::SQLite";
37             } elsif ($dsn =~ /^DBI:Oracle:/i) {
38 0         0 $subclass = "MogileFS::Store::Oracle";
39             } elsif ($dsn =~ /^DBI:Pg:/i) {
40 0         0 $subclass = "MogileFS::Store::Postgres";
41             } else {
42 0         0 die "Unknown database type: $dsn";
43             }
44 1 50   1   334 unless (eval "use $subclass; 1") {
  1         2031  
  0            
  0            
45 1         15 die "Error loading $subclass: $@\n";
46             }
47 0         0 my $self = bless {
48             dsn => $dsn,
49             user => $user,
50             pass => $pass,
51             max_handles => $max_handles, # Max number of handles to allow
52             raise_errors => $subclass->want_raise_errors,
53             slave_list_version => 0,
54             slave_list_cache => [],
55             recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
56             recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
57             handles_left => 0, # amount of times this handle can still be verified
58             connected_slaves => {},
59             dead_slaves => {},
60             dead_backoff => {}, # how many times in a row a slave has died
61             connect_timeout => 10, # High default.
62             }, $subclass;
63 0         0 $self->init;
64 0         0 return $self;
65             }
66              
67             # Defaults to true now.
68             sub want_raise_errors {
69 0     0 0 0 1;
70             }
71              
72             sub new_from_mogdbsetup {
73 0     0 0 0 my ($class, %args) = @_;
74             # where args is: dbhost dbport dbname dbrootuser dbrootpass dbuser dbpass
75 0         0 my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}, $args{dbport});
76              
77             my $try_make_sto = sub {
78 0 0   0   0 my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
79             PrintError => 0,
80             }) or return undef;
81 0         0 my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
82 0         0 $sto->raise_errors;
83 0         0 return $sto;
84 0         0 };
85              
86             # upgrading, apparently, as this database already exists.
87 0         0 my $sto = $try_make_sto->();
88 0 0       0 return $sto if $sto;
89              
90             # otherwise, we need to make the requested database, setup permissions, etc
91 0         0 $class->status("couldn't connect to database as mogilefs user. trying root...");
92 0         0 my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}, $args{dbport});
93 0 0       0 my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
94             PrintError => 0,
95             }) or
96             die "Failed to connect to $rootdsn as specified root user ($args{dbrootuser}): " . DBI->errstr . "\n";
97 0         0 $class->status("connected to database as root user.");
98              
99 0         0 $class->confirm("Create/Upgrade database name '$args{dbname}'?");
100 0         0 $class->create_db_if_not_exists($rdbh, $args{dbname});
101 0         0 $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
102 0         0 $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
103              
104             # should be ready now:
105 0         0 $sto = $try_make_sto->();
106 0 0       0 return $sto if $sto;
107              
108 0         0 die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
109             }
110              
111             # given a root DBI connection, create the named database. succeed
112             # if it it's made, or already exists. die otherwise.
113             sub create_db_if_not_exists {
114 0     0 0 0 my ($pkg, $rdbh, $dbname) = @_;
115 0 0       0 $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
116             or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
117             }
118              
119             sub grant_privileges {
120 0     0 0 0 my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
121 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
122             undef, $pass)
123             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
124 0 0       0 $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
125             undef, $pass)
126             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
127             }
128              
129 0     0 0 0 sub can_replace { 0 }
130 0     0 0 0 sub can_insertignore { 0 }
131 0     0 0 0 sub can_insert_multi { 0 }
132 0     0 0 0 sub can_for_update { 1 }
133              
134 0     0 0 0 sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
135              
136             sub ignore_replace {
137 0     0 0 0 my $self = shift;
138 0 0       0 return "INSERT IGNORE " if $self->can_insertignore;
139 0 0       0 return "REPLACE " if $self->can_replace;
140 0         0 die "Can't INSERT IGNORE or REPLACE?";
141             }
142              
143             my $on_status = sub {};
144             my $on_confirm = sub { 1 };
145 0     0 0 0 sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
  0         0  
146 0     0 0 0 sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
  0         0  
147 0     0 0 0 sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
  0         0  
148 0 0   0 0 0 sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
  0         0  
149              
150 0     0 0 0 sub latest_schema_version { SCHEMA_VERSION }
151              
152             sub raise_errors {
153 0     0 0 0 my $self = shift;
154 0         0 $self->{raise_errors} = 1;
155 0         0 $self->dbh->{RaiseError} = 1;
156             }
157              
158 0     0 0 0 sub set_connect_timeout { $_[0]{connect_timeout} = $_[1]; }
159              
160 0     0 0 0 sub dsn { $_[0]{dsn} }
161 0     0 0 0 sub user { $_[0]{user} }
162 0     0 0 0 sub pass { $_[0]{pass} }
163              
164 0     0 0 0 sub connect_timeout { $_[0]{connect_timeout} }
165              
166 0     0 0 0 sub init { 1 }
167 0     0 0 0 sub post_dbi_connect { 1 }
168              
169 0     0 0 0 sub can_do_slaves { 0 }
170              
171             sub mark_as_slave {
172 0     0 0 0 my $self = shift;
173 0 0       0 die "Incapable of becoming slave." unless $self->can_do_slaves;
174              
175 0         0 $self->{is_slave} = 1;
176             }
177              
178             sub is_slave {
179 0     0 0 0 my $self = shift;
180 0         0 return $self->{is_slave};
181             }
182              
183             sub _slaves_list_changed {
184 0     0   0 my $self = shift;
185 0   0     0 my $ver = MogileFS::Config->server_setting_cached('slave_version') || 0;
186 0 0       0 if ($ver <= $self->{slave_list_version}) {
187 0         0 return 0;
188             }
189 0         0 $self->{slave_list_version} = $ver;
190             # Restart connections from scratch if the configuration changed.
191 0         0 $self->{connected_slaves} = {};
192 0         0 return 1;
193             }
194              
195             # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
196             sub _slaves_list {
197 0     0   0 my $self = shift;
198 0         0 my $now = time();
199              
200 0 0       0 my $sk = MogileFS::Config->server_setting_cached('slave_keys')
201             or return ();
202              
203 0         0 my @ret;
204 0         0 foreach my $key (split /\s*,\s*/, $sk) {
205 0         0 my $slave = MogileFS::Config->server_setting_cached("slave_$key");
206              
207 0 0       0 if (!$slave) {
208 0         0 error("key for slave DB config: slave_$key not found in configuration");
209 0         0 next;
210             }
211              
212 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
213 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
214 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
215 0         0 next;
216             }
217 0         0 push @ret, [$dsn, $user, $pass]
218             }
219              
220 0         0 return @ret;
221             }
222              
223             sub _pick_slave {
224 0     0   0 my $self = shift;
225 0         0 my @temp = shuffle keys %{$self->{connected_slaves}};
  0         0  
226 0 0       0 return unless @temp;
227 0         0 return $self->{connected_slaves}->{$temp[0]};
228             }
229              
230             sub _connect_slave {
231 0     0   0 my $self = shift;
232 0         0 my $slave_fulldsn = shift;
233 0         0 my $now = time();
234              
235 0   0     0 my $dead_retry =
236             MogileFS::Config->server_setting_cached('slave_dead_retry_timeout') || 15;
237              
238 0   0     0 my $dead_backoff = $self->{dead_backoff}->{$slave_fulldsn->[0]} || 0;
239 0         0 my $dead_timeout = $self->{dead_slaves}->{$slave_fulldsn->[0]};
240 0 0 0     0 return if (defined $dead_timeout
241             && $dead_timeout + ($dead_retry * $dead_backoff) > $now);
242 0 0       0 return if ($self->{connected_slaves}->{$slave_fulldsn->[0]});
243              
244 0         0 my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
245 0   0     0 $newslave->set_connect_timeout(
246             MogileFS::Config->server_setting_cached('slave_connect_timeout') || 1);
247 0         0 $self->{slave}->{next_check} = 0;
248 0         0 $newslave->mark_as_slave;
249 0 0       0 if ($self->check_slave) {
250 0         0 $self->{connected_slaves}->{$slave_fulldsn->[0]} = $newslave;
251 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = 0;
252             } else {
253             # Magic numbers are saddening...
254 0 0       0 $dead_backoff++ unless $dead_backoff > 20;
255 0         0 $self->{dead_slaves}->{$slave_fulldsn->[0]} = $now;
256 0         0 $self->{dead_backoff}->{$slave_fulldsn->[0]} = $dead_backoff;
257             }
258             }
259              
260             sub get_slave {
261 0     0 0 0 my $self = shift;
262              
263 0 0       0 die "Incapable of having slaves." unless $self->can_do_slaves;
264              
265 0         0 $self->{slave} = undef;
266 0         0 foreach my $slave (keys %{$self->{dead_slaves}}) {
  0         0  
267 0         0 my ($full_dsn) = grep { $slave eq $_->[0] } @{$self->{slave_list_cache}};
  0         0  
  0         0  
268 0 0       0 unless ($full_dsn) {
269 0         0 delete $self->{dead_slaves}->{$slave};
270 0         0 next;
271             }
272 0         0 $self->_connect_slave($full_dsn);
273             }
274              
275 0 0       0 unless ($self->_slaves_list_changed) {
276 0 0       0 if ($self->{slave} = $self->_pick_slave) {
277 0         0 $self->{slave}->{recheck_req_gen} = $self->{recheck_req_gen};
278 0 0       0 return $self->{slave} if $self->check_slave;
279             }
280             }
281              
282 0 0       0 if ($self->{slave}) {
283 0         0 my $dsn = $self->{slave}->{dsn};
284 0         0 $self->{dead_slaves}->{$dsn} = time();
285 0         0 $self->{dead_backoff}->{$dsn} = 0;
286 0         0 delete $self->{connected_slaves}->{$dsn};
287 0         0 error("Error talking to slave: $dsn");
288             }
289 0         0 my @slaves_list = $self->_slaves_list;
290              
291             # If we have no slaves, then return silently.
292 0 0       0 return unless @slaves_list;
293              
294 0         0 my $slave_skip_filtering = MogileFS::Config->server_setting_cached('slave_skip_filtering');
295              
296 0 0 0     0 unless (defined $slave_skip_filtering && $slave_skip_filtering eq 'on') {
297 0         0 MogileFS::run_global_hook('slave_list_filter', \@slaves_list);
298             }
299              
300 0         0 $self->{slave_list_cache} = \@slaves_list;
301              
302 0         0 foreach my $slave_fulldsn (@slaves_list) {
303 0         0 $self->_connect_slave($slave_fulldsn);
304             }
305              
306 0 0       0 if ($self->{slave} = $self->_pick_slave) {
307 0         0 return $self->{slave};
308             }
309 0         0 warn "Slave list exhausted, failing back to master.";
310 0         0 return;
311             }
312              
313             sub read_store {
314 0     0 0 0 my $self = shift;
315              
316 0 0       0 return $self unless $self->can_do_slaves;
317              
318 0 0       0 if ($self->{slave_ok}) {
319 0 0       0 if (my $slave = $self->get_slave) {
320 0         0 return $slave;
321             }
322             }
323              
324 0         0 return $self;
325             }
326              
327             sub slaves_ok {
328 0     0 0 0 my $self = shift;
329 0         0 my $coderef = shift;
330              
331 0 0       0 return unless ref $coderef eq 'CODE';
332              
333 0         0 local $self->{slave_ok} = 1;
334              
335 0         0 return $coderef->(@_);
336             }
337              
338             sub recheck_dbh {
339 0     0 0 0 my $self = shift;
340 0         0 $self->{recheck_req_gen}++;
341             }
342              
343             sub dbh {
344 0     0 0 0 my $self = shift;
345            
346 0 0       0 if ($self->{dbh}) {
347 0 0       0 if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
348 0 0       0 $self->{dbh} = undef unless $self->{dbh}->ping;
349             # Handles a memory leak under Solaris/Postgres.
350             # We may leak a little extra memory if we're holding a lock,
351             # since dropping a connection mid-lock is fatal
352 0 0 0     0 $self->{dbh} = undef if ($self->{max_handles} &&
      0        
353             $self->{handles_left}-- < 0 && !$self->{lock_depth});
354 0         0 $self->{recheck_done_gen} = $self->{recheck_req_gen};
355             }
356 0 0       0 return $self->{dbh} if $self->{dbh};
357             }
358              
359             # Shortcut flag: if monitor thinks the master is down, avoid attempting to
360             # connect to it for now. If we already have a connection to the master,
361             # keep using it as above.
362 0 0       0 if (!$self->is_slave) {
363 0         0 my $flag = MogileFS::Config->server_setting_cached('_master_db_alive', 0);
364 0 0 0     0 return if (defined $flag && $flag == 0);;
365             }
366              
367             # auto-reconnect is unsafe if we're holding a lock
368 0 0       0 if ($self->{lock_depth}) {
369 0         0 die "DB connection recovery unsafe, lock held: $self->{last_lock}";
370             }
371              
372 0         0 eval {
373 0     0   0 local $SIG{ALRM} = sub { die "timeout\n" };
  0         0  
374 0         0 alarm($self->connect_timeout);
375 0   0     0 $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
376             PrintError => 0,
377             AutoCommit => 1,
378             # FUTURE: will default to on (have to validate all callers first):
379             RaiseError => ($self->{raise_errors} || 0),
380             sqlite_use_immediate_transaction => 1,
381             });
382             };
383 0         0 alarm(0);
384 0 0       0 if ($@ eq "timeout\n") {
    0          
385 0         0 die "Failed to connect to database: timeout";
386             } elsif ($@) {
387 0         0 die "Failed to connect to database: " . DBI->errstr;
388             }
389 0         0 $self->post_dbi_connect;
390 0 0       0 $self->{handles_left} = $self->{max_handles} if $self->{max_handles};
391 0         0 return $self->{dbh};
392             }
393              
394 0 0   0 0 0 sub have_dbh { return 1 if $_[0]->{dbh}; }
395              
396             sub ping {
397 0     0 0 0 my $self = shift;
398 0         0 return $self->dbh->ping;
399             }
400              
401             sub condthrow {
402 0     0 0 0 my ($self, $optmsg) = @_;
403 0         0 my $dbh = $self->dbh;
404 0 0       0 return 1 unless $dbh->err;
405 0         0 my ($pkg, $fn, $line) = caller;
406 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
407 0 0       0 $msg .= ": $optmsg" if $optmsg;
408             # Auto rollback failures around transactions.
409 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
410 0         0 croak($msg);
411             }
412              
413             sub dowell {
414 0     0 0 0 my ($self, $sql, @do_params) = @_;
415 0         0 my $rv = eval { $self->dbh->do($sql, @do_params) };
  0         0  
416 0 0 0     0 return $rv unless $@ || $self->dbh->err;
417 0         0 warn "Error with SQL: $sql\n";
418 0   0     0 Carp::confess($@ || $self->dbh->errstr);
419             }
420              
421             sub _valid_params {
422 0 0   0   0 croak("Odd number of parameters!") if scalar(@_) % 2;
423 0         0 my ($self, $vlist, %uarg) = @_;
424 0         0 my %ret;
425 0         0 $ret{$_} = delete $uarg{$_} foreach @$vlist;
426 0 0       0 croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
427 0         0 return %ret;
428             }
429              
430             sub was_deadlock_error {
431 0     0 0 0 my $self = shift;
432 0         0 my $dbh = $self->dbh;
433 0         0 die "UNIMPLEMENTED";
434             }
435              
436             sub was_duplicate_error {
437 0     0 0 0 my $self = shift;
438 0         0 my $dbh = $self->dbh;
439 0         0 die "UNIMPLEMENTED";
440             }
441              
442             # run a subref (presumably a database update) in an eval, because you expect it to
443             # maybe fail on duplicate key error, and throw a dup exception for you, else return
444             # its return value
445             sub conddup {
446 0     0 0 0 my ($self, $code) = @_;
447 0         0 my $rv = eval { $code->(); };
  0         0  
448 0 0       0 throw("dup") if $self->was_duplicate_error;
449 0 0       0 croak($@) if $@;
450 0         0 return $rv;
451             }
452              
453             # insert row if doesn't already exist
454             # WARNING: This function is NOT transaction safe if the duplicate errors causes
455             # your transaction to halt!
456             # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
457             # is false! Rows before the duplicate will be inserted, but rows after the
458             # duplicate might not be, depending your database.
459             sub insert_ignore {
460 0     0 0 0 my ($self, $sql, @params) = @_;
461 0         0 my $dbh = $self->dbh;
462 0 0       0 if ($self->can_insertignore) {
463 0         0 return $dbh->do("INSERT IGNORE $sql", @params);
464             } else {
465             # TODO: Detect bad multi-row insert here.
466 0         0 my $rv = eval { $dbh->do("INSERT $sql", @params); };
  0         0  
467 0 0 0     0 if ($@ || $dbh->err) {
468 0 0       0 return 1 if $self->was_duplicate_error;
469             # This chunk is identical to condthrow, but we include it directly
470             # here as we know there is definitely an error, and we would like
471             # the caller of this function.
472 0         0 my ($pkg, $fn, $line) = caller;
473 0         0 my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
474 0         0 croak($msg);
475             }
476 0         0 return $rv;
477             }
478             }
479              
480             sub retry_on_deadlock {
481 0     0 0 0 my $self = shift;
482 0         0 my $code = shift;
483 0   0     0 my $tries = shift || 3;
484 0 0       0 croak("deadlock retries must be positive") if $tries < 1;
485 0         0 my $rv;
486              
487 0         0 while ($tries-- > 0) {
488 0         0 $rv = eval { $code->(); };
  0         0  
489 0 0       0 next if ($self->was_deadlock_error);
490 0 0       0 croak($@) if $@;
491 0         0 last;
492             }
493 0         0 return $rv;
494             }
495              
496             # --------------------------------------------------------------------------
497              
498             my @extra_tables;
499              
500             sub add_extra_tables {
501 0     0 0 0 my $class = shift;
502 0         0 push @extra_tables, @_;
503             }
504              
505 21         305402 use constant TABLES => qw( domain class file tempfile file_to_delete
506             unreachable_fids file_on file_on_corrupt host
507             device server_settings file_to_replicate
508             file_to_delete_later fsck_log file_to_queue
509 21     21   287 file_to_delete2 checksum);
  21         55  
510              
511             sub setup_database {
512 0     0 0 0 my $sto = shift;
513              
514 0         0 my $curver = $sto->schema_version;
515              
516 0         0 my $latestver = SCHEMA_VERSION;
517 0 0       0 if ($curver == $latestver) {
518 0         0 $sto->status("Schema already up-to-date at version $curver.");
519 0         0 return 1;
520             }
521              
522 0 0       0 if ($curver > $latestver) {
523 0         0 die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
524             }
525              
526 0 0       0 if ($curver) {
527 0         0 $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
528             }
529              
530 0         0 foreach my $t (TABLES, @extra_tables) {
531 0         0 $sto->create_table($t);
532             }
533              
534 0         0 $sto->upgrade_add_host_getport;
535 0         0 $sto->upgrade_add_host_altip;
536 0         0 $sto->upgrade_add_device_asof;
537 0         0 $sto->upgrade_add_device_weight;
538 0         0 $sto->upgrade_add_device_readonly;
539 0         0 $sto->upgrade_add_device_drain;
540 0         0 $sto->upgrade_add_class_replpolicy;
541 0         0 $sto->upgrade_modify_server_settings_value;
542 0         0 $sto->upgrade_add_file_to_queue_arg;
543 0         0 $sto->upgrade_modify_device_size;
544 0         0 $sto->upgrade_add_class_hashtype;
545              
546 0         0 return 1;
547             }
548              
549             sub cached_schema_version {
550 0     0 0 0 my $self = shift;
551 0   0     0 return $self->{_cached_schema_version} ||=
552             $self->schema_version;
553             }
554              
555             sub schema_version {
556 0     0 0 0 my $self = shift;
557 0         0 my $dbh = $self->dbh;
558 0   0     0 return eval {
559             $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
560             } || 0;
561             }
562              
563 0     0 0 0 sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
  0         0  
564              
565             sub create_table {
566 0     0 0 0 my ($self, $table) = @_;
567 0         0 my $dbh = $self->dbh;
568 0 0       0 return 1 if $self->table_exists($table);
569 0         0 my $meth = "TABLE_$table";
570 0         0 my $sql = $self->$meth;
571 0         0 $sql = $self->filter_create_sql($sql);
572 0         0 $self->status("Running SQL: $sql;");
573 0 0       0 $dbh->do($sql) or
574             die "Failed to create table $table: " . $dbh->errstr;
575 0         0 my $imeth = "INDEXES_$table";
576 0         0 my @indexes = eval { $self->$imeth };
  0         0  
577 0         0 foreach $sql (@indexes) {
578 0         0 $self->status("Running SQL: $sql;");
579 0 0       0 $dbh->do($sql) or
580             die "Failed to create indexes on $table: " . $dbh->errstr;
581             }
582             }
583              
584             # Please try to keep all tables aligned nicely
585             # with '"CREATE TABLE' on the first line
586             # and ')"' alone on the last line.
587              
588             sub TABLE_domain {
589             # classes are tied to domains. domains can have classes of items
590             # with different mindevcounts.
591             #
592             # a minimum devcount is the number of copies the system tries to
593             # maintain for files in that class
594             #
595             # unspecified classname means classid=0 (implicit class), and that
596             # implies mindevcount=2
597 0     0 0 0 "CREATE TABLE domain (
598             dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
599             namespace VARCHAR(255),
600             UNIQUE (namespace)
601             )"
602             }
603              
604             sub TABLE_class {
605 0     0 0 0 "CREATE TABLE class (
606             dmid SMALLINT UNSIGNED NOT NULL,
607             classid TINYINT UNSIGNED NOT NULL,
608             PRIMARY KEY (dmid,classid),
609             classname VARCHAR(50),
610             UNIQUE (dmid,classname),
611             mindevcount TINYINT UNSIGNED NOT NULL,
612             hashtype TINYINT UNSIGNED
613             )"
614             }
615              
616             # the length field is only here for easy verifications of content
617             # integrity when copying around. no sums or content types or other
618             # metadata here. application can handle that.
619             #
620             # classid is what class of file this belongs to. for instance, on fotobilder
621             # there will be a class for original pictures (the ones the user uploaded)
622             # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
623             # each domain can setup classes and assign the minimum redundancy level for
624             # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
625             # photos and and a 1 minimum for derived images (which means the sole device
626             # for a derived image can die, bringing devcount to 0 for that file, but
627             # the application can recreate it from its original)
628             sub TABLE_file {
629 0     0 0 0 "CREATE TABLE file (
630             fid INT UNSIGNED NOT NULL,
631             PRIMARY KEY (fid),
632              
633             dmid SMALLINT UNSIGNED NOT NULL,
634             dkey VARCHAR(255), # domain-defined
635             UNIQUE dkey (dmid, dkey),
636              
637             length BIGINT UNSIGNED, # big limit
638              
639             classid TINYINT UNSIGNED NOT NULL,
640             devcount TINYINT UNSIGNED NOT NULL,
641             INDEX devcount (dmid,classid,devcount)
642             )"
643             }
644              
645             sub TABLE_tempfile {
646 0     0 0 0 "CREATE TABLE tempfile (
647             fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
648             PRIMARY KEY (fid),
649              
650             createtime INT UNSIGNED NOT NULL,
651             classid TINYINT UNSIGNED NOT NULL,
652             dmid SMALLINT UNSIGNED NOT NULL,
653             dkey VARCHAR(255),
654             devids VARCHAR(60)
655             )"
656             }
657              
658             # files marked for death when their key is overwritten. then they get a new
659             # fid, but since the old row (with the old fid) had to be deleted immediately,
660             # we need a place to store the fid so an async job can delete the file from
661             # all devices.
662             sub TABLE_file_to_delete {
663 0     0 0 0 "CREATE TABLE file_to_delete (
664             fid INT UNSIGNED NOT NULL,
665             PRIMARY KEY (fid)
666             )"
667             }
668              
669             # if the replicator notices that a fid has no sources, that file gets inserted
670             # into the unreachable_fids table. it is up to the application to actually
671             # handle fids stored in this table.
672             sub TABLE_unreachable_fids {
673 0     0 0 0 "CREATE TABLE unreachable_fids (
674             fid INT UNSIGNED NOT NULL,
675             lastupdate INT UNSIGNED NOT NULL,
676             PRIMARY KEY (fid),
677             INDEX (lastupdate)
678             )"
679             }
680              
681             # what files are on what devices? (most likely physical devices,
682             # as logical devices of RAID arrays would be costly, and mogilefs
683             # already handles redundancy)
684             #
685             # the devid index lets us answer "What files were on this now-dead disk?"
686             sub TABLE_file_on {
687 0     0 0 0 "CREATE TABLE file_on (
688             fid INT UNSIGNED NOT NULL,
689             devid MEDIUMINT UNSIGNED NOT NULL,
690             PRIMARY KEY (fid, devid),
691             INDEX (devid)
692             )"
693             }
694              
695             # if application or framework detects an error in one of the duplicate files
696             # for whatever reason, it can register its complaint and the framework
697             # will do some verifications and fix things up w/ an async job
698             # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
699             # on the other devices?
700             sub TABLE_file_on_corrupt {
701 0     0 0 0 "CREATE TABLE file_on_corrupt (
702             fid INT UNSIGNED NOT NULL,
703             devid MEDIUMINT UNSIGNED NOT NULL,
704             PRIMARY KEY (fid, devid)
705             )"
706             }
707              
708             # hosts (which contain devices...)
709             sub TABLE_host {
710 0     0 0 0 "CREATE TABLE host (
711             hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
712              
713             status ENUM('alive','dead','down'),
714             http_port MEDIUMINT UNSIGNED DEFAULT 7500,
715             http_get_port MEDIUMINT UNSIGNED,
716              
717             hostname VARCHAR(40),
718             hostip VARCHAR(15),
719             altip VARCHAR(15),
720             altmask VARCHAR(18),
721             UNIQUE (hostname),
722             UNIQUE (hostip),
723             UNIQUE (altip)
724             )"
725             }
726              
727             # disks...
728             sub TABLE_device {
729 0     0 0 0 "CREATE TABLE device (
730             devid MEDIUMINT UNSIGNED NOT NULL,
731             hostid MEDIUMINT UNSIGNED NOT NULL,
732              
733             status ENUM('alive','dead','down'),
734             weight MEDIUMINT DEFAULT 100,
735              
736             mb_total INT UNSIGNED,
737             mb_used INT UNSIGNED,
738             mb_asof INT UNSIGNED,
739             PRIMARY KEY (devid),
740             INDEX (status)
741             )"
742             }
743              
744             sub TABLE_server_settings {
745 0     0 0 0 "CREATE TABLE server_settings (
746             field VARCHAR(50) PRIMARY KEY,
747             value TEXT
748             )"
749             }
750              
751             sub TABLE_file_to_replicate {
752             # nexttry is time to try to replicate it next.
753             # 0 means immediate. it's only on one host.
754             # 1 means lower priority. it's on 2+ but isn't happy where it's at.
755             # unix timestamp means at/after that time. some previous error occurred.
756             # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
757             # failcount. how many times we've failed, just for doing backoff of nexttry.
758             # flags. reserved for future use.
759 0     0 0 0 "CREATE TABLE file_to_replicate (
760             fid INT UNSIGNED NOT NULL PRIMARY KEY,
761             nexttry INT UNSIGNED NOT NULL,
762             INDEX (nexttry),
763             fromdevid INT UNSIGNED,
764             failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
765             flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
766             )"
767             }
768              
769             sub TABLE_file_to_delete_later {
770 0     0 0 0 "CREATE TABLE file_to_delete_later (
771             fid INT UNSIGNED NOT NULL PRIMARY KEY,
772             delafter INT UNSIGNED NOT NULL,
773             INDEX (delafter)
774             )"
775             }
776              
777             sub TABLE_fsck_log {
778 0     0 0 0 "CREATE TABLE fsck_log (
779             logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
780             PRIMARY KEY (logid),
781             utime INT UNSIGNED NOT NULL,
782             fid INT UNSIGNED NULL,
783             evcode CHAR(4),
784             devid MEDIUMINT UNSIGNED,
785             INDEX(utime)
786             )"
787             }
788              
789             # generic queue table, designed to be used for workers/jobs which aren't
790             # constantly in use, and are async to the user.
791             # ie; fsck, drain, rebalance.
792             sub TABLE_file_to_queue {
793 0     0 0 0 "CREATE TABLE file_to_queue (
794             fid INT UNSIGNED NOT NULL,
795             devid INT UNSIGNED,
796             type TINYINT UNSIGNED NOT NULL,
797             nexttry INT UNSIGNED NOT NULL,
798             failcount TINYINT UNSIGNED NOT NULL default '0',
799             flags SMALLINT UNSIGNED NOT NULL default '0',
800             arg TEXT,
801             PRIMARY KEY (fid, type),
802             INDEX type_nexttry (type,nexttry)
803             )"
804             }
805              
806             # new style async delete table.
807             # this is separate from file_to_queue since deletes are more actively used,
808             # and partitioning on 'type' doesn't always work so well.
809             sub TABLE_file_to_delete2 {
810 0     0 0 0 "CREATE TABLE file_to_delete2 (
811             fid INT UNSIGNED NOT NULL PRIMARY KEY,
812             nexttry INT UNSIGNED NOT NULL,
813             failcount TINYINT UNSIGNED NOT NULL default '0',
814             INDEX nexttry (nexttry)
815             )"
816             }
817              
818             sub TABLE_checksum {
819 0     0 0 0 "CREATE TABLE checksum (
820             fid INT UNSIGNED NOT NULL PRIMARY KEY,
821             hashtype TINYINT UNSIGNED NOT NULL,
822             checksum VARBINARY(64) NOT NULL
823             )"
824             }
825              
826             # these five only necessary for MySQL, since no other database existed
827             # before, so they can just create the tables correctly to begin with.
828             # in the future, there might be new alters that non-MySQL databases
829             # will have to implement.
830 0     0 0 0 sub upgrade_add_host_getport { 1 }
831 0     0 0 0 sub upgrade_add_host_altip { 1 }
832 0     0 0 0 sub upgrade_add_device_asof { 1 }
833 0     0 0 0 sub upgrade_add_device_weight { 1 }
834 0     0 0 0 sub upgrade_add_device_readonly { 1 }
835 0     0 0 0 sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
836 0     0 0 0 sub upgrade_modify_server_settings_value { die "Not implemented in $_[0]" }
837 0     0 0 0 sub upgrade_add_file_to_queue_arg { die "Not implemented in $_[0]" }
838 0     0 0 0 sub upgrade_modify_device_size { die "Not implemented in $_[0]" }
839              
840             sub upgrade_add_class_replpolicy {
841 0     0 0 0 my ($self) = @_;
842 0 0       0 unless ($self->column_type("class", "replpolicy")) {
843 0         0 $self->dowell("ALTER TABLE class ADD COLUMN replpolicy VARCHAR(255)");
844             }
845             }
846              
847             sub upgrade_add_class_hashtype {
848 0     0 0 0 my ($self) = @_;
849 0 0       0 unless ($self->column_type("class", "hashtype")) {
850 0         0 $self->dowell("ALTER TABLE class ADD COLUMN hashtype TINYINT UNSIGNED");
851             }
852             }
853              
854             # return true if deleted, 0 if didn't exist, exception if error
855             sub delete_host {
856 0     0 0 0 my ($self, $hostid) = @_;
857 0         0 return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
858             }
859              
860             # return true if deleted, 0 if didn't exist, exception if error
861             sub delete_domain {
862 0     0 0 0 my ($self, $dmid) = @_;
863 0         0 my ($err, $rv);
864 0         0 my $dbh = $self->dbh;
865 0         0 eval {
866 0         0 $dbh->begin_work;
867 0 0       0 if ($self->domain_has_files($dmid)) {
    0          
868 0         0 $err = "has_files";
869             } elsif ($self->domain_has_classes($dmid)) {
870 0         0 $err = "has_classes";
871             } else {
872 0         0 $rv = $dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
873              
874             # remove the "default" class if one was created (for mindevcount)
875             # this is currently the only way to delete the "default" class
876 0         0 $dbh->do("DELETE FROM class WHERE dmid = ? AND classid = 0", undef, $dmid);
877 0         0 $dbh->commit;
878             }
879 0 0       0 $dbh->rollback if $err;
880             };
881 0         0 $self->condthrow; # will rollback on errors
882 0 0       0 throw($err) if $err;
883 0         0 return $rv;
884             }
885              
886             sub domain_has_files {
887 0     0 0 0 my ($self, $dmid) = @_;
888 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
889             undef, $dmid);
890 0 0       0 return $has_a_fid ? 1 : 0;
891             }
892              
893             sub domain_has_classes {
894 0     0 0 0 my ($self, $dmid) = @_;
895             # queryworker does not permit removing default class, so domain_has_classes
896             # should not register the default class
897 0         0 my $has_a_class = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classid != 0 LIMIT 1',
898             undef, $dmid);
899 0         0 return defined($has_a_class);
900             }
901              
902             sub class_has_files {
903 0     0 0 0 my ($self, $dmid, $clid) = @_;
904 0         0 my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
905             undef, $dmid, $clid);
906 0 0       0 return $has_a_fid ? 1 : 0;
907             }
908              
909             # return new classid on success (non-zero integer), die on failure
910             # throw 'dup' on duplicate name
911             sub create_class {
912 0     0 0 0 my ($self, $dmid, $classname) = @_;
913 0         0 my $dbh = $self->dbh;
914              
915 0         0 my ($clsid, $rv);
916              
917 0         0 eval {
918 0         0 $dbh->begin_work;
919 0 0       0 if ($classname eq 'default') {
920 0         0 $clsid = 0;
921             } else {
922             # get the max class id in this domain
923 0   0     0 my $maxid = $dbh->selectrow_array
924             ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
925 0         0 $clsid = $maxid + 1;
926             }
927             # now insert the new class
928 0         0 $rv = $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
929             undef, $dmid, $clsid, $classname, 2);
930 0 0       0 $dbh->commit if $rv;
931             };
932 0 0 0     0 if ($@ || $dbh->err) {
933 0 0       0 if ($self->was_duplicate_error) {
934             # ensure we're not inside a transaction
935 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
936 0         0 throw("dup");
937             }
938             }
939 0         0 $self->condthrow; # this will rollback on errors
940 0 0       0 return $clsid if $rv;
941 0         0 die;
942             }
943              
944             # return 1 on success, throw "dup" on duplicate name error, die otherwise
945             sub update_class_name {
946 0     0 0 0 my $self = shift;
947 0         0 my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
948 0         0 my $rv = eval {
949 0         0 $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
950             undef, $arg{classname}, $arg{dmid}, $arg{classid});
951             };
952 0 0       0 throw("dup") if $self->was_duplicate_error;
953 0         0 $self->condthrow;
954 0         0 return 1;
955             }
956              
957             # return 1 on success, die otherwise
958             sub update_class_mindevcount {
959 0     0 0 0 my $self = shift;
960 0         0 my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
961 0         0 eval {
962 0         0 $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
963             undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
964             };
965 0         0 $self->condthrow;
966 0         0 return 1;
967             }
968              
969             # return 1 on success, die otherwise
970             sub update_class_replpolicy {
971 0     0 0 0 my $self = shift;
972 0         0 my %arg = $self->_valid_params([qw(dmid classid replpolicy)], @_);
973 0         0 eval {
974 0         0 $self->dbh->do("UPDATE class SET replpolicy=? WHERE dmid=? AND classid=?",
975             undef, $arg{replpolicy}, $arg{dmid}, $arg{classid});
976             };
977 0         0 $self->condthrow;
978 0         0 return 1;
979             }
980              
981             # return 1 on success, die otherwise
982             sub update_class_hashtype {
983 0     0 0 0 my $self = shift;
984 0         0 my %arg = $self->_valid_params([qw(dmid classid hashtype)], @_);
985 0         0 eval {
986 0         0 $self->dbh->do("UPDATE class SET hashtype=? WHERE dmid=? AND classid=?",
987             undef, $arg{hashtype}, $arg{dmid}, $arg{classid});
988             };
989 0         0 $self->condthrow;
990             }
991              
992             sub nfiles_with_dmid_classid_devcount {
993 0     0 0 0 my ($self, $dmid, $classid, $devcount) = @_;
994 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
995             undef, $dmid, $classid, $devcount);
996             }
997              
998             sub set_server_setting {
999 0     0 0 0 my ($self, $key, $val) = @_;
1000 0         0 my $dbh = $self->dbh;
1001 0 0       0 die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
1002              
1003 0         0 eval {
1004 0 0       0 if (defined $val) {
1005 0         0 $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
1006             } else {
1007 0         0 $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
1008             }
1009             };
1010              
1011 0 0       0 die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
1012 0         0 return 1;
1013             }
1014              
1015             # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
1016             sub incr_server_setting {
1017 0     0 0 0 my ($self, $key, $val) = @_;
1018 0 0       0 $val = 1 unless defined $val;
1019 0 0       0 return unless $val;
1020              
1021 0 0       0 return 1 if $self->dbh->do("UPDATE server_settings ".
1022             "SET value=value+? ".
1023             "WHERE field=?", undef,
1024             $val, $key) > 0;
1025 0         0 $self->set_server_setting($key, $val);
1026             }
1027              
1028             sub server_setting {
1029 0     0 0 0 my ($self, $key) = @_;
1030 0         0 return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
1031             undef, $key);
1032             }
1033              
1034             sub server_settings {
1035 0     0 0 0 my ($self) = @_;
1036 0         0 my $ret = {};
1037 0         0 my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
1038 0         0 $sth->execute;
1039 0         0 while (my ($k, $v) = $sth->fetchrow_array) {
1040 0         0 $ret->{$k} = $v;
1041             }
1042 0         0 return $ret;
1043             }
1044              
1045             # register a tempfile and return the fidid, which should be allocated
1046             # using autoincrement/sequences if the passed in fid is undef. however,
1047             # if fid is passed in, that value should be used and returned.
1048             #
1049             # return new/passed in fidid on success.
1050             # throw 'dup' if fid already in use
1051             # return 0/undef/die on failure
1052             #
1053             sub register_tempfile {
1054 0     0 0 0 my $self = shift;
1055 0         0 my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
1056              
1057 0         0 my $dbh = $self->dbh;
1058 0         0 my $fid = $arg{fid};
1059              
1060 0 0       0 my $explicit_fid_used = $fid ? 1 : 0;
1061              
1062             # setup the new mapping. we store the devices that we picked for
1063             # this file in here, knowing that they might not be used. create_close
1064             # is responsible for actually mapping in file_on. NOTE: fid is being
1065             # passed in, it's either some number they gave us, or it's going to be
1066             # 0/undef which translates into NULL which means to automatically create
1067             # one. that should be fine.
1068             my $ins_tempfile = sub {
1069 0     0   0 my $rv = eval {
1070             # We must only pass the correct number of bind parameters
1071             # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
1072             # Postgres, where you are expected to leave it out or use DEFAULT
1073             # Leaving it out seems sanest and least likely to cause problems
1074             # with other databases.
1075 0         0 my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
1076 0         0 my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
1077 0   0     0 my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
1078             # Do not check for $explicit_fid_used, but rather $fid directly
1079             # as this anonymous sub is called from the loop later
1080 0 0       0 if($fid) {
1081 0         0 unshift @keys, 'fid';
1082 0         0 unshift @vars, '?';
1083 0         0 unshift @vals, $fid;
1084             }
1085 0         0 my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
1086 0         0 $dbh->do($sql, undef, @vals);
1087             };
1088 0 0       0 if (!$rv) {
1089 0 0       0 return undef if $self->was_duplicate_error;
1090 0         0 die "Unexpected db error into tempfile: " . $dbh->errstr;
1091             }
1092              
1093 0 0       0 unless (defined $fid) {
1094             # if they did not give us a fid, then we want to grab the one that was
1095             # theoretically automatically generated
1096 0 0       0 $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
1097             or die "No last_insert_id found";
1098             }
1099 0 0 0     0 return undef unless defined $fid && $fid > 0;
1100 0         0 return 1;
1101 0         0 };
1102              
1103 0 0       0 unless ($ins_tempfile->()) {
1104 0 0       0 throw("dup") if $explicit_fid_used;
1105 0         0 die "tempfile insert failed";
1106             }
1107              
1108             my $fid_in_use = sub {
1109 0     0   0 my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
1110 0 0       0 return $exists ? 1 : 0;
1111 0         0 };
1112              
1113             # See notes in MogileFS::Config->check_database
1114 0         0 my $min_fidid = MogileFS::Config->config('min_fidid');
1115              
1116             # if the fid is in use, do something
1117 0   0     0 while ($fid_in_use->($fid) || $fid <= $min_fidid) {
1118 0 0       0 throw("dup") if $explicit_fid_used;
1119              
1120             # be careful of databases which reset their
1121             # auto-increment/sequences when the table is empty (InnoDB
1122             # did/does this, for instance). So check if it's in use, and
1123             # re-seed the table with the highest known fid from the file
1124             # table.
1125              
1126             # get the highest fid from the filetable and insert a dummy row
1127 0         0 $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
1128 0         0 $ins_tempfile->(); # don't care about its result
1129              
1130             # then do a normal auto-increment
1131 0         0 $fid = undef;
1132 0 0       0 $ins_tempfile->() or die "register_tempfile failed after seeding";
1133             }
1134              
1135 0         0 return $fid;
1136             }
1137              
1138             # return hashref of row containing columns "fid, dmid, dkey, length,
1139             # classid, devcount" provided a $dmid and $key (dkey). or undef if no
1140             # row.
1141             sub file_row_from_dmid_key {
1142 0     0 0 0 my ($self, $dmid, $key) = @_;
1143 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1144             "FROM file WHERE dmid=? AND dkey=?",
1145             undef, $dmid, $key);
1146             }
1147              
1148             # return hashref of row containing columns "fid, dmid, dkey, length,
1149             # classid, devcount" provided a $fidid or undef if no row.
1150             sub file_row_from_fidid {
1151 0     0 0 0 my ($self, $fidid) = @_;
1152 0         0 return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
1153             "FROM file WHERE fid=?",
1154             undef, $fidid);
1155             }
1156              
1157             # return an arrayref of rows containing columns "fid, dmid, dkey, length,
1158             # classid, devcount" provided a pair of $fidid or undef if no rows.
1159             sub file_row_from_fidid_range {
1160 0     0 0 0 my ($self, $fromfid, $count) = @_;
1161 0         0 my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
1162             "FROM file WHERE fid > ? LIMIT ?");
1163 0         0 $sth->execute($fromfid,$count);
1164 0         0 return $sth->fetchall_arrayref({});
1165             }
1166              
1167             # return array of devids that a fidid is on
1168             sub fid_devids {
1169 0     0 0 0 my ($self, $fidid) = @_;
1170 0 0       0 return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
  0         0  
1171             undef, $fidid) || [] };
1172             }
1173              
1174             # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
1175             sub fid_devids_multiple {
1176 0     0 0 0 my ($self, @fidids) = @_;
1177 0         0 my $in = join(",", map { $_+0 } @fidids);
  0         0  
1178 0         0 my $ret = {};
1179 0         0 my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
1180 0         0 $sth->execute;
1181 0         0 while (my ($fidid, $devid) = $sth->fetchrow_array) {
1182 0   0     0 push @{$ret->{$fidid} ||= []}, $devid;
  0         0  
1183             }
1184 0         0 return $ret;
1185             }
1186              
1187             # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
1188             sub tempfile_row_from_fid {
1189 0     0 0 0 my ($self, $fidid) = @_;
1190 0         0 return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey, devids ".
1191             "FROM tempfile WHERE fid=?",
1192             undef, $fidid);
1193             }
1194              
1195             # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
1196             sub create_device {
1197 0     0 0 0 my ($self, $devid, $hostid, $status) = @_;
1198             my $rv = $self->conddup(sub {
1199 0     0   0 $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
1200             $devid, $hostid, $status);
1201 0         0 });
1202 0         0 $self->condthrow;
1203 0 0       0 die "error making device $devid\n" unless $rv > 0;
1204 0         0 return 1;
1205             }
1206              
1207             sub update_device {
1208 0     0 0 0 my ($self, $devid, $to_update) = @_;
1209 0         0 my @keys = sort keys %$to_update;
1210 0 0       0 return unless @keys;
1211             $self->conddup(sub {
1212 0         0 $self->dbh->do("UPDATE device SET " . join('=?, ', @keys)
1213 0     0   0 . "=? WHERE devid=?", undef, (map { $to_update->{$_} } @keys),
1214             $devid);
1215 0         0 });
1216 0         0 return 1;
1217             }
1218              
1219             sub update_device_usage {
1220 0     0 0 0 my $self = shift;
1221 0         0 my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
1222 0         0 eval {
1223 0         0 $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
1224             " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
1225             };
1226 0         0 $self->condthrow;
1227             }
1228              
1229             # This is unimplemented at the moment as we must verify:
1230             # - no file_on rows exist
1231             # - nothing in file_to_queue is going to attempt to use it
1232             # - nothing in file_to_replicate is going to attempt to use it
1233             # - it's already been marked dead
1234             # - that all trackers are likely to know this :/
1235             # - ensure the devid can't be reused
1236             # IE; the user can't mark it dead then remove it all at once and cause their
1237             # cluster to implode.
1238             sub delete_device {
1239 0     0 0 0 die "Unimplemented; needs further testing";
1240             }
1241              
1242             sub mark_fidid_unreachable {
1243 0     0 0 0 my ($self, $fidid) = @_;
1244 0 0       0 die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
1245 0         0 $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
1246             undef, $fidid);
1247             }
1248              
1249             sub set_device_weight {
1250 0     0 0 0 my ($self, $devid, $weight) = @_;
1251 0         0 eval {
1252 0         0 $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
1253             };
1254 0         0 $self->condthrow;
1255             }
1256              
1257             sub set_device_state {
1258 0     0 0 0 my ($self, $devid, $state) = @_;
1259 0         0 eval {
1260 0         0 $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
1261             };
1262 0         0 $self->condthrow;
1263             }
1264              
1265             sub delete_class {
1266 0     0 0 0 my ($self, $dmid, $cid) = @_;
1267 0 0       0 throw("has_files") if $self->class_has_files($dmid, $cid);
1268 0         0 eval {
1269 0         0 $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
1270             };
1271 0         0 $self->condthrow;
1272             }
1273              
1274             # called from a queryworker process, will trigger delete_fidid_enqueued
1275             # in the delete worker
1276             sub delete_fidid {
1277 0     0 0 0 my ($self, $fidid) = @_;
1278 0         0 eval { $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); };
  0         0  
1279 0         0 $self->condthrow;
1280 0         0 $self->enqueue_for_delete2($fidid, 0);
1281 0         0 $self->condthrow;
1282             }
1283              
1284             # Only called from delete workers (after delete_fidid),
1285             # this reduces client-visible latency from the queryworker
1286             sub delete_fidid_enqueued {
1287 0     0 0 0 my ($self, $fidid) = @_;
1288 0         0 eval { $self->delete_checksum($fidid); };
  0         0  
1289 0         0 $self->condthrow;
1290 0         0 eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1291 0         0 $self->condthrow;
1292             }
1293              
1294             sub delete_tempfile_row {
1295 0     0 0 0 my ($self, $fidid) = @_;
1296 0         0 my $rv = eval { $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); };
  0         0  
1297 0         0 $self->condthrow;
1298 0         0 return $rv;
1299             }
1300              
1301             # Load the specified tempfile, then delete it. If we succeed, we were
1302             # here first; otherwise, someone else beat us here (and we return undef)
1303             sub delete_and_return_tempfile_row {
1304 0     0 0 0 my ($self, $fidid) = @_;
1305 0         0 my $rv = $self->tempfile_row_from_fid($fidid);
1306 0         0 my $rows_deleted = $self->delete_tempfile_row($fidid);
1307 0 0       0 return $rv if ($rows_deleted > 0);
1308             }
1309              
1310             sub replace_into_file {
1311 0     0 0 0 my $self = shift;
1312 0         0 my %arg = $self->_valid_params([qw(fidid dmid key length classid devcount)], @_);
1313 0 0       0 die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
1314 0         0 eval {
1315 0         0 $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
1316             "VALUES (?,?,?,?,?,?) ", undef,
1317             @arg{'fidid', 'dmid', 'key', 'length', 'classid', 'devcount'});
1318             };
1319 0         0 $self->condthrow;
1320             }
1321              
1322             # returns 1 on success, 0 on duplicate key error, dies on exception
1323             # TODO: need a test to hit the duplicate name error condition
1324             # TODO: switch to using "dup" exception here?
1325             sub rename_file {
1326 0     0 0 0 my ($self, $fidid, $to_key) = @_;
1327 0         0 my $dbh = $self->dbh;
1328 0         0 eval {
1329 0         0 $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
1330             undef, $to_key, $fidid);
1331             };
1332 0 0 0     0 if ($@ || $dbh->err) {
1333             # first is MySQL's error code for duplicates
1334 0 0       0 if ($self->was_duplicate_error) {
1335 0         0 return 0;
1336             } else {
1337 0         0 die $@;
1338             }
1339             }
1340 0         0 $self->condthrow;
1341 0         0 return 1;
1342             }
1343              
1344             sub get_domainid_by_name {
1345 0     0 0 0 my $self = shift;
1346 0         0 my ($dmid) = $self->dbh->selectrow_array('SELECT dmid FROM domain WHERE namespace = ?',
1347             undef, $_[0]);
1348 0         0 return $dmid;
1349             }
1350              
1351             # returns a hash of domains. Key is namespace, value is dmid.
1352             sub get_all_domains {
1353 0     0 0 0 my ($self) = @_;
1354 0         0 my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1355 0 0       0 return map { ($_->[0], $_->[1]) } @{$domains || []};
  0         0  
  0         0  
1356             }
1357              
1358             sub get_classid_by_name {
1359 0     0 0 0 my $self = shift;
1360 0         0 my ($classid) = $self->dbh->selectrow_array('SELECT classid FROM class WHERE dmid = ? AND classname = ?',
1361             undef, $_[0], $_[1]);
1362 0         0 return $classid;
1363             }
1364              
1365             # returns an array of hashrefs, one hashref per row in the 'class' table
1366             sub get_all_classes {
1367 0     0 0 0 my ($self) = @_;
1368 0         0 my (@ret, $row);
1369              
1370 0         0 my @cols = qw/dmid classid classname mindevcount/;
1371 0 0       0 if ($self->cached_schema_version >= 10) {
1372 0         0 push @cols, 'replpolicy';
1373 0 0       0 if ($self->cached_schema_version >= 15) {
1374 0         0 push @cols, 'hashtype';
1375             }
1376             }
1377 0         0 my $cols = join(', ', @cols);
1378 0         0 my $sth = $self->dbh->prepare("SELECT $cols FROM class");
1379 0         0 $sth->execute;
1380 0         0 push @ret, $row while $row = $sth->fetchrow_hashref;
1381 0         0 return @ret;
1382             }
1383              
1384             # add a record of fidid existing on devid
1385             # returns 1 on success, 0 on duplicate
1386             sub add_fidid_to_devid {
1387 0     0 0 0 my ($self, $fidid, $devid) = @_;
1388 0 0       0 croak("fidid not non-zero") unless $fidid;
1389 0 0       0 croak("devid not non-zero") unless $devid;
1390              
1391             # TODO: This should possibly be insert_ignore instead
1392             # As if we are adding an extra file_on entry, we do not want to replace the
1393             # exist one. Check REPLACE semantics.
1394 0         0 my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1395             undef, $fidid, $devid);
1396 0 0       0 return 1 if $rv > 0;
1397 0         0 return 0;
1398             }
1399              
1400             # remove a record of fidid existing on devid
1401             # returns 1 on success, 0 if not there anyway
1402             sub remove_fidid_from_devid {
1403 0     0 0 0 my ($self, $fidid, $devid) = @_;
1404 0         0 my $rv = eval { $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
  0         0  
1405             undef, $fidid, $devid); };
1406 0         0 $self->condthrow;
1407 0         0 return $rv;
1408             }
1409              
1410             # Test if host exists.
1411             sub get_hostid_by_id {
1412 0     0 0 0 my $self = shift;
1413 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostid = ?',
1414             undef, $_[0]);
1415 0         0 return $hostid;
1416             }
1417              
1418             sub get_hostid_by_name {
1419 0     0 0 0 my $self = shift;
1420 0         0 my ($hostid) = $self->dbh->selectrow_array('SELECT hostid FROM host WHERE hostname = ?',
1421             undef, $_[0]);
1422 0         0 return $hostid;
1423             }
1424              
1425             # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1426             sub get_all_hosts {
1427 0     0 0 0 my ($self) = @_;
1428 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1429             "hostip, http_port, http_get_port, altip, altmask FROM host");
1430 0         0 $sth->execute;
1431 0         0 my @ret;
1432 0         0 while (my $row = $sth->fetchrow_hashref) {
1433 0         0 push @ret, $row;
1434             }
1435 0         0 return @ret;
1436             }
1437              
1438             # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1439             sub get_all_devices {
1440 0     0 0 0 my ($self) = @_;
1441 0         0 my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1442             "mb_used, mb_asof, status, weight FROM device");
1443 0         0 $self->condthrow;
1444 0         0 $sth->execute;
1445 0         0 my @return;
1446 0         0 while (my $row = $sth->fetchrow_hashref) {
1447 0         0 push @return, $row;
1448             }
1449 0         0 return @return;
1450             }
1451              
1452             # update the device count for a given fidid
1453             sub update_devcount {
1454 0     0 0 0 my ($self, $fidid) = @_;
1455 0         0 my $dbh = $self->dbh;
1456 0         0 my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1457             undef, $fidid);
1458              
1459 0         0 eval { $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
  0         0  
1460             $ct, $fidid); };
1461 0         0 $self->condthrow;
1462              
1463 0         0 return 1;
1464             }
1465              
1466             # update the classid for a given fidid
1467             sub update_classid {
1468 0     0 0 0 my ($self, $fidid, $classid) = @_;
1469 0         0 my $dbh = $self->dbh;
1470              
1471 0         0 $dbh->do("UPDATE file SET classid=? WHERE fid=?", undef,
1472             $classid, $fidid);
1473              
1474 0         0 $self->condthrow;
1475 0         0 return 1;
1476             }
1477              
1478             # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1479             sub enqueue_for_replication {
1480 0     0 0 0 my ($self, $fidid, $from_devid, $in) = @_;
1481              
1482 0         0 my $nexttry = 0;
1483 0 0       0 if ($in) {
1484 0         0 $nexttry = $self->unix_timestamp . " + " . int($in);
1485             }
1486              
1487             $self->retry_on_deadlock(sub {
1488 0     0   0 $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1489             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1490 0         0 });
1491             }
1492              
1493             # enqueue a fidid for delete
1494             # note: if we get one more "independent" queue like this, the
1495             # code should be collapsable? I tried once and it looked too ugly, so we have
1496             # some redundancy.
1497             sub enqueue_for_delete2 {
1498 0     0 0 0 my ($self, $fidid, $in) = @_;
1499              
1500 0 0       0 $in = 0 unless $in;
1501 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1502              
1503             $self->retry_on_deadlock(sub {
1504 0     0   0 $self->insert_ignore("INTO file_to_delete2 (fid, nexttry) ".
1505             "VALUES (?,$nexttry)", undef, $fidid);
1506 0         0 });
1507             }
1508              
1509             # enqueue a fidid for work
1510             sub enqueue_for_todo {
1511 0     0 0 0 my ($self, $fidid, $type, $in) = @_;
1512              
1513 0 0       0 $in = 0 unless $in;
1514 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1515              
1516             $self->retry_on_deadlock(sub {
1517 0 0   0   0 if (ref($fidid)) {
1518 0         0 $self->insert_ignore("INTO file_to_queue (fid, devid, arg, type, ".
1519             "nexttry) VALUES (?,?,?,?,$nexttry)", undef,
1520             $fidid->[0], $fidid->[1], $fidid->[2], $type);
1521             } else {
1522 0         0 $self->insert_ignore("INTO file_to_queue (fid, type, nexttry) ".
1523             "VALUES (?,?,$nexttry)", undef, $fidid, $type);
1524             }
1525 0         0 });
1526             }
1527              
1528             # return 1 on success. die otherwise.
1529             sub enqueue_many_for_todo {
1530 0     0 0 0 my ($self, $fidids, $type, $in) = @_;
1531 0 0 0     0 if (! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1532 0         0 $self->enqueue_for_todo($_, $type, $in) foreach @$fidids;
1533 0         0 return 1;
1534             }
1535              
1536 0 0       0 $in = 0 unless $in;
1537 0         0 my $nexttry = $self->unix_timestamp . " + " . int($in);
1538              
1539             # TODO: convert to prepared statement?
1540             $self->retry_on_deadlock(sub {
1541 0 0   0   0 if (ref($fidids->[0]) eq 'ARRAY') {
1542 0         0 my $sql = $self->ignore_replace .
1543             "INTO file_to_queue (fid, devid, arg, type, nexttry) VALUES ".
1544             join(', ', ('(?,?,?,?,?)') x scalar @$fidids);
1545 0         0 $self->dbh->do($sql, undef, map { @$_, $type, $nexttry } @$fidids);
  0         0  
1546             } else {
1547 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_queue (fid, type,
1548             nexttry) VALUES " .
1549 0         0 join(",", map { "(" . int($_) . ", $type, $nexttry)" } @$fidids));
1550             }
1551 0         0 });
1552 0         0 $self->condthrow;
1553             }
1554              
1555             # For file_to_queue queues that should be kept small, find the size.
1556             # This isn't fast, but for small queues won't be slow, and is usually only ran
1557             # from a single tracker.
1558             sub file_queue_length {
1559 0     0 0 0 my $self = shift;
1560 0         0 my $type = shift;
1561              
1562 0         0 return $self->dbh->selectrow_array("SELECT COUNT(*) FROM file_to_queue " .
1563             "WHERE type = ?", undef, $type);
1564             }
1565              
1566             # reschedule all deferred replication, return number rescheduled
1567             sub replicate_now {
1568 0     0 0 0 my ($self) = @_;
1569              
1570             $self->retry_on_deadlock(sub {
1571 0     0   0 return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1572             " WHERE nexttry > " . $self->unix_timestamp);
1573 0         0 });
1574             }
1575              
1576             # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1577             sub get_fidids_by_device {
1578 0     0 0 0 my ($self, $devid, $limit) = @_;
1579              
1580 0         0 my $dbh = $self->dbh;
1581 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1582             undef, $devid);
1583 0         0 return $fidids;
1584             }
1585              
1586             # finds a chunk of fids given a set of constraints:
1587             # devid, fidid, age (new or old), limit
1588             # Note that if this function is very slow on your large DB, you're likely
1589             # sorting by "newfiles" and are missing a new index.
1590             # returns an arrayref of fidids
1591             sub get_fidid_chunks_by_device {
1592 0     0 0 0 my ($self, %o) = @_;
1593              
1594 0         0 my $dbh = $self->dbh;
1595 0         0 my $devid = delete $o{devid};
1596 0 0       0 croak("must supply at least a devid") unless $devid;
1597 0         0 my $age = delete $o{age};
1598 0         0 my $fidid = delete $o{fidid};
1599 0         0 my $limit = delete $o{limit};
1600 0 0       0 croak("invalid options: " . join(', ', keys %o)) if %o;
1601             # If supplied a "previous" fidid, we're paging through.
1602 0         0 my $fidsort = '';
1603 0         0 my $order = '';
1604 0   0     0 $age ||= 'old';
1605 0 0       0 if ($age eq 'old') {
    0          
1606 0 0       0 $fidsort = 'AND fid > ?' if $fidid;
1607 0         0 $order = 'ASC';
1608             } elsif ($age eq 'new') {
1609 0 0       0 $fidsort = 'AND fid < ?' if $fidid;
1610 0         0 $order = 'DESC';
1611             } else {
1612 0         0 croak("invalid age argument: " . $age);
1613             }
1614 0   0     0 $limit ||= 100;
1615 0         0 my @extra = ();
1616 0 0       0 push @extra, $fidid if $fidid;
1617              
1618 0         0 my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? " .
1619             $fidsort . " ORDER BY fid $order LIMIT $limit", undef, $devid, @extra);
1620 0         0 return $fidids;
1621             }
1622              
1623             # gets fidids above fidid_low up to (and including) fidid_high
1624             sub get_fidids_between {
1625 0     0 0 0 my ($self, $fidid_low, $fidid_high, $limit) = @_;
1626 0   0     0 $limit ||= 1000;
1627 0         0 $limit = int($limit);
1628              
1629 0         0 my $dbh = $self->dbh;
1630 0         0 my $fidids = $dbh->selectcol_arrayref(qq{SELECT fid FROM file
1631             WHERE fid > ? and fid <= ?
1632             ORDER BY fid LIMIT $limit}, undef, $fidid_low, $fidid_high);
1633 0         0 return $fidids;
1634             }
1635              
1636             # creates a new domain, given a domain namespace string. return the dmid on success,
1637             # throw 'dup' on duplicate name.
1638             # override if you want a less racy version.
1639             sub create_domain {
1640 0     0 0 0 my ($self, $name) = @_;
1641 0         0 my $dbh = $self->dbh;
1642              
1643             # get the max domain id
1644 0   0     0 my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1645 0         0 my $rv = eval {
1646 0         0 $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1647             undef, $maxid + 1, $name);
1648             };
1649 0 0       0 if ($self->was_duplicate_error) {
1650 0         0 throw("dup");
1651             }
1652 0 0       0 return $maxid+1 if $rv;
1653 0         0 die "failed to make domain"; # FIXME: the above is racy.
1654             }
1655              
1656             sub update_host {
1657 0     0 0 0 my ($self, $hid, $to_update) = @_;
1658 0         0 my @keys = sort keys %$to_update;
1659 0 0       0 return unless @keys;
1660             $self->conddup(sub {
1661 0         0 $self->dbh->do("UPDATE host SET " . join('=?, ', @keys)
1662 0     0   0 . "=? WHERE hostid=?", undef, (map { $to_update->{$_} } @keys),
1663             $hid);
1664 0         0 });
1665 0         0 return 1;
1666             }
1667              
1668             sub update_host_property {
1669 0     0 0 0 my ($self, $hostid, $col, $val) = @_;
1670             $self->conddup(sub {
1671 0     0   0 $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1672 0         0 });
1673 0         0 return 1;
1674             }
1675              
1676             # return ne hostid, or throw 'dup' on error.
1677             # NOTE: you need to put them into the initial 'down' state.
1678             sub create_host {
1679 0     0 0 0 my ($self, $hostname, $ip) = @_;
1680 0         0 my $dbh = $self->dbh;
1681             # racy! lazy. no, better: portable! how often does this happen? :)
1682 0   0     0 my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1683             my $rv = $self->conddup(sub {
1684 0     0   0 $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1685             "VALUES (?, ?, ?, 'down')",
1686             undef, $hid, $hostname, $ip);
1687 0         0 });
1688 0 0       0 return $hid if $rv;
1689 0         0 die "db failure";
1690             }
1691              
1692             # return array of row hashrefs containing columns: (fid, fromdevid,
1693             # failcount, flags, nexttry)
1694             sub files_to_replicate {
1695 0     0 0 0 my ($self, $limit) = @_;
1696 0         0 my $ut = $self->unix_timestamp;
1697 0 0       0 my $to_repl_map = $self->dbh->selectall_hashref(qq{
1698             SELECT fid, fromdevid, failcount, flags, nexttry
1699             FROM file_to_replicate
1700             WHERE nexttry <= $ut
1701             ORDER BY nexttry
1702             LIMIT $limit
1703             }, "fid") or return ();
1704 0         0 return values %$to_repl_map;
1705             }
1706              
1707             # "new" style queue consumption code.
1708             # from within a transaction, fetch a limit of fids,
1709             # then update each fid's nexttry to be off in the future,
1710             # giving local workers some time to dequeue the items.
1711             # Note:
1712             # DBI (even with RaiseError) returns weird errors on
1713             # deadlocks from selectall_hashref. So we can't do that.
1714             # we also used to retry on deadlock within the routine,
1715             # but instead lets return undef and let job_master retry.
1716             sub grab_queue_chunk {
1717 0     0 0 0 my $self = shift;
1718 0         0 my $queue = shift;
1719 0         0 my $limit = shift;
1720 0         0 my $extfields = shift;
1721              
1722 0         0 my $dbh = $self->dbh;
1723 0         0 my $tries = 3;
1724 0         0 my $work;
1725              
1726 0 0       0 return 0 unless $self->lock_queue($queue);
1727              
1728 0   0     0 my $extwhere = shift || '';
1729 0         0 my $fields = 'fid, nexttry, failcount';
1730 0 0       0 $fields .= ', ' . $extfields if $extfields;
1731 0         0 eval {
1732 0         0 $dbh->begin_work;
1733 0         0 my $ut = $self->unix_timestamp;
1734 0         0 my $query = qq{
1735             SELECT $fields
1736             FROM $queue
1737             WHERE nexttry <= $ut
1738             $extwhere
1739             ORDER BY nexttry
1740             LIMIT $limit
1741             };
1742 0 0       0 $query .= "FOR UPDATE\n" if $self->can_for_update;
1743 0         0 my $sth = $dbh->prepare($query);
1744 0         0 $sth->execute;
1745 0         0 $work = $sth->fetchall_hashref('fid');
1746             # Nothing to work on.
1747             # Now claim the fids for a while.
1748             # TODO: Should be configurable... but not necessary.
1749 0         0 my $fidlist = join(',', keys %$work);
1750 0 0       0 unless ($fidlist) { $dbh->commit; return; }
  0         0  
  0         0  
1751 0         0 $dbh->do("UPDATE $queue SET nexttry = $ut + 1000 WHERE fid IN ($fidlist)");
1752 0         0 $dbh->commit;
1753             };
1754 0 0       0 if ($self->was_deadlock_error) {
1755 0         0 eval { $dbh->rollback };
  0         0  
1756 0         0 $work = undef;
1757             } else {
1758 0         0 $self->condthrow;
1759             }
1760             # FIXME: Super extra paranoia to prevent deadlocking.
1761             # Need to handle or die on all errors above, but $@ can get reset. For now
1762             # we'll just always ensure there's no transaction running at the end here.
1763             # A (near) release should figure the error detection correctly.
1764 0 0       0 if ($dbh->{AutoCommit} == 0) { eval { $dbh->rollback }; }
  0         0  
  0         0  
1765 0         0 $self->unlock_queue($queue);
1766              
1767 0 0       0 return defined $work ? values %$work : ();
1768             }
1769              
1770             sub grab_files_to_replicate {
1771 0     0 0 0 my ($self, $limit) = @_;
1772 0         0 return $self->grab_queue_chunk('file_to_replicate', $limit,
1773             'fromdevid, flags');
1774             }
1775              
1776             sub grab_files_to_delete2 {
1777 0     0 0 0 my ($self, $limit) = @_;
1778 0         0 return $self->grab_queue_chunk('file_to_delete2', $limit);
1779             }
1780              
1781             # $extwhere is ugly... but should be fine.
1782             sub grab_files_to_queued {
1783 0     0 0 0 my ($self, $type, $what, $limit) = @_;
1784 0   0     0 $what ||= 'type, flags';
1785 0         0 return $self->grab_queue_chunk('file_to_queue', $limit,
1786             $what, 'AND type = ' . $type);
1787             }
1788              
1789             # although it's safe to have multiple tracker hosts and/or processes
1790             # replicating the same file, around, it's inefficient CPU/time-wise,
1791             # and it's also possible they pick different places and waste disk.
1792             # so the replicator asks the store interface when it's about to start
1793             # and when it's done replicating a fidid, so you can do something smart
1794             # and tell it not to.
1795             sub should_begin_replicating_fidid {
1796 0     0 0 0 my ($self, $fidid) = @_;
1797 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1798 0 0       0 return 1 if $self->get_lock($lockname, 1);
1799 0         0 return 0;
1800             }
1801              
1802             # called when replicator is done replicating a fid, so you can cleanup
1803             # whatever you did in 'should_begin_replicating_fidid' above.
1804             #
1805             # NOTE: there's a theoretical race condition in the rebalance code,
1806             # where (without locking as provided by
1807             # should_begin_replicating_fidid/note_done_replicating), all copies of
1808             # a file can be deleted by independent replicators doing rebalancing
1809             # in different ways. so you'll probably want to implement some
1810             # locking in this pair of functions.
1811             sub note_done_replicating {
1812 0     0 0 0 my ($self, $fidid) = @_;
1813 0         0 my $lockname = "mgfs:fid:$fidid:replicate";
1814 0         0 $self->release_lock($lockname);
1815             }
1816              
1817             sub find_fid_from_file_to_replicate {
1818 0     0 0 0 my ($self, $fidid) = @_;
1819 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, fromdevid, failcount, flags FROM file_to_replicate WHERE fid = ?",
1820             undef, $fidid);
1821             }
1822              
1823             sub find_fid_from_file_to_delete2 {
1824 0     0 0 0 my ($self, $fidid) = @_;
1825 0         0 return $self->dbh->selectrow_hashref("SELECT fid, nexttry, failcount FROM file_to_delete2 WHERE fid = ?",
1826             undef, $fidid);
1827             }
1828              
1829             sub find_fid_from_file_to_queue {
1830 0     0 0 0 my ($self, $fidid, $type) = @_;
1831 0         0 return $self->dbh->selectrow_hashref("SELECT fid, devid, type, nexttry, failcount, flags, arg FROM file_to_queue WHERE fid = ? AND type = ?",
1832             undef, $fidid, $type);
1833             }
1834              
1835             sub delete_fid_from_file_to_replicate {
1836 0     0 0 0 my ($self, $fidid) = @_;
1837             $self->retry_on_deadlock(sub {
1838 0     0   0 $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1839 0         0 });
1840             }
1841              
1842             sub delete_fid_from_file_to_queue {
1843 0     0 0 0 my ($self, $fidid, $type) = @_;
1844             $self->retry_on_deadlock(sub {
1845 0     0   0 $self->dbh->do("DELETE FROM file_to_queue WHERE fid=? and type=?",
1846             undef, $fidid, $type);
1847 0         0 });
1848             }
1849              
1850             sub delete_fid_from_file_to_delete2 {
1851 0     0 0 0 my ($self, $fidid) = @_;
1852             $self->retry_on_deadlock(sub {
1853 0     0   0 $self->dbh->do("DELETE FROM file_to_delete2 WHERE fid=?", undef, $fidid);
1854 0         0 });
1855             }
1856              
1857             sub reschedule_file_to_replicate_absolute {
1858 0     0 0 0 my ($self, $fid, $abstime) = @_;
1859             $self->retry_on_deadlock(sub {
1860 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1861             undef, $abstime, $fid);
1862 0         0 });
1863             }
1864              
1865             sub reschedule_file_to_replicate_relative {
1866 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1867             $self->retry_on_deadlock(sub {
1868 0     0   0 $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1869             "failcount = failcount + 1 WHERE fid = ?",
1870             undef, $in_n_secs, $fid);
1871 0         0 });
1872             }
1873              
1874             sub reschedule_file_to_delete2_absolute {
1875 0     0 0 0 my ($self, $fid, $abstime) = @_;
1876             $self->retry_on_deadlock(sub {
1877 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1878             undef, $abstime, $fid);
1879 0         0 });
1880             }
1881              
1882             sub reschedule_file_to_delete2_relative {
1883 0     0 0 0 my ($self, $fid, $in_n_secs) = @_;
1884             $self->retry_on_deadlock(sub {
1885 0     0   0 $self->dbh->do("UPDATE file_to_delete2 SET nexttry = " . $self->unix_timestamp . " + ?, " .
1886             "failcount = failcount + 1 WHERE fid = ?",
1887             undef, $in_n_secs, $fid);
1888 0         0 });
1889             }
1890              
1891             # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1892             # table.
1893             sub get_keys_like {
1894 0     0 0 0 my ($self, $dmid, $prefix, $after, $limit) = @_;
1895             # fix the input... prefix always ends with a % so that it works
1896             # in a LIKE call, and after is either blank or something
1897 0 0       0 $prefix = '' unless defined $prefix;
1898              
1899             # escape underscores, % and \
1900 0         0 $prefix =~ s/([%\\_])/\\$1/g;
1901              
1902 0         0 $prefix .= '%';
1903 0 0       0 $after = '' unless defined $after;
1904              
1905 0         0 my $like = $self->get_keys_like_operator;
1906              
1907             # now select out our keys
1908 0         0 return $self->dbh->selectcol_arrayref
1909             ("SELECT dkey FROM file WHERE dmid = ? AND dkey $like ? ESCAPE ? AND dkey > ? " .
1910             "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, "\\", $after);
1911             }
1912              
1913 0     0 0 0 sub get_keys_like_operator { return "LIKE"; }
1914              
1915             # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1916             # that were created $secs_ago seconds ago or older.
1917             sub old_tempfiles {
1918 0     0 0 0 my ($self, $secs_old) = @_;
1919 0         0 return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1920             "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1921             }
1922              
1923             # given an array of MogileFS::DevFID objects, mass-insert them all
1924             # into file_on (ignoring if they're already present)
1925             sub mass_insert_file_on {
1926 0     0 0 0 my ($self, @devfids) = @_;
1927 0 0       0 return 1 unless @devfids;
1928              
1929 0 0 0     0 if (@devfids > 1 && ! $self->can_insert_multi) {
1930 0         0 $self->mass_insert_file_on($_) foreach @devfids;
1931 0         0 return 1;
1932             }
1933              
1934 0         0 my (@qmarks, @binds);
1935 0         0 foreach my $df (@devfids) {
1936 0         0 my ($fidid, $devid) = ($df->fidid, $df->devid);
1937 0 0       0 Carp::croak("got a false fidid") unless $fidid;
1938 0 0       0 Carp::croak("got a false devid") unless $devid;
1939 0         0 push @binds, $fidid, $devid;
1940 0         0 push @qmarks, "(?,?)";
1941             }
1942              
1943             # TODO: This should possibly be insert_ignore instead
1944             # As if we are adding an extra file_on entry, we do not want to replace the
1945             # exist one. Check REPLACE semantics.
1946 0         0 $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1947 0         0 return 1;
1948             }
1949              
1950             sub set_schema_vesion {
1951 0     0 0 0 my ($self, $ver) = @_;
1952 0         0 $self->set_server_setting("schema_version", int($ver));
1953             }
1954              
1955             # returns array of fidids to try and delete again
1956             sub fids_to_delete_again {
1957 0     0 0 0 my $self = shift;
1958 0         0 my $ut = $self->unix_timestamp;
1959 0 0       0 return @{ $self->dbh->selectcol_arrayref(qq{
  0         0  
1960             SELECT fid
1961             FROM file_to_delete_later
1962             WHERE delafter < $ut
1963             LIMIT 500
1964             }) || [] };
1965             }
1966              
1967             # return 1 on success. die otherwise.
1968             sub enqueue_fids_to_delete {
1969 0     0 0 0 my ($self, @fidids) = @_;
1970             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1971             # when the first row causes the duplicate error, and the remaining rows are
1972             # not processed.
1973 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1974 0         0 $self->enqueue_fids_to_delete($_) foreach @fidids;
1975 0         0 return 1;
1976             }
1977             # TODO: convert to prepared statement?
1978             $self->retry_on_deadlock(sub {
1979 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1980 0     0   0 join(",", map { "(" . int($_) . ")" } @fidids));
1981 0         0 });
1982 0         0 $self->condthrow;
1983             }
1984              
1985             sub enqueue_fids_to_delete2 {
1986 0     0 0 0 my ($self, @fidids) = @_;
1987             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1988             # when the first row causes the duplicate error, and the remaining rows are
1989             # not processed.
1990 0 0 0     0 if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
      0        
1991 0         0 $self->enqueue_fids_to_delete2($_) foreach @fidids;
1992 0         0 return 1;
1993             }
1994              
1995 0         0 my $nexttry = $self->unix_timestamp;
1996              
1997             # TODO: convert to prepared statement?
1998             $self->retry_on_deadlock(sub {
1999 0         0 $self->dbh->do($self->ignore_replace . " INTO file_to_delete2 (fid,
2000             nexttry) VALUES " .
2001 0     0   0 join(",", map { "(" . int($_) . ", $nexttry)" } @fidids));
2002 0         0 });
2003 0         0 $self->condthrow;
2004             }
2005              
2006             # clears everything from the fsck_log table
2007             # return 1 on success. die otherwise.
2008             sub clear_fsck_log {
2009 0     0 0 0 my $self = shift;
2010 0         0 $self->dbh->do("DELETE FROM fsck_log");
2011 0         0 return 1;
2012             }
2013              
2014             # FIXME: Fsck log entries are processed a little out of order.
2015             # Once a fsck has completed, the log should be re-summarized.
2016             sub fsck_log_summarize {
2017 0     0 0 0 my $self = shift;
2018              
2019 0         0 my $lockname = 'mgfs:fscksum';
2020 0         0 my $lock = eval { $self->get_lock($lockname, 10) };
  0         0  
2021 0 0 0     0 return 0 if defined $lock && $lock == 0;
2022              
2023 0         0 my $logid = $self->max_fsck_logid;
2024              
2025             # sum-up evcode counts every so often, to make fsck_status faster,
2026             # avoiding a potentially-huge GROUP BY in the future..
2027 0   0     0 my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
2028             # both inclusive:
2029 0   0     0 my $min_logid = $self->server_setting("fsck_logid_processed") || 0;
2030 0         0 $min_logid++;
2031 0         0 my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
2032 0         0 while (my ($evcode, $ct) = each %$cts) {
2033 0         0 $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
2034             }
2035 0         0 $self->set_server_setting("fsck_logid_processed", $logid);
2036              
2037 0 0       0 $self->release_lock($lockname) if $lock;
2038             }
2039              
2040             sub fsck_log {
2041 0     0 0 0 my ($self, %opts) = @_;
2042 0         0 $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
2043             "VALUES (" . $self->unix_timestamp . ",?,?,?)",
2044             undef,
2045             delete $opts{fid},
2046             delete $opts{code},
2047             delete $opts{devid});
2048 0 0       0 croak("Unknown opts") if %opts;
2049 0         0 $self->condthrow;
2050              
2051 0         0 return 1;
2052             }
2053              
2054             sub get_db_unixtime {
2055 0     0 0 0 my $self = shift;
2056 0         0 return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
2057             }
2058              
2059             sub max_fidid {
2060 0     0 0 0 my $self = shift;
2061 0         0 return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
2062             }
2063              
2064             sub max_fsck_logid {
2065 0     0 0 0 my $self = shift;
2066 0   0     0 return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
2067             }
2068              
2069             # returns array of $row hashrefs, from fsck_log table
2070             sub fsck_log_rows {
2071 0     0 0 0 my ($self, $after_logid, $limit) = @_;
2072 0   0     0 $limit = int($limit || 100);
2073 0   0     0 $after_logid = int($after_logid || 0);
2074              
2075 0         0 my @rows;
2076 0         0 my $sth = $self->dbh->prepare(qq{
2077             SELECT logid, utime, fid, evcode, devid
2078             FROM fsck_log
2079             WHERE logid > ?
2080             ORDER BY logid
2081             LIMIT $limit
2082             });
2083 0         0 $sth->execute($after_logid);
2084 0         0 my $row;
2085 0         0 push @rows, $row while $row = $sth->fetchrow_hashref;
2086 0         0 return @rows;
2087             }
2088              
2089             sub fsck_evcode_counts {
2090 0     0 0 0 my ($self, %opts) = @_;
2091 0         0 my $timegte = delete $opts{time_gte};
2092 0         0 my $logr = delete $opts{logid_range};
2093 0 0       0 die if %opts;
2094              
2095 0         0 my $ret = {};
2096 0         0 my $sth;
2097 0 0       0 if ($timegte) {
2098 0         0 $sth = $self->dbh->prepare(qq{
2099             SELECT evcode, COUNT(*) FROM fsck_log
2100             WHERE utime >= ?
2101             GROUP BY evcode
2102             });
2103 0   0     0 $sth->execute($timegte||0);
2104             }
2105 0 0       0 if ($logr) {
2106 0         0 $sth = $self->dbh->prepare(qq{
2107             SELECT evcode, COUNT(*) FROM fsck_log
2108             WHERE logid >= ? AND logid <= ?
2109             GROUP BY evcode
2110             });
2111 0         0 $sth->execute($logr->[0], $logr->[1]);
2112             }
2113 0         0 while (my ($ev, $ct) = $sth->fetchrow_array) {
2114 0         0 $ret->{$ev} = $ct;
2115             }
2116 0         0 return $ret;
2117             }
2118              
2119             # run before daemonizing. you can die from here if you see something's amiss. or emit
2120             # warnings.
2121             sub pre_daemonize_checks {
2122 0     0 0 0 my $self = shift;
2123              
2124 0         0 $self->pre_daemonize_check_slaves;
2125             }
2126              
2127             sub pre_daemonize_check_slaves {
2128 0 0   0 0 0 my $sk = MogileFS::Config->server_setting('slave_keys')
2129             or return;
2130              
2131 0         0 my @slaves;
2132 0         0 foreach my $key (split /\s*,\s*/, $sk) {
2133 0         0 my $slave = MogileFS::Config->server_setting("slave_$key");
2134              
2135 0 0       0 if (!$slave) {
2136 0         0 error("key for slave DB config: slave_$key not found in configuration");
2137 0         0 next;
2138             }
2139              
2140 0         0 my ($dsn, $user, $pass) = split /\|/, $slave;
2141 0 0 0     0 if (!defined($dsn) or !defined($user) or !defined($pass)) {
      0        
2142 0         0 error("key slave_$key contains $slave, which doesn't split in | into DSN|user|pass - ignoring");
2143 0         0 next;
2144             }
2145 0         0 push @slaves, [$dsn, $user, $pass]
2146             }
2147              
2148 0 0       0 return unless @slaves; # Escape this block if we don't have a set of slaves anyways
2149              
2150 0         0 MogileFS::run_global_hook('slave_list_check', \@slaves);
2151             }
2152              
2153              
2154             # attempt to grab a lock of lockname, and timeout after timeout seconds.
2155             # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
2156             sub get_lock {
2157 0     0 0 0 my ($self, $lockname, $timeout) = @_;
2158 0 0       0 die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
2159 0         0 die "get_lock not implemented for $self";
2160             }
2161              
2162             # attempt to release a lock of lockname.
2163             # returns 1 on success and 0 if no lock we have has that name.
2164             sub release_lock {
2165 0     0 0 0 my ($self, $lockname) = @_;
2166 0         0 die "release_lock not implemented for $self";
2167             }
2168              
2169             # MySQL has an issue where you either get excessive deadlocks, or INSERT's
2170             # hang forever around some transactions. Use ghetto locking to cope.
2171 0     0 0 0 sub lock_queue { 1 }
2172 0     0 0 0 sub unlock_queue { 1 }
2173              
2174 0     0 0 0 sub BLOB_BIND_TYPE { undef; }
2175              
2176             sub set_checksum {
2177 0     0 0 0 my ($self, $fidid, $hashtype, $checksum) = @_;
2178 0         0 my $dbh = $self->dbh;
2179 0 0       0 die "Your database does not support REPLACE! Reimplement set_checksum!" unless $self->can_replace;
2180              
2181 0         0 eval {
2182 0         0 my $sth = $dbh->prepare("REPLACE INTO checksum " .
2183             "(fid, hashtype, checksum) " .
2184             "VALUES (?, ?, ?)");
2185 0         0 $sth->bind_param(1, $fidid);
2186 0         0 $sth->bind_param(2, $hashtype);
2187 0         0 $sth->bind_param(3, $checksum, BLOB_BIND_TYPE);
2188 0         0 $sth->execute;
2189             };
2190 0         0 $self->condthrow;
2191             }
2192              
2193             sub get_checksum {
2194 0     0 0 0 my ($self, $fidid) = @_;
2195              
2196 0         0 $self->dbh->selectrow_hashref("SELECT fid, hashtype, checksum " .
2197             "FROM checksum WHERE fid = ?",
2198             undef, $fidid);
2199             }
2200              
2201             sub delete_checksum {
2202 0     0 0 0 my ($self, $fidid) = @_;
2203              
2204 0         0 $self->dbh->do("DELETE FROM checksum WHERE fid = ?", undef, $fidid);
2205             }
2206              
2207             # setup the value used in a 'nexttry' field to indicate that this item will
2208             # never actually be tried again and require some sort of manual intervention.
2209 21     21   385 use constant ENDOFTIME => 2147483647;
  21         52  
  21         9134  
2210              
2211 0     0 0 0 sub end_of_time { ENDOFTIME; }
2212              
2213             # returns the size of the non-urgent replication queue
2214             # nexttry == 0 - the file is urgent
2215             # nexttry != 0 && nexttry < ENDOFTIME - the file is deferred
2216             sub deferred_repl_queue_length {
2217 0     0 0 0 my ($self) = @_;
2218              
2219 0         0 return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file_to_replicate WHERE nexttry != 0 AND nexttry < ?', undef, $self->end_of_time);
2220             }
2221              
2222             1;
2223              
2224             __END__