The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
#!/usr/bin/env perl

use strict;
use warnings;
use v5.10;

use Mango;
use MangoX::Queue;

use Test::More;

my $mango = Mango->new($ENV{MANGO_URI} // 'mongodb://localhost:27017');
my $collection = $mango->db('test')->collection('mangox_queue_test');
eval { $collection->drop };
$collection->create;

my $queue = MangoX::Queue->new(collection => $collection);

test_nonblocking_consume();
test_blocking_consume();
test_custom_consume();
test_job_finished_method();
test_concurrent_job_limit_disabled();
test_concurrent_job_limit_reached();

sub test_nonblocking_consume {
	enqueue $queue '82365';

	my $happened = 0;

	my $consumer_id;
	$consumer_id = consume $queue sub {
		my ($job) = @_;

		$happened++;
		if($happened == 1) {
			is($job->{data}, '82365', 'Found job 82365 in non-blocking consume');
			Mojo::IOLoop->timer(1 => sub {
				enqueue $queue '29345';
			});
		} elsif ($happened == 2) {
			is($job->{data}, '29345', 'Found job 29345 in non-blocking consume');
			release $queue $consumer_id;
			Mojo::IOLoop->stop;
		} else {
			use Data::Dumper; print Dumper $job;
			fail('Queue consumed too many items');
		}
	};

	is($happened, 0, 'Non-blocking consume successful');

	Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
}

sub test_blocking_consume {
	enqueue $queue 'test';

	while(my $item = consume $queue) {
		ok(1, 'Found job in blocking consume');
		last;
	}
}

sub test_custom_consume {
	$collection->remove;

	my $id = enqueue $queue 'custom consume test';

	my $happened = 0;

	my $consumer_id;
	$consumer_id = consume $queue status => 'Failed', sub {
		my ($job) = @_;
		isnt($job, undef, 'Found failed job in non-blocking custom consume');

		release $queue $consumer_id;
		Mojo::IOLoop->stop;
		return;
	};

	is($happened, 0, 'Non-blocking consume successful');

	Mojo::IOLoop->timer(1 => sub {
		my $job = get $queue $id;
		$job->{status} = 'Failed';
		update $queue $job;
	});

	Mojo::IOLoop->start unless Mojo::IOLoop->is_running;
}

sub test_concurrent_job_limit_disabled {
	my $queue_concurrent_job_limit_backup = $queue->concurrent_job_limit;
	my $jobs = [];
	my $consumed_job_count = 0;
	my $concurrent_job_limit_reached_flag;
	my $consumer_id;

	$queue->concurrent_job_limit(-1);
	is($queue->concurrent_job_limit, -1, 'concurrent_job_limit changed to -1');

	# Enqueue 10 dummy jobs
	$queue->enqueue($_) for (1..10);

	# Start consuming jobs
	$consumer_id = consume $queue sub {
		my ($job) = @_;

		$consumed_job_count++;

    # print "IN FIRST CONSUMER\n";

		# Push jobs to array so we can finish() them later
		push(@$jobs, $job);
	};

	# Subscribe to the 'concurrent_job_limit_reached' event so we know when consuming has paused
	my $handler = $queue->on(concurrent_job_limit_reached => sub {
    ok(0, 'concurrent_job_limit_reached should not have been emitted');

		# Finish the jobs previously stored in the array
		while (shift(@$jobs)) {};
	});

	# Start waiting for all jobs to finish
	Mojo::IOLoop->timer(0 => sub { _wait_test_concurrent_job_limit_reached($queue, $consumer_id, \$consumed_job_count, $jobs); });
	Mojo::IOLoop->start;

	ok($consumed_job_count == 10, 'consumed_job_count == 10');

  $queue->unsubscribe(concurrent_job_limit_reached => $handler);
	$queue->concurrent_job_limit($queue_concurrent_job_limit_backup);
}

sub test_job_finished_method {
  # TODO without this, somehow the old consumer gets called (from last test)
  $queue = MangoX::Queue->new(collection => $collection);

	my $queue_concurrent_job_limit_backup = $queue->concurrent_job_limit;
	my $jobs = [];
	my $consumed_job_count = 0;
	my $concurrent_job_limit_reached_flag;
	my $consumer_id;

	is($queue->concurrent_job_limit, 10, 'concurrent_job_limit is the default (10)');
	$queue->concurrent_job_limit(5);
	is($queue->concurrent_job_limit, 5, 'concurrent_job_limit changed to 5');

	# Enqueue 10 dummy jobs
	$queue->enqueue($_) for (1..10);

	# Start consuming jobs
	$consumer_id = consume $queue sub {
		my ($job) = @_;

		$consumed_job_count++;
    #print "CONSUMED JOB COUNT: $consumed_job_count\n";

		# Push jobs to array so we can finish() them later
		push(@$jobs, $job);
	};

	# Subscribe to the 'concurrent_job_limit_reached' event so we know when consuming has paused
	$queue->on(concurrent_job_limit_reached => sub {
		$concurrent_job_limit_reached_flag = 1;

    #print "Event fires\n";

		# Finish the jobs previously stored in the array
		# But keep them in the array so we have a reference, and manually call ->finished
		$_->finished for @$jobs;
	});

	# Start waiting for all jobs to finish
	Mojo::IOLoop->timer(0 => sub { _wait_test_concurrent_job_limit_reached($queue, $consumer_id, \$consumed_job_count, $jobs) });
	Mojo::IOLoop->start;

	ok($consumed_job_count == 10, 'consumed_job_count == 10');
	ok($concurrent_job_limit_reached_flag, 'concurrent_job_limit was reached');

	$queue->concurrent_job_limit($queue_concurrent_job_limit_backup);
}

sub test_concurrent_job_limit_reached {
  # TODO without this, somehow the old consumer gets called (from last test)
  $queue = MangoX::Queue->new(collection => $collection);

	my $queue_concurrent_job_limit_backup = $queue->concurrent_job_limit;
	my $jobs = [];
	my $consumed_job_count = 0;
	my $concurrent_job_limit_reached_flag;
	my $consumer_id;

	is($queue->concurrent_job_limit, 10, 'concurrent_job_limit is the default (10)');
	$queue->concurrent_job_limit(5);
	is($queue->concurrent_job_limit, 5, 'concurrent_job_limit changed to 5');

	# Enqueue 10 dummy jobs
	$queue->enqueue($_) for (1..10);

	# Start consuming jobs
	$consumer_id = consume $queue sub {
		my ($job) = @_;

		$consumed_job_count++;
    #print "CONSUMED JOB COUNT: $consumed_job_count\n";

		# Push jobs to array so we can finish() them later
		push(@$jobs, $job);
	};

	# Subscribe to the 'concurrent_job_limit_reached' event so we know when consuming has paused
	$queue->on(concurrent_job_limit_reached => sub {
		$concurrent_job_limit_reached_flag = 1;

    #print "Event fires\n";

		# Finish the jobs previously stored in the array
		while (shift(@$jobs)) {};
	});

	# Start waiting for all jobs to finish
	Mojo::IOLoop->timer(0 => sub { _wait_test_concurrent_job_limit_reached($queue, $consumer_id, \$consumed_job_count, $jobs) });
	Mojo::IOLoop->start;

	ok($consumed_job_count == 10, 'consumed_job_count == 10');
	ok($concurrent_job_limit_reached_flag, 'concurrent_job_limit was reached');

	$queue->concurrent_job_limit($queue_concurrent_job_limit_backup);
}

sub _wait_test_concurrent_job_limit_reached {
	my ($queue, $consumer_id, $consumed_job_count, $jobs) = @_;

	if ($$consumed_job_count == 10) {
		# Make sure there are no un-finished jobs
		while (shift(@$jobs)) {};

    # print "Finished jobs\n";

		release $queue $consumer_id;
		Mojo::IOLoop->stop;
	} else {
    # print "Waiting: $$consumed_job_count\n";
	  $queue->delay->wait(sub{
			_wait_test_concurrent_job_limit_reached($queue, $consumer_id, $consumed_job_count, $jobs);
		});
	}
}

done_testing;