package MogileFS::Store;
use strict;
use warnings;
use Carp qw(croak);
use MogileFS::Util qw(throw max);
use DBI;  # no reason a Store has to be DBI-based, but for now they all are.
use List::Util ();

# this is incremented whenever the schema changes.  server will refuse
# to start-up with an old schema version
# 8: adds fsck_log table
# 9: adds 'drain' state to enum in device table
use constant SCHEMA_VERSION => 9;

sub new {
    my ($class) = @_;
    return $class->new_from_dsn_user_pass(map { MogileFS->config($_) } qw(db_dsn db_user db_pass));

sub new_from_dsn_user_pass {
    my ($class, $dsn, $user, $pass) = @_;
    my $subclass;
    if ($dsn =~ /^DBI:mysql:/i) {
        $subclass = "MogileFS::Store::MySQL";
    } elsif ($dsn =~ /^DBI:SQLite:/i) {
        $subclass = "MogileFS::Store::SQLite";
    } elsif ($dsn =~ /^DBI:Oracle:/i) {
        $subclass = "MogileFS::Store::Oracle";
    } elsif ($dsn =~ /^DBI:Pg:/i) {
        $subclass = "MogileFS::Store::Postgres";
    } else {
        die "Unknown database type: $dsn";
    unless (eval "use $subclass; 1") {
        die "Error loading $subclass: $@\n";
    my $self = bless {
        dsn    => $dsn,
        user   => $user,
        pass   => $pass,
        raise_errors => $subclass->want_raise_errors,
        slave_list_cachetime => 0,
        slave_list_cache     => [],
        recheck_req_gen  => 0,  # incremented generation, of recheck of dbh being requested
        recheck_done_gen => 0,  # once recheck is done, copy of what the request generation was
    }, $subclass;
    return $self;

sub want_raise_errors {
     # will default to true later

sub new_from_mogdbsetup {
    my ($class, %args) = @_;
    # where args is:  dbhost dbname dbrootuser dbrootpass dbuser dbpass
    my $dsn = $class->dsn_of_dbhost($args{dbname}, $args{dbhost});

    my $try_make_sto = sub {
        my $dbh = DBI->connect($dsn, $args{dbuser}, $args{dbpass}, {
            PrintError => 0,
        }) or return undef;
        my $sto = $class->new_from_dsn_user_pass($dsn, $args{dbuser}, $args{dbpass});
        return $sto;

    # upgrading, apparently, as this database already exists.
    my $sto = $try_make_sto->();
    return $sto if $sto;

    # otherwise, we need to make the requested database, setup permissions, etc
    $class->status("couldn't connect to database as mogilefs user.  trying root...");
    my $rootdsn = $class->dsn_of_root($args{dbname}, $args{dbhost});
    my $rdbh = DBI->connect($rootdsn, $args{dbrootuser}, $args{dbrootpass}, {
        PrintError => 0,
    }) or
        die "Failed to connect to $dsn as specified root user: " . DBI->errstr . "\n";
    $class->status("connected to database as root user.");

    $class->confirm("Create database name '$args{dbname}'?");
    $class->create_db_if_not_exists($rdbh, $args{dbname});
    $class->confirm("Grant all privileges to user '$args{dbuser}', connecting from anywhere, to the mogilefs database '$args{dbname}'?");
    $class->grant_privileges($rdbh, $args{dbname}, $args{dbuser}, $args{dbpass});

    # should be ready now:
    $sto = $try_make_sto->();
    return $sto if $sto;

    die "Failed to connect to database as regular user, even after creating it and setting up permissions as the root user.";

# given a root DBI connection, create the named database.  succeed
# if it it's made, or already exists.  die otherwise.
sub create_db_if_not_exists {
    my ($pkg, $rdbh, $dbname) = @_;
    $rdbh->do("CREATE DATABASE IF NOT EXISTS $dbname")
        or die "Failed to create database '$dbname': " . $rdbh->errstr . "\n";

sub grant_privileges {
    my ($pkg, $rdbh, $dbname, $user, $pass) = @_;
    $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'\%' IDENTIFIED BY ?",
             undef, $pass)
        or die "Failed to grant privileges: " . $rdbh->errstr . "\n";
    $rdbh->do("GRANT ALL PRIVILEGES ON $dbname.* TO $user\@'localhost' IDENTIFIED BY ?",
             undef, $pass)
        or die "Failed to grant privileges: " . $rdbh->errstr . "\n";

sub can_replace      { 0 }
sub can_insertignore { 0 }
sub can_insert_multi { 0 }

sub unix_timestamp { die "No function in $_[0] to return DB's unixtime." }

sub ignore_replace {
    my $self = shift;
    return "INSERT IGNORE " if $self->can_insertignore;
    return "REPLACE " if $self->can_replace;
    die "Can't INSERT IGNORE or REPLACE?";

my $on_status = sub {};
my $on_confirm = sub { 1 };
sub on_status  { my ($pkg, $code) = @_; $on_status  = $code; };
sub on_confirm { my ($pkg, $code) = @_; $on_confirm = $code; };
sub status     { my ($pkg, $msg)  = @_; $on_status->($msg);  };
sub confirm    { my ($pkg, $msg)  = @_; $on_confirm->($msg) or die "Aborted.\n"; };

sub latest_schema_version { SCHEMA_VERSION }

sub raise_errors {
    my $self = shift;
    $self->{raise_errors} = 1;
    $self->dbh->{RaiseError} = 1;

sub dsn  { $_[0]{dsn}  }
sub user { $_[0]{user} }
sub pass { $_[0]{pass} }

sub init { 1 }
sub post_dbi_connect { 1 }

sub can_do_slaves { 0 }

sub mark_as_slave {
    my $self = shift;
    die "Incapable of becoming slave." unless $self->can_do_slaves;

    $self->{slave} = 1;

sub is_slave {
    my $self = shift;
    return $self->{slave};

# Returns a list of arrayrefs, each being [$dsn, $username, $password] for connecting to a slave DB.
sub _slaves_list {
    my $self = shift;
    my $now = time();

    # only reload every 15 seconds.
    if ($self->{slave_list_cachetime} > $now - 15) {
        return @{$self->{slave_list_cache}};
    $self->{slave_list_cachetime} = $now;
    $self->{slave_list_cache}     = [];

    my $sk = MogileFS::Config->server_setting('slave_keys')
        or return ();

    my @ret;
    foreach my $key (split /\s*,\s*/, $sk) {
        my $slave = MogileFS::Config->server_setting("slave_$key");
        my ($dsn, $user, $pass) = split /\|/, $slave;
        push @ret, [$dsn, $user, $pass];

    $self->{slave_list_cache}     = \@ret;
    return @ret;

sub get_slave {
    my $self = shift;

    die "Incapable of having slaves." unless $self->can_do_slaves;

    return $self->{slave} if $self->check_slave;

    my @slaves_list = $self->_slaves_list;

    # If we have no slaves, then return silently.
    return unless @slaves_list;

    foreach my $slave_fulldsn (@slaves_list) {
        my $newslave = $self->{slave} = $self->new_from_dsn_user_pass(@$slave_fulldsn);
        $self->{slave_next_check} = 0;
        return $newslave
            if $self->check_slave;

    warn "Slave list exhausted, failing back to master.";

sub read_store {
    my $self = shift;

    return $self unless $self->can_do_slaves;

    if ($self->{slave_ok}) {
        if (my $slave = $self->get_slave) {
            $slave->{recheck_req_gen} = $self->{recheck_req_gen};
            return $slave;

    return $self;

sub slaves_ok {
    my $self = shift;
    my $coderef = shift;

    return unless ref $coderef eq 'CODE';

    local $self->{slave_ok} = 1;

    return $coderef->(@_);

sub recheck_dbh {
    my $self = shift;

sub dbh {
    my $self = shift;
    if ($self->{dbh}) {
        if ($self->{recheck_done_gen} != $self->{recheck_req_gen}) {
            $self->{dbh} = undef unless $self->{dbh}->ping;
            $self->{recheck_done_gen} = $self->{recheck_req_gen};
        return $self->{dbh} if $self->{dbh};

    $self->{dbh} = DBI->connect($self->{dsn}, $self->{user}, $self->{pass}, {
        PrintError => 0,
        AutoCommit => 1,
        # FUTURE: will default to on (have to validate all callers first):
        RaiseError => ($self->{raise_errors} || 0),
    }) or
        die "Failed to connect to database: " . DBI->errstr;
    return $self->{dbh};

sub ping {
    my $self = shift;
    return $self->dbh->ping;

sub condthrow {
    my ($self, $optmsg) = @_;
    my $dbh = $self->dbh;
    return unless $dbh->err;
    my ($pkg, $fn, $line) = caller;
    my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
    $msg .= ": $optmsg" if $optmsg;

sub dowell {
    my ($self, $sql, @do_params) = @_;
    my $rv = eval { $self->dbh->do($sql, @do_params) };
    return $rv unless $@ || $self->dbh->err;
    warn "Error with SQL: $sql\n";
    Carp::confess($@ || $self->dbh->errstr);

sub _valid_params {
    croak("Odd number of parameters!") if scalar(@_) % 2;
    my ($self, $vlist, %uarg) = @_;
    my %ret;
    $ret{$_} = delete $uarg{$_} foreach @$vlist;
    croak("Bogus options: ".join(',',keys %uarg)) if %uarg;
    return %ret;

sub was_duplicate_error {
    my $self = shift;
    my $dbh = $self->dbh;

# run a subref (presumably a database update) in an eval, because you expect it to
# maybe fail on duplicate key error, and throw a dup exception for you, else return
# its return value
sub conddup {
    my ($self, $code) = @_;
    my $rv = eval { $code->(); };
    throw("dup") if $self->was_duplicate_error;
    return $rv;

# insert row if doesn't already exist
# WARNING: This function is NOT transaction safe if the duplicate errors causes
# your transaction to halt!
# WARNING: This function is NOT safe on multi-row inserts if can_insertignore
# is false! Rows before the duplicate will be inserted, but rows after the
# duplicate might not be, depending your database.
sub insert_ignore {
    my ($self, $sql, @params) = @_;
    my $dbh = $self->dbh;
    if ($self->can_insertignore) {
        return $dbh->do("INSERT IGNORE $sql", @params);
    } else {
        # TODO: Detect bad multi-row insert here.
        my $rv = eval { $dbh->do("INSERT $sql", @params); };
        if ($@ || $dbh->err) {
            return 1 if $self->was_duplicate_error;
            # This chunk is identical to condthrow, but we include it directly
            # here as we know there is definetly an error, and we would like
            # the caller of this function.
            my ($pkg, $fn, $line) = caller;
            my $msg = "Database error from $pkg/$fn/$line: " . $dbh->errstr;
        return $rv;

# --------------------------------------------------------------------------

my @extra_tables;

sub add_extra_tables {
    my $class = shift;
    push @extra_tables, @_;

use constant TABLES => qw( domain class file tempfile file_to_delete
                            unreachable_fids file_on file_on_corrupt host
                            device server_settings file_to_replicate
                            file_to_delete_later fsck_log);

sub setup_database {
    my $sto = shift;

    # schema history:
    #   8: adds fsck_log table
    #   7: adds file_to_delete_later table
    #   6: adds file_to_replicate table
    my $curver = $sto->schema_version;

    my $latestver = SCHEMA_VERSION;
    if ($curver == $latestver) {
        $sto->status("Schema already up-to-date at version $curver.");
        return 1;

    if ($curver > $latestver) {
        die "Your current schema version is $curver, but this version of mogdbsetup only knows up to $latestver.  Aborting to be safe.\n";

    if ($curver) {
        $sto->confirm("Install/upgrade your schema from version $curver to version $latestver?");

    foreach my $t (TABLES, @extra_tables) {


    return 1;

sub schema_version {
    my $self = shift;
    my $dbh = $self->dbh;
    return eval {
        $dbh->selectrow_array("SELECT value FROM server_settings WHERE field='schema_version'") || 0;
    } || 0;

sub filter_create_sql { my ($self, $sql) = @_; return $sql; }

sub create_table {
    my ($self, $table) = @_;
    my $dbh = $self->dbh;
    return 1 if $self->table_exists($table);
    my $meth = "TABLE_$table";
    my $sql = $self->$meth;
    $sql = $self->filter_create_sql($sql);
    $self->status("Running SQL: $sql;");
    $dbh->do($sql) or
        die "Failed to create table $table: " . $dbh->errstr;
    my $imeth = "INDEXES_$table";
    my @indexes = eval { $self->$imeth };
    foreach $sql (@indexes) {
        $self->status("Running SQL: $sql;");
        $dbh->do($sql) or
            die "Failed to create indexes on $table: " . $dbh->errstr;

# Please try to keep all tables aligned nicely
# with '"CREATE TABLE' on the first line
# and ')"' alone on the last line.

sub TABLE_domain {
    # classes are tied to domains.  domains can have classes of items
    # with different mindevcounts.
    # a minimum devcount is the number of copies the system tries to
    # maintain for files in that class
    # unspecified classname means classid=0 (implicit class), and that
    # implies mindevcount=2
    "CREATE TABLE domain (
    namespace    VARCHAR(255),
    UNIQUE (namespace)

sub TABLE_class {
    "CREATE TABLE class (
    classid       TINYINT UNSIGNED NOT NULL,
    PRIMARY KEY (dmid,classid),
    classname     VARCHAR(50),
    UNIQUE      (dmid,classname),

# the length field is only here for easy verifications of content
# integrity when copying around.  no sums or content types or other
# metadata here.  application can handle that.
# classid is what class of file this belongs to.  for instance, on fotobilder
# there will be a class for original pictures (the ones the user uploaded)
# and a class for derived images (scaled down versions, thumbnails, greyscale, etc)
# each domain can setup classes and assign the minimum redundancy level for
# each class.  fotobilder will use a 2 or 3 minimum copy redundancy for original
# photos and and a 1 minimum for derived images (which means the sole device
# for a derived image can die, bringing devcount to 0 for that file, but
# the application can recreate it from its original)
sub TABLE_file {
    "CREATE TABLE file (
    fid          INT UNSIGNED NOT NULL,
    PRIMARY KEY  (fid),

    dkey           VARCHAR(255),     # domain-defined
    UNIQUE dkey  (dmid, dkey),

    length        INT UNSIGNED,        # 4GB limit

    classid       TINYINT UNSIGNED NOT NULL,
    devcount      TINYINT UNSIGNED NOT NULL,
    INDEX devcount (dmid,classid,devcount)

sub TABLE_tempfile {
    "CREATE TABLE tempfile (
    PRIMARY KEY  (fid),

    createtime   INT UNSIGNED NOT NULL,
    dkey           VARCHAR(255),
    devids       VARCHAR(60)

# files marked for death when their key is overwritten.  then they get a new
# fid, but since the old row (with the old fid) had to be deleted immediately,
# we need a place to store the fid so an async job can delete the file from
# all devices.
sub TABLE_file_to_delete {
    "CREATE TABLE file_to_delete (
    PRIMARY KEY (fid)

# if the replicator notices that a fid has no sources, that file gets inserted
# into the unreachable_fids table.  it is up to the application to actually
# handle fids stored in this table.
sub TABLE_unreachable_fids {
    "CREATE TABLE unreachable_fids (
    fid        INT UNSIGNED NOT NULL,
    lastupdate INT UNSIGNED NOT NULL,
    PRIMARY KEY (fid),
    INDEX (lastupdate)

# what files are on what devices?  (most likely physical devices,
# as logical devices of RAID arrays would be costly, and mogilefs
# already handles redundancy)
# the devid index lets us answer "What files were on this now-dead disk?"
sub TABLE_file_on {
    "CREATE TABLE file_on (
    fid          INT UNSIGNED NOT NULL,
    PRIMARY KEY (fid, devid),
    INDEX (devid)

# if application or framework detects an error in one of the duplicate files
# for whatever reason, it can register its complaint and the framework
# will do some verifications and fix things up w/ an async job
# MAYBE: let application tell us the SHA1/MD5 of the file for us to check
#        on the other devices?
sub TABLE_file_on_corrupt {
    "CREATE TABLE file_on_corrupt (
    fid          INT UNSIGNED NOT NULL,
    PRIMARY KEY (fid, devid)

# hosts (which contain devices...)
sub TABLE_host {
    "CREATE TABLE host (

    status     ENUM('alive','dead','down'),
    http_get_port MEDIUMINT UNSIGNED,

    hostname   VARCHAR(40),
    hostip     VARCHAR(15),
    altip      VARCHAR(15),
    altmask    VARCHAR(18),
    UNIQUE     (hostname),
    UNIQUE     (hostip),
    UNIQUE     (altip)

# disks...
sub TABLE_device {
    "CREATE TABLE device (

    status  ENUM('alive','dead','down'),
    weight  MEDIUMINT DEFAULT 100,

    mb_total   MEDIUMINT UNSIGNED,
    mb_used    MEDIUMINT UNSIGNED,
    mb_asof    INT UNSIGNED,
    PRIMARY KEY (devid),
    INDEX   (status)

sub TABLE_server_settings {
    "CREATE TABLE server_settings (
    field   VARCHAR(50) PRIMARY KEY,
    value   VARCHAR(255)

sub TABLE_file_to_replicate {
    # nexttry is time to try to replicate it next.
    #   0 means immediate.  it's only on one host.
    #   1 means lower priority.  it's on 2+ but isn't happy where it's at.
    #   unixtimestamp means at/after that time.  some previous error occurred.
    # fromdevid, if not null, means which devid we should replicate from.  perhaps it's the only non-corrupt one.  otherwise, wherever.
    # failcount.  how many times we've failed, just for doing backoff of nexttry.
    # flags.  reserved for future use.
    "CREATE TABLE file_to_replicate (
    nexttry    INT UNSIGNED NOT NULL,
    INDEX (nexttry),
    fromdevid  INT UNSIGNED,

sub TABLE_file_to_delete_later {
    "CREATE TABLE file_to_delete_later (
    INDEX (delafter)

sub TABLE_fsck_log {
    "CREATE TABLE fsck_log (
    PRIMARY KEY (logid),
    evcode CHAR(4),

# these five only necessary for MySQL, since no other database existed
# before, so they can just create the tables correctly to begin with.
# in the future, there might be new alters that non-MySQL databases
# will have to implement.
sub upgrade_add_host_getport { 1 }
sub upgrade_add_host_altip { 1 }
sub upgrade_add_device_asof { 1 }
sub upgrade_add_device_weight { 1 }
sub upgrade_add_device_readonly { 1 }
sub upgrade_add_device_drain { die "Not implemented in $_[0]" }

# return true if deleted, 0 if didn't exist, exception if error
sub delete_host {
    my ($self, $hostid) = @_;
    return $self->dbh->do("DELETE FROM host WHERE hostid = ?", undef, $hostid);

# return true if deleted, 0 if didn't exist, exception if error
sub delete_domain {
    my ($self, $dmid) = @_;
    return $self->dbh->do("DELETE FROM domain WHERE dmid = ?", undef, $dmid);

sub domain_has_files {
    my ($self, $dmid) = @_;
    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? LIMIT 1',
                                                undef, $dmid);
    return $has_a_fid ? 1 : 0;

sub class_has_files {
    my ($self, $dmid, $clid) = @_;
    my $has_a_fid = $self->dbh->selectrow_array('SELECT fid FROM file WHERE dmid = ? AND classid = ? LIMIT 1',
                                                undef, $dmid, $clid);
    return $has_a_fid ? 1 : 0;

# return new classid on success (non-zero integer), die on failure
# throw 'dup' on duplicate name
# override this if you want a less racy version.
sub create_class {
    my ($self, $dmid, $classname) = @_;
    my $dbh = $self->dbh;

    # get the max class id in this domain
    my $maxid = $dbh->selectrow_array
        ('SELECT MAX(classid) FROM class WHERE dmid = ?', undef, $dmid) || 0;

    # now insert the new class
    my $rv = eval {
        $dbh->do("INSERT INTO class (dmid, classid, classname, mindevcount) VALUES (?, ?, ?, ?)",
                 undef, $dmid, $maxid + 1, $classname, 2);
    if ($@ || $dbh->err) {
        if ($self->was_duplicate_error) {
    return $maxid + 1 if $rv;

# return 1 on success, throw "dup" on duplicate name error, die otherwise
sub update_class_name {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid classname)], @_);
    my $rv = eval {
        $self->dbh->do("UPDATE class SET classname=? WHERE dmid=? AND classid=?",
                       undef, $arg{classname}, $arg{dmid}, $arg{classid});
    throw("dup") if $self->was_duplicate_error;
    return 1;

# return 1 on success, die otherwise
sub update_class_mindevcount {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(dmid classid mindevcount)], @_);
    $self->dbh->do("UPDATE class SET mindevcount=? WHERE dmid=? AND classid=?",
                   undef, $arg{mindevcount}, $arg{dmid}, $arg{classid});
    return 1;

sub nfiles_with_dmid_classid_devcount {
    my ($self, $dmid, $classid, $devcount) = @_;
    return $self->dbh->selectrow_array('SELECT COUNT(*) FROM file WHERE dmid = ? AND classid = ? AND devcount = ?',
                                       undef, $dmid, $classid, $devcount);

sub set_server_setting {
    my ($self, $key, $val) = @_;
    my $dbh = $self->dbh;
    die "Your database does not support REPLACE! Reimplement set_server_setting!" unless $self->can_replace;

    if (defined $val) {
        $dbh->do("REPLACE INTO server_settings (field, value) VALUES (?, ?)", undef, $key, $val);
    } else {
        $dbh->do("DELETE FROM server_settings WHERE field=?", undef, $key);

    die "Error updating 'server_settings': " . $dbh->errstr if $dbh->err;
    return 1;

# FIXME: racy.  currently the only caller doesn't matter, but should be fixed.
sub incr_server_setting {
    my ($self, $key, $val) = @_;
    $val = 1 unless defined $val;
    return unless $val;

    return 1 if $self->dbh->do("UPDATE server_settings ".
                               "SET value=value+? ".
                               "WHERE field=?", undef,
                               $val, $key) > 0;
    $self->set_server_setting($key, $val);

sub server_setting {
    my ($self, $key) = @_;
    return $self->dbh->selectrow_array("SELECT value FROM server_settings WHERE field=?",
                                       undef, $key);

sub server_settings {
    my ($self) = @_;
    my $ret = {};
    my $sth = $self->dbh->prepare("SELECT field, value FROM server_settings");
    while (my ($k, $v) = $sth->fetchrow_array) {
        $ret->{$k} = $v;
    return $ret;

# register a tempfile and return the fidid, which should be allocated
# using autoincrement/sequences if the passed in fid is undef.  however,
# if fid is passed in, that value should be used and returned.
# return new/passed in fidid on success.
# throw 'dup' if fid already in use
# return 0/undef/die on failure
sub register_tempfile {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(fid dmid key classid devids)], @_);

    my $dbh = $self->dbh;
    my $fid = $arg{fid};

    my $explicit_fid_used = $fid ? 1 : 0;

    # setup the new mapping.  we store the devices that we picked for
    # this file in here, knowing that they might not be used.  create_close
    # is responsible for actually mapping in file_on.  NOTE: fid is being
    # passed in, it's either some number they gave us, or it's going to be
    # 0/undef which translates into NULL which means to automatically create
    # one.  that should be fine.
    my $ins_tempfile = sub {
        my $rv = eval {
            # We must only pass the correct number of bind parameters
            # Using 'NULL' for the AUTO_INCREMENT/SERIAL column will fail on
            # Postgres, where you are expected to leave it out or use DEFAULT
            # Leaving it out seems sanest and least likely to cause problems
            # with other databases.
            my @keys = ('dmid', 'dkey', 'classid', 'devids', 'createtime');
            my @vars = ('?'   , '?'   , '?'      , '?'     , $self->unix_timestamp);
            my @vals = ($arg{dmid}, $arg{key}, $arg{classid} || 0, $arg{devids});
            # Do not check for $explicit_fid_used, but rather $fid directly
            # as this anonymous sub is called from the loop later
            if($fid) {
                unshift @keys, 'fid';
                unshift @vars, '?';
                unshift @vals, $fid;
            my $sql = "INSERT INTO tempfile (".join(',',@keys).") VALUES (".join(',',@vars).")";
            $dbh->do($sql, undef, @vals);
        if (!$rv) {
            return undef if $self->was_duplicate_error;
            die "Unexpected db error into tempfile: " . $dbh->errstr;

        unless (defined $fid) {
            # if they did not give us a fid, then we want to grab the one that was
            # theoretically automatically generated
            $fid = $dbh->last_insert_id(undef, undef, 'tempfile', 'fid')
                or die "No last_insert_id found";
        return undef unless defined $fid && $fid > 0;
        return 1;

    unless ($ins_tempfile->()) {
        throw("dup") if $explicit_fid_used;
        die "tempfile insert failed";

    my $fid_in_use = sub {
        my $exists = $dbh->selectrow_array("SELECT COUNT(*) FROM file WHERE fid=?", undef, $fid);
        return $exists ? 1 : 0;

    # if the fid is in use, do something
    while ($fid_in_use->($fid)) {
        throw("dup") if $explicit_fid_used;

        # be careful of databases which reset their
        # auto-increment/sequences when the table is empty (InnoDB
        # did/does this, for instance).  So check if it's in use, and
        # re-seed the table with the highest known fid from the file
        # table.

        # get the highest fid from the filetable and insert a dummy row
        $fid = $dbh->selectrow_array("SELECT MAX(fid) FROM file");
        $ins_tempfile->();  # don't care about its result

        # then do a normal auto-increment
        $fid = undef;
        $ins_tempfile->() or die "register_tempfile failed after seeding";

    return $fid;

# return hashref of row containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a $dmid and $key (dkey).  or undef if no
# row.
sub file_row_from_dmid_key {
    my ($self, $dmid, $key) = @_;
    return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
                                         "FROM file WHERE dmid=? AND dkey=?",
                                         undef, $dmid, $key);

# return hashref of row containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a $fidid or undef if no row.
sub file_row_from_fidid {
    my ($self, $fidid) = @_;
    return $self->dbh->selectrow_hashref("SELECT fid, dmid, dkey, length, classid, devcount ".
                                         "FROM file WHERE fid=?",
                                         undef, $fidid);

# return an arrayref of rows containing columns "fid, dmid, dkey, length,
# classid, devcount" provided a pair of $fidid or undef if no rows.
sub file_row_from_fidid_range {
    my ($self, $fromfid, $tofid) = @_;
	my $sth = $self->dbh->prepare("SELECT fid, dmid, dkey, length, classid, devcount ".
								  "FROM file WHERE fid BETWEEN ? AND ?");
    return $sth->fetchall_arrayref({});

# return array of devids that a fidid is on
sub fid_devids {
    my ($self, $fidid) = @_;
    return @{ $self->dbh->selectcol_arrayref("SELECT devid FROM file_on WHERE fid=?",
                                             undef, $fidid) || [] };

# return hashref of { $fidid => [ $devid, $devid... ] } for a bunch of given @fidids
sub fid_devids_multiple {
    my ($self, @fidids) = @_;
    my $in = join(",", map { $_+0 } @fidids);
    my $ret = {};
    my $sth = $self->dbh->prepare("SELECT fid, devid FROM file_on WHERE fid IN ($in)");
    while (my ($fidid, $devid) = $sth->fetchrow_array) {
        push @{$ret->{$fidid} ||= []}, $devid;
    return $ret;

# return hashref of columns classid, dmid, dkey, given a $fidid, or return undef
sub tempfile_row_from_fid {
    my ($self, $fidid) = @_;
    return $self->dbh->selectrow_hashref("SELECT classid, dmid, dkey ".
                                         "FROM tempfile WHERE fid=?",
                                         undef, $fidid);

# return 1 on success, throw "dup" on duplicate devid or throws other error on failure
sub create_device {
    my ($self, $devid, $hostid, $status) = @_;
    my $rv = $self->conddup(sub {
        $self->dbh->do("INSERT INTO device (devid, hostid, status) VALUES (?,?,?)", undef,
                       $devid, $hostid, $status);
    die "error making device $devid\n" unless $rv > 0;
    return 1;

sub update_device_usage {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(mb_total mb_used devid)], @_);
    $self->dbh->do("UPDATE device SET mb_total = ?, mb_used = ?, mb_asof = " . $self->unix_timestamp .
                   " WHERE devid = ?", undef, $arg{mb_total}, $arg{mb_used}, $arg{devid});

sub mark_fidid_unreachable {
    my ($self, $fidid) = @_;
    die "Your database does not support REPLACE! Reimplement mark_fidid_unreachable!" unless $self->can_replace;
    $self->dbh->do("REPLACE INTO unreachable_fids VALUES (?, " . $self->unix_timestamp . ")",
                   undef, $fidid);

sub set_device_weight {
    my ($self, $devid, $weight) = @_;
    $self->dbh->do('UPDATE device SET weight = ? WHERE devid = ?', undef, $weight, $devid);

sub set_device_state {
    my ($self, $devid, $state) = @_;
    $self->dbh->do('UPDATE device SET status = ? WHERE devid = ?', undef, $state, $devid);

sub delete_class {
    my ($self, $dmid, $cid) = @_;
    $self->dbh->do("DELETE FROM class WHERE dmid = ? AND classid = ?", undef, $dmid, $cid);

sub delete_fidid {
    my ($self, $fidid) = @_;
    $self->dbh->do("DELETE FROM file WHERE fid=?", undef, $fidid);
    $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);
    $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES (?)", undef, $fidid);

sub delete_tempfile_row {
    my ($self, $fidid) = @_;
    $self->dbh->do("DELETE FROM tempfile WHERE fid=?", undef, $fidid);

sub replace_into_file {
    my $self = shift;
    my %arg  = $self->_valid_params([qw(fidid dmid key length classid)], @_);
    die "Your database does not support REPLACE! Reimplement replace_into_file!" unless $self->can_replace;
    $self->dbh->do("REPLACE INTO file (fid, dmid, dkey, length, classid, devcount) ".
                   "VALUES (?,?,?,?,?,0) ", undef,
                   @arg{'fidid', 'dmid', 'key', 'length', 'classid'});

# returns 1 on success, 0 on duplicate key error, dies on exception
# TODO: need a test to hit the duplicate name error condition
# TODO: switch to using "dup" exception here?
sub rename_file {
    my ($self, $fidid, $to_key) = @_;
    my $dbh = $self->dbh;
    eval {
        $dbh->do('UPDATE file SET dkey = ? WHERE fid=?',
                 undef, $to_key, $fidid);
    if ($@ || $dbh->err) {
        # first is mysql's error code for duplicates
        if ($self->was_duplicate_error) {
            return 0;
        } else {
            die $@;
    return 1;

# returns a hash of domains. Key is namespace, value is dmid.
sub get_all_domains {
    my ($self) = @_;
    my $domains = $self->dbh->selectall_arrayref('SELECT namespace, dmid FROM domain');
    return map { ($_->[0], $_->[1]) } @{$domains || []};

# returns an array of hashrefs, one hashref per row in the 'class' table
sub get_all_classes {
    my ($self) = @_;
    my (@ret, $row);
    my $sth = $self->dbh->prepare("SELECT dmid, classid, classname, mindevcount FROM class");
    push @ret, $row while $row = $sth->fetchrow_hashref;
    return @ret;

# add a record of fidid existing on devid
# returns 1 on success, 0 on duplicate
sub add_fidid_to_devid {
    my ($self, $fidid, $devid) = @_;
    croak("fidid not non-zero") unless $fidid;
    croak("devid not non-zero") unless $devid;

    # TODO: This should possibly be insert_ignore instead
    # As if we are adding an extra file_on entry, we do not want to replace the
    # exist one. Check REPLACE semantics.
    my $rv = $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES (?,?)",
                           undef, $fidid, $devid);
    return 1 if $rv > 0;
    return 0;

# remove a record of fidid existing on devid
# returns 1 on success, 0 if not there anyway
sub remove_fidid_from_devid {
    my ($self, $fidid, $devid) = @_;
    my $rv = $self->dbh->do("DELETE FROM file_on WHERE fid=? AND devid=?",
                            undef, $fidid, $devid);
    return $rv;

# get all hosts from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_hosts {
    my ($self) = @_;
    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ hostid, status, hostname, " .
                                  "hostip, http_port, http_get_port, altip, altmask FROM host");
    my @ret;
    while (my $row = $sth->fetchrow_hashref) {
        push @ret, $row;
    return @ret;

# get all devices from database, returns them as list of hashrefs, hashrefs being the row contents.
sub get_all_devices {
    my ($self) = @_;
    my $sth = $self->dbh->prepare("SELECT /*!40000 SQL_CACHE */ devid, hostid, mb_total, " .
                                  "mb_used, mb_asof, status, weight FROM device");
    my @return;
    while (my $row = $sth->fetchrow_hashref) {
        push @return, $row;
    return @return;

# update the device count for a given fidid
sub update_devcount {
    my ($self, $fidid) = @_;
    my $dbh = $self->dbh;
    my $ct = $dbh->selectrow_array("SELECT COUNT(*) FROM file_on WHERE fid=?",
                                   undef, $fidid);

    $dbh->do("UPDATE file SET devcount=? WHERE fid=?", undef,
              $ct, $fidid);

    return 1;

# enqueue a fidid for replication, from a specific deviceid (can be undef), in a given number of seconds.
sub enqueue_for_replication {
    my ($self, $fidid, $from_devid, $in) = @_;

    my $nexttry = 0;
    if ($in) {
        $nexttry = $self->unix_timestamp . " + " . int($in);

    $self->insert_ignore("INTO file_to_replicate (fid, fromdevid, nexttry) ".
                         "VALUES (?,?,$nexttry)", undef, $fidid, $from_devid);

# reschedule all deferred replication, return number rescheduled
sub replicate_now {
    my ($self) = @_;
    return $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp .
                          " WHERE nexttry > " . $self->unix_timestamp);

# takes two arguments, devid and limit, both required. returns an arrayref of fidids.
sub get_fidids_by_device {
    my ($self, $devid, $limit) = @_;

    my $dbh = $self->dbh;
    my $fidids = $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid = ? LIMIT $limit",
                                          undef, $devid);
    return $fidids;

# takes two arguments, fidid to be above, and optional limit (default
# 1,000).  returns up to that that many fidids above the provided
# fidid.  returns array of MogileFS::FID objects, sorted by fid ids.
sub get_fids_above_id {
    my ($self, $fidid, $limit) = @_;
    $limit ||= 1000;
    $limit = int($limit);

    my @ret;
    my $dbh = $self->dbh;
    my $sth = $dbh->prepare("SELECT fid, dmid, dkey, length, classid ".
                            "FROM   file ".
                            "WHERE  fid > ? ".
                            "ORDER BY fid LIMIT $limit");
    while (my $row = $sth->fetchrow_hashref) {
        push @ret, MogileFS::FID->new_from_db_row($row);
    return @ret;

# creates a new domain, given a domain namespace string.  return the dmid on success,
# throw 'dup' on duplicate name.
# override if you want a less racy version.
sub create_domain {
    my ($self, $name) = @_;
    my $dbh = $self->dbh;

    # get the max domain id
    my $maxid = $dbh->selectrow_array('SELECT MAX(dmid) FROM domain') || 0;
    my $rv = eval {
        $dbh->do('INSERT INTO domain (dmid, namespace) VALUES (?, ?)',
                 undef, $maxid + 1, $name);
    if ($self->was_duplicate_error) {
    return $maxid+1 if $rv;
    die "failed to make domain";  # FIXME: the above is racy.

sub update_host_property {
    my ($self, $hostid, $col, $val) = @_;
    $self->conddup(sub {
        $self->dbh->do("UPDATE host SET $col=? WHERE hostid=?", undef, $val, $hostid);
    return 1;

# return ne hostid, or throw 'dup' on error.
# NOTE: you need to put them into the initial 'down' state.
sub create_host {
    my ($self, $hostname, $ip) = @_;
    my $dbh = $self->dbh;
    # racy! lazy. no, better: portable! how often does this happen? :)
    my $hid = ($dbh->selectrow_array('SELECT MAX(hostid) FROM host') || 0) + 1;
    my $rv = $self->conddup(sub {
        $dbh->do("INSERT INTO host (hostid, hostname, hostip, status) ".
                 "VALUES (?, ?, ?, 'down')",
                 undef, $hid, $hostname, $ip);
    return $hid if $rv;
    die "db failure";

# return array of row hashrefs containing columns: (fid, fromdevid,
# failcount, flags, nexttry)
sub files_to_replicate {
    my ($self, $limit) = @_;
    my $ut = $self->unix_timestamp;
    my $to_repl_map = $self->dbh->selectall_hashref(qq{
        SELECT fid, fromdevid, failcount, flags, nexttry
        FROM file_to_replicate
        WHERE nexttry <= $ut
        ORDER BY nexttry
        LIMIT $limit
    }, "fid") or return ();
    return values %$to_repl_map;

# although it's safe to have multiple tracker hosts and/or processes
# replicating the same file, around, it's inefficient CPU/time-wise,
# and it's also possible they pick different places and waste disk.
# so the replicator asks the store interface when it's about to start
# and when it's done replicating a fidid, so you can do something smart
# and tell it not to.
sub should_begin_replicating_fidid {
    my ($self, $fidid) = @_;
    warn("Inefficient implementation of should_begin_replicating_fidid() in $self!\n");

# called when replicator is done replicating a fid, so you can cleanup
# whatever you did in 'should_begin_replicating_fidid' above.
# NOTE: there's a theoretical race condition in the rebalance code,
# where (without locking as provided by
# should_begin_replicating_fidid/note_done_replicating), all copies of
# a file can be deleted by independent replicators doing rebalancing
# in different ways.  so you'll probably want to implement some
# locking in this pair of functions.
sub note_done_replicating {
    my ($self, $fidid) = @_;

sub delete_fid_from_file_to_replicate {
    my ($self, $fidid) = @_;
    $self->dbh->do("DELETE FROM file_to_replicate WHERE fid=?", undef, $fidid);

sub reschedule_file_to_replicate_absolute {
    my ($self, $fid, $abstime) = @_;
    $self->dbh->do("UPDATE file_to_replicate SET nexttry = ?, failcount = failcount + 1 WHERE fid = ?",
                   undef, $abstime, $fid);

sub reschedule_file_to_replicate_relative {
    my ($self, $fid, $in_n_secs) = @_;
    $self->dbh->do("UPDATE file_to_replicate SET nexttry = " . $self->unix_timestamp . " + ?, " .
                   "failcount = failcount + 1 WHERE fid = ?",
                   undef, $in_n_secs, $fid);

# Given a dmid prefix after and limit, return an arrayref of dkey from the file
# table.
sub get_keys_like {
    my ($self, $dmid, $prefix, $after, $limit) = @_;
    # fix the input... prefix always ends with a % so that it works
    # in a LIKE call, and after is either blank or something
    $prefix ||= '';
    $prefix .= '%';
    $after ||= '';

    # now select out our keys
    return $self->dbh->selectcol_arrayref
        ('SELECT dkey FROM file WHERE dmid = ? AND dkey LIKE ? AND dkey > ? ' .
         "ORDER BY dkey LIMIT $limit", undef, $dmid, $prefix, $after);

# return arrayref of all tempfile rows (themselves also arrayrefs, of [$fidid, $devids])
# that were created $secs_ago seconds ago or older.
sub old_tempfiles {
    my ($self, $secs_old) = @_;
    return $self->dbh->selectall_arrayref("SELECT fid, devids FROM tempfile " .
                                          "WHERE createtime < " . $self->unix_timestamp . " - $secs_old LIMIT 50");

# given an array of MogileFS::DevFID objects, mass-insert them all
# into file_on (ignoring if they're already present)
sub mass_insert_file_on {
    my ($self, @devfids) = @_;
    return 1 unless @devfids;

    if (@devfids > 1 && ! $self->can_insert_multi) {
        $self->mass_insert_file_on($_) foreach @devfids;
        return 1;

    my (@qmarks, @binds);
    foreach my $df (@devfids) {
        my ($fidid, $devid) = ($df->fidid, $df->devid);
        Carp::croak("got a false fidid") unless $fidid;
        Carp::croak("got a false devid") unless $devid;
        push @binds, $fidid, $devid;
        push @qmarks, "(?,?)";

    # TODO: This should possibly be insert_ignore instead
    # As if we are adding an extra file_on entry, we do not want to replace the
    # exist one. Check REPLACE semantics.
    $self->dowell($self->ignore_replace . " INTO file_on (fid, devid) VALUES " . join(',', @qmarks), undef, @binds);
    return 1;

sub set_schema_vesion {
    my ($self, $ver) = @_;
    $self->set_server_setting("schema_version", int($ver));

# returns array of fidids to try and delete again
sub fids_to_delete_again {
    my $self = shift;
    my $ut = $self->unix_timestamp;
    return @{ $self->dbh->selectcol_arrayref(qq{
        SELECT fid
         FROM file_to_delete_later
        WHERE delafter < $ut
        LIMIT 500
    }) || [] };

# return 1 on success.  die otherwise.
sub enqueue_fids_to_delete {
    my ($self, @fidids) = @_;
    # multi-row insert-ignore/replace CAN fail with the insert_ignore emulation sub.
    # when the first row causes the duplicate error, and the remaining rows are
    # not processed.
    if (@fidids > 1 && ! ($self->can_insert_multi && ($self->can_replace || $self->can_insertignore))) {
        $self->enqueue_fids_to_delete($_) foreach @fidids;
        return 1;
    # TODO: convert to prepared statement?
    $self->dbh->do($self->ignore_replace . " INTO file_to_delete (fid) VALUES " .
                   join(",", map { "(" . int($_) . ")" } @fidids))
        or die "file_to_delete insert failed";

# clears everything from the fsck_log table
# return 1 on success.  die otherwise.
sub clear_fsck_log {
    my $self = shift;
    $self->dbh->do("DELETE FROM fsck_log");
    return 1;

sub fsck_log_summarize_every { 100 }

sub fsck_log {
    my ($self, %opts) = @_;
    $self->dbh->do("INSERT INTO fsck_log (utime, fid, evcode, devid) ".
                   "VALUES (" . $self->unix_timestamp . ",?,?,?)",
                   delete $opts{fid},
                   delete $opts{code},
                   delete $opts{devid});
    croak("Unknown opts") if %opts;

    my $logid = $self->dbh->last_insert_id(undef, undef, 'fsck_log', 'logid')
        or die "No last_insert_id found for fsck_log table";

    # sum-up evcode counts every so often, to make fsck_status faster,
    # avoiding a potentially-huge GROUP BY in the future..
    my $SUM_EVERY = $self->fsck_log_summarize_every;
    # Note: totally disregards locking/races because there's only one
    # fsck process running globally (in theory-- there could be 5
    # second overlaps on quick stop/starts, so we take some regard for
    # races, but not much).
    if ($logid % $SUM_EVERY == 0) {
        my $start_max_logid = $self->server_setting("fsck_start_maxlogid") || 0;
        # both inclusive:
        my $min_logid = max($start_max_logid, $logid - $SUM_EVERY) + 1;
        my $cts = $self->fsck_evcode_counts(logid_range => [$min_logid, $logid]); # inclusive notation :)
        while (my ($evcode, $ct) = each %$cts) {
            $self->incr_server_setting("fsck_sum_evcount_$evcode", $ct);

    return 1;

sub get_db_unixtime {
    my $self = shift;
    return $self->dbh->selectrow_array("SELECT " . $self->unix_timestamp);

sub max_fidid {
    my $self = shift;
    return $self->dbh->selectrow_array("SELECT MAX(fid) FROM file");

sub max_fsck_logid {
    my $self = shift;
    return $self->dbh->selectrow_array("SELECT MAX(logid) FROM fsck_log") || 0;

# returns array of $row hashrefs, from fsck_log table
sub fsck_log_rows {
    my ($self, $after_logid, $limit) = @_;
    $limit       = int($limit || 100);
    $after_logid = int($after_logid || 0);

    my @rows;
    my $sth = $self->dbh->prepare(qq{
        SELECT logid, utime, fid, evcode, devid
        FROM fsck_log
        WHERE logid > ?
        ORDER BY logid
        LIMIT $limit
    my $row;
    push @rows, $row while $row = $sth->fetchrow_hashref;
    return @rows;

sub fsck_evcode_counts {
    my ($self, %opts) = @_;
    my $timegte = delete $opts{time_gte};
    my $logr    = delete $opts{logid_range};
    die if %opts;

    my $ret = {};
    my $sth;
    if ($timegte) {
        $sth = $self->dbh->prepare(qq{
            SELECT evcode, COUNT(*) FROM fsck_log
            WHERE utime >= ?
            GROUP BY evcode
    if ($logr) {
        $sth = $self->dbh->prepare(qq{
            SELECT evcode, COUNT(*) FROM fsck_log
            WHERE logid >= ? AND logid <= ?
            GROUP BY evcode
        $sth->execute($logr->[0], $logr->[1]);
    while (my ($ev, $ct) = $sth->fetchrow_array) {
        $ret->{$ev} = $ct;
    return $ret;

# run before daemonizing.  you can die from here if you see something's amiss.  or emit
# warnings.
sub pre_daemonize_checks { }

# attempt to grab a lock of lockname, and timeout after timeout seconds.
# returns 1 on success and 0 on timeout.  dies if more than one lock is already outstanding.
sub get_lock {
    my ($self, $lockname, $timeout) = @_;
    die "Lock recursion detected (grabbing $lockname, had $self->{last_lock}).  Bailing out." if $self->{lock_depth};
    die "get_lock not implemented for $self";

# attempt to release a lock of lockname.
# returns 1 on success and 0 if no lock we have has that name.
sub release_lock {
    my ($self, $lockname) = @_;
    die "release_lock not implemented for $self";

# returns up to $limit @fidids which are on provided $devid
sub random_fids_on_device {
    my ($self, $devid, $limit) = @_;
    $limit = int($limit) || 100;

    my $dbh = $self->dbh;

    # FIXME: this blows. not random.  and good chances these will
    # eventually get to point where they're un-rebalanacable, and we
    # never move on past the first 5000
    my @some_fids = List::Util::shuffle(@{
        $dbh->selectcol_arrayref("SELECT fid FROM file_on WHERE devid=? LIMIT 5000",
                                 undef, $devid) || []

    @some_fids = @some_fids[0..$limit-1] if $limit < @some_fids;
    return @some_fids;

# return array of { dmid => ..., classid => ..., devcount => ..., count => ... }
sub get_stats_files_per_devcount {
    my ($self) = @_;
    my $dbh = $self->dbh;
    my @ret;
    my $sth = $dbh->prepare('SELECT dmid, classid, devcount, COUNT(devcount) AS "count" FROM file GROUP BY 1, 2, 3');
    while (my $row = $sth->fetchrow_hashref) {
        push @ret, $row;
    return @ret;



=head1 NAME

MogileFS::Store - data storage provider.  base class.

=head1 ABOUT

MogileFS aims to be database-independent (though currently as of late
2006 only works with MySQL).  In the future, the server will create a
singleton instance of type "MogileFS::Store", like
L<MogileFS::Store::MySQL>, and all database interaction will be
through it.

=head1 SEE ALSO
