File Coverage

blib/lib/MogileFS/Store.pm
Criterion Covered Total %
statement 13 15 86.6
branch n/a
condition n/a
subroutine 5 5 100.0
pod n/a
total 18 20 90.0


line stmt bran cond sub pod time code
1             package MogileFS::Store;
2 7     7   42 use strict;
  7         18  
  7         273  
3 7     7   44 use warnings;
  7         18  
  7         253  
4 7     7   40 use Carp qw(croak);
  7         15  
  7         395  
5 7     7   43 use MogileFS::Util qw(throw max);
  7         13  
  7         389  
6 7     7   14330 use DBI; # no reason a Store has to be DBI-based, but for now they all are.
  0            
  0            
7             use List::Util ();
8              
9             # this is incremented whenever the schema changes. server will refuse
10             # to start-up with an old schema version
11             #
12             # 8: adds fsck_log table
13             # 9: adds 'drain' state to enum in device table
14             use constant SCHEMA_VERSION => 9;
15              
16             sub new {
17             my ($class) = @_;
18             return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass));
19             }
20              
21             sub new_from_dsn_user_pass {
22             my ($class, $dsn, $user, $pass) = @_;
23             my $subclass;
24             if ($dsn =~ /^DBI:mysql:/i) {
25             $subclass = "MogileFS::Store::MySQL";
26             } elsif ($dsn =~ /^DBI:SQLite:/i) {
27             $subclass = "MogileFS::Store::SQLite";
28             } elsif ($dsn =~ /^DBI:Oracle:/i) {
29             $subclass = "MogileFS::Store::Oracle";
30             } elsif ($dsn =~ /^DBI:Pg:/i) {
31             $subclass = "MogileFS::Store::Postgres";
32             } else {
33             die "Unknown database type: $dsn";
34             }
35             unless (eval "use $subclass; 1") {
36             die "Error loading $subclass: $@\n";
37             }
38             my $self = bless {
39             dsn => $dsn,
40             user => $user,
41             pass => $pass,
42             raise_errors => $subclass->want_raise_errors,
43             slave_list_cachetime => 0,
44             slave_list_cache => [],
45             recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested
46             recheck_done_gen => 0, # once recheck is done, copy of what the request generation was
47             }, $subclass;
48             $self->init;
49             return $self;
50             }
51              
52             sub want_raise_errors {
53             # will default to true later
54             0;
55             }
56              
57             sub new_from_mogdbsetup {
58             my ($class, %args) = @_;
59             # where args is: dbhost dbname dbrootuser dbrootpass dbuser dbpass
60             my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost});
61              
62             my $try_make_sto = sub {
63             my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
64             PrintError => 0,
65             }) or return undef;
66             my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
67             $sto->raise_errors;
68             return $sto;
69             };
70              
71             # upgrading, apparently, as this database already exists.
72             my $sto = $try_make_sto->();
73             return $sto if $sto;
74              
75             # otherwise, we need to make the requested database, setup permissions, etc
76             $class->status("couldn't connect to database as mogilefs user. trying root...");
77             my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost});
78             my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
79             PrintError => 0,
80             }) or
81             die "Failed to connect to $dsn as specified root user: " . DBI->errstr . "\n";
82             $class->status("connected to database as root user.");
83              
84             $class->confirm("Create database name '$args{dbname}'?");
85             $class->create_db_if_not_exists($rdbh, $args{dbname});
86             $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
87             $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});
88              
89             # should be ready now:
90             $sto = $try_make_sto->();
91             return $sto if $sto;
92              
93             die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";
94             }
95              
96             # given a root DBI connection, create the named database. succeed
97             # if it it's made, or already exists. die otherwise.
98             sub create_db_if_not_exists {
99             my ($pkg, $rdbh, $dbname) = @_;
100             $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
101             or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";
102             }
103              
104             sub grant_privileges {
105             my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
106             $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
107             undef, $pass)
108             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
109             $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
110             undef, $pass)
111             or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
112             }
113              
114             sub can_replace { 0 }
115             sub can_insertignore { 0 }
116             sub can_insert_multi { 0 }
117              
118             sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }
119              
120             sub ignore_replace {
121             my $self = shift;
122             return "INSERT IGNORE " if $self->can_insertignore;
123             return "REPLACE " if $self->can_replace;
124             die "Can't INSERT IGNORE or REPLACE?";
125             }
126              
127             my $on_status = sub {};
128             my $on_confirm = sub { 1 };
129             sub on_status { my ($pkg, $code) = @_; $on_status = $code; };
130             sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
131             sub status { my ($pkg, $msg) = @_; $on_status->($msg); };
132             sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; };
133              
134             sub latest_schema_version { SCHEMA_VERSION }
135              
136             sub raise_errors {
137             my $self = shift;
138             $self->{raise_errors} = 1;
139             $self->dbh->{RaiseError} = 1;
140             }
141              
142             sub dsn { $_[0]{dsn} }
143             sub user { $_[0]{user} }
144             sub pass { $_[0]{pass} }
145              
146             sub init { 1 }
147             sub post_dbi_connect { 1 }
148              
149             sub can_do_slaves { 0 }
150              
151             sub mark_as_slave {
152             my $self = shift;
153             die "Incapable of becoming slave." unless $self->can_do_slaves;
154              
155             $self->{slave} = 1;
156             }
157              
158             sub is_slave {
159             my $self = shift;
160             return $self->{slave};
161             }
162              
163             # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
164             sub _slaves_list {
165             my $self = shift;
166             my $now = time();
167              
168             # only reload every 15 seconds.
169             if ($self->{slave_list_cachetime} > $now - 15) {
170             return @{$self->{slave_list_cache}};
171             }
172             $self->{slave_list_cachetime} = $now;
173             $self->{slave_list_cache} = [];
174              
175             my $sk = MogileFS::Config->server_setting('slave_keys')
176             or return ();
177              
178             my @ret;
179             foreach my $key (split /\s*,\s*/, $sk) {
180             my $slave = MogileFS::Config->server_setting("slave_$key");
181             my ($dsn, $user, $pass) = split /\|/, $slave;
182             push @ret, [$dsn, $user, $pass];
183             }
184              
185             $self->{slave_list_cache} = \@ret;
186             return @ret;
187             }
188              
189             sub get_slave {
190             my $self = shift;
191              
192             die "Incapable of having slaves." unless $self->can_do_slaves;
193              
194             return $self->{slave} if $self->check_slave;
195              
196             my @slaves_list = $self->_slaves_list;
197              
198             # If we have no slaves, then return silently.
199             return unless @slaves_list;
200              
201             foreach my $slave_fulldsn (@slaves_list) {
202             my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
203             $self->{slave_next_check} = 0;
204             $newslave->mark_as_slave;
205             return $newslave
206             if $self->check_slave;
207             }
208              
209             warn "Slave list exhausted, failing back to master.";
210             return;
211             }
212              
213             sub read_store {
214             my $self = shift;
215              
216             return $self unless $self->can_do_slaves;
217              
218             if ($self->{slave_ok}) {
219             if (my $slave = $self->get_slave) {
220             $slave->{recheck_req_gen} = $self->{recheck_req_gen};
221             return $slave;
222             }
223             }
224              
225             return $self;
226             }
227              
228             sub slaves_ok {
229             my $self = shift;
230             my $coderef = shift;
231              
232             return unless ref $coderef eq 'CODE';
233              
234             local $self->{slave_ok} = 1;
235              
236             return $coderef->(@_);
237             }
238              
239             sub recheck_dbh {
240             my $self = shift;
241             $self->{recheck_req_gen}++;
242             }
243              
244             sub dbh {
245             my $self = shift;
246             if ($self->{dbh}) {
247             if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
248             $self->{dbh} = undef unless $self->{dbh}->ping;
249             $self->{recheck_done_gen} = $self->{recheck_req_gen};
250             }
251             return $self->{dbh} if $self->{dbh};
252             }
253              
254             $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
255             PrintError => 0,
256             AutoCommit => 1,
257             # FUTURE: will default to on (have to validate all callers first):
258             RaiseError => ($self->{raise_errors} || 0),
259             }) or
260             die "Failed to connect to database: " . DBI->errstr;
261             $self->post_dbi_connect;
262             return $self->{dbh};
263             }
264              
265             sub ping {
266             my $self = shift;
267             return $self->dbh->ping;
268             }
269              
270             sub condthrow {
271             my ($self, $optmsg) = @_;
272             my $dbh = $self->dbh;
273             return unless $dbh->err;
274             my ($pkg, $fn, $line) = caller;
275             my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
276             $msg .= ": $optmsg" if $optmsg;
277             croak($msg);
278             }
279              
280             sub dowell {
281             my ($self, $sql, @do_params) = @_;
282             my $rv = eval { $self->dbh->do($sql, @do_params) };
283             return $rv unless $@ || $self->dbh->err;
284             warn "Error with SQL: $sql\n";
285             Carp::confess($@ || $self->dbh->errstr);
286             }
287              
288             sub _valid_params {
289             croak("Odd number of parameters!") if scalar(@_) % 2;
290             my ($self, $vlist, %uarg) = @_;
291             my %ret;
292             $ret{$_} = delete $uarg{$_} foreach @$vlist;
293             croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
294             return %ret;
295             }
296              
297             sub was_duplicate_error {
298             my $self = shift;
299             my $dbh = $self->dbh;
300             die "UNIMPLEMENTED";
301             }
302              
303             # run a subref (presumably a database update) in an eval, because you expect it to
304             # maybe fail on duplicate key error, and throw a dup exception for you, else return
305             # its return value
306             sub conddup {
307             my ($self, $code) = @_;
308             my $rv = eval { $code->(); };
309             throw("dup") if $self->was_duplicate_error;
310             return $rv;
311             }
312              
313             # insert row if doesn't already exist
314             # WARNING: This function is NOT transaction safe if the duplicate errors causes
315             # your transaction to halt!
316             # WARNING: This function is NOT safe on multi-row inserts if can_insertignore
317             # is false! Rows before the duplicate will be inserted, but rows after the
318             # duplicate might not be, depending your database.
319             sub insert_ignore {
320             my ($self, $sql, @params) = @_;
321             my $dbh = $self->dbh;
322             if ($self->can_insertignore) {
323             return $dbh->do("INSERT IGNORE $sql", @params);
324             } else {
325             # TODO: Detect bad multi-row insert here.
326             my $rv = eval { $dbh->do("INSERT $sql", @params); };
327             if ($@ || $dbh->err) {
328             return 1 if $self->was_duplicate_error;
329             # This chunk is identical to condthrow, but we include it directly
330             # here as we know there is definetly an error, and we would like
331             # the caller of this function.
332             my ($pkg, $fn, $line) = caller;
333             my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
334             croak($msg);
335             }
336             return $rv;
337             }
338             }
339              
340             # --------------------------------------------------------------------------
341              
342             my @extra_tables;
343              
344             sub add_extra_tables {
345             my $class = shift;
346             push @extra_tables, @_;
347             }
348              
349             use constant TABLES => qw( domain class file tempfile file_to_delete
350             unreachable_fids file_on file_on_corrupt host
351             device server_settings file_to_replicate
352             file_to_delete_later fsck_log);
353              
354             sub setup_database {
355             my $sto = shift;
356              
357             # schema history:
358             # 8: adds fsck_log table
359             # 7: adds file_to_delete_later table
360             # 6: adds file_to_replicate table
361             my $curver = $sto->schema_version;
362              
363             my $latestver = SCHEMA_VERSION;
364             if ($curver == $latestver) {
365             $sto->status("Schema already up-to-date at version $curver.");
366             return 1;
367             }
368              
369             if ($curver > $latestver) {
370             die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n";
371             }
372              
373             if ($curver) {
374             $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");
375             }
376              
377             foreach my $t (TABLES, @extra_tables) {
378             $sto->create_table($t);
379             }
380              
381             $sto->upgrade_add_host_getport;
382             $sto->upgrade_add_host_altip;
383             $sto->upgrade_add_device_asof;
384             $sto->upgrade_add_device_weight;
385             $sto->upgrade_add_device_readonly;
386             $sto->upgrade_add_device_drain;
387              
388             return 1;
389             }
390              
391             sub schema_version {
392             my $self = shift;
393             my $dbh = $self->dbh;
394             return eval {
395             $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
396             } || 0;
397             }
398              
399             sub filter_create_sql { my ($self, $sql) = @_; return $sql; }
400              
401             sub create_table {
402             my ($self, $table) = @_;
403             my $dbh = $self->dbh;
404             return 1 if $self->table_exists($table);
405             my $meth = "TABLE_$table";
406             my $sql = $self->$meth;
407             $sql = $self->filter_create_sql($sql);
408             $self->status("Running SQL: $sql;");
409             $dbh->do($sql) or
410             die "Failed to create table $table: " . $dbh->errstr;
411             my $imeth = "INDEXES_$table";
412             my @indexes = eval { $self->$imeth };
413             foreach $sql (@indexes) {
414             $self->status("Running SQL: $sql;");
415             $dbh->do($sql) or
416             die "Failed to create indexes on $table: " . $dbh->errstr;
417             }
418             }
419              
420             # Please try to keep all tables aligned nicely
421             # with '"CREATE TABLE' on the first line
422             # and ')"' alone on the last line.
423              
424             sub TABLE_domain {
425             # classes are tied to domains. domains can have classes of items
426             # with different mindevcounts.
427             #
428             # a minimum devcount is the number of copies the system tries to
429             # maintain for files in that class
430             #
431             # unspecified classname means classid=0 (implicit class), and that
432             # implies mindevcount=2
433             "CREATE TABLE domain (
434             dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY,
435             namespace VARCHAR(255),
436             UNIQUE (namespace)
437             )"
438             }
439              
440             sub TABLE_class {
441             "CREATE TABLE class (
442             dmid SMALLINT UNSIGNED NOT NULL,
443             classid TINYINT UNSIGNED NOT NULL,
444             PRIMARY KEY (dmid,classid),
445             classname VARCHAR(50),
446             UNIQUE (dmid,classname),
447             mindevcount TINYINT UNSIGNED NOT NULL
448             )"
449             }
450              
451             # the length field is only here for easy verifications of content
452             # integrity when copying around. no sums or content types or other
453             # metadata here. application can handle that.
454             #
455             # classid is what class of file this belongs to. for instance, on fotobilder
456             # there will be a class for original pictures (the ones the user uploaded)
457             # and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
458             # each domain can setup classes and assign the minimum redundancy level for
459             # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original
460             # photos and and a 1 minimum for derived images (which means the sole device
461             # for a derived image can die, bringing devcount to 0 for that file, but
462             # the application can recreate it from its original)
463             sub TABLE_file {
464             "CREATE TABLE file (
465             fid INT UNSIGNED NOT NULL,
466             PRIMARY KEY (fid),
467              
468             dmid SMALLINT UNSIGNED NOT NULL,
469             dkey VARCHAR(255), # domain-defined
470             UNIQUE dkey (dmid, dkey),
471              
472             length INT UNSIGNED, # 4GB limit
473              
474             classid TINYINT UNSIGNED NOT NULL,
475             devcount TINYINT UNSIGNED NOT NULL,
476             INDEX devcount (dmid,classid,devcount)
477             )"
478             }
479              
480             sub TABLE_tempfile {
481             "CREATE TABLE tempfile (
482             fid INT UNSIGNED NOT NULL AUTO_INCREMENT,
483             PRIMARY KEY (fid),
484              
485             createtime INT UNSIGNED NOT NULL,
486             classid TINYINT UNSIGNED NOT NULL,
487             dmid SMALLINT UNSIGNED NOT NULL,
488             dkey VARCHAR(255),
489             devids VARCHAR(60)
490             )"
491             }
492              
493             # files marked for death when their key is overwritten. then they get a new
494             # fid, but since the old row (with the old fid) had to be deleted immediately,
495             # we need a place to store the fid so an async job can delete the file from
496             # all devices.
497             sub TABLE_file_to_delete {
498             "CREATE TABLE file_to_delete (
499             fid INT UNSIGNED NOT NULL,
500             PRIMARY KEY (fid)
501             )"
502             }
503              
504             # if the replicator notices that a fid has no sources, that file gets inserted
505             # into the unreachable_fids table. it is up to the application to actually
506             # handle fids stored in this table.
507             sub TABLE_unreachable_fids {
508             "CREATE TABLE unreachable_fids (
509             fid INT UNSIGNED NOT NULL,
510             lastupdate INT UNSIGNED NOT NULL,
511             PRIMARY KEY (fid),
512             INDEX (lastupdate)
513             )"
514             }
515              
516             # what files are on what devices? (most likely physical devices,
517             # as logical devices of RAID arrays would be costly, and mogilefs
518             # already handles redundancy)
519             #
520             # the devid index lets us answer "What files were on this now-dead disk?"
521             sub TABLE_file_on {
522             "CREATE TABLE file_on (
523             fid INT UNSIGNED NOT NULL,
524             devid MEDIUMINT UNSIGNED NOT NULL,
525             PRIMARY KEY (fid, devid),
526             INDEX (devid)
527             )"
528             }
529              
530             # if application or framework detects an error in one of the duplicate files
531             # for whatever reason, it can register its complaint and the framework
532             # will do some verifications and fix things up w/ an async job
533             # MAYBE: let application tell us the SHA1/MD5 of the file for us to check
534             # on the other devices?
535             sub TABLE_file_on_corrupt {
536             "CREATE TABLE file_on_corrupt (
537             fid INT UNSIGNED NOT NULL,
538             devid MEDIUMINT UNSIGNED NOT NULL,
539             PRIMARY KEY (fid, devid)
540             )"
541             }
542              
543             # hosts (which contain devices...)
544             sub TABLE_host {
545             "CREATE TABLE host (
546             hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY,
547              
548             status ENUM('alive','dead','down'),
549             http_port MEDIUMINT UNSIGNED DEFAULT 7500,
550             http_get_port MEDIUMINT UNSIGNED,
551              
552             hostname VARCHAR(40),
553             hostip VARCHAR(15),
554             altip VARCHAR(15),
555             altmask VARCHAR(18),
556             UNIQUE (hostname),
557             UNIQUE (hostip),
558             UNIQUE (altip)
559             )"
560             }
561              
562             # disks...
563             sub TABLE_device {
564             "CREATE TABLE device (
565             devid MEDIUMINT UNSIGNED NOT NULL,
566             hostid MEDIUMINT UNSIGNED NOT NULL,
567              
568             status ENUM('alive','dead','down'),
569             weight MEDIUMINT DEFAULT 100,
570              
571             mb_total MEDIUMINT UNSIGNED,
572             mb_used MEDIUMINT UNSIGNED,
573             mb_asof INT UNSIGNED,
574             PRIMARY KEY (devid),
575             INDEX (status)
576             )"
577             }
578              
579             sub TABLE_server_settings {
580             "CREATE TABLE server_settings (
581             field VARCHAR(50) PRIMARY KEY,
582             value VARCHAR(255)
583             )"
584             }
585              
586             sub TABLE_file_to_replicate {
587             # nexttry is time to try to replicate it next.
588             # 0 means immediate. it's only on one host.
589             # 1 means lower priority. it's on 2+ but isn't happy where it's at.
590             # unixtimestamp means at/after that time. some previous error occurred.
591             # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever.
592             # failcount. how many times we've failed, just for doing backoff of nexttry.
593             # flags. reserved for future use.
594             "CREATE TABLE file_to_replicate (
595             fid INT UNSIGNED NOT NULL PRIMARY KEY,
596             nexttry INT UNSIGNED NOT NULL,
597             INDEX (nexttry),
598             fromdevid INT UNSIGNED,
599             failcount TINYINT UNSIGNED NOT NULL DEFAULT 0,
600             flags SMALLINT UNSIGNED NOT NULL DEFAULT 0
601             )"
602             }
603              
604             sub TABLE_file_to_delete_later {
605             "CREATE TABLE file_to_delete_later (
606             fid INT UNSIGNED NOT NULL PRIMARY KEY,
607             delafter INT UNSIGNED NOT NULL,
608             INDEX (delafter)
609             )"
610             }
611              
612             sub TABLE_fsck_log {
613             "CREATE TABLE fsck_log (
614             logid INT UNSIGNED NOT NULL AUTO_INCREMENT,
615             PRIMARY KEY (logid),
616             utime INT UNSIGNED NOT NULL,
617             fid INT UNSIGNED NULL,
618             evcode CHAR(4),
619             devid MEDIUMINT UNSIGNED,
620             INDEX(utime)
621             )"
622             }
623              
624             # these five only necessary for MySQL, since no other database existed
625             # before, so they can just create the tables correctly to begin with.
626             # in the future, there might be new alters that non-MySQL databases
627             # will have to implement.
628             sub upgrade_add_host_getport { 1 }
629             sub upgrade_add_host_altip { 1 }
630             sub upgrade_add_device_asof { 1 }
631             sub upgrade_add_device_weight { 1 }
632             sub upgrade_add_device_readonly { 1 }
633             sub upgrade_add_device_drain { die "Not implemented in $_[0]" }
634              
635             # return true if deleted, 0 if didn't exist, exception if error
636             sub delete_host {
637             my ($self, $hostid) = @_;
638             return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);
639             }
640              
641             # return true if deleted, 0 if didn't exist, exception if error
642             sub delete_domain {
643             my ($self, $dmid) = @_;
644             return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);
645             }
646              
647             sub domain_has_files {
648             my ($self, $dmid) = @_;
649             my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
650             undef, $dmid);
651             return $has_a_fid ? 1 : 0;
652             }
653              
654             sub class_has_files {
655             my ($self, $dmid, $clid) = @_;
656             my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
657             undef, $dmid, $clid);
658             return $has_a_fid ? 1 : 0;
659             }
660              
661             # return new classid on success (non-zero integer), die on failure
662             # throw 'dup' on duplicate name
663             # override this if you want a less racy version.
664             sub create_class {
665             my ($self, $dmid, $classname) = @_;
666             my $dbh = $self->dbh;
667              
668             # get the max class id in this domain
669             my $maxid = $dbh->selectrow_array
670             ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;
671              
672             # now insert the new class
673             my $rv = eval {
674             $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
675             undef, $dmid, $maxid + 1, $classname, 2);
676             };
677             if ($@ || $dbh->err) {
678             if ($self->was_duplicate_error) {
679             throw("dup");
680             }
681             }
682             return $maxid + 1 if $rv;
683             $self->condthrow;
684             die;
685             }
686              
687             # return 1 on success, throw "dup" on duplicate name error, die otherwise
688             sub update_class_name {
689             my $self = shift;
690             my %arg = $self->_valid_params([qw(dmid classid classname)], @_);
691             my $rv = eval {
692             $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
693             undef, $arg{classname}, $arg{dmid}, $arg{classid});
694             };
695             throw("dup") if $self->was_duplicate_error;
696             $self->condthrow;
697             return 1;
698             }
699              
700             # return 1 on success, die otherwise
701             sub update_class_mindevcount {
702             my $self = shift;
703             my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_);
704             $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
705             undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
706             $self->condthrow;
707             return 1;
708             }
709              
710             sub nfiles_with_dmid_classid_devcount {
711             my ($self, $dmid, $classid, $devcount) = @_;
712             return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
713             undef, $dmid, $classid, $devcount);
714             }
715              
716             sub set_server_setting {
717             my ($self, $key, $val) = @_;
718             my $dbh = $self->dbh;
719             die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;
720              
721             if (defined $val) {
722             $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
723             } else {
724             $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);
725             }
726              
727             die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
728             return 1;
729             }
730              
731             # FIXME: racy. currently the only caller doesn't matter, but should be fixed.
732             sub incr_server_setting {
733             my ($self, $key, $val) = @_;
734             $val = 1 unless defined $val;
735             return unless $val;
736              
737             return 1 if $self->dbh->do("UPDATE server_settings ".
738             "SET value=value+? ".
739             "WHERE field=?", undef,
740             $val, $key) > 0;
741             $self->set_server_setting($key, $val);
742             }
743              
744             sub server_setting {
745             my ($self, $key) = @_;
746             return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
747             undef, $key);
748             }
749              
750             sub server_settings {
751             my ($self) = @_;
752             my $ret = {};
753             my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
754             $sth->execute;
755             while (my ($k, $v) = $sth->fetchrow_array) {
756             $ret->{$k} = $v;
757             }
758             return $ret;
759             }
760              
761             # register a tempfile and return the fidid, which should be allocated
762             # using autoincrement/sequences if the passed in fid is undef. however,
763             # if fid is passed in, that value should be used and returned.
764             #
765             # return new/passed in fidid on success.
766             # throw 'dup' if fid already in use
767             # return 0/undef/die on failure
768             #
769             sub register_tempfile {
770             my $self = shift;
771             my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_);
772              
773             my $dbh = $self->dbh;
774             my $fid = $arg{fid};
775              
776             my $explicit_fid_used = $fid ? 1 : 0;
777              
778             # setup the new mapping. we store the devices that we picked for
779             # this file in here, knowing that they might not be used. create_close
780             # is responsible for actually mapping in file_on. NOTE: fid is being
781             # passed in, it's either some number they gave us, or it's going to be
782             # 0/undef which translates into NULL which means to automatically create
783             # one. that should be fine.
784             my $ins_tempfile = sub {
785             my $rv = eval {
786             # We must only pass the correct number of bind parameters
787             # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
788             # Postgres, where you are expected to leave it out or use DEFAULT
789             # Leaving it out seems sanest and least likely to cause problems
790             # with other databases.
791             my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
792             my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp);
793             my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
794             # Do not check for $explicit_fid_used, but rather $fid directly
795             # as this anonymous sub is called from the loop later
796             if($fid) {
797             unshift @keys, 'fid';
798             unshift @vars, '?';
799             unshift @vals, $fid;
800             }
801             my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
802             $dbh->do($sql, undef, @vals);
803             };
804             if (!$rv) {
805             return undef if $self->was_duplicate_error;
806             die "Unexpected db error into tempfile: " . $dbh->errstr;
807             }
808              
809             unless (defined $fid) {
810             # if they did not give us a fid, then we want to grab the one that was
811             # theoretically automatically generated
812             $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
813             or die "No last_insert_id found";
814             }
815             return undef unless defined $fid && $fid > 0;
816             return 1;
817             };
818              
819             unless ($ins_tempfile->()) {
820             throw("dup") if $explicit_fid_used;
821             die "tempfile insert failed";
822             }
823              
824             my $fid_in_use = sub {
825             my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
826             return $exists ? 1 : 0;
827             };
828              
829             # if the fid is in use, do something
830             while ($fid_in_use->($fid)) {
831             throw("dup") if $explicit_fid_used;
832              
833             # be careful of databases which reset their
834             # auto-increment/sequences when the table is empty (InnoDB
835             # did/does this, for instance). So check if it's in use, and
836             # re-seed the table with the highest known fid from the file
837             # table.
838              
839             # get the highest fid from the filetable and insert a dummy row
840             $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
841             $ins_tempfile->(); # don't care about its result
842              
843             # then do a normal auto-increment
844             $fid = undef;
845             $ins_tempfile->() or die "register_tempfile failed after seeding";
846             }
847              
848             return $fid;
849             }
850              
851             # return hashref of row containing columns "fid, dmid, dkey, length,
852             # classid, devcount" provided a $dmid and $key (dkey). or undef if no
853             # row.
854             sub file_row_from_dmid_key {
855             my ($self, $dmid, $key) = @_;
856             return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
857             "FROM file WHERE dmid=? AND dkey=?",
858             undef, $dmid, $key);
859             }
860              
861             # return hashref of row containing columns "fid, dmid, dkey, length,
862             # classid, devcount" provided a $fidid or undef if no row.
863             sub file_row_from_fidid {
864             my ($self, $fidid) = @_;
865             return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
866             "FROM file WHERE fid=?",
867             undef, $fidid);
868             }
869              
870             # return an arrayref of rows containing columns "fid, dmid, dkey, length,
871             # classid, devcount" provided a pair of $fidid or undef if no rows.
872             sub file_row_from_fidid_range {
873             my ($self, $fromfid, $tofid) = @_;
874             my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
875             "FROM file WHERE fid BETWEEN ? AND ?");
876             $sth->execute($fromfid,$tofid);
877             return $sth->fetchall_arrayref({});
878             }
879              
880             # return array of devids that a fidid is on
881             sub fid_devids {
882             my ($self, $fidid) = @_;
883             return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
884             undef, $fidid) || [] };
885             }
886              
887             # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
888             sub fid_devids_multiple {
889             my ($self, @fidids) = @_;
890             my $in = join(",", map { $_+0 } @fidids);
891             my $ret = {};
892             my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
893             $sth->execute;
894             while (my ($fidid, $devid) = $sth->fetchrow_array) {
895             push @{$ret->{$fidid} ||= []}, $devid;
896             }
897             return $ret;
898             }
899              
900             # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
901             sub tempfile_row_from_fid {
902             my ($self, $fidid) = @_;
903             return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
904             "FROM tempfile WHERE fid=?",
905             undef, $fidid);
906             }
907              
908             # return 1 on success, throw "dup" on duplicate devid or throws other error on failure
909             sub create_device {
910             my ($self, $devid, $hostid, $status) = @_;
911             my $rv = $self->conddup(sub {
912             $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
913             $devid, $hostid, $status);
914             });
915             $self->condthrow;
916             die "error making device $devid\n" unless $rv > 0;
917             return 1;
918             }
919              
920             sub update_device_usage {
921             my $self = shift;
922             my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_);
923             $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
924             " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});
925             $self->condthrow;
926             }
927              
928             sub mark_fidid_unreachable {
929             my ($self, $fidid) = @_;
930             die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
931             $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
932             undef, $fidid);
933             }
934              
935             sub set_device_weight {
936             my ($self, $devid, $weight) = @_;
937             $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);
938             $self->condthrow;
939             }
940              
941             sub set_device_state {
942             my ($self, $devid, $state) = @_;
943             $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);
944             $self->condthrow;
945             }
946              
947             sub delete_class {
948             my ($self, $dmid, $cid) = @_;
949             $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);
950             $self->condthrow;
951             }
952              
953             sub delete_fidid {
954             my ($self, $fidid) = @_;
955             $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
956             $self->condthrow;
957             $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
958             $self->condthrow;
959             $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES (?)", undef, $fidid);
960             $self->condthrow;
961             }
962              
963             sub delete_tempfile_row {
964             my ($self, $fidid) = @_;
965             $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
966             $self->condthrow;
967             }
968              
969             sub replace_into_file {
970             my $self = shift;
971             my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_);
972             die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
973             $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
974             "VALUES (?,?,?,?,?,0) ", undef,
975             @arg{'fidid', 'dmid', 'key', 'length', 'classid'});
976             $self->condthrow;
977             }
978              
979             # returns 1 on success, 0 on duplicate key error, dies on exception
980             # TODO: need a test to hit the duplicate name error condition
981             # TODO: switch to using "dup" exception here?
982             sub rename_file {
983             my ($self, $fidid, $to_key) = @_;
984             my $dbh = $self->dbh;
985             eval {
986             $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
987             undef, $to_key, $fidid);
988             };
989             if ($@ || $dbh->err) {
990             # first is mysql's error code for duplicates
991             if ($self->was_duplicate_error) {
992             return 0;
993             } else {
994             die $@;
995             }
996             }
997             $self->condthrow;
998             return 1;
999             }
1000              
1001             # returns a hash of domains. Key is namespace, value is dmid.
1002             sub get_all_domains {
1003             my ($self) = @_;
1004             my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
1005             return map { ($_->[0], $_->[1]) } @{$domains || []};
1006             }
1007              
1008             # returns an array of hashrefs, one hashref per row in the 'class' table
1009             sub get_all_classes {
1010             my ($self) = @_;
1011             my (@ret, $row);
1012             my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class");
1013             $sth->execute;
1014             push @ret, $row while $row = $sth->fetchrow_hashref;
1015             return @ret;
1016             }
1017              
1018             # add a record of fidid existing on devid
1019             # returns 1 on success, 0 on duplicate
1020             sub add_fidid_to_devid {
1021             my ($self, $fidid, $devid) = @_;
1022             croak("fidid not non-zero") unless $fidid;
1023             croak("devid not non-zero") unless $devid;
1024              
1025             # TODO: This should possibly be insert_ignore instead
1026             # As if we are adding an extra file_on entry, we do not want to replace the
1027             # exist one. Check REPLACE semantics.
1028             my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
1029             undef, $fidid, $devid);
1030             return 1 if $rv > 0;
1031             return 0;
1032             }
1033              
1034             # remove a record of fidid existing on devid
1035             # returns 1 on success, 0 if not there anyway
1036             sub remove_fidid_from_devid {
1037             my ($self, $fidid, $devid) = @_;
1038             my $rv = $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
1039             undef, $fidid, $devid);
1040             $self->condthrow;
1041             return $rv;
1042             }
1043              
1044             # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
1045             sub get_all_hosts {
1046             my ($self) = @_;
1047             my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
1048             "hostip, http_port, http_get_port, altip, altmask FROM host");
1049             $sth->execute;
1050             my @ret;
1051             while (my $row = $sth->fetchrow_hashref) {
1052             push @ret, $row;
1053             }
1054             return @ret;
1055             }
1056              
1057             # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
1058             sub get_all_devices {
1059             my ($self) = @_;
1060             my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
1061             "mb_used, mb_asof, status, weight FROM device");
1062             $self->condthrow;
1063             $sth->execute;
1064             my @return;
1065             while (my $row = $sth->fetchrow_hashref) {
1066             push @return, $row;
1067             }
1068             return @return;
1069             }
1070              
1071             # update the device count for a given fidid
1072             sub update_devcount {
1073             my ($self, $fidid) = @_;
1074             my $dbh = $self->dbh;
1075             my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
1076             undef, $fidid);
1077              
1078             $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
1079             $ct, $fidid);
1080              
1081             return 1;
1082             }
1083              
1084             # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
1085             sub enqueue_for_replication {
1086             my ($self, $fidid, $from_devid, $in) = @_;
1087              
1088             my $nexttry = 0;
1089             if ($in) {
1090             $nexttry = $self->unix_timestamp . " + " . int($in);
1091             }
1092              
1093             $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
1094             "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);
1095             }
1096              
1097             # reschedule all deferred replication, return number rescheduled
1098             sub replicate_now {
1099             my ($self) = @_;
1100             return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
1101             " WHERE nexttry > " . $self->unix_timestamp);
1102             }
1103              
1104             # takes two arguments, devid and limit, both required. returns an arrayref of fidids.
1105             sub get_fidids_by_device {
1106             my ($self, $devid, $limit) = @_;
1107              
1108             my $dbh = $self->dbh;
1109             my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
1110             undef, $devid);
1111             return $fidids;
1112             }
1113              
1114             # takes two arguments, fidid to be above, and optional limit (default
1115             # 1,000). returns up to that that many fidids above the provided
1116             # fidid. returns array of MogileFS::FID objects, sorted by fid ids.
1117             sub get_fids_above_id {
1118             my ($self, $fidid, $limit) = @_;
1119             $limit ||= 1000;
1120             $limit = int($limit);
1121              
1122             my @ret;
1123             my $dbh = $self->dbh;
1124             my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid ".
1125             "FROM file ".
1126             "WHERE fid > ? ".
1127             "ORDER BY fid LIMIT $limit");
1128             $sth->execute($fidid);
1129             while (my $row = $sth->fetchrow_hashref) {
1130             push @ret, MogileFS::FID->new_from_db_row($row);
1131             }
1132             return @ret;
1133             }
1134              
1135             # creates a new domain, given a domain namespace string. return the dmid on success,
1136             # throw 'dup' on duplicate name.
1137             # override if you want a less racy version.
1138             sub create_domain {
1139             my ($self, $name) = @_;
1140             my $dbh = $self->dbh;
1141              
1142             # get the max domain id
1143             my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
1144             my $rv = eval {
1145             $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
1146             undef, $maxid + 1, $name);
1147             };
1148             if ($self->was_duplicate_error) {
1149             throw("dup");
1150             }
1151             return $maxid+1 if $rv;
1152             die "failed to make domain"; # FIXME: the above is racy.
1153             }
1154              
1155             sub update_host_property {
1156             my ($self, $hostid, $col, $val) = @_;
1157             $self->conddup(sub {
1158             $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
1159             });
1160             return 1;
1161             }
1162              
1163             # return ne hostid, or throw 'dup' on error.
1164             # NOTE: you need to put them into the initial 'down' state.
1165             sub create_host {
1166             my ($self, $hostname, $ip) = @_;
1167             my $dbh = $self->dbh;
1168             # racy! lazy. no, better: portable! how often does this happen? :)
1169             my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
1170             my $rv = $self->conddup(sub {
1171             $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
1172             "VALUES (?, ?, ?, 'down')",
1173             undef, $hid, $hostname, $ip);
1174             });
1175             return $hid if $rv;
1176             die "db failure";
1177             }
1178              
1179             # return array of row hashrefs containing columns: (fid, fromdevid,
1180             # failcount, flags, nexttry)
1181             sub files_to_replicate {
1182             my ($self, $limit) = @_;
1183             my $ut = $self->unix_timestamp;
1184             my $to_repl_map = $self->dbh->selectall_hashref(qq{
1185             SELECT fid, fromdevid, failcount, flags, nexttry
1186             FROM file_to_replicate
1187             WHERE nexttry <= $ut
1188             ORDER BY nexttry
1189             LIMIT $limit
1190             }, "fid") or return ();
1191             return values %$to_repl_map;
1192             }
1193              
1194             # although it's safe to have multiple tracker hosts and/or processes
1195             # replicating the same file, around, it's inefficient CPU/time-wise,
1196             # and it's also possible they pick different places and waste disk.
1197             # so the replicator asks the store interface when it's about to start
1198             # and when it's done replicating a fidid, so you can do something smart
1199             # and tell it not to.
1200             sub should_begin_replicating_fidid {
1201             my ($self, $fidid) = @_;
1202             warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");
1203             1;
1204             }
1205              
1206             # called when replicator is done replicating a fid, so you can cleanup
1207             # whatever you did in 'should_begin_replicating_fidid' above.
1208             #
1209             # NOTE: there's a theoretical race condition in the rebalance code,
1210             # where (without locking as provided by
1211             # should_begin_replicating_fidid/note_done_replicating), all copies of
1212             # a file can be deleted by independent replicators doing rebalancing
1213             # in different ways. so you'll probably want to implement some
1214             # locking in this pair of functions.
1215             sub note_done_replicating {
1216             my ($self, $fidid) = @_;
1217             }
1218              
1219             sub delete_fid_from_file_to_replicate {
1220             my ($self, $fidid) = @_;
1221             $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);
1222             }
1223              
1224             sub reschedule_file_to_replicate_absolute {
1225             my ($self, $fid, $abstime) = @_;
1226             $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
1227             undef, $abstime, $fid);
1228             }
1229              
1230             sub reschedule_file_to_replicate_relative {
1231             my ($self, $fid, $in_n_secs) = @_;
1232             $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
1233             "failcount = failcount + 1 WHERE fid = ?",
1234             undef, $in_n_secs, $fid);
1235             }
1236              
1237             # Given a dmid prefix after and limit, return an arrayref of dkey from the file
1238             # table.
1239             sub get_keys_like {
1240             my ($self, $dmid, $prefix, $after, $limit) = @_;
1241             # fix the input... prefix always ends with a % so that it works
1242             # in a LIKE call, and after is either blank or something
1243             $prefix ||= '';
1244             $prefix .= '%';
1245             $after ||= '';
1246              
1247             # now select out our keys
1248             return $self->dbh->selectcol_arrayref
1249             ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
1250             "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);
1251             }
1252              
1253             # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
1254             # that were created $secs_ago seconds ago or older.
1255             sub old_tempfiles {
1256             my ($self, $secs_old) = @_;
1257             return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
1258             "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");
1259             }
1260              
1261             # given an array of MogileFS::DevFID objects, mass-insert them all
1262             # into file_on (ignoring if they're already present)
1263             sub mass_insert_file_on {
1264             my ($self, @devfids) = @_;
1265             return 1 unless @devfids;
1266              
1267             if (@devfids > 1 && ! $self->can_insert_multi) {
1268             $self->mass_insert_file_on($_) foreach @devfids;
1269             return 1;
1270             }
1271              
1272             my (@qmarks, @binds);
1273             foreach my $df (@devfids) {
1274             my ($fidid, $devid) = ($df->fidid, $df->devid);
1275             Carp::croak("got a false fidid") unless $fidid;
1276             Carp::croak("got a false devid") unless $devid;
1277             push @binds, $fidid, $devid;
1278             push @qmarks, "(?,?)";
1279             }
1280              
1281             # TODO: This should possibly be insert_ignore instead
1282             # As if we are adding an extra file_on entry, we do not want to replace the
1283             # exist one. Check REPLACE semantics.
1284             $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
1285             return 1;
1286             }
1287              
1288             sub set_schema_vesion {
1289             my ($self, $ver) = @_;
1290             $self->set_server_setting("schema_version", int($ver));
1291             }
1292              
1293             # returns array of fidids to try and delete again
1294             sub fids_to_delete_again {
1295             my $self = shift;
1296             my $ut = $self->unix_timestamp;
1297             return @{ $self->dbh->selectcol_arrayref(qq{
1298             SELECT fid
1299             FROM file_to_delete_later
1300             WHERE delafter < $ut
1301             LIMIT 500
1302             }) || [] };
1303             }
1304              
1305             # return 1 on success. die otherwise.
1306             sub enqueue_fids_to_delete {
1307             my ($self, @fidids) = @_;
1308             # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
1309             # when the first row causes the duplicate error, and the remaining rows are
1310             # not processed.
1311             if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
1312             $self->enqueue_fids_to_delete($_) foreach @fidids;
1313             return 1;
1314             }
1315             # TODO: convert to prepared statement?
1316             $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
1317             join(",", map { "(" . int($_) . ")" } @fidids))
1318             or die "file_to_delete insert failed";
1319             }
1320              
1321             # clears everything from the fsck_log table
1322             # return 1 on success. die otherwise.
1323             sub clear_fsck_log {
1324             my $self = shift;
1325             $self->dbh->do("DELETE FROM fsck_log");
1326             return 1;
1327             }
1328              
1329             sub fsck_log_summarize_every { 100 }
1330              
1331             sub fsck_log {
1332             my ($self, %opts) = @_;
1333             $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
1334             "VALUES (" . $self->unix_timestamp . ",?,?,?)",
1335             undef,
1336             delete $opts{fid},
1337             delete $opts{code},
1338             delete $opts{devid});
1339             croak("Unknown opts") if %opts;
1340              
1341             my $logid = $self->dbh->last_insert_id(undef, undef, 'fsck_log', 'logid')
1342             or die "No last_insert_id found for fsck_log table";
1343              
1344             # sum-up evcode counts every so often, to make fsck_status faster,
1345             # avoiding a potentially-huge GROUP BY in the future..
1346             my $SUM_EVERY = $self->fsck_log_summarize_every;
1347             # Note: totally disregards locking/races because there's only one
1348             # fsck process running globally (in theory-- there could be 5
1349             # second overlaps on quick stop/starts, so we take some regard for
1350             # races, but not much).
1351             if ($logid % $SUM_EVERY == 0) {
1352             my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
1353             # both inclusive:
1354             my $min_logid = max($start_max_logid, $logid - $SUM_EVERY) + 1;
1355             my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
1356             while (my ($evcode, $ct) = each %$cts) {
1357             $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);
1358             }
1359             }
1360              
1361             return 1;
1362             }
1363              
1364             sub get_db_unixtime {
1365             my $self = shift;
1366             return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);
1367             }
1368              
1369             sub max_fidid {
1370             my $self = shift;
1371             return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");
1372             }
1373              
1374             sub max_fsck_logid {
1375             my $self = shift;
1376             return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;
1377             }
1378              
1379             # returns array of $row hashrefs, from fsck_log table
1380             sub fsck_log_rows {
1381             my ($self, $after_logid, $limit) = @_;
1382             $limit = int($limit || 100);
1383             $after_logid = int($after_logid || 0);
1384              
1385             my @rows;
1386             my $sth = $self->dbh->prepare(qq{
1387             SELECT logid, utime, fid, evcode, devid
1388             FROM fsck_log
1389             WHERE logid > ?
1390             ORDER BY logid
1391             LIMIT $limit
1392             });
1393             $sth->execute($after_logid);
1394             my $row;
1395             push @rows, $row while $row = $sth->fetchrow_hashref;
1396             return @rows;
1397             }
1398              
1399             sub fsck_evcode_counts {
1400             my ($self, %opts) = @_;
1401             my $timegte = delete $opts{time_gte};
1402             my $logr = delete $opts{logid_range};
1403             die if %opts;
1404              
1405             my $ret = {};
1406             my $sth;
1407             if ($timegte) {
1408             $sth = $self->dbh->prepare(qq{
1409             SELECT evcode, COUNT(*) FROM fsck_log
1410             WHERE utime >= ?
1411             GROUP BY evcode
1412             });
1413             $sth->execute($timegte||0);
1414             }
1415             if ($logr) {
1416             $sth = $self->dbh->prepare(qq{
1417             SELECT evcode, COUNT(*) FROM fsck_log
1418             WHERE logid >= ? AND logid <= ?
1419             GROUP BY evcode
1420             });
1421             $sth->execute($logr->[0], $logr->[1]);
1422             }
1423             while (my ($ev, $ct) = $sth->fetchrow_array) {
1424             $ret->{$ev} = $ct;
1425             }
1426             return $ret;
1427             }
1428              
1429             # run before daemonizing. you can die from here if you see something's amiss. or emit
1430             # warnings.
1431             sub pre_daemonize_checks { }
1432              
1433              
1434             # attempt to grab a lock of lockname, and timeout after timeout seconds.
1435             # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding.
1436             sub get_lock {
1437             my ($self, $lockname, $timeout) = @_;
1438             die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth};
1439             die "get_lock not implemented for $self";
1440             }
1441              
1442             # attempt to release a lock of lockname.
1443             # returns 1 on success and 0 if no lock we have has that name.
1444             sub release_lock {
1445             my ($self, $lockname) = @_;
1446             die "release_lock not implemented for $self";
1447             }
1448              
1449             # returns up to $limit @fidids which are on provided $devid
1450             sub random_fids_on_device {
1451             my ($self, $devid, $limit) = @_;
1452             $limit = int($limit) || 100;
1453              
1454             my $dbh = $self->dbh;
1455              
1456             # FIXME: this blows. not random. and good chances these will
1457             # eventually get to point where they're un-rebalanacable, and we
1458             # never move on past the first 5000
1459             my @some_fids = List::Util::shuffle(@{
1460             $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
1461             undef, $devid) || []
1462             });
1463              
1464             @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
1465             return @some_fids;
1466             }
1467              
1468             # return array of { dmid => ..., classid => ..., devcount => ..., count => ... }
1469             sub get_stats_files_per_devcount {
1470             my ($self) = @_;
1471             my $dbh = $self->dbh;
1472             my @ret;
1473             my $sth = $dbh->prepare('SELECT dmid, classid, devcount, COUNT(devcount) AS "count" FROM file GROUP BY 1, 2, 3');
1474             $sth->execute;
1475             while (my $row = $sth->fetchrow_hashref) {
1476             push @ret, $row;
1477             }
1478             return @ret;
1479             }
1480              
1481             1;
1482              
1483             __END__