The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/perl -w -I../lib

our $debug;
BEGIN {
$debug = 0;
}

use strict;

use FindBin qw($Bin);
use lib "$Bin/lib";
use Proc::JobQueue::Testing;
use Object::Dependency;

use Test::More;
use Proc::JobQueue::BackgroundQueue;
use aliased 'Proc::JobQueue::Sort';
use aliased 'Proc::JobQueue::Move';
use aliased 'Proc::JobQueue::Sequence';
use aliased 'Proc::JobQueue::Command';
use Sys::Hostname;
use File::Temp qw(tempdir);
use Time::HiRes qw(time);
use File::Slurp;
use Proc::JobQueue::DependencyQueue;
use Proc::JobQueue::DependencyJob;
use Proc::JobQueue::DependencyTask;
use Proc::JobQueue::RemoteDependencyJob;
use IO::Event 'emulate_Event';

my $tmpdir = tempdir(CLEANUP => 1);

#if ($debug) {
#	open(STDOUT, "| tee $tmpdir/output")
#		or die "open STDOUT | tee: $!";
#} else {
#	open(STDOUT, ">$tmpdir/output")
#		or die "redirect STDOUT to $tmpdir/output: $!";
#}
#select(STDOUT);
#$| = 1;
#open(STDERR, ">&STDOUT") or die "dup STDOUT: $!";
#select(STDERR);
#$| = 1;
#
#my $shdebug = $debug ? "set -x; " : "";

plan tests => 28;

my $graph = Object::Dependency->new();

my $queue = Proc::JobQueue::DependencyQueue->new(
	dependency_graph => $graph,
	hold_all => 1,
);

$queue->addhost('localhost', jobs_per_host => 2);

my $timer;
sub reset_bomb
{
	$timer->cancel if $timer;
        $timer = IO::Event->timer(
                after   => 10,
                cb      => sub {
                        ok(0, "bomb timer went off, something failed");
                        exit 0;
                },
        );
}


# 
# We will construct a pyramid of jobs.  
#
# Layer	Mult	Desc				handler
#  1	1	generate random numbers		DependencyJob
#  2	1	write out numbers		DependencyTask
#  3	3	Combine random numbers		DependencyJob
#  4	2	Sort combined files		RemoteDependencyJob
#
# We will place the jobs into the queue in reverse order
#

my $nrandom = 1000;
my @outputs = ("A");
my %inputs;


my @final_jobs;
my @new_outputs;
for my $out (@outputs) {
	my @in = ("$out.1", "$out.2");
	push(@new_outputs, @in);
	my $job;
	$job = Proc::JobQueue::RemoteDependencyJob->create( 
		prefix			=> '# ',
		preload			=> [qw(File::Slurp)], 
		dependency_graph	=> $graph,
		host			=> 'localhost',

		chdir			=> $tmpdir,
		data			=> {
			output	=> $out,
			inputs	=> \@in,
		},
		preload			=> [qw(File::Slurp)],
		desc			=> "combine and sort -> $out",
		when_done		=> sub {
			reset_bomb();
			write_file("$tmpdir/$out", join("\n", @_) . "\n");
			ok(1, "wrote $out");
			$job->finished(0);
		},
		on_start		=> sub {
			reset_bomb();
		},
		eval            	=> <<'END_REMOTEJOB',
			
			my ($data) = @_;
			print "reading $data->{inputs}[0]\n";
			print "reading $data->{inputs}[1]\n";
			my @in1 = split("\n", read_file($data->{inputs}[0]));
			my @in2 = split("\n", read_file($data->{inputs}[1]));
			return (sort { $a <=> $b } @in1, @in2);
			
END_REMOTEJOB
	);
	$inputs{$_} = $job for @in;
	push(@final_jobs, $job);
}
@outputs = @new_outputs;
is(scalar(@outputs), 2, "num outputs");

my $finish = Proc::JobQueue::DependencyTask->new(
	desc	=> 'unloop',
	func	=> sub {
		reset_bomb();
		ok($queue->alldone, "queue empty");
		IO::Event::unloop_all();
		return 'done';
	},
);
$graph->add($finish, $_) for @final_jobs;

undef @new_outputs;
for my $out (@outputs) {
	my @in = ("$out.A", "$out.B", "$out.C");
	push(@new_outputs, @in);
	my $job = Proc::JobQueue::DependencyJob->new($graph, 
		sub {
			my $data = '';
			for my $input (@in) {
				$data .= read_file("$tmpdir/$input");
			}
			write_file("$tmpdir/$out", $data);
			reset_bomb();
			ok(1, "wrote $out combining @in");
			return 'all-done';
		},
		desc	=> "read @in -> $out",
	);
	$inputs{$_} = $job for @in;
	$graph->add($inputs{$out}, $job);
}
@outputs = @new_outputs;
is(scalar(@outputs), 6, "num outputs");

my %membuf;

undef @new_outputs;
for my $out (@outputs) {
	my $in = ("$out.membuf");
	push(@new_outputs, $in);
	my $job = Proc::JobQueue::DependencyTask->new(
		desc	=> "write $out.membuf -> $out",
		func	=> sub {
			write_file("$tmpdir/$out", join("\n", @{$membuf{$in}}) . "\n");
			reset_bomb();
			ok(1, "wrote $out");
			return 'done';
		},
	);
	$inputs{$in} = $job;
	$graph->add($inputs{$out}, $job);
}
@outputs = @new_outputs;
is(scalar(@outputs), 6, "num outputs");


for my $out (@outputs) {
	my $job;
	$job = Proc::JobQueue::DependencyJob->new($graph, 
		sub {
			my $timer;
			$timer = IO::Event->timer(
				after => 0.01,
				cb => sub {
					my @r;
					for my $i (1..$nrandom) {
						push(@r, rand(1000));
					}
					$membuf{$out} = \@r;
					$timer->cancel();
					$job->finished(0);
					reset_bomb();
					ok(1, "job for $out finished");
				},
			);
			ok(1, "set up timer for $out");
			return 'all-keep';
		},
		desc	=> "set up timer to write generate membuf $out",
	);
	$graph->add($inputs{$out}, $job);
}
is(scalar(@outputs), 6, "num outputs");

$queue->hold(0);

$queue->startmore();

reset_bomb();
IO::Event::loop();
reset_bomb();

ok(-e "$tmpdir/A", "A exists");
my @nums = read_file("$tmpdir/A");
is(scalar(@nums), scalar(@outputs)*$nrandom, "count of numbers");