package Parallel::MapReduce::Sequential; use base 'Parallel::MapReduce'; use strict; use warnings; use Data::Dumper; use Cache::Memcached; use Parallel::MapReduce::Utils; =pod =head1 NAME Parallel::MapReduce::Sequential - MapReduce Infrastructure, single-threaded =head1 SYNOPSIS use Parallel::MapReduce::Sequential; my $mri = new Parallel::MapReduce::Sequential (MemCacheds => [ '127.0.0.1:11211', .... ], Workers => [ '10.0.10.1', '10.0.10.2', ...]); # rest like in Parallel::MapReduce =head1 DESCRIPTION This subclass of L implements MapReduce as a single thread. Like its superclass it uses a C server pool to distribute the data and the class can also be used in conjunction of local or remote workers. But everything will happen sequentially. =cut sub mapreduce { my $self = shift; #-- my $map = shift; # the map function to be used my $reduce = shift; # the reduce function to be used my $h1 = shift; # the incoming hash my $job = shift || 'job1:'; # a job id (should be different for every job) my $memd = new Cache::Memcached {'servers' => $self->{MemCacheds}, namespace => $job }; $memd->set ('map', $map); # store map into cloud (see $Storable::Deparse) $memd->set ('reduce', $reduce); # store reduce into cloud (see $Storable::Deparse) my $h1_sliced = Hslice ($h1, scalar @{ $self->{_workers} }); # slice the hash into equal parts (as many workers as there are) warn "sliced ".Dumper $h1_sliced; my @rkeys; # here we collect the intermediate keys, values remain in the cloud foreach my $k (keys %$h1_sliced) { # for all slices of the original hash my @chunks = chunk_n_store ($memd, $h1_sliced->{$k}, $job, 1000); # distribute hash over memcacheds warn "master created chunks ".Dumper \@chunks; my ($w) = @{ $self->{_workers} }; # take always the first, TODO: random? push @rkeys, @{ # store the returned keys of the ... $w->map (\@chunks, "slice$k:", $self->{MemCacheds}, $job) # ... run worker }; } warn "all keys after mappers ".Dumper \@rkeys; my $Rs = balance_keys (\@rkeys, $job, scalar @{ $self->{_workers} }); # slice the keys into 'equal' groups my @Rchunks; foreach my $r (keys %$Rs) { # for all these slices my ($w) = @{ $self->{_workers} }; # take always the first, TODO: random? push @Rchunks, @{ $w->reduce ($Rs->{$r}, $self->{MemCacheds}, $job) # run the reducer and collect keys of chunks for result hash }; } warn "trying to reconstruct from ".Dumper \@Rchunks; my $h4 = fetch_n_unchunk ($memd, \@Rchunks); # collect together all these chunks warn "reconstructed result ".Dumper $h4; return $h4; # return the result hash } =pod =head1 SEE ALSO L =head1 COPYRIGHT AND LICENSE Copyright 200[8] by Robert Barta, Edrrho@cpan.orgE This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut our $VERSION = 0.03; 1;