package GRID::Machine::Group; use warnings; use strict; use List::Util qw(first); use Scalar::Util qw(reftype); use IO::Select; use base qw{Exporter}; our @EXPORT_OK = qw{void}; sub new { my $class = shift; my %args = @_; my @machines = @{$args{cluster}}; @machines = map { ref($_)? $_ : GRID::Machine->new(host => $_, survive => 1) } @machines; my $s = IO::Select->new(); my %rpipe2gm = map { (0+$_->readpipe, $_) } @machines; my %wpipe2gm = map { (0+$_->writepipe, $_) } @machines; for (@machines) { $s->add($_->readpipe); $s->add($_->writepipe); } my $self = { machines => [ @machines ], select => $s, rpipe => \%rpipe2gm, wpipe => \%wpipe2gm, # keys: write pipe addresses. Values: GRID machines }; my $clusterclass = "$class"."::".(0+$self); bless $self, $clusterclass; my $misa; { no strict 'refs'; $misa = \@{"${clusterclass}::ISA"}; } unshift @{$misa}, 'GRID::Machine::Group' unless first { $_ eq 'GRID::Machine::Group' } @{$misa}; $self; } sub call { calloreval('GRID::Machine::CALL', @_); } sub eval { calloreval('GRID::Machine::EVAL', @_); } sub calloreval { my $protocol = shift; my $self = shift; my $name = shift; my %ARG = @_; my $arg = $ARG{args}; my ($next, $thereareargs, $reset); unless (@{$self->{machines}}) { warn "Warning! Attempt to execute '$name' in an empty cluster!"; return; } # replicate is ignored if 'arg' is defined unless (defined($arg)) { my $rep = $ARG{replicate}; my $rt = reftype($rep); die "GRID::Machine::Group::call error. Unexpected arguments" unless $rt; if ($rt eq 'ARRAY') { push @$arg, $rep for @{$self->{machines}}; } elsif ($rt eq 'CODE') { for ( @{$self->{machines}}) { my $r = $rep->($_); $r = [ $r ] unless reftype($r) and (reftype($r) eq 'ARRAY'); push @$arg, $r; } } else { die "GRID::Machine::Group::call error. Unexpected arguments"; } } my $rt = reftype($arg); if ($rt) { if ($rt eq 'ARRAY') { my @args = @$arg; $next = sub { shift @args }; $thereareargs = sub { @args ? 1 : 0 }; $reset = sub {}; } elsif ($rt eq 'HASH') { $next = $arg->{next}; $thereareargs = $arg->{thereareargs}; $reset = $arg->{reset}; } else { die "GRID::Machine::Group::call error. Unexpected arguments"; } } else { # not a ref die "GRID::Machine::Group::call error. Unexpected arguments"; } my %t; my $task = 0; $reset->(); for (@{$self->{machines}}) { my ($args) = $next->(); # shift @_; $args = [ $args] unless (ref($args) and (reftype($args) eq 'ARRAY')); $_->send_operation( $protocol, $name, $args ); $t{0+$_} = $task++; last unless $thereareargs->(); # @_; # Number of jobs is less than the number of machines } my $readset = $self->{select}; my @ready; my @result; my $finished = 0; do { push @ready, $readset->can_read unless @ready; my $handle = shift @ready; my $me = $self->{rpipe}{0+$handle}; my $index = $t{0+$me}; $result[$index] = $me->_get_result(); $finished++; if ($thereareargs->()) { my ($args) = $next->(\@result, $index); $args = [ $args] unless (ref($args) and (reftype($args) eq 'ARRAY')); $t{0+$me} = $task++; $me->send_operation( $protocol, $name, $args ); } #print "Tasks left = '@_' Task = $task, finished = $finished\n"; } while ($thereareargs->() or ($finished < $task)); $reset->(); return bless \@result, 'GRID::Machine::Group::Result'; } sub sub { my $self = shift; warn "Warning!: Attempt to install sub '$_[0]' in an empty cluster" unless @{$self->{machines}}; my @r; push @r, $_->sub(@_) for @{$self->{machines}}; #install the par method proxy my $name = shift; my $sub = sub { my $self = shift; $self->call( $name, @_ ) }; my $class = ref($self); no strict 'refs'; *{$class."::".$name} = $sub; return @r; } sub makemethod { my $self = shift; warn "Warning!: Attempt to install makemethod '$_[0]' in an empty cluster" unless @{$self->{machines}}; my @r; push @r, $_->makemethod(@_) for @{$self->{machines}}; #install the par method proxy my $name = shift; my $sub = sub { my $self = shift; $self->call( $name, @_ ) }; my $class = ref($self); no strict 'refs'; *{$class."::".$name} = $sub; return @r; } sub void { return (replicate => []) } package GRID::Machine::Group::Result; sub Results { my $self = shift; return map { $_->result } @$self; } 1;