package MogileFS::Store; use strict; use warnings; use Carp qw(croak); use MogileFS::Util qw(throw max); use DBI; # no reason a Store has to be DBI-based, but for now they all are. use List::Util (); # this is incremented whenever the schema changes. server will refuse # to start-up with an old schema version # # 8: adds fsck_log table # 9: adds 'drain' state to enum in device table use constant SCHEMA_VERSION => 9; sub new { my ($class) = @_; return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass)); } sub new_from_dsn_user_pass { my ($class, $dsn, $user, $pass) = @_; my $subclass; if ($dsn =~ /^DBI:mysql:/i) { $subclass = "MogileFS::Store::MySQL"; } elsif ($dsn =~ /^DBI:SQLite:/i) { $subclass = "MogileFS::Store::SQLite"; } elsif ($dsn =~ /^DBI:Oracle:/i) { $subclass = "MogileFS::Store::Oracle"; } elsif ($dsn =~ /^DBI:Pg:/i) { $subclass = "MogileFS::Store::Postgres"; } else { die "Unknown database type: $dsn"; } unless (eval "use $subclass; 1") { die "Error loading $subclass: $@\n"; } my $self = bless { dsn => $dsn, user => $user, pass => $pass, raise_errors => $subclass->want_raise_errors, slave_list_cachetime => 0, slave_list_cache => [], recheck_req_gen => 0, # incremented generation, of recheck of dbh being requested recheck_done_gen => 0, # once recheck is done, copy of what the request generation was }, $subclass; $self->init; return $self; } sub want_raise_errors { # will default to true later 0; } sub new_from_mogdbsetup { my ($class, %args) = @_; # where args is: dbhost dbname dbrootuser dbrootpass dbuser dbpass my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost}); my $try_make_sto = sub { my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, { PrintError => 0, }) or return undef; my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass}); $sto->raise_errors; return $sto; }; # upgrading, apparently, as this database already exists. my $sto = $try_make_sto->(); return $sto if $sto; # otherwise, we need to make the requested database, setup permissions, etc $class->status("couldn't connect to database as mogilefs user. trying root..."); my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost}); my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, { PrintError => 0, }) or die "Failed to connect to $dsn as specified root user: " . DBI->errstr . "\n"; $class->status("connected to database as root user."); $class->confirm("Create database name '$args{dbname}'?"); $class->create_db_if_not_exists($rdbh, $args{dbname}); $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?"); $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass}); # should be ready now: $sto = $try_make_sto->(); return $sto if $sto; die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user."; } # given a root DBI connection, create the named database. succeed # if it it's made, or already exists. die otherwise. sub create_db_if_not_exists { my ($pkg, $rdbh, $dbname) = @_; $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname") or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n"; } sub grant_privileges { my ($pkg, $rdbh, $dbname, $user, $pass) = @_; $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?", undef, $pass) or die "Failed to grant privileges: " . $rdbh->errstr . "\n"; $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?", undef, $pass) or die "Failed to grant privileges: " . $rdbh->errstr . "\n"; } sub can_replace { 0 } sub can_insertignore { 0 } sub can_insert_multi { 0 } sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." } sub ignore_replace { my $self = shift; return "INSERT IGNORE " if $self->can_insertignore; return "REPLACE " if $self->can_replace; die "Can't INSERT IGNORE or REPLACE?"; } my $on_status = sub {}; my $on_confirm = sub { 1 }; sub on_status { my ($pkg, $code) = @_; $on_status = $code; }; sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; }; sub status { my ($pkg, $msg) = @_; $on_status->($msg); }; sub confirm { my ($pkg, $msg) = @_; $on_confirm->($msg) or die "Aborted.\n"; }; sub latest_schema_version { SCHEMA_VERSION } sub raise_errors { my $self = shift; $self->{raise_errors} = 1; $self->dbh->{RaiseError} = 1; } sub dsn { $_[0]{dsn} } sub user { $_[0]{user} } sub pass { $_[0]{pass} } sub init { 1 } sub post_dbi_connect { 1 } sub can_do_slaves { 0 } sub mark_as_slave { my $self = shift; die "Incapable of becoming slave." unless $self->can_do_slaves; $self->{slave} = 1; } sub is_slave { my $self = shift; return $self->{slave}; } # Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB. sub _slaves_list { my $self = shift; my $now = time(); # only reload every 15 seconds. if ($self->{slave_list_cachetime} > $now - 15) { return @{$self->{slave_list_cache}}; } $self->{slave_list_cachetime} = $now; $self->{slave_list_cache} = []; my $sk = MogileFS::Config->server_setting('slave_keys') or return (); my @ret; foreach my $key (split /\s*,\s*/, $sk) { my $slave = MogileFS::Config->server_setting("slave_$key"); my ($dsn, $user, $pass) = split /\|/, $slave; push @ret, [$dsn, $user, $pass]; } $self->{slave_list_cache} = \@ret; return @ret; } sub get_slave { my $self = shift; die "Incapable of having slaves." unless $self->can_do_slaves; return $self->{slave} if $self->check_slave; my @slaves_list = $self->_slaves_list; # If we have no slaves, then return silently. return unless @slaves_list; foreach my $slave_fulldsn (@slaves_list) { my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn); $self->{slave_next_check} = 0; $newslave->mark_as_slave; return $newslave if $self->check_slave; } warn "Slave list exhausted, failing back to master."; return; } sub read_store { my $self = shift; return $self unless $self->can_do_slaves; if ($self->{slave_ok}) { if (my $slave = $self->get_slave) { $slave->{recheck_req_gen} = $self->{recheck_req_gen}; return $slave; } } return $self; } sub slaves_ok { my $self = shift; my $coderef = shift; return unless ref $coderef eq 'CODE'; local $self->{slave_ok} = 1; return $coderef->(@_); } sub recheck_dbh { my $self = shift; $self->{recheck_req_gen}++; } sub dbh { my $self = shift; if ($self->{dbh}) { if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) { $self->{dbh} = undef unless $self->{dbh}->ping; $self->{recheck_done_gen} = $self->{recheck_req_gen}; } return $self->{dbh} if $self->{dbh}; } $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, { PrintError => 0, AutoCommit => 1, # FUTURE: will default to on (have to validate all callers first): RaiseError => ($self->{raise_errors} || 0), }) or die "Failed to connect to database: " . DBI->errstr; $self->post_dbi_connect; return $self->{dbh}; } sub ping { my $self = shift; return $self->dbh->ping; } sub condthrow { my ($self, $optmsg) = @_; my $dbh = $self->dbh; return unless $dbh->err; my ($pkg, $fn, $line) = caller; my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr; $msg .= ": $optmsg" if $optmsg; croak($msg); } sub dowell { my ($self, $sql, @do_params) = @_; my $rv = eval { $self->dbh->do($sql, @do_params) }; return $rv unless $@ || $self->dbh->err; warn "Error with SQL: $sql\n"; Carp::confess($@ || $self->dbh->errstr); } sub _valid_params { croak("Odd number of parameters!") if scalar(@_) % 2; my ($self, $vlist, %uarg) = @_; my %ret; $ret{$_} = delete $uarg{$_} foreach @$vlist; croak("Bogus options: ".join(',',keys %uarg)) if %uarg; return %ret; } sub was_duplicate_error { my $self = shift; my $dbh = $self->dbh; die "UNIMPLEMENTED"; } # run a subref (presumably a database update) in an eval, because you expect it to # maybe fail on duplicate key error, and throw a dup exception for you, else return # its return value sub conddup { my ($self, $code) = @_; my $rv = eval { $code->(); }; throw("dup") if $self->was_duplicate_error; return $rv; } # insert row if doesn't already exist # WARNING: This function is NOT transaction safe if the duplicate errors causes # your transaction to halt! # WARNING: This function is NOT safe on multi-row inserts if can_insertignore # is false! Rows before the duplicate will be inserted, but rows after the # duplicate might not be, depending your database. sub insert_ignore { my ($self, $sql, @params) = @_; my $dbh = $self->dbh; if ($self->can_insertignore) { return $dbh->do("INSERT IGNORE $sql", @params); } else { # TODO: Detect bad multi-row insert here. my $rv = eval { $dbh->do("INSERT $sql", @params); }; if ($@ || $dbh->err) { return 1 if $self->was_duplicate_error; # This chunk is identical to condthrow, but we include it directly # here as we know there is definetly an error, and we would like # the caller of this function. my ($pkg, $fn, $line) = caller; my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr; croak($msg); } return $rv; } } # -------------------------------------------------------------------------- my @extra_tables; sub add_extra_tables { my $class = shift; push @extra_tables, @_; } use constant TABLES => qw( domain class file tempfile file_to_delete unreachable_fids file_on file_on_corrupt host device server_settings file_to_replicate file_to_delete_later fsck_log); sub setup_database { my $sto = shift; # schema history: # 8: adds fsck_log table # 7: adds file_to_delete_later table # 6: adds file_to_replicate table my $curver = $sto->schema_version; my $latestver = SCHEMA_VERSION; if ($curver == $latestver) { $sto->status("Schema already up-to-date at version $curver."); return 1; } if ($curver > $latestver) { die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver. Aborting to be safe.\n"; } if ($curver) { $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?"); } foreach my $t (TABLES, @extra_tables) { $sto->create_table($t); } $sto->upgrade_add_host_getport; $sto->upgrade_add_host_altip; $sto->upgrade_add_device_asof; $sto->upgrade_add_device_weight; $sto->upgrade_add_device_readonly; $sto->upgrade_add_device_drain; return 1; } sub schema_version { my $self = shift; my $dbh = $self->dbh; return eval { $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0; } || 0; } sub filter_create_sql { my ($self, $sql) = @_; return $sql; } sub create_table { my ($self, $table) = @_; my $dbh = $self->dbh; return 1 if $self->table_exists($table); my $meth = "TABLE_$table"; my $sql = $self->$meth; $sql = $self->filter_create_sql($sql); $self->status("Running SQL: $sql;"); $dbh->do($sql) or die "Failed to create table $table: " . $dbh->errstr; my $imeth = "INDEXES_$table"; my @indexes = eval { $self->$imeth }; foreach $sql (@indexes) { $self->status("Running SQL: $sql;"); $dbh->do($sql) or die "Failed to create indexes on $table: " . $dbh->errstr; } } # Please try to keep all tables aligned nicely # with '"CREATE TABLE' on the first line # and ')"' alone on the last line. sub TABLE_domain { # classes are tied to domains. domains can have classes of items # with different mindevcounts. # # a minimum devcount is the number of copies the system tries to # maintain for files in that class # # unspecified classname means classid=0 (implicit class), and that # implies mindevcount=2 "CREATE TABLE domain ( dmid SMALLINT UNSIGNED NOT NULL PRIMARY KEY, namespace VARCHAR(255), UNIQUE (namespace) )" } sub TABLE_class { "CREATE TABLE class ( dmid SMALLINT UNSIGNED NOT NULL, classid TINYINT UNSIGNED NOT NULL, PRIMARY KEY (dmid,classid), classname VARCHAR(50), UNIQUE (dmid,classname), mindevcount TINYINT UNSIGNED NOT NULL )" } # the length field is only here for easy verifications of content # integrity when copying around. no sums or content types or other # metadata here. application can handle that. # # classid is what class of file this belongs to. for instance, on fotobilder # there will be a class for original pictures (the ones the user uploaded) # and a class for derived images (scaled down versions, thumbnails, greyscale, etc) # each domain can setup classes and assign the minimum redundancy level for # each class. fotobilder will use a 2 or 3 minimum copy redundancy for original # photos and and a 1 minimum for derived images (which means the sole device # for a derived image can die, bringing devcount to 0 for that file, but # the application can recreate it from its original) sub TABLE_file { "CREATE TABLE file ( fid INT UNSIGNED NOT NULL, PRIMARY KEY (fid), dmid SMALLINT UNSIGNED NOT NULL, dkey VARCHAR(255), # domain-defined UNIQUE dkey (dmid, dkey), length INT UNSIGNED, # 4GB limit classid TINYINT UNSIGNED NOT NULL, devcount TINYINT UNSIGNED NOT NULL, INDEX devcount (dmid,classid,devcount) )" } sub TABLE_tempfile { "CREATE TABLE tempfile ( fid INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY (fid), createtime INT UNSIGNED NOT NULL, classid TINYINT UNSIGNED NOT NULL, dmid SMALLINT UNSIGNED NOT NULL, dkey VARCHAR(255), devids VARCHAR(60) )" } # files marked for death when their key is overwritten. then they get a new # fid, but since the old row (with the old fid) had to be deleted immediately, # we need a place to store the fid so an async job can delete the file from # all devices. sub TABLE_file_to_delete { "CREATE TABLE file_to_delete ( fid INT UNSIGNED NOT NULL, PRIMARY KEY (fid) )" } # if the replicator notices that a fid has no sources, that file gets inserted # into the unreachable_fids table. it is up to the application to actually # handle fids stored in this table. sub TABLE_unreachable_fids { "CREATE TABLE unreachable_fids ( fid INT UNSIGNED NOT NULL, lastupdate INT UNSIGNED NOT NULL, PRIMARY KEY (fid), INDEX (lastupdate) )" } # what files are on what devices? (most likely physical devices, # as logical devices of RAID arrays would be costly, and mogilefs # already handles redundancy) # # the devid index lets us answer "What files were on this now-dead disk?" sub TABLE_file_on { "CREATE TABLE file_on ( fid INT UNSIGNED NOT NULL, devid MEDIUMINT UNSIGNED NOT NULL, PRIMARY KEY (fid, devid), INDEX (devid) )" } # if application or framework detects an error in one of the duplicate files # for whatever reason, it can register its complaint and the framework # will do some verifications and fix things up w/ an async job # MAYBE: let application tell us the SHA1/MD5 of the file for us to check # on the other devices? sub TABLE_file_on_corrupt { "CREATE TABLE file_on_corrupt ( fid INT UNSIGNED NOT NULL, devid MEDIUMINT UNSIGNED NOT NULL, PRIMARY KEY (fid, devid) )" } # hosts (which contain devices...) sub TABLE_host { "CREATE TABLE host ( hostid MEDIUMINT UNSIGNED NOT NULL PRIMARY KEY, status ENUM('alive','dead','down'), http_port MEDIUMINT UNSIGNED DEFAULT 7500, http_get_port MEDIUMINT UNSIGNED, hostname VARCHAR(40), hostip VARCHAR(15), altip VARCHAR(15), altmask VARCHAR(18), UNIQUE (hostname), UNIQUE (hostip), UNIQUE (altip) )" } # disks... sub TABLE_device { "CREATE TABLE device ( devid MEDIUMINT UNSIGNED NOT NULL, hostid MEDIUMINT UNSIGNED NOT NULL, status ENUM('alive','dead','down'), weight MEDIUMINT DEFAULT 100, mb_total MEDIUMINT UNSIGNED, mb_used MEDIUMINT UNSIGNED, mb_asof INT UNSIGNED, PRIMARY KEY (devid), INDEX (status) )" } sub TABLE_server_settings { "CREATE TABLE server_settings ( field VARCHAR(50) PRIMARY KEY, value VARCHAR(255) )" } sub TABLE_file_to_replicate { # nexttry is time to try to replicate it next. # 0 means immediate. it's only on one host. # 1 means lower priority. it's on 2+ but isn't happy where it's at. # unixtimestamp means at/after that time. some previous error occurred. # fromdevid, if not null, means which devid we should replicate from. perhaps it's the only non-corrupt one. otherwise, wherever. # failcount. how many times we've failed, just for doing backoff of nexttry. # flags. reserved for future use. "CREATE TABLE file_to_replicate ( fid INT UNSIGNED NOT NULL PRIMARY KEY, nexttry INT UNSIGNED NOT NULL, INDEX (nexttry), fromdevid INT UNSIGNED, failcount TINYINT UNSIGNED NOT NULL DEFAULT 0, flags SMALLINT UNSIGNED NOT NULL DEFAULT 0 )" } sub TABLE_file_to_delete_later { "CREATE TABLE file_to_delete_later ( fid INT UNSIGNED NOT NULL PRIMARY KEY, delafter INT UNSIGNED NOT NULL, INDEX (delafter) )" } sub TABLE_fsck_log { "CREATE TABLE fsck_log ( logid INT UNSIGNED NOT NULL AUTO_INCREMENT, PRIMARY KEY (logid), utime INT UNSIGNED NOT NULL, fid INT UNSIGNED NULL, evcode CHAR(4), devid MEDIUMINT UNSIGNED, INDEX(utime) )" } # these five only necessary for MySQL, since no other database existed # before, so they can just create the tables correctly to begin with. # in the future, there might be new alters that non-MySQL databases # will have to implement. sub upgrade_add_host_getport { 1 } sub upgrade_add_host_altip { 1 } sub upgrade_add_device_asof { 1 } sub upgrade_add_device_weight { 1 } sub upgrade_add_device_readonly { 1 } sub upgrade_add_device_drain { die "Not implemented in $_[0]" } # return true if deleted, 0 if didn't exist, exception if error sub delete_host { my ($self, $hostid) = @_; return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid); } # return true if deleted, 0 if didn't exist, exception if error sub delete_domain { my ($self, $dmid) = @_; return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid); } sub domain_has_files { my ($self, $dmid) = @_; my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1', undef, $dmid); return $has_a_fid ? 1 : 0; } sub class_has_files { my ($self, $dmid, $clid) = @_; my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1', undef, $dmid, $clid); return $has_a_fid ? 1 : 0; } # return new classid on success (non-zero integer), die on failure # throw 'dup' on duplicate name # override this if you want a less racy version. sub create_class { my ($self, $dmid, $classname) = @_; my $dbh = $self->dbh; # get the max class id in this domain my $maxid = $dbh->selectrow_array ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0; # now insert the new class my $rv = eval { $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)", undef, $dmid, $maxid + 1, $classname, 2); }; if ($@ || $dbh->err) { if ($self->was_duplicate_error) { throw("dup"); } } return $maxid + 1 if $rv; $self->condthrow; die; } # return 1 on success, throw "dup" on duplicate name error, die otherwise sub update_class_name { my $self = shift; my %arg = $self->_valid_params([qw(dmid classid classname)], @_); my $rv = eval { $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?", undef, $arg{classname}, $arg{dmid}, $arg{classid}); }; throw("dup") if $self->was_duplicate_error; $self->condthrow; return 1; } # return 1 on success, die otherwise sub update_class_mindevcount { my $self = shift; my %arg = $self->_valid_params([qw(dmid classid mindevcount)], @_); $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?", undef, $arg{mindevcount}, $arg{dmid}, $arg{classid}); $self->condthrow; return 1; } sub nfiles_with_dmid_classid_devcount { my ($self, $dmid, $classid, $devcount) = @_; return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?', undef, $dmid, $classid, $devcount); } sub set_server_setting { my ($self, $key, $val) = @_; my $dbh = $self->dbh; die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace; if (defined $val) { $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val); } else { $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key); } die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err; return 1; } # FIXME: racy. currently the only caller doesn't matter, but should be fixed. sub incr_server_setting { my ($self, $key, $val) = @_; $val = 1 unless defined $val; return unless $val; return 1 if $self->dbh->do("UPDATE server_settings ". "SET value=value+? ". "WHERE field=?", undef, $val, $key) > 0; $self->set_server_setting($key, $val); } sub server_setting { my ($self, $key) = @_; return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?", undef, $key); } sub server_settings { my ($self) = @_; my $ret = {}; my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings"); $sth->execute; while (my ($k, $v) = $sth->fetchrow_array) { $ret->{$k} = $v; } return $ret; } # register a tempfile and return the fidid, which should be allocated # using autoincrement/sequences if the passed in fid is undef. however, # if fid is passed in, that value should be used and returned. # # return new/passed in fidid on success. # throw 'dup' if fid already in use # return 0/undef/die on failure # sub register_tempfile { my $self = shift; my %arg = $self->_valid_params([qw(fid dmid key classid devids)], @_); my $dbh = $self->dbh; my $fid = $arg{fid}; my $explicit_fid_used = $fid ? 1 : 0; # setup the new mapping. we store the devices that we picked for # this file in here, knowing that they might not be used. create_close # is responsible for actually mapping in file_on. NOTE: fid is being # passed in, it's either some number they gave us, or it's going to be # 0/undef which translates into NULL which means to automatically create # one. that should be fine. my $ins_tempfile = sub { my $rv = eval { # We must only pass the correct number of bind parameters # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on # Postgres, where you are expected to leave it out or use DEFAULT # Leaving it out seems sanest and least likely to cause problems # with other databases. my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime'); my @vars = ('?' , '?' , '?' , '?' , $self->unix_timestamp); my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids}); # Do not check for $explicit_fid_used, but rather $fid directly # as this anonymous sub is called from the loop later if($fid) { unshift @keys, 'fid'; unshift @vars, '?'; unshift @vals, $fid; } my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")"; $dbh->do($sql, undef, @vals); }; if (!$rv) { return undef if $self->was_duplicate_error; die "Unexpected db error into tempfile: " . $dbh->errstr; } unless (defined $fid) { # if they did not give us a fid, then we want to grab the one that was # theoretically automatically generated $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid') or die "No last_insert_id found"; } return undef unless defined $fid && $fid > 0; return 1; }; unless ($ins_tempfile->()) { throw("dup") if $explicit_fid_used; die "tempfile insert failed"; } my $fid_in_use = sub { my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid); return $exists ? 1 : 0; }; # if the fid is in use, do something while ($fid_in_use->($fid)) { throw("dup") if $explicit_fid_used; # be careful of databases which reset their # auto-increment/sequences when the table is empty (InnoDB # did/does this, for instance). So check if it's in use, and # re-seed the table with the highest known fid from the file # table. # get the highest fid from the filetable and insert a dummy row $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file"); $ins_tempfile->(); # don't care about its result # then do a normal auto-increment $fid = undef; $ins_tempfile->() or die "register_tempfile failed after seeding"; } return $fid; } # return hashref of row containing columns "fid, dmid, dkey, length, # classid, devcount" provided a $dmid and $key (dkey). or undef if no # row. sub file_row_from_dmid_key { my ($self, $dmid, $key) = @_; return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ". "FROM file WHERE dmid=? AND dkey=?", undef, $dmid, $key); } # return hashref of row containing columns "fid, dmid, dkey, length, # classid, devcount" provided a $fidid or undef if no row. sub file_row_from_fidid { my ($self, $fidid) = @_; return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ". "FROM file WHERE fid=?", undef, $fidid); } # return an arrayref of rows containing columns "fid, dmid, dkey, length, # classid, devcount" provided a pair of $fidid or undef if no rows. sub file_row_from_fidid_range { my ($self, $fromfid, $tofid) = @_; my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ". "FROM file WHERE fid BETWEEN ? AND ?"); $sth->execute($fromfid,$tofid); return $sth->fetchall_arrayref({}); } # return array of devids that a fidid is on sub fid_devids { my ($self, $fidid) = @_; return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?", undef, $fidid) || [] }; } # return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids sub fid_devids_multiple { my ($self, @fidids) = @_; my $in = join(",", map { $_+0 } @fidids); my $ret = {}; my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)"); $sth->execute; while (my ($fidid, $devid) = $sth->fetchrow_array) { push @{$ret->{$fidid} ||= []}, $devid; } return $ret; } # return hashref of columns classid, dmid, dkey, given a $fidid, or return undef sub tempfile_row_from_fid { my ($self, $fidid) = @_; return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ". "FROM tempfile WHERE fid=?", undef, $fidid); } # return 1 on success, throw "dup" on duplicate devid or throws other error on failure sub create_device { my ($self, $devid, $hostid, $status) = @_; my $rv = $self->conddup(sub { $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef, $devid, $hostid, $status); }); $self->condthrow; die "error making device $devid\n" unless $rv > 0; return 1; } sub update_device_usage { my $self = shift; my %arg = $self->_valid_params([qw(mb_total mb_used devid)], @_); $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp . " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid}); $self->condthrow; } sub mark_fidid_unreachable { my ($self, $fidid) = @_; die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace; $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")", undef, $fidid); } sub set_device_weight { my ($self, $devid, $weight) = @_; $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid); $self->condthrow; } sub set_device_state { my ($self, $devid, $state) = @_; $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid); $self->condthrow; } sub delete_class { my ($self, $dmid, $cid) = @_; $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid); $self->condthrow; } sub delete_fidid { my ($self, $fidid) = @_; $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid); $self->condthrow; $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); $self->condthrow; $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES (?)", undef, $fidid); $self->condthrow; } sub delete_tempfile_row { my ($self, $fidid) = @_; $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid); $self->condthrow; } sub replace_into_file { my $self = shift; my %arg = $self->_valid_params([qw(fidid dmid key length classid)], @_); die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace; $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ". "VALUES (?,?,?,?,?,0) ", undef, @arg{'fidid', 'dmid', 'key', 'length', 'classid'}); $self->condthrow; } # returns 1 on success, 0 on duplicate key error, dies on exception # TODO: need a test to hit the duplicate name error condition # TODO: switch to using "dup" exception here? sub rename_file { my ($self, $fidid, $to_key) = @_; my $dbh = $self->dbh; eval { $dbh->do('UPDATE file SET dkey = ? WHERE fid=?', undef, $to_key, $fidid); }; if ($@ || $dbh->err) { # first is mysql's error code for duplicates if ($self->was_duplicate_error) { return 0; } else { die $@; } } $self->condthrow; return 1; } # returns a hash of domains. Key is namespace, value is dmid. sub get_all_domains { my ($self) = @_; my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain'); return map { ($_->[0], $_->[1]) } @{$domains || []}; } # returns an array of hashrefs, one hashref per row in the 'class' table sub get_all_classes { my ($self) = @_; my (@ret, $row); my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class"); $sth->execute; push @ret, $row while $row = $sth->fetchrow_hashref; return @ret; } # add a record of fidid existing on devid # returns 1 on success, 0 on duplicate sub add_fidid_to_devid { my ($self, $fidid, $devid) = @_; croak("fidid not non-zero") unless $fidid; croak("devid not non-zero") unless $devid; # TODO: This should possibly be insert_ignore instead # As if we are adding an extra file_on entry, we do not want to replace the # exist one. Check REPLACE semantics. my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)", undef, $fidid, $devid); return 1 if $rv > 0; return 0; } # remove a record of fidid existing on devid # returns 1 on success, 0 if not there anyway sub remove_fidid_from_devid { my ($self, $fidid, $devid) = @_; my $rv = $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?", undef, $fidid, $devid); $self->condthrow; return $rv; } # get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents. sub get_all_hosts { my ($self) = @_; my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " . "hostip, http_port, http_get_port, altip, altmask FROM host"); $sth->execute; my @ret; while (my $row = $sth->fetchrow_hashref) { push @ret, $row; } return @ret; } # get all devices from database, returns them as list of hashrefs, hashrefs being the row contents. sub get_all_devices { my ($self) = @_; my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " . "mb_used, mb_asof, status, weight FROM device"); $self->condthrow; $sth->execute; my @return; while (my $row = $sth->fetchrow_hashref) { push @return, $row; } return @return; } # update the device count for a given fidid sub update_devcount { my ($self, $fidid) = @_; my $dbh = $self->dbh; my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?", undef, $fidid); $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef, $ct, $fidid); return 1; } # enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds. sub enqueue_for_replication { my ($self, $fidid, $from_devid, $in) = @_; my $nexttry = 0; if ($in) { $nexttry = $self->unix_timestamp . " + " . int($in); } $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ". "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid); } # reschedule all deferred replication, return number rescheduled sub replicate_now { my ($self) = @_; return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " WHERE nexttry > " . $self->unix_timestamp); } # takes two arguments, devid and limit, both required. returns an arrayref of fidids. sub get_fidids_by_device { my ($self, $devid, $limit) = @_; my $dbh = $self->dbh; my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit", undef, $devid); return $fidids; } # takes two arguments, fidid to be above, and optional limit (default # 1,000). returns up to that that many fidids above the provided # fidid. returns array of MogileFS::FID objects, sorted by fid ids. sub get_fids_above_id { my ($self, $fidid, $limit) = @_; $limit ||= 1000; $limit = int($limit); my @ret; my $dbh = $self->dbh; my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid ". "FROM file ". "WHERE fid > ? ". "ORDER BY fid LIMIT $limit"); $sth->execute($fidid); while (my $row = $sth->fetchrow_hashref) { push @ret, MogileFS::FID->new_from_db_row($row); } return @ret; } # creates a new domain, given a domain namespace string. return the dmid on success, # throw 'dup' on duplicate name. # override if you want a less racy version. sub create_domain { my ($self, $name) = @_; my $dbh = $self->dbh; # get the max domain id my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0; my $rv = eval { $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)', undef, $maxid + 1, $name); }; if ($self->was_duplicate_error) { throw("dup"); } return $maxid+1 if $rv; die "failed to make domain"; # FIXME: the above is racy. } sub update_host_property { my ($self, $hostid, $col, $val) = @_; $self->conddup(sub { $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid); }); return 1; } # return ne hostid, or throw 'dup' on error. # NOTE: you need to put them into the initial 'down' state. sub create_host { my ($self, $hostname, $ip) = @_; my $dbh = $self->dbh; # racy! lazy. no, better: portable! how often does this happen? :) my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1; my $rv = $self->conddup(sub { $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ". "VALUES (?, ?, ?, 'down')", undef, $hid, $hostname, $ip); }); return $hid if $rv; die "db failure"; } # return array of row hashrefs containing columns: (fid, fromdevid, # failcount, flags, nexttry) sub files_to_replicate { my ($self, $limit) = @_; my $ut = $self->unix_timestamp; my $to_repl_map = $self->dbh->selectall_hashref(qq{ SELECT fid, fromdevid, failcount, flags, nexttry FROM file_to_replicate WHERE nexttry <= $ut ORDER BY nexttry LIMIT $limit }, "fid") or return (); return values %$to_repl_map; } # although it's safe to have multiple tracker hosts and/or processes # replicating the same file, around, it's inefficient CPU/time-wise, # and it's also possible they pick different places and waste disk. # so the replicator asks the store interface when it's about to start # and when it's done replicating a fidid, so you can do something smart # and tell it not to. sub should_begin_replicating_fidid { my ($self, $fidid) = @_; warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n"); 1; } # called when replicator is done replicating a fid, so you can cleanup # whatever you did in 'should_begin_replicating_fidid' above. # # NOTE: there's a theoretical race condition in the rebalance code, # where (without locking as provided by # should_begin_replicating_fidid/note_done_replicating), all copies of # a file can be deleted by independent replicators doing rebalancing # in different ways. so you'll probably want to implement some # locking in this pair of functions. sub note_done_replicating { my ($self, $fidid) = @_; } sub delete_fid_from_file_to_replicate { my ($self, $fidid) = @_; $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid); } sub reschedule_file_to_replicate_absolute { my ($self, $fid, $abstime) = @_; $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?", undef, $abstime, $fid); } sub reschedule_file_to_replicate_relative { my ($self, $fid, $in_n_secs) = @_; $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " . "failcount = failcount + 1 WHERE fid = ?", undef, $in_n_secs, $fid); } # Given a dmid prefix after and limit, return an arrayref of dkey from the file # table. sub get_keys_like { my ($self, $dmid, $prefix, $after, $limit) = @_; # fix the input... prefix always ends with a % so that it works # in a LIKE call, and after is either blank or something $prefix ||= ''; $prefix .= '%'; $after ||= ''; # now select out our keys return $self->dbh->selectcol_arrayref ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' . "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after); } # return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids]) # that were created $secs_ago seconds ago or older. sub old_tempfiles { my ($self, $secs_old) = @_; return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " . "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50"); } # given an array of MogileFS::DevFID objects, mass-insert them all # into file_on (ignoring if they're already present) sub mass_insert_file_on { my ($self, @devfids) = @_; return 1 unless @devfids; if (@devfids > 1 && ! $self->can_insert_multi) { $self->mass_insert_file_on($_) foreach @devfids; return 1; } my (@qmarks, @binds); foreach my $df (@devfids) { my ($fidid, $devid) = ($df->fidid, $df->devid); Carp::croak("got a false fidid") unless $fidid; Carp::croak("got a false devid") unless $devid; push @binds, $fidid, $devid; push @qmarks, "(?,?)"; } # TODO: This should possibly be insert_ignore instead # As if we are adding an extra file_on entry, we do not want to replace the # exist one. Check REPLACE semantics. $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds); return 1; } sub set_schema_vesion { my ($self, $ver) = @_; $self->set_server_setting("schema_version", int($ver)); } # returns array of fidids to try and delete again sub fids_to_delete_again { my $self = shift; my $ut = $self->unix_timestamp; return @{ $self->dbh->selectcol_arrayref(qq{ SELECT fid FROM file_to_delete_later WHERE delafter < $ut LIMIT 500 }) || [] }; } # return 1 on success. die otherwise. sub enqueue_fids_to_delete { my ($self, @fidids) = @_; # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub. # when the first row causes the duplicate error, and the remaining rows are # not processed. if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) { $self->enqueue_fids_to_delete($_) foreach @fidids; return 1; } # TODO: convert to prepared statement? $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " . join(",", map { "(" . int($_) . ")" } @fidids)) or die "file_to_delete insert failed"; } # clears everything from the fsck_log table # return 1 on success. die otherwise. sub clear_fsck_log { my $self = shift; $self->dbh->do("DELETE FROM fsck_log"); return 1; } sub fsck_log_summarize_every { 100 } sub fsck_log { my ($self, %opts) = @_; $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ". "VALUES (" . $self->unix_timestamp . ",?,?,?)", undef, delete $opts{fid}, delete $opts{code}, delete $opts{devid}); croak("Unknown opts") if %opts; my $logid = $self->dbh->last_insert_id(undef, undef, 'fsck_log', 'logid') or die "No last_insert_id found for fsck_log table"; # sum-up evcode counts every so often, to make fsck_status faster, # avoiding a potentially-huge GROUP BY in the future.. my $SUM_EVERY = $self->fsck_log_summarize_every; # Note: totally disregards locking/races because there's only one # fsck process running globally (in theory-- there could be 5 # second overlaps on quick stop/starts, so we take some regard for # races, but not much). if ($logid % $SUM_EVERY == 0) { my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0; # both inclusive: my $min_logid = max($start_max_logid, $logid - $SUM_EVERY) + 1; my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :) while (my ($evcode, $ct) = each %$cts) { $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct); } } return 1; } sub get_db_unixtime { my $self = shift; return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp); } sub max_fidid { my $self = shift; return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file"); } sub max_fsck_logid { my $self = shift; return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0; } # returns array of $row hashrefs, from fsck_log table sub fsck_log_rows { my ($self, $after_logid, $limit) = @_; $limit = int($limit || 100); $after_logid = int($after_logid || 0); my @rows; my $sth = $self->dbh->prepare(qq{ SELECT logid, utime, fid, evcode, devid FROM fsck_log WHERE logid > ? ORDER BY logid LIMIT $limit }); $sth->execute($after_logid); my $row; push @rows, $row while $row = $sth->fetchrow_hashref; return @rows; } sub fsck_evcode_counts { my ($self, %opts) = @_; my $timegte = delete $opts{time_gte}; my $logr = delete $opts{logid_range}; die if %opts; my $ret = {}; my $sth; if ($timegte) { $sth = $self->dbh->prepare(qq{ SELECT evcode, COUNT(*) FROM fsck_log WHERE utime >= ? GROUP BY evcode }); $sth->execute($timegte||0); } if ($logr) { $sth = $self->dbh->prepare(qq{ SELECT evcode, COUNT(*) FROM fsck_log WHERE logid >= ? AND logid <= ? GROUP BY evcode }); $sth->execute($logr->[0], $logr->[1]); } while (my ($ev, $ct) = $sth->fetchrow_array) { $ret->{$ev} = $ct; } return $ret; } # run before daemonizing. you can die from here if you see something's amiss. or emit # warnings. sub pre_daemonize_checks { } # attempt to grab a lock of lockname, and timeout after timeout seconds. # returns 1 on success and 0 on timeout. dies if more than one lock is already outstanding. sub get_lock { my ($self, $lockname, $timeout) = @_; die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}). Bailing out." if $self->{lock_depth}; die "get_lock not implemented for $self"; } # attempt to release a lock of lockname. # returns 1 on success and 0 if no lock we have has that name. sub release_lock { my ($self, $lockname) = @_; die "release_lock not implemented for $self"; } # returns up to $limit @fidids which are on provided $devid sub random_fids_on_device { my ($self, $devid, $limit) = @_; $limit = int($limit) || 100; my $dbh = $self->dbh; # FIXME: this blows. not random. and good chances these will # eventually get to point where they're un-rebalanacable, and we # never move on past the first 5000 my @some_fids = List::Util::shuffle(@{ $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000", undef, $devid) || [] }); @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids; return @some_fids; } # return array of { dmid => ..., classid => ..., devcount => ..., count => ... } sub get_stats_files_per_devcount { my ($self) = @_; my $dbh = $self->dbh; my @ret; my $sth = $dbh->prepare('SELECT dmid, classid, devcount, COUNT(devcount) AS "count" FROM file GROUP BY 1, 2, 3'); $sth->execute; while (my $row = $sth->fetchrow_hashref) { push @ret, $row; } return @ret; } 1; __END__ =head1 NAME MogileFS::Store - data storage provider. base class. =head1 ABOUT MogileFS aims to be database-independent (though currently as of late 2006 only works with MySQL). In the future, the server will create a singleton instance of type "MogileFS::Store", like L, and all database interaction will be through it. =head1 SEE ALSO L