use strict;
use warnings FATAL => 'all';
use Test::TempDatabase;
use Test::More tests => 21;
BEGIN { use_ok( 'Queue::Worker'); }
Test::TempDatabase->become_postgres_user;
my $temp_db = Test::TempDatabase->create(dbname => 'queue_worker_db');
my $dbh = $temp_db->handle;
$dbh->do("set client_min_messages to warning");
package Q1;
use base 'Queue::Worker';
sub name { return 'q1'; }
sub process {
my ($self, $msg) = @_;
push @{ $self->{msgs} }, $msg;
}
package Q2;
use base 'Queue::Worker';
sub name { return 'q2'; }
sub process {
$dbh->do("insert into fork_res values (?)", undef, $_[1]);
my $v = $dbh->selectcol_arrayref("select v from value_res");
$v->[0]++;
$dbh->do("update value_res set v = ?", undef, $v->[0]);
}
package main;
Q1->create_table($dbh);
Q2->create_table($dbh);
ok($dbh->do("select * from queue_worker_q1"));
ok($dbh->do("select * from queue_worker_q2"));
Q1->enqueue($dbh, "h$_") for (1 .. 3);
is_deeply($dbh->selectcol_arrayref("select count(*) from queue_worker_q1")
, [ 3 ]);
my $q1 = Q1->new;
isa_ok($q1, 'Q1');
$q1->run($dbh);
is_deeply($q1->{msgs}, [ 'h1', 'h2', 'h3' ]);
my @pids;
sub do_fork {
if (my $pid = fork()) {
push @pids, $pid;
return;
}
$dbh->{InactiveDestroy} = 1;
undef $dbh;
$dbh = $temp_db->connect('queue_worker_db');
$temp_db->{db_handle} = $dbh;
sleep 1;
shift()->($dbh);
exit;
}
Q2->enqueue($dbh, "h$_") for (1 .. 5);
$dbh->do("create table fork_res (m text)");
$dbh->do("create table value_res (v integer)");
$dbh->do("insert into value_res values (0)");
do_fork(sub {
my $q = Q2->new;
$q->run(shift());
}) for (1 .. 7);
waitpid($_, 0) for @pids;
is_deeply($dbh->selectcol_arrayref("select count(*) from fork_res"), [ 5 ]);
is_deeply($dbh->selectcol_arrayref("select v from value_res"), [ 5 ]);
package Q21;
use base 'Q2';
sub process {
my $q2 = Q21->new;
Test::More::is($q2->run($dbh), 0);
}
package main;
my $q21 = Q21->new;
is($q21->run($dbh), 0);
Q21->enqueue($dbh, "h1");
is($q21->run($dbh), 1);
package Q22;
use base 'Q2';
sub process { die "hoho"; }
package main;
my $q22 = Q22->new;
Q22->enqueue($dbh, "h1");
eval { $q22->run($dbh); };
like($@, qr/hoho/);
is_deeply($dbh->selectcol_arrayref("select count(*) from queue_worker_q1")
, [ 0 ]);
Q21->enqueue($dbh, "h1");
is($q21->run($dbh), 1);
my $_waits = 0;
my $_posts = 0;
package Q23;
use base 'Q2';
sub process {}
package S;
sub trywait {
$_waits++;
shift()->{sem}->trywait;
}
sub post {
$_posts++;
Q23->enqueue($dbh, "race") if $_posts < 2;
shift()->{sem}->post;
}
package main;
my $q23 = Q23->new;
$q23->{semaphore} = bless({ sem => $q23->{semaphore} }, 'S');
Q23->enqueue($dbh, "h1");
is($q23->run($dbh), 2);
is($_posts, $_waits);
isa_ok(Q1->get_semaphore, 'POSIX::RT::Semaphore::Named');
Q1->unlink_semaphore;
unlike(`ls /dev/shm`, qr/q1/);
Q2->unlink_semaphore;
unlike(`ls /dev/shm`, qr/q2/);
Queue::Worker->create_table($dbh, 'ho');
ok($dbh->do("select * from queue_worker_ho"));