# $Id$ package Mvalve::Reader; use Moose; use Mvalve; use Mvalve::Const; use Mvalve::Types; extends 'Mvalve::Base'; has 'timeout' => ( is => 'rw', isa => 'Num', required => 1, default => 60 ); has 'throttler' => ( is => 'rw', does => 'Mvalve::Throttler', required => 1, coerce => 1, handles => [ qw(try_push fill) ], ); has 'drop_on_emergency' => ( is => 'rw', isa => 'Bool', required => 1, default => 1, ); __PACKAGE__->meta->make_immutable; no Moose; sub next { my $self = shift; my $qs = $self->queue_set; my @names = $qs->as_q4m_args; Mvalve::trace("queue_wait with @names") if &Mvalve::Const::MVALVE_TRACE; my $table = $self->q_next( table_conds => \@names, timeout => $self->timeout + 0 ); if (! $table) { Mvalve::trace( "q_next did not return a table name, simply returning" ) if &Mvalve::Const::MVALVE_TRACE; return (); } Mvalve::trace( "issueing fetch on table '$table'") if &Mvalve::Const::MVALVE_TRACE; my $message = $self->q_fetch(table => $table); if (! $message) { Mvalve::trace( "q_fetch did not return a message, simply returning" ) if &Mvalve::Const::MVALVE_TRACE; return (); } my $destination = $message->header( &Mvalve::Const::DESTINATION_HEADER ); if (&Mvalve::Const::MVALVE_TRACE && $qs->is_timed( $table )) { Mvalve::trace( "we should have dispatched at " . scalar( localtime( $message->header( &Mvalve::Const::RETRY_HEADER ) /100_000 ) ) ); } # destination is an abstract symbol representing the endpoint # service name. this /could/ be used by the queue consumer, but it # is *not* a my $emerg_key = [ $destination, 'emergency' ]; if ( $qs->is_emergency( $table ) ) { if ($self->drop_on_emergency) { my $duration = $message->header(&Mvalve::Const::DURATION_HEADER) || $self->throttler->interval; $self->state_set($emerg_key, time + $duration); } else { $self->fill( key => $destination ) } goto RETURN_MESSAGE; } my $avail_time = $self->state_get($emerg_key); if ($avail_time && time < $avail_time) { return (); # drop message! } if ( $qs->is_timed( $table ) ) { # if this is from an emergency queue or a timed queue, we go ahead # and allow the message, but we also update the throttler's count # so the next message from a normal queue would be throttled correctly if ($message->header(&Mvalve::Const::RETRY_HEADER)) { $self->state_decr( [ $destination, 'retry' ] ); } $self->fill( key => $destination ); # XXX - This is bad practive, but oh well goto RETURN_MESSAGE; } # otherwise, we need to check if this message is going to be throttled my $is_pending = $self->is_pending( $destination ); my $is_throttled = ! $self->try_push( key => $destination ); Mvalve::trace( "checking if message to $destination should be throttled (pending: $is_pending, throttled: $is_throttled)" ) if &Mvalve::Const::MVALVE_TRACE; if ($is_throttled || $is_pending) { Mvalve::trace( "message", $message->id, "is being throttled") if &Mvalve::Const::MVALVE_TRACE; $self->defer( message => $message, interval => $self->throttler->interval, ); $self->log( action => "throttle", destination => $destination ); return (); # no data for you! } RETURN_MESSAGE: # if we got here, we can just return the data Mvalve::trace( "message", $message->id, "being returned") if &Mvalve::Const::MVALVE_TRACE; $self->log( action => "dequeue", destination => $destination, ); return $message; } sub is_pending { my( $self, $destination ) = @_; my $retry_key = [ $destination, 'retry' ]; my $count = $self->state_get($retry_key); return $count ? 1 : 0; } 1; __END__ =head1 NAME Mvalve::Reader - Mvalve Reader =head1 METHODS =head2 next Fetches the next available message. =head2 is_pending( $destination ) Checks whethere there are pending retries for that particular destination. =head2 throttler C holds the Data::Throttler instance that does the dirty work of determining if a message needs to be throttled or not $self->throttler( { module => "Data::Throttler::Memcached", } ); =head2 timeout C specifies the timeout value while we wait to read from the queue. =head2 drop_on_emergency C specifies if given a emergency message, other messages should be dropped while the emergency message is being displayed. By default this is C. =cut