package MogileFS::ProcManager; use strict; use warnings; use POSIX qw(:sys_wait_h sigprocmask SIGINT SIG_BLOCK SIG_UNBLOCK); use Symbol; use Socket; use MogileFS::Connection::Client; use MogileFS::Connection::Worker; # This class handles keeping lists of workers and clients and # assigning them to eachother when things happen. You don't actually # instantiate a procmanager. the class itself holds all state. # Mappings: fd => [ clientref, jobstring, starttime ] # queues are just lists of Client class objects # ChildrenByJob: job => { pid => $client } # ErrorsTo: fid => Client # RecentQueries: [ string, string, string, ... ] # Stats: element => number our ($IsChild, @RecentQueries, %Mappings, %ChildrenByJob, %ErrorsTo, %Stats); our $starttime = time(); # time we got going sub server_starttime { return $starttime } my @IdleQueryWorkers; # workers that are idle, able to process commands (MogileFS::Worker::Query, ...) my @PendingQueries; # [ MogileFS::Connection::Client, "$ip $query" ] $IsChild = 0; # either false if we're the parent, or a MogileFS::Worker object # keep track of what all child pids are doing, and what jobs are being # satisifed. my %child = (); # pid -> MogileFS::Connection::Worker my %todie = (); # pid -> 1 (lists pids that we've asked to die) my %jobs = (); # jobname -> [ min, current ] our $allkidsup = 0; # if true, all our kids are running. set to 0 when a kid dies. my @prefork_cleanup; # subrefs to run to clean stuff up before we make a new child *error = \&Mgd::error; my %dev_util; # devid -> utilization my $last_util_spray = 0; # time we lost spread %dev_util to children my $nowish; # updated approximately once per second sub push_pre_fork_cleanup { my ($class, $code) = @_; push @prefork_cleanup, $code; } sub RecentQueries { return @RecentQueries; } sub write_pidfile { my $class = shift; my $pidfile = MogileFS->config("pidfile") or return 1; my $fh; unless (open($fh, ">$pidfile")) { Mgd::log('err', "couldn't create pidfile '$pidfile': $!"); return 0; } unless ((print $fh "$$\n") && close($fh)) { Mgd::log('err', "couldn't write into pidfile '$pidfile': $!"); remove_pidfile(); return 0; } return 1; } sub remove_pidfile { my $class = shift; my $pidfile = MogileFS->config("pidfile") or return; unlink $pidfile; return 1; } sub set_min_workers { my ($class, $job, $min) = @_; $jobs{$job} ||= [undef, 0]; # [min, current] $jobs{$job}->[0] = $min; # TODO: set allkipsup false, so spawner re-checks? } sub job_to_class_suffix { my ($class, $job) = @_; return { fsck => "Fsck", queryworker => "Query", delete => "Delete", replicate => "Replicate", reaper => "Reaper", monitor => "Monitor", }->{$job}; } sub job_to_class { my ($class, $job) = @_; my $suffix = $class->job_to_class_suffix($job) or return ""; return "MogileFS::Worker::$suffix"; } sub child_pids { return keys %child; } sub WatchDog { foreach my $pid (keys %child) { my MogileFS::Connection::Worker $child = $child{$pid}; my $healthy = $child->watchdog_check; next if $healthy; error("Watchdog killing worker $pid (" . $child->job . ")"); kill 9, $pid; } } # returns a sub that Danga::Socket calls after each event loop round. # the sub must return 1 for the program to continue running. sub PostEventLoopChecker { my $lastspawntime = 0; # time we last ran spawn_children sub return sub { # run only once per second $nowish = time(); return 1 unless $nowish > $lastspawntime; $lastspawntime = $nowish; MogileFS::ProcManager->WatchDog; # see if anybody has died, but don't hang up on doing so my $pid = waitpid -1, WNOHANG; return 1 if $pid <= 0 && $allkidsup; $allkidsup = 0; # know something died # when a child dies, figure out what it was doing # and note that job has one less worker my $jobconn; if ($pid > -1 && ($jobconn = delete $child{$pid})) { my $job = $jobconn->job; my $extra = $todie{$pid} ? "expected" : "UNEXPECTED"; error("Child $pid ($job) died: $? ($extra)"); MogileFS::ProcManager->NoteDeadChild($pid); $jobconn->close; if (my $jobstat = $jobs{$job}) { # if the pid is in %todie, then we have asked it to shut down # and have already decremented the jobstat counter and don't # want to do it again unless (my $true = delete $todie{$pid}) { # decrement the count of currently running jobs $jobstat->[1]--; } } } # foreach job, fork enough children while (my ($job, $jobstat) = each %jobs) { my $need = $jobstat->[0] - $jobstat->[1]; if ($need > 0) { error("Job $job has only $jobstat->[1], wants $jobstat->[0], making $need."); for (1..$need) { my $jobconn = make_new_child($job) or return 1; # basically bail: true value keeps event loop running $child{$jobconn->pid} = $jobconn; # now increase the count of processes currently doing this job $jobstat->[1]++; } } } # if we got this far, all jobs have been re-created. note that # so we avoid more CPU usage in this post-event-loop callback later $allkidsup = 1; # true value keeps us running: return 1; }; } sub make_new_child { my $job = shift; my $pid; my $sigset; # block signal for fork $sigset = POSIX::SigSet->new(SIGINT); sigprocmask(SIG_BLOCK, $sigset) or return error("Can't block SIGINT for fork: $!"); socketpair(my $parents_ipc, my $childs_ipc, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) or die( "Sockpair failed" ); return error("fork failed creating $job: $!") unless defined ($pid = fork); # enable auto-flush, so it's not pipe-buffered between parent/child select((select( $parents_ipc ), $|++)[0]); select((select( $childs_ipc ), $|++)[0]); # if i'm the parent if ($pid) { sigprocmask(SIG_UNBLOCK, $sigset) or return error("Can't unblock SIGINT for fork: $!"); close($childs_ipc); # unnecessary but explicit IO::Handle::blocking($parents_ipc, 0); my $worker_conn = MogileFS::Connection::Worker->new($parents_ipc); $worker_conn->pid($pid); $worker_conn->job($job); MogileFS::ProcManager->RegisterWorkerConn($worker_conn); return $worker_conn; } # as a child, we want to close these and ignore them $_->() foreach @prefork_cleanup; close($parents_ipc); undef $parents_ipc; $SIG{INT} = 'DEFAULT'; $SIG{TERM} = 'DEFAULT'; $0 .= " [$job]"; # unblock signals sigprocmask(SIG_UNBLOCK, $sigset) or return error("Can't unblock SIGINT for fork: $!"); # now call our job function my $class = MogileFS::ProcManager->job_to_class($job) or die "No worker class defined for job '$job'\n"; my $worker = $class->new($childs_ipc); # set our frontend into child mode MogileFS::ProcManager->SetAsChild($worker); $worker->work; exit 0; } sub PendingQueryCount { return scalar @PendingQueries; } sub BoredQueryWorkerCount { return scalar @IdleQueryWorkers; } sub QueriesInProgressCount { return scalar keys %Mappings; } sub StatsHash { return \%Stats; } sub foreach_job { my ($class, $cb) = @_; foreach my $job (sort keys %ChildrenByJob) { my $ct = scalar(keys %{$ChildrenByJob{$job}}); $cb->($job, $ct, $jobs{$job}->[0], [ join(' ', sort { $a <=> $b } keys %{$ChildrenByJob{$job}}) ]); } } sub foreach_pending_query { my ($class, $cb) = @_; foreach my $clq (@PendingQueries) { $cb->($clq->[0], # client object, $clq->[1], # "$ip $query" ); } } sub is_valid_job { my ($class, $job) = @_; return defined $jobs{$job}; } sub valid_jobs { return sort keys %jobs; } sub request_job_process { my ($class, $job, $n) = @_; return 0 unless $class->is_valid_job($job); $jobs{$job}->[0] = $n; $allkidsup = 0; # try to clean out the queryworkers (if that's what we're doing?) MogileFS::ProcManager->CullQueryWorkers if $job eq 'queryworker'; } # when a child is spawned, they'll have copies of all the data from the # parent, but they don't need it. this method is called when you want # to indicate that this procmanager is running on a child and should clean. sub SetAsChild { my ($class, $worker) = @_; @IdleQueryWorkers = (); @PendingQueries = (); %Mappings = (); $IsChild = $worker; %ErrorsTo = (); # and now kill off our event loop so that we don't waste time Danga::Socket->SetPostLoopCallback(sub { return 0; }); } # called when a child has died. a child is someone doing a job for us, # but it might be a queryworker or any other type of job. we just want # to remove them from our list of children. they're actually respawned # by the make_new_child function elsewhere in Mgd. sub NoteDeadChild { my $pid = $_[1]; foreach my $job (keys %ChildrenByJob) { return if # bail out if we actually delete one delete $ChildrenByJob{$job}->{$pid}; } } # called when a client dies. clients are users, management or non. # we just want to remove them from the error reporting interface, if # they happen to be part of it. sub NoteDeadClient { my $client = $_[1]; delete $ErrorsTo{$client->{fd}}; } # called when the error function in Mgd is called and we're in the parent, # so it's pretty simple that basically we just spit it out to folks listening # to errors sub NoteError { return unless %ErrorsTo; my $msg = ":: ${$_[1]}\r\n"; foreach my $client (values %ErrorsTo) { $client->write(\$msg); } } sub RemoveErrorWatcher { my ($class, $client) = @_; return delete $ErrorsTo{$client->{fd}}; } sub AddErrorWatcher { my ($class, $client) = @_; $ErrorsTo{$client->{fd}} = $client; } # one-time initialization of a new worker connection sub RegisterWorkerConn { my MogileFS::Connection::Worker $worker = $_[1]; $worker->watch_read(1); #warn sprintf("Registering start-up of $worker (%s) [%d]\n", $worker->job, $worker->pid); # now do any special case startup if ($worker->job eq 'queryworker') { MogileFS::ProcManager->NoteIdleQueryWorker($worker); } # add to normal list $ChildrenByJob{$worker->job}->{$worker->pid} = $worker; } sub EnqueueCommandRequest { my ($class, $line, $client) = @_; push @PendingQueries, [ $client, ($client->peer_ip_string || '0.0.0.0') . " $line" ]; MogileFS::ProcManager->ProcessQueues; } # puts a worker back in the queue, deleting any outstanding jobs in # the mapping list for this fd. sub NoteIdleQueryWorker { # first arg is class, second is worker my MogileFS::Connection::Worker $worker = $_[1]; delete $Mappings{$worker->{fd}}; # see if we need to kill off some workers if (job_needs_reduction('queryworker')) { Mgd::error("Reducing queryworker headcount by 1."); MogileFS::ProcManager->AskWorkerToDie($worker); return; } # must be okay, so put it in the queue push @IdleQueryWorkers, $worker; MogileFS::ProcManager->ProcessQueues; } # if we need to kill off a worker, this function takes in the WorkerConn # object, tells it to die, marks us as having requested its death, and decrements # the count of running jobs. sub AskWorkerToDie { my MogileFS::Connection::Worker $worker = $_[1]; note_pending_death($worker->job, $worker->pid); $worker->write(":shutdown\r\n"); } # kill bored query workers so we can get down to the level requested. this # continues killing until we run out of folks to kill. sub CullQueryWorkers { while (@IdleQueryWorkers && job_needs_reduction('queryworker')) { my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers; MogileFS::ProcManager->AskWorkerToDie($worker); } } # called when we get a response from a worker. this reenqueues the # worker so it can handle another response as well as passes the answer # back on to the client. sub HandleQueryWorkerResponse { # got a response from a worker my MogileFS::Connection::Worker $worker; my $line; (undef, $worker, $line) = @_; return Mgd::error("ASSERT: ProcManager (Child) got worker response: $line") if $IsChild; return unless $worker && $Mappings{$worker->{fd}}; # get the client we're working with (if any) my ($client, $jobstr, $starttime) = @{ $Mappings{$worker->{fd}} }; # if we have no client, then we just got a standard message from # the queryworker and need to pass it up the line return MogileFS::ProcManager->HandleChildRequest($worker, $line) if !$client; # at this point it was a command response, but if the client has gone # away, just reenqueue this query worker return MogileFS::ProcManager->NoteIdleQueryWorker($worker) if $client->{closed}; # [client-side time to complete] my ($time, $id, $res); if ($line =~ /^(\d+-\d+)\s+(\-?\d+\.\d+)\s+(.+)$/) { # save time and response for use later # Note the optional negative sign in the regexp. Somebody # on the mailing list was getting a time of -0.0000, causing # broken connections. ($id, $time, $res) = ($1, $2, $3); } # now, if it doesn't match unless ($id && $id eq "$worker->{pid}-$worker->{reqid}") { $id = "" unless defined $id; $line = "" unless defined $line; $line =~ s/\n/\\n/g; $line =~ s/\r/\\r/g; Mgd::error("Worker responded with id $id (line: [$line]), but expected id $worker->{pid}-$worker->{reqid}, killing"); $client->close('worker_mismatch'); return MogileFS::ProcManager->AskWorkerToDie($worker); } # now time this interval and add to @RecentQueries my $tinterval = Time::HiRes::time() - $starttime; push @RecentQueries, sprintf("%s %.4f %s", $jobstr, $tinterval, $time); shift @RecentQueries if scalar(@RecentQueries) > 50; # send text to client, put worker back in queue $client->write("$res\r\n"); MogileFS::ProcManager->NoteIdleQueryWorker($worker); } # called from various spots to empty the queues of available pairs. sub ProcessQueues { return if $IsChild; # try to match up a client with a worker while (@IdleQueryWorkers && @PendingQueries) { # get client that isn't closed my $clref; while (!$clref && @PendingQueries) { $clref = shift @PendingQueries or next; if ($clref->[0]->{closed}) { $clref = undef; next; } } next unless $clref; # get worker and make sure it's not closed already my MogileFS::Connection::Worker $worker = shift @IdleQueryWorkers; if (!defined $worker || $worker->{closed}) { unshift @PendingQueries, $clref; next; } # put in mapping and send data to worker push @$clref, Time::HiRes::time(); $Mappings{$worker->{fd}} = $clref; $Stats{queries}++; # increment our counter so we know what request counter this is going out $worker->{reqid}++; # so we're writing a string of the form: # 123-455 10.2.3.123 get_paths foo=bar&blah=bar\r\n $worker->write("$worker->{pid}-$worker->{reqid} $clref->[1]\r\n"); } } # send short descriptions of commands we support to the user sub SendHelp { my $client = $_[1]; # send general purpose help $client->write(< !to Send to all workers of . Mostly used for debugging. !want Alter the level of workers of this class desired. Example: !want 20 queryworker, !want 3 replicate. See !jobs for what jobs are available. HELP } # a child has contacted us with some command/status/something. sub HandleChildRequest { if ($IsChild) { Mgd::fatal("ASSERT: child $_[2] shouldn't be getting requests from other children"); } # if they have no job set, then their first line is what job they are # and not a command. they also specify their pid, just so we know what # connection goes with what pid, in case it's ever useful information. my MogileFS::Connection::Worker $child = $_[1]; my $cmd = $_[2]; die "Child $child with no pid?" unless $child->job; # at this point we've got a command of some sort if ($cmd =~ /^error (.+)$/i) { # pass it on to our error handler, prefaced with the child's job Mgd::error("[" . $child->job . "(" . $child->pid . ")] $1"); } elsif ($cmd =~ /^debug (.+)$/i) { # pass it on to our error handler, prefaced with the child's job Mgd::debug("[" . $child->job . "(" . $child->pid . ")] $1"); } elsif ($cmd =~ /^:state_change (\w+) (\d+) (\w+)/) { my ($what, $whatid, $state) = ($1, $2, $3); state_change($what, $whatid, $state, $child); } elsif ($cmd =~ /^:repl_unreachable (\d+)/) { # announce to the other replicators that this fid can't be reached, but note # that we don't actually drain the queue to the requestor, as the replicator # isn't in a place where it can accept a queue drain right now. MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_unreachable $1", $child); } elsif ($cmd =~ /^repl_i_did (\d+)/) { my $fidid = $1; # announce to the other replicators that this fid was done and then drain the # queue to this person. MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_was_done $fidid", $child); } elsif ($cmd =~ /^repl_starting (\d+)/) { my $fidid = $1; # announce to the other replicators that this fid is starting to be replicated MogileFS::ProcManager->ImmediateSendToChildrenByJob('replicate', "repl_starting $fidid", $child); } elsif ($cmd eq ":ping") { # warn sprintf("Job '%s' with pid %d is still alive at %d\n", $child->job, $child->pid, time()); # this command expects a reply, either to die or stay alive. beginning of worker's loops if (job_needs_reduction($child->job)) { MogileFS::ProcManager->AskWorkerToDie($child); } else { $child->write(":stay_alive\r\n"); } } elsif ($cmd eq ":still_alive") { # a no-op } elsif ($cmd eq ":monitor_just_ran") { send_monitor_has_run($child); } elsif ($cmd =~ /^:wake_a (\w+)$/) { MogileFS::ProcManager->wake_a($1, $child); } elsif ($cmd =~ /^:invalidate_meta (\w+)/) { my $what = $1; MogileFS::ProcManager->send_to_all_children(":invalidate_meta_once $what", $child); } elsif ($cmd =~ /^:set_config_from_child (\S+) (.+)/) { # and this will rebroadcast it to all other children # (including the one that just set it to us, but eh) MogileFS::Config->set_config($1, $2); } elsif (my ($devid, $util) = $cmd =~ /^:set_dev_utilization (\d+) (.+)/) { $dev_util{$devid} = $util; # time to rebroadcast dev utilization messages to all children? if ($nowish > $last_util_spray + 3) { $last_util_spray = $nowish; MogileFS::ProcManager->send_to_all_children(":set_dev_utilization " . join(" ", %dev_util)); } } else { # unknown command my $show = $cmd; $show = substr($show, 0, 80) . "..." if length $cmd > 80; Mgd::error("Unknown command [$show] from child; job=" . $child->job); } } # Class method. # ProcManager->ImmediateSendToChildrenByJob($class, $message, [ $child ]) # given a job class, and a message, send it to all children of that job. returns # the number of children the message was sent to. # # if child is specified, the message will be sent to members of the job class that # aren't that child. so you can exclude the one that originated the message. # # doesn't add to queue of things child gets on next interactive command: writes immediately # (won't get in middle of partial write, though, as danga::socket queues things up) # # if $just_one is specified, only a single process is notified, then we stop. sub ImmediateSendToChildrenByJob { my ($pkg, $class, $msg, $exclude_child, $just_one) = @_; my $childref = $ChildrenByJob{$class}; return 0 unless defined $childref && %$childref; foreach my $child (values %$childref) { # ignore the child specified as the third arg if one is sent next if $exclude_child && $exclude_child == $child; # send the message to this child $child->write("$msg\r\n"); return 1 if $just_one; } return scalar(keys %$childref); } # called when we notice that a worker has bit it. we might have to restart a # job that they had been working on. sub NoteDeadWorkerConn { return if $IsChild; # get parms and error check my MogileFS::Connection::Worker $worker = $_[1]; return unless $worker; my $fd = $worker->{fd}; return unless defined($fd); # if there's a mapping for this worker's fd, they had a job that didn't get done if ($Mappings{$fd}) { # unshift, since this one already went through the queue once unshift @PendingQueries, $Mappings{$worker->{fd}}; delete $Mappings{$worker->{fd}}; # now try to get it processing again MogileFS::ProcManager->ProcessQueues; } } # given (job, pid), record that this worker is about to die sub note_pending_death { my ($job, $pid) = @_; die "$job not defined in call to note_pending_death.\n" unless defined $jobs{$job}; $todie{$pid} = 1; $jobs{$job}->[1]--; } # see if we should reduce the number of active children sub job_needs_reduction { my $job = shift; return $jobs{$job}->[0] < $jobs{$job}->[1]; } sub is_child { return $IsChild; } sub state_change { my ($what, $whatid, $state, $exclude) = @_; my $key = "$what-$whatid"; foreach my $child (values %child) { next if $exclude && $child == $exclude; my $old = $child->{known_state}{$key} || ""; if ($old ne $state) { $child->{known_state}{$key} = $state; $child->write(":state_change $what $whatid $state\r\n"); } } } sub wake_a { my ($pkg, $class, $fromchild) = @_; # from arg is optional (which child sent it) my $child = MogileFS::ProcManager->is_child; if ($child) { $child->wake_a($class); } else { MogileFS::ProcManager->ImmediateSendToChildrenByJob($class, ":wake_up", $fromchild, "just_one"); } } sub send_to_all_children { my ($pkg, $msg, $exclude) = @_; foreach my $child (values %child) { next if $exclude && $child == $exclude; $child->write("$msg\r\n"); } } sub send_monitor_has_run { my $child = shift; for my $type (qw(replicate fsck queryworker delete)) { MogileFS::ProcManager->ImmediateSendToChildrenByJob($type, ":monitor_has_run", $child); } } 1; # Local Variables: # mode: perl # c-basic-indent: 4 # indent-tabs-mode: nil # End: