# -*- perl -*- # Copyright (c) 2009 AdCopy # Author: Jeff Weisberg # Created: 2009-Apr-02 11:16 (EDT) # Function: track status of peers # # $Id: Status.pm,v 1.38 2010/10/30 19:46:16 jaw Exp $ package AC::Yenta::Status; use AC::Yenta::Kibitz::Status; use AC::Yenta::Debug 'status'; use AC::Yenta::Config; use AC::Yenta::MySelf; use AC::Misc; use Sys::Hostname; use JSON; use Socket; require 'AC/protobuf/yenta_status.pl'; use strict; my $KEEPDOWN = 1800; # keep data about down servers for how long? my $KEEPLOST = 600; # keep data about servers we have not heard about for how long? my $SAVEMAX = 1800; # do not save if older than my $PORT; our $DATA = bless { allpeer => {}, # yenta_status sceptical => {}, mappeer => {}, # {map} => { id => id } peermap => {}, # {id} => @map datacenter => {}, # {dc} => { id => id } peertype => {}, # {ss} => { id => id } }; sub init { my $port = shift; $PORT = $port; AC::DC::Sched->new( info => 'kibitz status', freq => (conf_value('time_status_kibitz') || 5), func => \&periodic, ); AC::DC::Sched->new( info => 'save status', freq => (conf_value('time_status_save') || 15), func => \&save_status, ); } # start up a client every so often sub periodic { # clean up down or lost peers for my $id ( keys %{$DATA->{allpeer}} ){ my $p = $DATA->{allpeer}{$id}; next unless $p; next if $p->{status} == 200 && $p->{timestamp} > $^T - $KEEPLOST; _maybe_remove( $id ); } # randomly pick a peer my($id, $ip, $port) = _random_peer(); return unless $id; # start a client debug("starting status kibitz client to $id"); my $c = AC::Yenta::Kibitz::Status::Client->new( $ip, $port, info => "status client: $id", status_peer => $id, ); __PACKAGE__->isdown($id) unless $c; $c->start(); } sub _random_peer { # sceptical my @peer = values %{$DATA->{sceptical}}; # known peer unless( @peer ){ @peer = map { $DATA->{allpeer}{$_} } keys %{$DATA->{peertype}{yenta}}; } if( @peer ){ my $p = $peer[ rand(@peer) ]; debug("using peer $p->{server_id}"); return ($p->{server_id}, $p->{ip}, undef) if int rand(@peer+1); # sometimes use the seed } # seed peer my $seed = conf_value('seedpeer'); my $p = $seed->[ rand(@$seed) ]; my ($ip, $port) = split /:/, $p; $port ||= my_port(); # don't talk to self. any of my addrs. my $ipinfo = my_network_info(); for my $i (@$ipinfo){ return if $ip eq $i->{ipa} && $port == $PORT; } return("seed/$ip:$port", $ip, $port); } # server list for save file sub server_list { my $type = shift; ($type, my $where) = split m|/|, $type; # where - no longer used $where ||= my_datacenter(); my @peer = keys %{ $DATA->{peertype}{$type} }; return unless @peer; # nothing too old @peer = grep { $DATA->{allpeer}{$_}{lastup} > $^T - $SAVEMAX } @peer; return unless @peer; return map { $DATA->{allpeer}{$_} } @peer; } # save a list of peers, in case I crash, and for others to use sub save_status { my $save = conf_value('savestatus'); my $here = my_datacenter(); # also save locally running services my @mon = AC::Yenta::Monitor::export(); for my $s ( @$save ){ my $file = $s->{file}; my $types = $s->{type}; my @peer; for my $type (@$types){ push @peer, server_list($type); for my $m (@mon){ push @peer, $m if $m->{subsystem} eq $type; } } next unless @peer; debug("saving peer status file"); unless( open(FILE, ">$file.tmp") ){ problem("cannot open save file '$file.tmp': $!"); return; } for my $pd (@peer){ # only save best addr in save file my($ip, $port) = AC::Yenta::IO::TCP::Client->use_addr_port( $pd->{ip} ); my $data = { id => $pd->{server_id}, addr => $ip, port => int($port), status => int($pd->{status}), subsystem => $pd->{subsystem}, environment => $pd->{environment}, sort_metric => int($pd->{sort_metric}), is_local => ($here eq $pd->{datacenter} ? 1 : 0), }; if( $pd->{subsystem} eq 'yenta' ){ $data->{map} = $pd->{map}; } print FILE encode_json( $data ), "\n"; } close FILE; unless( rename("$file.tmp", $file) ){ problem("cannot rename save file '$file': $!"); } } } ################################################################ # diagnostic reports sub report { my $res; for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){ my $id = sprintf '%-28s', $v->{server_id}; my $metric = int( $v->{sort_metric} ); $res .= "$id $v->{hostname}\t$v->{datacenter}\t$v->{subsystem}\t$v->{environment}\t$v->{status}\t$metric\n"; } return $res; } sub report_long { my $res; # for my $v (AC::Yenta::Kibitz::Status::_myself(), AC::Yenta::Monitor::export(), values %{$DATA->{allpeer}} ){ # $res .= dumper( $v ) . "\n\n"; # } return $res; } ################################################################ sub my_port { $PORT } sub my_instance_id { my $class = shift; return my_server_id() . sprintf('%04x', $$); } sub peer { my $class = shift; my $id = shift; return $DATA->{allpeer}{$id}; } sub allpeers { my $class = shift; return values %{$DATA->{allpeer}}; } sub mappeers { my $class = shift; my $map = shift; return keys %{ $DATA->{mappeer}{$map} }; } sub datacenters { my $class = shift; return $DATA->{datacenter}; } ################################################################ sub _remove { my $id = shift; my $ss = $DATA->{allpeer}{$id}{subsystem}; delete $DATA->{peertype}{$ss}{$id} if $ss; my $dc = $DATA->{allpeer}{$id}{datacenter}; delete $DATA->{datacenter}{$dc}{$id} if $dc; verbose("deleting peer: $id"); delete $DATA->{allpeer}{$id}; # remove map info for my $map ( @{$DATA->{peermap}{$id}} ){ delete $DATA->{mappeer}{$map}{$id}; } delete $DATA->{peermap}{$id}; # delete its monitored items for my $p (keys %{$DATA->{allpeer}}){ next unless $DATA->{allpeer}{$p}{via} eq $id; _remove($p); } } sub _maybe_remove { my $id = shift; my $d = $DATA->{allpeer}{$id}; if( ($^T - $d->{lastup} > $KEEPDOWN) || ($^T - $d->{timestamp} > $KEEPLOST) ){ _remove($id); } } sub isdown { my $class = shift; my $id = shift; debug("marking peer '$id' as down"); if( ! $DATA->{allpeer}{$id} ){ return unless $DATA->{sceptical}{$id}; $DATA->{allpeer}{$id} = $DATA->{sceptical}{$id}; delete $DATA->{sceptical}{$id}; } if( $DATA->{allpeer}{$id} ){ $DATA->{allpeer}{$id}{timestamp} = $^T; $DATA->{allpeer}{$id}{status} = 0; $DATA->{allpeer}{$id}{path} = my_server_id(); } _maybe_remove( $id ); } ################################################################ sub _env_ok { my $class = shift; my $id = shift; my $up = shift; # if( $up->{environment} ne conf_value('environment') ){ # verbose("ignoring update from $id - wrong env: $up->{environment}"); # return; # } return 1; } sub update_sceptical { my $class = shift; my $id = shift; # ->server_id my $up = shift; return unless $class->_env_ok($id, $up); if( $DATA->{allpeer}{$id} ){ # already known delete $DATA->{sceptical}{$id}; return; } debug("rcvd update (sceptical) about $id"); $up->{id} = $id; $DATA->{sceptical}{$id} = $up; } sub update { my $class = shift; my $id = shift; # -> server_id my $up = shift; return unless $class->_env_ok($id, $up); # only keep it if it is relatively fresh return unless $up->{timestamp} > $^T - $KEEPLOST; $up->{id} = $id; my $previnfo = $DATA->{allpeer}{$id}; verbose("discovered new peer: $id ($up->{hostname})") unless $previnfo; # only keep it if it is newer than what we have return if $previnfo && $up->{timestamp} <= $previnfo->{timestamp}; $up->{path} .= ' ' . my_server_id(); # debug("updating $id => $up->{status} => " . dumper($up)); debug("updating $id => $up->{status}"); $DATA->{allpeer}{$id} = $up; if( $up->{status} != 200 ){ _maybe_remove( $id ); return ; } # update datacenter info unless( $DATA->{datacenter}{$up->{datacenter}}{$id} ){ my $pdc = $previnfo->{datacenter}; delete $DATA->{datacenter}{$pdc}{$id} if $pdc; $DATA->{datacenter}{$up->{datacenter}}{$id} = $id; } # update subsystem info unless( $DATA->{peertype}{$up->{subsystem}}{$id} ){ my $ss = $previnfo->{subsystem}; delete $DATA->{peertype}{$ss}{$id} if $ss; $DATA->{peertype}{$up->{subsystem}}{$id} = $id; } # update map info $DATA->{peermap}{$id} ||= []; $up->{map} ||= []; my @curmap = @{$DATA->{peermap}{$id}}; my @newmap = sort @{$up->{map}}; return if "@curmap" eq "@newmap"; # unchanged # what do we need to add/remove my (%remove, %add); @remove{@curmap} = @curmap; @add{@newmap} = @newmap; delete $remove{$_} for @newmap; delete $add{$_} for @curmap; for my $map (keys %remove){ debug("removing $map from $id"); delete $DATA->{mappeer}{$map}{$id}; } for my $map (keys %add){ debug("adding $map to $id"); $DATA->{mappeer}{$map}{$id} = $id; } $DATA->{peermap}{$id} = \@newmap; }