package GRID::Machine; use strict; use Data::Dumper; use List::Util qw(first); use Scalar::Util qw(blessed reftype); use Cwd qw{getcwd abs_path}; use IO::File; use File::Spec; use File::Path; $| = 1; sub send_error { my $server = shift; my $message = shift; $server->remotelog($message); $server->send_operation( "DIED", GRID::Machine::Result->new( errmsg => "$server->{host}: $message"), ); } sub send_result { my $server = shift; my %arg = @_; $arg{stdout} = '' unless defined($arg{stdout}); $arg{stderr} = '' unless defined($arg{stderr}); $arg{errmsg} = '' unless defined($arg{errmsg}); $arg{results} = [] unless defined($arg{results}); $server->send_operation( "RETURNED", GRID::Machine::Result->new( %arg )); } { # closure for getter-setters # List of legal attributes my @legal = qw( cleanfiles cleandirs cleanup clientpid command debug Deparse err errfile host Indent log logic_id logfile perl prefix pushinc readfunc report sendstdout startdir startenv stored_procedures tmpdir unshiftinc writefunc ); my %legal = map { $_ => 1 } @legal; GRID::Machine::MakeAccessors::make_accessors(@legal); my $SERVER; sub SERVER { return $SERVER; } sub set_tmpdir { my $workdir = File::Spec->tmpdir()."/rperl"; $workdir .= '1' while (-f $workdir) or (-d $workdir and !(-w $workdir)); return $workdir; } sub setloganderr { # and set report too my ($log, $args, ) = @_; my $workdir = $SERVER->{tmpdir}; my $logname = $args->{$log}; $logname = "$workdir/$args->{clientpid}_$$.$log" unless $logname; if (-d $logname) { $logname .= "/rperl$args->{clientpid}_$$.$log"; } my $logfile = IO::File->new("> $logname"); unless ($logfile) { $SERVER->send_error("Can't open $logname for writing. $@"); return $SERVER } $logfile->autoflush(1); $SERVER->{$log} = $logname; $SERVER->{$log."file"} = $logfile; } sub new { my $class = shift; my %args = @_; # Dummy variable $legal. Just to make the perl compiler conscious of the closure ... my $legal = \%legal; my $die = 0; my $a = first { !exists $legal{$_} } keys(%args); die("Error in remote new: Illegal arg $a. Legal args: @legal\n") if $a; # TRUE if remote stdout is sent back to local my $sendstdout = $args{sendstdout}; # TRUE if temporary files will be deleted my $cleanup = $args{cleanup}; $cleanup = 1 unless defined($cleanup); my $host = $args{host}; unless (defined($host)) { eval q{ require Sys::Hostname; }; $host = $@? '$unknown' : Sys::Hostname::hostname(); } my $startdir = $args{startdir}; if ($startdir) { mkdir $startdir unless -x $startdir; chdir($startdir) or die "$host: can't change to dir $startdir\n"; } # Set initial environment variables # No need to check is a HASH ref since it was already checked in the local side # (See GRID/Machine.pm) my %startenv = %{$args{startenv}}; for (keys %startenv) { $ENV{$_} = $startenv{$_}; } my $pushinc = $args{pushinc} || []; for (@$pushinc) { push @INC, $_; } my $unshiftinc = $args{unshiftinc} || []; for (@$unshiftinc) { unshift @INC, $_; } my $clientpid = $args{clientpid} || $$; my $prefix = $args{prefix} || "$ENV{HOME}/perl5lib"; unless (-r $prefix) { mkdir $prefix unless -r $prefix; die "$host: can't create directory $prefix\n" unless (-d $prefix) && (-r $prefix); } push @INC, $prefix; # create copy of STDOUT to use as command output (-dk-) open(my $CMDIN, "<&STDIN") or die "$host: Can't create copy of STDIN\n"; my $readfunc = sub { if( defined $_[1] ) { read( STDIN, $_[0], $_[1] ); } else { $_[0] = ; die "Premature EOF received" unless defined($_[0]); length $_[0]; } }; # create copy of STDOUT to use as command output (-dk-) open(my $CMDOUT, ">&STDOUT") or die "$host: Can't create copy of STDOUT\n"; $CMDOUT->autoflush(1); my $writefunc = sub { syswrite $CMDOUT, $_[0]; }; my $debug = $args{debug} || 0; # THIS IS NEW!!!!: Logic ID of the machine my $logic_id = $args{logic_id}; my $handler = { sendstdout => $sendstdout, readfunc => $readfunc, writefunc => $writefunc, host => $host, logic_id => $logic_id, cleanup => $cleanup, prefix => $prefix, clientpid => $clientpid, # Used to store remote files (see GRID::Machine::RIOHandle) FILES => [], # List of absolute names of temporary files cleanfiles => [], # List of absolute names of temporary directories cleandirs => [], # Used to store remote subroutines. A remote sub is a GRID::Machine::RemoteSub object stored_procedures => {}, report => $args{report}, tmpdir => $args{tmpdir}, debug => $debug, }; $SERVER = bless $handler, $class; $SERVER->{tmpdir} = set_tmpdir() unless $SERVER->{tmpdir}; mkdir $SERVER->{tmpdir} if !(-d $SERVER->{tmpdir}); setloganderr('log', \%args, ); # redirect STDOUT - once per remote session! (-dk-) open(STDOUT,"> $SERVER->{log}") or do { $SERVER->send_error("Can't redirect STDOUT. $@"); return $SERVER }; setloganderr('err', \%args, ); setloganderr('report', \%args, ); return $SERVER; } } # end closure # Methods in uppercase are PROTOCOL methods sub QUIT { my $server = shift; $server->DESTROY; $server->send_operation( "RETURNED"); # It is not being called upon termination! exit( 0 ) } sub read_stdfiles { my $server = shift; my $sendstdout = $server->{sendstdout}; my $log = $server->{log}; my $err = $server->{err}; my ($rstdout, $rstderr) = ("", ""); if ($sendstdout) { local $/ = undef; my $file; open($file, $log) or do { $server->remotelog(qq{Can't open stdout file '$log' $@ $!}); return (); }; $rstdout = <$file>; close($file) or $server->remotelog(qq{Closing stdout file '$log' $@ $!}); open($file, $err) or do { $server->remotelog(qq{Can't open stderr file '$err' $@ $!}); return (); }; $rstderr = <$file>; close($file) or $server->remotelog(qq{Can't close stderr file '$err' $@ $!}); } return ($rstdout, $rstderr); } sub eval_and_read_stdfiles { my $server = shift; my $subref = shift; my @results = eval { $subref->( @_ ) }; my $err = "$@$!"; my @output = $server->read_stdfiles; # handling 'GRID::Machine::QUIT' when waiting for 'RESULT' from callback (-dk-) if ($err =~ /^EXIT/) { undef $@; exit 0 } return GRID::Machine::Result->new( errmsg => "Can't recover stdout and stderr", results => \@results, ) unless @output; my ($rstdout, $rstderr) = @output; return GRID::Machine::Result->new( stdout => $rstdout, errmsg => $err, errcode => $?, stderr => $rstderr, results => \@results ); } sub EVAL { my ($server, $code, $args) = @_; my $subref = eval "use strict; sub { $code }"; if( $@ ) { # Error while compiling code # get rid off of the "#line" header $code =~ s{^#package\s\w+;\s+#line\s+\d+\s+"[\\/\w.-]+"\s*}{}; $server->send_error("Error while compiling eval '".substr($code,0,20)."...'\n$@"); return; } if ( defined($args) && (reftype($args) ne 'ARRAY')) { $server->send_error( "Error in eval. Args expected. Found instead: $args" ); return; } # -dk- look for anonymous callback(s) my $results = eval_and_read_stdfiles( $server, $subref, map( UNIVERSAL::isa($_, 'GRID::Machine::_RemoteStub') ? make_callback($server, $_->{id}) : $_, @$args ) ); if( $@ ) { $code =~ m{(.*)}; $server->send_error( "Error running eval code $1. $@" ); return; } $server->send_operation( "RETURNED", $results ); return; } sub STORE { my ($server, $name, $code) = splice @_,0,3; my %args = @_; my $politely = $args{politely} || 0; delete($args{politely}); my $subref = eval qq{ sub { use strict; $code } }; if( $@ ) { $server->send_error("Error while compiling '$name'. $@"); return; } if (($politely) && exists($server->{stored_procedures}{$name})) { $server->send_result( errmsg => "Warning! Attempt to overwrite sub '$name'. New version was not installed.", results => [ 0 ], ); } else { # Store a Remote Subroutine Object $server->{stored_procedures}{$name} = GRID::Machine::RemoteSub->new(sub => $subref, %args); { no strict 'refs'; *{'GRID::Machine::'.$name} = $subref unless GRID::Machine->can($name); } $server->send_result( results => [ 1 ], ); } #$DB::single = 1 if $server->{debug}; # warn! return; } sub MAKEMETHOD { # makes a remote method of an existing sub my ($server, $name, ) = splice @_,0,2; my %args = @_; my $politely = $args{politely} || 0; delete($args{politely}); # TODO: check that exists or that the user is willing to risk my $subref = do { no strict 'refs'; \&{$name} }; if (($politely) && exists($server->{stored_procedures}{$name})) { $server->send_result( errmsg => "Warning! Attempt to overwrite sub '$name'. New version was not installed.", results => [ 0 ], ); } else { # Store a Remote Subroutine Object $server->{stored_procedures}{$name} = GRID::Machine::RemoteSub->new(sub => $subref, %args); { no strict 'refs'; *{'GRID::Machine::'.$name} = $subref unless GRID::Machine->can($name); } $server->send_result( results => [ 1 ], ); } #$DB::single = 1 if $server->{debug}; # warn! return; } # -dk- sub make_callback { my ($server, $name) = splice @_,0,2; return sub { $server->send_operation('CALLBACK', $name, @_); while (1) { my ($op, @args) = $server->read_operation(); # what we actually expected - 'RESULT' if ($op eq 'RESULT') { return wantarray ? @args : $args[0]; } # special case - 'QUIT' if ($op eq 'GRID::Machine::QUIT') { $server->send_operation("RETURNED"); # we're inside eval so have to pass 'exit' up die "EXIT" } if ($server->can($op)) { $server->$op(@args); next } die "RESULT expected, '$op' received...\n" } } } # -dk- sub CALLBACK { my ($server, $name) = splice @_,0,2; # Don't overwrite existing methods my $class = ref($server); if ($class->can($name)) { $server->send_error("Machine $server->{host} already has a method $name"); return; }; my $callback = make_callback($server, $name); { no strict 'refs'; *{$class."::$name"} = $callback; } $server->send_result( results => [ 0+$callback ] ); } sub EXISTS { my $server = shift; my $name = shift; $server->send_operation("RETURNED", exists($server->{stored_procedures}{$name})); } sub CALL { my ($server, $name, $args) = @_; my $subref = $server->{stored_procedures}{$name}->{sub}; if( !defined $subref ) { $server->send_error( "Error in RPC. No such stored procedure '$name'" ); return; } if ( defined($args) && !UNIVERSAL::isa($args, 'ARRAY')) { $server->send_error( "Error in RPC. Args expected. Found instead: $args" ); return; } # -dk- look for anonymous callback(s) my $results = eval_and_read_stdfiles($server, $subref, map(UNIVERSAL::isa($_, 'GRID::Machine::_RemoteStub') ? make_callback($server, $_->{id}) : $_, @$args ) ); if( $@ ) { $server->send_error("Error running sub $name: $@"); return; } my $filter = $server->{stored_procedures}{$name}->filter; $results = $results->$filter if $filter; $server->send_operation( "RETURNED", $results ); return; } sub MODPUT { my $self = shift; my $pwd = getcwd(); while (my ($name, $code) = splice @_,0,2) { my @a = split "/", $name; my $module = pop @a; my $dir = File::Spec->catfile($self->prefix, @a); if (defined($dir)) { # Bug fixed by Alex White. Thanks Alex! mkpath($dir) unless -e $dir; chdir($dir) or $self->send_error("Error chdir $dir $@"), return; } open my $f, "> $module" or $self->send_error("Error can't create file $module $@"), return; binmode $f; syswrite($f, $code); #or $self->send_error("Error can't save file $module $@"), return; close($f) or $self->send_error("Error can't close file $module $@"), return; chdir($pwd); } $self->send_result( #DEBUGGING stdout=> "pwd: ".getcwd()." dir: $dir, module: $module\n", results => [1] ); return; } # Sends a message for inmediate print to the local machine # Goes in reverse: The remote makes the request the local # serves the request sub gprint { SERVER->send_operation('GRID::Machine::GPRINT', @_); return 1; } # Sends a message for inmediate printf to the local machine # Goes in reverse: The remote makes the request, the local # serves the request sub gprintf { SERVER->send_operation('GRID::Machine::GPRINTF', @_); return 1; } sub OPEN { my ($server, $descriptor) = splice @_,0,2; my $file = IO::File->new(); # Check if is an ouput pipe, i.e. s.t. like: my $f = $m->open('| sort -n'); # In such case, redirect STDOUT to null if ($descriptor =~ m{^\s*\|(.*)}) { my $command = $1; my $nulldevice = File::Spec->devnull; $descriptor = "| ($command) > $nulldevice"; } unless (defined($descriptor) and $file->open($descriptor)) { $server->send_error("Error while opening $descriptor. $@"); return; } my $files = $server->{FILES}; push @{$files}, $file; # Watch memory usage $server->send_result( results => [ $#$files ], ); return; } sub DESTROY { my $self = shift; close($self->{report}); unlink $self->{log} if -w $self->{log}; unlink $self->{err} if -w $self->{err}; unlink $self->{report} if -w $self->{report} and $self->{cleanup}; my $cleanfiles = $self->cleanfiles; if (reftype($cleanfiles) eq 'ARRAY') { while (@{$cleanfiles}) { $_ = shift @{$cleanfiles}; unlink $_; } } my $cleandirs = $self->cleandirs; if (reftype($cleandirs) eq 'ARRAY') { while (@{$cleandirs}) { $_ = shift @{$cleandirs}; rmtree $_; } } } sub remotelog { my $server = shift; my $msg = join '', @_; #my ($package, $filename, $line, $subroutine,) = caller(1); #$subroutine = "$filename:$line" if $subroutine =~/__ANON__$/; syswrite $server->{reportfile},"$$:".scalar(localtime)." => $msg\n"; } ######### Remote main() ######### sub main() { my $server = shift; # Create filter process # Check $server is a GRID::Machine { package main; while( 1 ) { my ( $operation, @args ) = $server->read_operation( ); if ($server->can($operation)) { $server->$operation(@args); # Outermost CALL Should Reset Redirects (-dk-) next unless ($operation eq 'GRID::Machine::CALL' || $operation eq 'GRID::Machine::EVAL'); if ($server->{log}) { close STDOUT; $server->{log} and open(STDOUT,"> $server->{log}") or do { $server->send_error("Can't redirect stdout. $@"); last } } if ($server->{err}) { close STDERR; $server->{err} and open(STDERR,"> $server->{err}") or do { $server->send_error("Can't redirect stderr. $@"); last } } next; } $server->send_error( "Unknown operation $operation\nARGS: @args\n" ); } } # package } # GRID::Machine::DEBUG_LOAD_FINISHED # signals that the initial bootstraping has finished sub DEBUG_LOAD_FINISHED { my $self = shift; no warnings 'once'; $DB::single = 1 if $self->{debug}; } # keys are PIDs values are (remote) GRID::Machine::Processes my %process; ###################################### Core methods to be build with makemethod ########### sub fork #gm (filter => 'result', #gm around => sub { my $self = shift; my $r = $self->call( 'fork', @_ ); $r->{machine} = $self; $r } #gm ) { my $childcode = shift; my %args = (stdin => '/dev/null', stdout => '', stderr => '', result => '', args => [], @_); my ($stdin, $stdout, $stderr, $result) = @args{'stdin', 'stdout', 'stderr', 'result'}; $| = 1; use File::Temp qw{tempfile}; unless ($stdout) { my $stdoutfile; ($stdoutfile, $stdout) = tempfile(SERVER()->{tmpdir}.'/GMFORKXXXXXX', UNLINK => 0, SUFFIX =>'.out'); close($stdoutfile); } unless ($stderr) { my $stderrfile; ($stderrfile, $stderr) = tempfile(SERVER()->{tmpdir}.'/GMFORKXXXXXX', UNLINK => 0, SUFFIX =>'.err'); close($stderrfile); } unless ($result) { my $resultfile; ($resultfile, $result) = tempfile(SERVER()->{tmpdir}.'/GMFORKXXXXXX', UNLINK => 0, SUFFIX =>'.result'); close($resultfile); } my $subref = eval <<"EOSUB"; use strict; sub { use POSIX qw{setsid}; setsid(); open(STDIN, "< $stdin"); open(STDOUT,"> $stdout"); open(STDERR,"> $stderr"); $childcode }; EOSUB if ($@) { my $context = substr($childcode,0,100); SERVER->send_error("Found errors before forking in code '$context'\n$@\n"); return; } my $pid = fork; unless (defined($pid)) { SERVER->remotelog("Process $$: Can't fork '".substr($childcode,0,10)."'\n$@ $!"); return; } if ($pid) { # father #gprint("Created child $pid\n"); my $p = bless { pid => $pid, stdin => $stdin, stdout => $stdout, stderr => $stderr, result => $result }, 'GRID::Machine::Process'; $process{$pid} = $p; return $p; } # child # admit a single argument of any kind my $ar = $args{args}; $ar = [ $ar ] unless reftype($ar) and (reftype($ar) eq 'ARRAY'); my @r = $subref->(@$ar); close(STDERR); close(STDOUT); use Data::Dumper; local $Data::Dumper::Indent = 0; local $Data::Dumper::Deparse = 1; local $Data::Dumper::Purity = 1; local $Data::Dumper::Terse = 0; open(my $resfile, ">", $result); print $resfile Dumper([\@r, \$@]); close($resfile); exit(0); } sub async { my $subname = shift; # warning potential error. It must be: # 'SERVER()->{stored_procedures}{'.$subname.'}{sub}->(@_)', GRID::Machine::fork( "$subname".'(@_)', args => [ @_ ] ); } sub waitpid #gm (filter => 'result') { my $process = shift; $process = $process->result if ($process->isa('GRID::Machine::Result')); SERVER->remotelog(Dumper($process)); #gprint("Synchronizing with $process->{pid} args = <@_>\n"); my ($status, $deceased);; do { $deceased = waitpid($process->{pid}, @_ ? @_ : 0); $status = $?; #gprint("Synchronized: Pid of the deceased process: $deceased\n"); #gprint("there are processes still running\n") if (!$deceased); #SERVER->remotelog("deceased = $deceased"); #if (kill 0, -$process->{pid}) { gprint("Not answer to 0 signal from process $process->{pid}\n") } #else { gprint("Strange: the process $process->{pid} is still alive\n"); } } while (kill 0, $process->{pid}); # if deceased is 0 # TODO: study flock, may be this way we can synchronize!! delete $process{$deceased} if $deceased > 0; local $/ = undef; open my $fo, $process->{stdout}; # check exists, die, etc. my $stdout = <$fo>; close($fo); CORE::unlink $process->{stdout}; #gprint("stderr file is: $process->{stderr}\n"); open my $fe, $process->{stderr}; # or do { SERVER->remotelog("can't open file <$process->{stderr}> $@") }; # check exists, die, etc. my $stderr = <$fe>; #gprint("stderr is: $stderr\n"); close($fe); CORE::unlink $process->{stderr}; open my $fr, $process->{result} or do { SERVER->remotelog("can't open file <$process->{stderr}> $@") }; # check exists, die, etc. #sleep 1; my $result = <$fr>; #gprint("result as read from file $process->{result} is <$result>\n");; close($fr); CORE::unlink $process->{result}; $result .= '$VAR1'; my $val = eval "no strict; $result"; #SERVER->remotelog("Errors: '$@'. Result from the asynchronous call: '@$val'"); return bless { stdout => $stdout, stderr => $stderr, results => $val->[0], status => $status, # as in $? waitpid => $deceased, # returned by waitpid descriptor => SERVER()->host().':'.$$.':'.$process->{pid}, machineID => SERVER()->logic_id, errmsg => ${$val->[1]}, # as $@ }, 'GRID::Machine::Process::Result'; } sub waitall #gm (filter => 'result') { my ($status, $deceased); do { $deceased = CORE::wait(); $status = $?; return $deceased if ($deceased <= 0); #gprint("Synchronized: Pid of the deceased process: $deceased\n"); #gprint("there are processes still running\n") if (!$deceased); #SERVER->remotelog("deceased = $deceased"); #if (kill 0, -$process->{pid}) { gprint("Not answer to 0 signal from process $process->{pid}\n") } #else { gprint("Strange: the process $process->{pid} is still alive\n"); } } while (kill 0, $deceased); # if deceased is 0 # TODO: study flock, may be this way we can synchronize!! my $process = $process{$deceased}; delete $process{$deceased}; return $deceased unless $process; local $/ = undef; open my $fo, $process->{stdout}; # check exists, die, etc. my $stdout = <$fo>; close($fo); CORE::unlink $process->{stdout}; #gprint("stderr file is: $process->{stderr}\n"); open my $fe, $process->{stderr}; # or do { SERVER->remotelog("can't open file <$process->{stderr}> $@") }; # check exists, die, etc. my $stderr = <$fe>; #gprint("stderr is: $stderr\n"); close($fe); CORE::unlink $process->{stderr}; open my $fr, $process->{result} or do { SERVER->remotelog("can't open file <$process->{stderr}> $@") }; # check exists, die, etc. #sleep 1; my $result = <$fr>; #gprint("result as read from file $process->{result} is <$result>\n");; close($fr); CORE::unlink $process->{result}; $result .= '$VAR1'; my $val = eval "no strict; $result"; #SERVER->remotelog("Errors: '$@'. Result from the asynchronous call: '@$val'"); return bless { stdout => $stdout, stderr => $stderr, results => $val->[0], status => $status, # as in $? waitpid => $deceased, # returned by waitpid descriptor => SERVER()->host().':'.$$.':'.$process->{pid}, machineID => SERVER()->logic_id, errmsg => ${$val->[1]}, # as $@ }, 'GRID::Machine::Process::Result'; } sub kill { #gm (filter => 'result') my $signal = shift; CORE::kill $signal, @_; } sub poll { #gm (filter => 'result') CORE::kill 0, @_; } 1; package GRID::Machine::RemoteSub; { my @legal = qw( sub filter ); # other attributes: source file, source package, source line, etc my %legal = map { $_ => 1 } @legal; GRID::Machine::MakeAccessors::make_accessors(@legal); sub new { my $class = shift; bless { @_ }, $class; } } 1;