The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
use strict;
use warnings FATAL => 'all';

use Test::TempDatabase;
use Test::More tests => 21;

BEGIN { use_ok( 'Queue::Worker'); }

Test::TempDatabase->become_postgres_user;

my $temp_db = Test::TempDatabase->create(dbname => 'queue_worker_db');
my $dbh = $temp_db->handle;
$dbh->do("set client_min_messages to warning");

package Q1;
use base 'Queue::Worker';

sub name { return 'q1'; }

sub process {
	my ($self, $msg) = @_;
	push @{ $self->{msgs} }, $msg;
}

package Q2;
use base 'Queue::Worker';

sub name { return 'q2'; }

sub process {
	$dbh->do("insert into fork_res values (?)", undef, $_[1]);
	my $v = $dbh->selectcol_arrayref("select v from value_res");
	$v->[0]++;
	$dbh->do("update value_res set v = ?", undef, $v->[0]);
}

package main;

Q1->create_table($dbh);
Q2->create_table($dbh);

ok($dbh->do("select * from queue_worker_q1"));
ok($dbh->do("select * from queue_worker_q2"));

Q1->enqueue($dbh, "h$_") for (1 .. 3);
is_deeply($dbh->selectcol_arrayref("select count(*) from queue_worker_q1")
		, [ 3 ]);

my $q1 = Q1->new;
isa_ok($q1, 'Q1');

$q1->run($dbh);
is_deeply($q1->{msgs}, [ 'h1', 'h2', 'h3' ]);

my @pids;
sub do_fork {
	if (my $pid = fork()) {
		push @pids, $pid;
		return;
	}
	$dbh->{InactiveDestroy} = 1;
	undef $dbh;
	$dbh = $temp_db->connect('queue_worker_db');
	$temp_db->{db_handle} = $dbh;
	sleep 1;
	shift()->($dbh);
	exit;
}

Q2->enqueue($dbh, "h$_") for (1 .. 5);
$dbh->do("create table fork_res (m text)");
$dbh->do("create table value_res (v integer)");
$dbh->do("insert into value_res values (0)");

do_fork(sub {
	my $q = Q2->new;
	$q->run(shift());
}) for (1 .. 7);
waitpid($_, 0) for @pids;
is_deeply($dbh->selectcol_arrayref("select count(*) from fork_res"), [ 5 ]);
is_deeply($dbh->selectcol_arrayref("select v from value_res"), [ 5 ]);

package Q21;
use base 'Q2';

sub process {
	my $q2 = Q21->new;
	Test::More::is($q2->run($dbh), 0);
}

package main;

my $q21 = Q21->new;
is($q21->run($dbh), 0);

Q21->enqueue($dbh, "h1");
is($q21->run($dbh), 1);

package Q22;
use base 'Q2';

sub process { die "hoho"; }

package main;

my $q22 = Q22->new;

Q22->enqueue($dbh, "h1");
eval { $q22->run($dbh); };
like($@, qr/hoho/);

is_deeply($dbh->selectcol_arrayref("select count(*) from queue_worker_q1")
		, [ 0 ]);

Q21->enqueue($dbh, "h1");
is($q21->run($dbh), 1);

my $_waits = 0;
my $_posts = 0;
package Q23;
use base 'Q2';

sub process {}

package S;

sub trywait {
	$_waits++;
	shift()->{sem}->trywait;
}

sub post {
	$_posts++;
	Q23->enqueue($dbh, "race") if $_posts < 2;
	shift()->{sem}->post;
}

package main;

my $q23 = Q23->new;
$q23->{semaphore} = bless({ sem => $q23->{semaphore} }, 'S');
Q23->enqueue($dbh, "h1");

is($q23->run($dbh), 2);
is($_posts, $_waits);

isa_ok(Q1->get_semaphore, 'POSIX::RT::Semaphore::Named');

Q1->unlink_semaphore;
unlike(`ls /dev/shm`, qr/q1/);
Q2->unlink_semaphore;
unlike(`ls /dev/shm`, qr/q2/);

Queue::Worker->create_table($dbh, 'ho');
ok($dbh->do("select * from queue_worker_ho"));