package MogileFS::Worker::Monitor; use strict; use warnings; use base 'MogileFS::Worker'; use fields ( 'last_db_update', # devid -> time. update db less often than poll interval. 'last_test_write', # devid -> time. time we last tried writing to a device. 'skip_host', # hostid -> 1 if already noted dead (reset every loop) 'seen_hosts', # IP -> 1 (reset every loop) 'ua', # LWP::UserAgent for checking usage files 'iow', # MogileFS::IOStatWatcher object ); use Danga::Socket 1.56; use MogileFS::Util qw(error debug); use MogileFS::IOStatWatcher; use constant UPDATE_DB_EVERY => 15; sub new { my ($class, $psock) = @_; my $self = fields::new($class); $self->SUPER::new($psock); $self->{last_db_update} = {}; $self->{last_test_write} = {}; $self->{iow} = MogileFS::IOStatWatcher->new; return $self; } sub watchdog_timeout { 30; } sub work { my $self = shift; # we just forked from our parent process, also using Danga::Socket, # so we need to lose all that state and start afresh. Danga::Socket->Reset; my $iow = $self->{iow}; $iow->on_stats(sub { my ($hostname, $stats) = @_; while (my ($devid, $util) = each %$stats) { my $dev = MogileFS::Device->of_devid($devid) or die "Can't find that device"; $dev->set_observed_utilization($util); } }); my $main_monitor; $main_monitor = sub { Danga::Socket->AddTimer(2.5, $main_monitor); $self->parent_ping; # get db and note we're starting a run debug("Monitor running; scanning usage files"); $self->validate_dbh; $self->{skip_host} = {}; # hostid -> 1 if already noted dead. $self->{seen_hosts} = {}; # IP -> 1 # now iterate over devices MogileFS::Device->invalidate_cache; MogileFS::Host->invalidate_cache; foreach my $dev (MogileFS::Device->devices) { next unless $dev->dstate->should_monitor; next if $self->{skip_host}{$dev->hostid}; $self->check_device($dev); } $iow->set_hosts(keys %{$self->{seen_hosts}}); $self->send_to_parent(":monitor_just_ran"); }; $main_monitor->(); Danga::Socket->EventLoop; } # -------------------------------------------------------------------------- sub ua { my $self = shift; return $self->{ua} ||= LWP::UserAgent->new( timeout => 2, keep_alive => 20, ); } sub check_device { my ($self, $dev) = @_; my $devid = $dev->id; my $host = $dev->host; my $port = $host->http_port; my $get_port = $host->http_get_port; # || $port; my $hostip = $host->ip; my $url = $dev->usage_url; $self->{seen_hosts}{$hostip} = 1; # now try to get the data with a short timeout my $timeout = 2; my $start_time = Time::HiRes::time(); my $ua = $self->ua; my $response = $ua->get($url); my $res_time = Time::HiRes::time(); unless ($response->is_success) { my $failed_after = $res_time - $start_time; if ($failed_after < 0.5) { $self->broadcast_device_unreachable($dev->id); error("Port $get_port not listening on otherwise-alive machine $hostip? Error was: " . $response->status_line); } else { $failed_after = sprintf("%.02f", $failed_after); $self->broadcast_host_unreachable($dev->hostid); $self->{skip_host}{$dev->hostid} = 1; error("Timeout contacting machine $hostip for dev $devid: took $failed_after seconds out of $timeout allowed"); } return; } # at this point we can reach the host $self->broadcast_host_reachable($dev->hostid); $self->{iow}->restart_monitoring_if_needed($hostip); my %stats; my $data = $response->content; foreach (split(/\r?\n/, $data)) { next unless /^(\w+)\s*:\s*(.+)$/; $stats{$1} = $2; } my ($used, $total) = ($stats{used}, $stats{total}); unless ($used && $total) { $used = "" unless defined $used; $total = "" unless defined $total; my $clen = length($data || ""); error("dev$devid reports used = $used, total = $total, content-length: $clen, error?"); return; } # only update database every ~15 seconds per device my $last_update = $self->{last_db_update}{$dev->id} || 0; my $next_update = $last_update + UPDATE_DB_EVERY; my $now = time(); if ($now >= $next_update) { Mgd::get_store()->update_device_usage(mb_total => int($total / 1024), mb_used => int($used / 1024), devid => $devid); $self->{last_db_update}{$devid} = $now; } # next if we're not going to try this now return if ($self->{last_test_write}{$devid} || 0) + UPDATE_DB_EVERY > $now; $self->{last_test_write}{$devid} = $now; # now we want to check if this device is writeable # first, create the test-write directory. this will return # immediately after the first time, as the 'create_directory' # function caches what it's already created. $dev->create_directory("/dev$devid/test-write"); my $num = int(rand 10000); # this was "$$-$now" before, but we don't yet have a cleaner in mogstored for these files my $puturl = "http://$hostip:$port/dev$devid/test-write/test-write-$num"; my $content = "time=$now rand=$num"; my $req = HTTP::Request->new(PUT => $puturl); $req->content($content); # TODO: guard against race-conditions with double-check on failure # now, depending on what happens my $resp = $ua->request($req); if ($resp->is_success) { # now let's get it back to verify; note we use the get_port to verify that # the distinction works (if we have one) my $geturl = "http://$hostip:$get_port/dev$devid/test-write/test-write-$num"; my $testwrite = $ua->get($geturl); # if success and the content matches, mark it writeable if ($testwrite->is_success && $testwrite->content eq $content) { $self->broadcast_device_writeable($devid); debug("dev$devid: used = $used, total = $total, writeable = 1"); return; } } # if we fall through to here, then we know that something is not so good, so mark it readable # which is guaranteed given we even tested writeability $self->broadcast_device_readable($devid); debug("dev$devid: used = $used, total = $total, writeable = 0"); } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: