package Queue::Q::NaiveFIFO::Redis; use strict; use warnings; use Carp qw(croak); use Queue::Q::NaiveFIFO; use parent 'Queue::Q::NaiveFIFO'; use Redis; use Sereal::Encoder; use Sereal::Decoder; our $SerealEncoder; our $SerealDecoder; use Class::XSAccessor { getters => [qw(server port queue_name db _redis_conn)], }; sub new { my ($class, %params) = @_; for (qw(server port queue_name)) { croak("Need '$_' parameter") if not exists $params{$_}; } my $self = bless({ (map {$_ => $params{$_}} qw(server port queue_name) ), db => $params{db} || 0, _redis_conn => undef, } => $class); $self->{_redis_conn} = Redis->new( %{$params{redis_options} || {}}, encoding => undef, # force undef for binary data server => join(":", $self->server, $self->port), ); $self->_redis_conn->select($self->db) if $self->db; return $self; } sub enqueue_item { my $self = shift; croak("Need exactly one item to enqeue") if not @_ == 1; my ($blob) = $self->_serialize($_[0]); $self->_redis_conn->lpush($self->queue_name, $blob); } sub enqueue_items { my $self = shift; return if not @_; my $qn = $self->queue_name; my $conn = $self->_redis_conn; my @blobs = $self->_serialize(@_); $conn->lpush($qn, @blobs); } sub claim_item { my ($self) = @_; my ($rv) = $self->_deserialize( $self->_redis_conn->rpop($self->queue_name) ); return $rv; } sub claim_items { my ($self, $n) = @_; $n ||= 1; my $conn = $self->_redis_conn; my $qn = $self->queue_name; my @elem; $conn->rpop($qn, sub {push @elem, $_[0]}) for 1..$n; $conn->wait_all_responses; return $self->_deserialize( grep defined, @elem ); } sub flush_queue { my $self = shift; $self->_redis_conn->del($self->queue_name); } sub queue_length { my $self = shift; my ($len) = $self->_redis_conn->llen($self->queue_name); return $len; } sub _serialize { my $self = shift; $SerealEncoder ||= Sereal::Encoder->new({stringify_undef => 1, warn_undef => 1}); return map $SerealEncoder->encode($_), @_; } sub _deserialize { my $self = shift; $SerealDecoder ||= Sereal::Decoder->new(); return map defined($_) ? $SerealDecoder->decode($_) : $_, @_; } 1; __END__ =head1 NAME Queue::Q::NaiveFIFO::Redis - In-memory Redis implementation of the NaiveFIFO queue =head1 SYNOPSIS use Queue::Q::NaiveFIFO::Redis; my $q = Queue::Q::NaiveFIFO::Redis->new( server => 'myredisserver', port => 6379, queue_name => 'my_work_queue', ); $q->enqueue_item("foo"); $q->enqueue_item({ bar => "baz" }); # any Sereal-serializable data structure my $foo = $q->claim_item; my $bar = $q->claim_item; =head1 DESCRIPTION Implements interface defined in L: an implementation based on Redis lists. The data structures passed to C are serialized using Sereal (cf. L, L), so any data structures supported by that can be enqueued. =head1 METHODS All methods of L plus: =head2 new Constructor. Takes named parameters. Required parameters are the C hostname or address, the Redis C, and the name of the Redis key to use as the C. You may optionally specify a Redis C number to use. Since this module will establish the Redis connection, you may pass in a hash reference of options that are valid for the constructor of the L module. This can be passed in as the C parameter. =head2 claim_item($timeout_secs) The claim_item method has an optional parameter here, which is the timeout in seconds it will wait for a new item. Default wait time is one second. Using a timeout > 0 sec, no additional sleep() calls are needed and items will be available to the consumer without a delay. =head1 AUTHOR Steffen Mueller, Esmueller@cpan.orgE =head1 COPYRIGHT AND LICENSE Copyright (C) 2012 by Steffen Mueller This library is free software; you can redistribute it and/or modify it under the same terms as Perl itself, either Perl version 5.8.1 or, at your option, any later version of Perl 5 you may have available. =cut