The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#! /usr/bin/perl

use strict;
use warnings;

use Test::More tests => 17;
use File::Temp qw/tempdir/;
use Digest::MD5 qw/md5_hex/;

use_ok("File::Tail::Dir");

my $parallel = 5;
my $debug = 0;
my @pids;

for (qw/INT KILL TERM QUIT/) {
    $SIG{$_} = \&death_handler;
}

END {
    death_handler();
}

my $dir = tempdir( CLEANUP => ($debug ? 0: 1));
diag("Using temporary directory $dir\n") if $debug;
chdir($dir);
my $written_t;
my $autostate_delay = 2;

# Add some symlinks, including a circular reference
eval {
    symlink("input1.txt", "input-symlink");
# File::ChangeNotify does not handle circular references
#    symlink("..", "top");
#    symlink("input-circular1", "input-circular2");
#    symlink("input-circular2", "input-circular1");
};

# Do several runs; 
# second run tests state persistence and moving files, 
# third tests no state on existing dir; output.txt is removed and whole contents should be resent 
# fourth tests no_init; nothing should be sent
# In each case line count and checksums of output.txt should match sum of input files
for my $phase (qw/initial subsequent no_state no_init autostate/) {
# start watcher
    unlink(".filetaildirstate") if ($phase eq 'no_state');
    unlink("output.txt") if ($phase eq 'no_state');
    my $lw_pid = background(
	sub {
	    File::Tail::Dir->install_signal_handlers();
	    open my $fh, '>>', 'output.txt' or die "Failed to open output.txt: $!";
	    my $lw = File::Tail::Dir->new(  filter => qr/^input/,
					    processor => sub { my ($name, $lines) = @_; print $fh join("", @$lines); },
					    follow_symlinks => 1,
					    no_init => ($phase eq 'no_init'),
                                            autostate => ($phase eq 'autostate'),
                                            autostate_delay => $autostate_delay,
		) or die "Failed to create File::Tail::Dir instance";
	    $lw->watch_files();
	});

    sleep(2);

    if ($phase eq 'initial' || $phase eq 'subsequent') {
# start set of writers, except on no_init phase where we just need initial contents
	my @src_pids;
	for my $i (1 .. $parallel) {
	    rand();		# ensures different seed for each subprocess
	    push(@src_pids, background(
		     sub {
			 my $fname = "input$i.txt";
			 my $curname = $fname;
			 open my $fh, '>>', $fname or die "Failed to open $fname: $!";
			 select($fh);
			 $| = 1;
			 my @chartable = ( 'A' .. 'Z' );
			 # random number of writes, up to 10
			 my @lines;
			 for my $iter (0 .. int(rand(10))) {
			     # random block of lines up to 50 at a time
			     my $line_count = int(rand(50)) + 1;
			     # random lines up to 1000 chars in length
			     for (1 .. $line_count) {
				 push(@lines, join('', map { $chartable[int(rand(scalar @chartable))] } ( 0 .. int(rand(1000)) )));
			     }
			     if ($phase eq 'subsequent') {
				 my $nextname = $fname . ".$iter";
				 rename $curname, $nextname;
				 $curname = $nextname;
			     }
			     # prefix lines with checksum to verify integrity later
			     print $fh join('', map { md5_hex($_) . " " . $_ . "\n" } @lines);
			 }
			 close($fh);
		     }
		 ));
	}
# wait for writers to finish
	waitpids(30, @src_pids);
    }
    elsif ($phase eq 'autostate') {
        # write to one file to make the state dirty.
        my $fname = 'input1.txt';
        open my $fh, '>>', $fname or die "Failed to open $fname: $!";
        print $fh join('', map { md5_hex($_) . " " . $_ . "\n" } ( 'blah' ));
        close($fh);
        $written_t = time();
        sleep($autostate_delay + 1);
        my ($dev,$ino,$mode,$nlink,$uid,$gid,$rdev,$size,$atime,$mtime) = stat('.filetaildirstate');
        ok($mtime > $written_t, "state file automatically updated");
    }

# stop watcher
    sleep(2);
    kill 1, $lw_pid;
    waitpids(30, $lw_pid);

# evaluate content
# check that each line matches its checksum, and that total number of bytes/lines tally
    my $char_count = 0;
    my $line_count = 0;
    for my $i (1 .. $parallel) {
	my ($chars, $lines) = count_file("input$i.txt*");
	$char_count += $chars;
	$line_count += $lines;
    }

    my ($lw_chars, $lw_lines) = count_file("output.txt");

    is($lw_chars, $char_count, "character count, $phase run");
    is($lw_lines, $line_count, "line_count, $phase run");

    my ($test_count, $fail_count) = checksum_file("output.txt");
    diag("Checksummed $test_count lines, $fail_count failures");
    ok($fail_count == 0, "checksums, $phase run");
}


sub count_file {
    my $fname_glob = shift;
    my $line_count = 0;
    my $char_count = 0;
    for my $fname (glob $fname_glob) {
	open my $fh, '<', $fname or die "Failed to open $fname for reading: $!";
	while (<$fh>) {
	    $line_count++;
	    $char_count += length($_);
	}
	close($fh);
    }
    return ($char_count, $line_count);
}

sub checksum_file {
    my $fname = shift;

    my $test_count = 0;
    my $fail_count = 0;
    open my $fh, '<', $fname or die "Failed to open $fname for reading: $!";
    while (my $line = <$fh>) {
	my ($cs, $text) = split(/\s+/, $line);
	my $md5 = md5_hex($text);
	$test_count++;
	$fail_count++ unless $md5 eq $cs;
	if ($debug && $md5 ne $cs) {
	    diag("Bad checksum $md5: $line\n");
	}
    }
    close($fh);
    return ($test_count, $fail_count);
}
    

sub death_handler {
    chdir("..");
    kill 1, @pids;
    exit();
}

sub background {
    my $sub = shift;

    my $pid = fork();
    die "Fork failed: $!" unless defined $pid;

    if ($pid == 0) {
	@pids = ();
	exit($sub->());
    }
    push(@pids, $pid);
    return $pid;
}

sub waitpids {
    my ($timeout, @src_pids) = @_;

    eval {
	local $SIG{ALRM} = sub { die "alarm\n" };
	alarm($timeout);
	for my $pid (@src_pids) {
	    waitpid($pid,0);
	}
	alarm(0);
    };
    if ($@) {
	if ($@ eq "alarm\n") {
	    die "Test timed out";
	}
	else {
	    die;
	}
    }
}