package MangoX::Queue;
use Mojo::Base 'Mojo::EventEmitter';
use Carp 'croak';
use DateTime;
use DateTime::Duration;
use Mojo::Log;
use Mango::BSON ':bson';
use MangoX::Queue::Delay;
no warnings 'experimental::smartmatch';
our $VERSION = '0.03';
# A logger
has 'log' => sub { Mojo::Log->new->level('error') };
# The Mango::Collection representing the queue
has 'collection';
# A MangoX::Queue::Delay
has 'delay' => sub { MangoX::Queue::Delay->new };
# How long to wait before assuming a job has failed
has 'timeout' => sub { $ENV{MANGOX_QUEUE_JOB_TIMEOUT} // 60 };
# How many times to retry a job before giving up
has 'retries' => sub { $ENV{MANGOX_QUEUE_JOB_RETRIES} // 5 };
# Store Mojo::IOLoop->timer IDs
has 'consumers' => sub { {} };
# Store plugins
has 'plugins' => sub { {} };
sub new {
my $self = shift->SUPER::new(@_);
croak qq{No Mango::Collection provided to constructor} unless ref($self->collection) eq 'Mango::Collection';
return $self;
}
sub plugin {
my ($self, $name, $options) = @_;
croak qq{Plugin $name already loaded} if exists $self->plugins->{$name};
{
no strict 'refs';
unless($name->can('new')) {
eval "require $name" or croak qq{Failed to load plugin $name: $@};
}
}
$self->plugins->{$name} = $name->new(%$options);
$self->plugins->{$name}->register($self);
return $self->plugins->{$name};
}
sub get_options {
my ($self) = @_;
return {
query => {
'$or' => [{
status => {
'$in' => [ 'Pending' ]
}
},{
status => {
'$in' => [ 'Retrieved' ]
},
retrieved => {
'$lt' => DateTime->now->subtract_duration(DateTime::Duration->new(seconds => $self->timeout))
}
}]
},
update => {
'$set' => {
status => 'Retrieved',
retrieved => DateTime->now,
},
'$inc' => {
attempt => 1,
}
},
sort => bson_doc( # Sort by priority, then in order of creation
'priority' => 1,
'created' => -1,
),
new => 0, # Get the original object (so we can see status etc)
};
}
sub enqueue {
my ($self, @args) = @_;
# args maybe
# - 'job_name'
# - foo => bar, 'job_name'
# - 'job_name', $callback
# - foo => bar, 'job_name', $callback
my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
my $job = pop @args;
my %args;
%args = (@args) if scalar @args;
my $db_job = {
priority => $args{priority} // 1,
created => $args{created} // DateTime->now,
data => $job,
status => $args{status} // 'Pending',
attempt => 1,
};
if($callback) {
return $self->collection->insert($db_job => sub {
my ($collection, $error, $oid) = @_;
$db_job->{_id} = $oid;
$self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
$callback->($db_job);
});
} else {
my $id = $self->collection->insert($db_job);
$db_job->{_id} = $id;
$self->emit_safe(enqueued => $db_job) if $self->has_subscribers('enqueued');
return $db_job;
}
}
sub watch {
my ($self, $id_or_job, $status, $callback) = @_;
my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
$status //= 'Complete';
# args
# - watch $queue $id, 'Status' => $callback
if($callback) {
# Non-blocking
$self->log->debug("Waiting for $id on status $status in non-blocking mode");
return Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
} else {
# Blocking
$self->log->debug("Waiting for $id on status $status in blocking mode");
return $self->_watch_blocking($id, $status);
}
}
sub _watch_blocking {
my ($self, $id, $status) = @_;
while(1) {
my $doc = $self->collection->find_one({'_id' => $id});
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
return 1;
} else {
$self->delay->wait;
}
}
}
sub _watch_nonblocking {
my ($self, $id, $status, $callback) = @_;
$self->collection->find_one({'_id' => $id} => sub {
my ($cursor, $err, $doc) = @_;
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
if($doc && ((!ref($status) && $doc->{status} eq $status) || (ref($status) eq 'ARRAY' && $doc->{status} ~~ @$status))) {
$self->log->debug("Status is $status");
$self->delay->reset;
$callback->($doc);
} else {
$self->log->debug("Job not found or status doesn't match");
$self->delay->wait(sub {
return unless Mojo::IOLoop->is_running;
Mojo::IOLoop->timer(0 => sub { $self->_watch_nonblocking($id, $status, $callback) });
});
return undef;
}
});
}
sub requeue {
my ($self, $job, $callback) = @_;
$job->{status} = 'Pending';
return $self->update($job, $callback);
}
sub dequeue {
my ($self, $id_or_job, $callback) = @_;
# TODO option to not remove on dequeue?
my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
if($callback) {
$self->collection->remove({'_id' => $id} => sub {
$callback->();
$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
});
} else {
$self->collection->remove({'_id' => $id});
$self->emit_safe(dequeued => $id_or_job) if $self->has_subscribers('dequeued');
}
}
sub get {
my ($self, $id_or_job, $callback) = @_;
my $id = ref($id_or_job) ? $id_or_job->{_id} : $id_or_job;
if($callback) {
return $self->collection->find_one({'_id' => $id} => sub {
my ($collection, $error, $doc) = @_;
$callback->($doc);
});
} else {
return $self->collection->find_one({'_id' => $id});
}
}
sub update {
my ($self, $job, $callback) = @_;
if($callback) {
return $self->collection->find_one({'_id' => $job->{_id}} => sub {
my ($collection, $error, $doc) = @_;
$callback->($doc);
});
} else {
return $self->collection->update({'_id' => $job->{_id}}, $job, {upsert => 1});
}
}
sub fetch {
my ($self, @args) = @_;
# fetch $queue status => 'Complete', sub { my $job = shift; }
my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
my %args;
%args = (@args) if scalar @args;
$self->log->debug("In fetch");
if($callback) {
$self->log->debug("Fetching in non-blocking mode");
my $consumer_id = (scalar keys %{$self->consumers}) + 1;
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 1) });
return $consumer_id;
} else {
$self->log->debug("Fetching in blocking mode");
return $self->_consume_blocking(\%args, 1);
}
}
sub consume {
my ($self, @args) = @_;
# consume $queue status => 'Failed', sub { my $job = shift; }
my $callback = ref($args[-1]) eq 'CODE' ? pop @args : undef;
my %args;
%args = (@args) if scalar @args;
$self->log->debug("In consume");
if($callback) {
$self->log->debug("consuming in non-blocking mode");
my $consumer_id = (scalar keys %{$self->consumers}) + 1;
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking(\%args, $consumer_id, $callback, 0) });
$self->log->debug("Timer scheduled, consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
return $consumer_id;
} else {
$self->log->debug("consuming in blocking mode");
return $self->_consume_blocking(\%args, 0);
}
}
sub release {
my ($self, $consumer_id) = @_;
$self->log->debug("Releasing consumer $consumer_id with timer ID: " . $self->consumers->{$consumer_id});
Mojo::IOLoop->remove($self->consumers->{$consumer_id});
delete $self->consumers->{$consumer_id};
return;
}
sub _consume_blocking {
my ($self, $args, $fetch) = @_;
while(1) {
my $opts = $self->get_options;
$opts->{query} = $args if scalar keys %$args;
my $doc = $self->collection->find_and_modify($opts);
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
if($doc) {
$self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
return $doc;
} else {
last if $fetch;
$self->delay->wait;
}
}
}
sub _consume_nonblocking {
my ($self, $args, $consumer_id, $callback, $fetch) = @_;
my $opts = $self->get_options;
$opts->{query} = $args if scalar keys %$args;
$self->collection->find_and_modify($opts => sub {
my ($cursor, $err, $doc) = @_;
$self->log->debug("Job found by Mango: " . ($doc ? 'Yes' : 'No'));
if($doc) {
$self->delay->reset;
$self->emit_safe(consumed => $doc) if $self->has_subscribers('consumed');
$callback->($doc);
return unless Mojo::IOLoop->is_running;
return if $fetch;
return unless exists $self->consumers->{$consumer_id};
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
$self->log->debug("Timer rescheduled (recursive immediate), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
} else {
return unless Mojo::IOLoop->is_running;
return if $fetch;
$self->delay->wait(sub {
return unless exists $self->consumers->{$consumer_id};
$self->consumers->{$consumer_id} = Mojo::IOLoop->timer(0 => sub { $self->_consume_nonblocking($args, $consumer_id, $callback, 0) });
$self->log->debug("Timer rescheduled (recursive delayed), consumer_id $consumer_id has timer ID: " . $self->consumers->{$consumer_id});
});
return undef;
}
});
}
1;
=encoding utf8
=head1 NAME
MangoX::Queue - A MongoDB queue implementation using Mango
=head1 DESCRIPTION
L<MangoX::Queue> is a MongoDB backed queue implementation using L<Mango> to support
blocking and non-blocking queues.
L<MangoX::Queue> makes no attempt to handle the L<Mango> connection, database or
collection - pass in a collection to the constructor and L<MangoX::Queue> will
use it. The collection can be plain, capped or sharded.
=head1 SYNOPSIS
use Mango;
use MangoX::Queue;
my $mango = Mango->new("mongodb://localhost:27017");
my $collection = $mango->db('my_db')->collection('my_queue');
my $queue = MangoX::Queue->new(collection => $collection);
# To add a job
my $id = enqueue $queue 'test'; # Blocking
enqueue $queue 'test' => sub { my $id = shift; }; # Non-blocking
# To set options
my $id = enqueue $queue priority => 1, created => DateTime->now, 'test'; # Blocking
enqueue $queue priority => 1, created => DateTime->now, 'test' => sub { my $id = shift; }; # Non-blocking
# To watch for a specific job status
watch $queue $id; # Blocking
watch $queue $id, 'Complete' => sub { # Non-blocking
# Job status is 'Complete'
};
# To fetch a job
my $job = fetch $queue; # Blocking
fetch $queue sub { # Non-blocking
my ($job) = @_;
# ...
};
# To get a job by id
my $job = get $queue $id; # Blocking
get $queue $id => sub { my $job = shift; }; # Non-blocking
# To requeue a job
my $id = requeue $queue $job; # Blocking
requeue $queue $job => sub { my $id = shift; }; # Non-blocking
# To dequeue a job
dequeue $queue $id; # Blocking
dequeue $queue $id => sub { }; # Non-blocking
# To consume a queue
while(my $job = consume $queue) { # Blocking
# ...
}
my $consumer = consume $queue sub { # Non-blocking
my ($job) = @_;
# ...
};
# To stop consuming a queue
release $queue $consumer;
# To listen for events
on $queue enqueued => sub ( my ($queue, $job) = @_; };
on $queue dequeued => sub ( my ($queue, $job) = @_; };
on $queue consumed => sub { my ($queue, $job) = @_; };
# To register a plugin
plugin $queue 'MangoX::Queue::Plugin::Statsd';
=head1 ATTRIBUTES
L<MangoX::Queue> implements the following attributes.
=head2 collection
my $collection = $queue->collection;
$queue->collection($mango->db('foo')->collection('bar'));
my $queue = MangoX::Queue->new(collection => $collection);
The L<Mango::Collection> representing the MongoDB queue collection.
=head2 delay
my $delay = $queue->delay;
$queue->delay(MangoX::Queue::Delay->new);
The L<MangoX::Queue::Delay> responsible for dynamically controlling the
delay between queue queries.
=head2 plugins
my $plugins = $queue->plugins;
Returns a hash containing the plugins registered with this queue.
=head2 retries
my $retries = $queue->retries;
$queue->retries(5);
The number of times a job will be picked up from the queue before it is
marked as failed.
=head2 timeout
my $timeout = $queue->timeout;
$queue->timeout(10);
The time (in seconds) a job is allowed to stay in Retrieved state before
it is released back into Pending state. Defaults to 60 seconds.
=head1 EVENTS
L<MangoX::Queue> inherits from L<Mojo::EventEmitter> and emits the following events
=head2 consumed
on $queue consumed => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is consumed (either via consume or fetch)
=head2 dequeued
on $queue dequeued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is dequeued
=head2 enqueued
on $queue enqueued => sub {
my ($queue, $job) = @_;
# ...
};
Emitted when an item is enqueued
=head1 METHODS
L<MangoX::Queue> implements the following methods.
=head2 consume
# In blocking mode
while(my $job = consume $queue) {
# ...
}
while(my $job = $queue->consume) {
# ...
}
# In non-blocking mode
consume $queue sub {
my ($job) = @_;
# ...
};
$queue->consume(sub {
my ($job) = @_;
# ...
});
Waits for jobs to arrive on the queue, sleeping between queue checks using L<MangoX::Queue::Delay> or L<Mojo::IOLoop>.
Currently sets the status to 'Retrieved' before returning the job.
=head2 dequeue
my $job = fetch $queue;
dequeue $queue $job;
Dequeues a job. Currently removes it from the collection.
=head2 enqueue
enqueue $queue 'job name';
enqueue $queue [ 'some', 'data' ];
enqueue $queue +{ foo => 'bar' };
$queue->enqueue('job name');
$queue->enqueue([ 'some', 'data' ]);
$queue->enqueue({ foo => 'bar' });
Add an item to the queue.
Currently uses priority 1 with a job status of 'Pending'.
=head2 fetch
# In blocking mode
my $job = fetch $queue;
my $job = $queue->fetch;
# In non-blocking mode
fetch $queue sub {
my ($job) = @_;
# ...
};
$queue->fetch(sub {
my ($job) = @_;
# ...
});
Fetch a single job from the queue, returning undef if no jobs are available.
Currently sets job status to 'Retrieved'.
=head2 get
my $job = get $queue $id;
Gets a job from the queue by ID. Doesn't change the job status.
=head2 get_options
my $options = $queue->get_options;
Returns the L<Mango::Collection> options hash used by find_and_modify to
identify and update available queue items.
Wait for a job to enter a certain status.
=head2 release
my $consumer = consume $queue sub {
# ...
};
release $queue $consumer;
Releases a non-blocking consumer from watching a queue.
=head2 requeue
my $job = fetch $queue;
requeue $queue $job;
Requeues a job. Sets the job status to 'Pending'.
=head2 update
my $job = fetch $queue;
$job->{status} = 'Failed';
update $queue $job;
Updates a job in the queue.
=head2 watch
# In blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete'; # blocks until job is complete
# In non-blocking mode
my $id = enqueue $queue 'test';
watch $queue $id, 'Complete' => sub {
# ...
};
=head1 SEE ALSO
L<Mojolicious>, L<Mango>
=cut