package MogileFS::Worker::Fsck; use strict; use base 'MogileFS::Worker'; use fields ( 'last_stop_check', # unixtime 'should_stop_running' last called 'last_maxcheck_write', # unixtime maxcheck written 'size_checker', # subref which, given a DevFID, returns size of file 'opt_nostat', # bool: do we trust mogstoreds? skipping size stats? ); use MogileFS::Util qw(every error debug); use List::Util (); use Time::HiRes (); use constant SUCCESS => 0; use constant TEMPORARY => 1; use constant PERMANENT => 2; use constant REPLICATE => 3; use constant EV_NO_PATHS => "NOPA"; use constant EV_POLICY_VIOLATION => "POVI"; use constant EV_FILE_MISSING => "MISS"; use constant EV_BAD_LENGTH => "BLEN"; use constant EV_CANT_FIX => "GONE"; use constant EV_START_SEARCH => "SRCH"; use constant EV_FOUND_FID => "FOND"; use constant EV_RE_REPLICATE => "REPL"; use POSIX (); my $nowish; # approximate unixtime, updated once per loop. sub watchdog_timeout { 30 } sub work { my $self = shift; my $run_count = 0; # this can be CPU-intensive. let's nice ourselves down. POSIX::nice(10); # my $running = 0; # start time my $n_check = 0; # items checked my $start = sub { return if $running; $running = $nowish = time(); }; my $stats = sub { return unless $running; my $elap = $nowish - $running; debug(sprintf("In %d secs, %d fids, %0.02f fids/sec\n", $elap, $n_check, ($n_check / ($elap || 1)))); }; my $last_beat = 0; my $beat = sub { return unless $nowish >= $last_beat + 5; $stats->(); $last_beat = $nowish; }; my $stop = sub { return unless $running; $stats->(); debug("done."); $running = 0; }; # my $sto = Mgd::get_store(); my $max_checked = 0; every(5.0, sub { my $sleep_set = shift; $self->parent_ping; $nowish = time(); local $Mgd::nowish = $nowish; # see if we're even enabled for this host. unless ($self->should_be_running) { $max_checked = 0; # uncache this return; } # checking doesn't go well if the monitor job hasn't actively started # marking things as being available unless ($self->monitor_has_run) { # only warn on runs after the first. gives the monitor job some time to work # before we throw a message. debug("waiting for monitor job to complete a cycle before beginning") if $run_count++ > 0; return; } $max_checked ||= MogileFS::Config->server_setting('fsck_highest_fid_checked') || 0; $self->{opt_nostat} = MogileFS::Config->server_setting('fsck_opt_policy_only') || 0; my @fids = $sto->get_fids_above_id($max_checked, 5000); unless (@fids) { $sto->set_server_setting("fsck_host", undef); $sto->set_server_setting("fsck_stop_time", $sto->get_db_unixtime); $self->set_max_checked($max_checked) if $max_checked; $stop->(); return; } $start->(); MogileFS::FID->mass_load_devids(@fids); $self->init_size_checker(\@fids); # don't sleep in loop, next round, since we found stuff to work on # this round... $sleep_set->(0); my $new_max; my $hit_problem = 0; foreach my $fid (@fids) { last if $self->should_stop_running; if (!$self->check_fid($fid)) { # some connectivity problem... abort checking more # for now. $hit_problem = 1; last; } $max_checked = $fid->id; $self->set_max_checked_lazy($max_checked); $n_check++; $beat->(); } # if we had connectivity problems, let's sleep a bit if ($hit_problem) { error("connectivity problems; stalling 5s"); sleep 5; } }); } # only write to server_settings table our position every 5 seconds sub set_max_checked_lazy { my ($self, $nmax) = @_; return 0 if $nowish < ($self->{last_maxcheck_write} || 0) + 5; $self->{last_maxcheck_write} = $nowish; $self->set_max_checked($nmax); } sub set_max_checked { my ($self, $nmax) = @_; MogileFS::Config->set_server_setting('fsck_highest_fid_checked', $nmax); } # this version is accurate, sub should_be_running { my $self = shift; my $fhost = MogileFS::Config->server_setting('fsck_host') or return; return $fhost eq MogileFS::Config->hostname; } # this version is sloppy, optimized for speed. only checks db every 5 seconds. sub should_stop_running { my $self = shift; return 0 if $nowish < ($self->{last_stop_check} || 0) + 5; $self->{last_stop_check} = $nowish; return ! $self->should_be_running; } # given a $fid (MogileFS::FID, with pre-populated ->devids data) # return 0 if reachability problems. # return 1 if fid was checked (regardless of there being problems or not) # if no problems, no action. # if problems, log & enqueue fixes use constant STALLED => 0; use constant HANDLED => 1; sub check_fid { my ($self, $fid) = @_; my $fix = sub { my $fixed = eval { $self->fix_fid($fid) }; if (! defined $fixed) { error("Fsck stalled: $@"); return STALLED; } $fid->fsck_log(EV_CANT_FIX) if ! $fixed; # that might've all taken awhile, let's update our approximate time $nowish = $self->still_alive; return HANDLED; }; # first obvious fucked-up case: no devids even presumed to exist. unless ($fid->devids) { # first, log this weird condition. $fid->fsck_log(EV_NO_PATHS); # weird, schedule a fix (which will do a search over all # devices as a last-ditch effort to locate it) return $fix->(); } # first, see if the assumed devids meet the replication policy for # the fid's class. unless ($fid->devids_meet_policy) { # log a policy violation $fid->fsck_log(EV_POLICY_VIOLATION); return $fix->(); } # in the fast case, do nothing else (don't check if assumed file # locations are actually there). in the fast case, all we do is # check the replication policy, which is already done, so finish. return HANDLED if $self->{opt_nostat}; # stat each device to see if it's still there. on first problem, # stop and go into the slow(er) fix function. my $err; my $rv = $self->parallel_check_sizes([ $fid->devfids ], sub { my ($dfid, $disk_size) = @_; if (! defined $disk_size) { my $dev = $dfid->device; error("Connectivity problem reaching device " . $dev->id . " on host " . $dev->host->ip . "\n"); $err = "stalled"; return 0; } return 1 if $disk_size == $fid->length; $err = "needfix"; # Note: not doing fsck_log, as fix_fid will log status for each device. return 0; }); if ($rv) { return HANDLED; } elsif ($err eq "stalled") { return STALLED; } elsif ($err eq "needfix") { return $fix->(); } else { die "Unknown error checking fid sizes in parallel.\n"; } } sub parallel_check_sizes { my ($self, $dflist, $cb) = @_; # serial, for now: (just prepping for future parallel future, # getting interface right) foreach my $df (@$dflist) { my $size = $self->size_on_disk($df); return 0 unless $cb->($df, $size); } return 1; } # this is the slow path. if something above in check_fid finds # something amiss in any way, we went the slow path on a fid and try # really hard to fix the situation. # # return true if situation handled, 0 if nothing could be done. # die on errors (like connectivity problems). use constant CANT_FIX => 0; sub fix_fid { my ($self, $fid) = @_; error(sprintf("Fixing FID %d\n", $fid->id)); # make devfid objects from the devids that this fid is on, my @dfids = map { MogileFS::DevFID->new($_, $fid) } $fid->devids; # track all known good copies (dev objects), as well as all bad # copies (places it should've been, but isn't) my @good_devs; my @bad_devs; my %already_checked; # devid -> 1. my $check_dfids = sub { my $is_desperate_mode = shift; # stat all devices. foreach my $dfid (@dfids) { my $dev = $dfid->device; next if $already_checked{$dev->id}++; my $disk_size = $self->size_on_disk($dfid); die "dev unreachable" unless defined $disk_size; if ($disk_size == $fid->length) { push @good_devs, $dfid->device; # if we were doing a desperate search, one is enough, we can stop now! return if $is_desperate_mode; next; } # don't log in desperate mode, as we'd have "file missing!" log entries # for every device in the normal case, which is expected. unless ($is_desperate_mode) { if (! $disk_size) { $fid->fsck_log(EV_FILE_MISSING, $dev); } else { $fid->fsck_log(EV_BAD_LENGTH, $dev); } } push @bad_devs, $dfid->device; } }; $check_dfids->(); # if we didn't find it anywhere, let's go do an exhaustive search over # all devices, looking for it... unless (@good_devs) { # replace @dfids with list of all (alive) devices. dups will be ignored by # check_dfids $fid->fsck_log(EV_START_SEARCH); @dfids = List::Util::shuffle( map { MogileFS::DevFID->new($_, $fid) } grep { $_->dstate->should_fsck_search_on } MogileFS::Device->devices ); $check_dfids->("desperate"); # still can't fix it? return CANT_FIX unless @good_devs; # wow, we actually found it! $fid->fsck_log(EV_FOUND_FID); $fid->note_on_device($good_devs[0]); # at least one good one. # fall through to check policy (which will most likely be # wrong, with only one file_on record...) and re-replicate } # remove the file_on mappings for devices that were bogus/missing. foreach my $bdev (@bad_devs) { error("removing file_on mapping for fid=" . $fid->id . ", dev=" . $bdev->id); $fid->forget_about_device($bdev); } # Note: this will reload devids, if they called 'note_on_device' # or 'forget_about_device' unless ($fid->devids_meet_policy) { $fid->enqueue_for_replication; $fid->fsck_log(EV_RE_REPLICATE); return HANDLED; } return HANDLED; } sub init_size_checker { my ($self, $fidlist) = @_; $self->still_alive; my $lo_fid = $fidlist->[0]->id; my $hi_fid = $fidlist->[-1]->id; my %size; # $devid -> { $fid -> $size } my %tried_bulkstat; # $devid -> 1 $self->{size_checker} = sub { my $dfid = shift; my $devid = $dfid->devid; if (my $map = $size{$devid}) { return $map->{$dfid->fidid} || 0; } unless ($tried_bulkstat{$devid}++) { my $mogconn = $dfid->device->host->mogstored_conn; my $sock = $mogconn->sock(5); my $good = 0; my $unknown_cmd = 0; if ($sock) { my $cmd = "fid_sizes $lo_fid-$hi_fid $devid\n"; print $sock $cmd; my $map = {}; while (my $line = <$sock>) { if ($line =~ /^\./) { $good = 1; last; } elsif ($line =~ /^(\d+)\s+(\d+)\s+(\d+)/) { my ($res_devid, $res_fid, $size) = ($1, $2, $3); last unless $res_devid == $devid; $map->{$res_fid} = $size; } elsif ($line =~ /^ERR/) { $unknown_cmd = 1; last; } else { last; } } # we only update our $nowish (approximate time) lazily, when we # know time might've advanced (like during potentially slow RPC call) $nowish = $self->still_alive; if ($good) { $size{$devid} = $map; return $map->{$dfid->fidid} || 0; } elsif (!$unknown_cmd) { # mogstored connection is unknown state... can't # trust it, so close it. $mogconn->mark_dead; } } error("fid_sizes mogstored cmd unavailable for dev $devid; using slower method"); } # slow case (not using new command) $nowish = $self->still_alive; return $dfid->size_on_disk; }; } # returns 0 on missing, # undef on connectivity error, # else size of file on disk (after HTTP HEAD or mogstored stat) sub size_on_disk { my ($self, $dfid) = @_; return $self->{size_checker}->($dfid); } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: