use strict; use Test::More; my $messages; BEGIN { eval { require Cache::Memcached }; if ($@) { plan(skip_all => "Cache::Memcached not installed"); } elsif (! $ENV{MVALVE_Q4M_DSN} ) { plan(skip_all => "Define MVALVE_Q4M_DSN to run this test"); } else { $messages = $ENV{MVALVE_MESSAGE_COUNT} || 32; plan(tests => 3 + 3 * $messages); } $ENV{MEMCACHED_SERVERS} ||= '127.0.0.1:11211'; $ENV{MEMCACHED_NAMESPACE} ||= join('_', __FILE__, $$, {}, rand()); $ENV{MEMCACHED_SERVERS} = [ split(/\s*,\s*/, $ENV{MEMCACHED_SERVERS}) ]; use_ok("Mvalve::Reader"); use_ok("Mvalve::Writer"); } { my %q_config = ( args => { connect_info => [ $ENV{MVALVE_Q4M_DSN}, $ENV{MVALVE_Q4M_USERNAME}, $ENV{MVALVE_Q4M_PASSWORD}, { RaiseError => 1, AutoCommit => 1 }, ] } ); my $writer = Mvalve::Writer->new( queue => \%q_config ); my $reader = Mvalve::Reader->new( timeout => 1, throttler => { module => 'Data::Valve', args => { max_items => 1, interval => 1.4, } }, state => { module => 'Memcached', args => { memcached => { servers => $ENV{MEMCACHED_SERVERS}, namespace => $ENV{MEMCACHED_NAMESPACE}, } } }, queue => \%q_config ); $reader->clear_all; my $count = $messages; diag( "Generating $count messages...." ); my %messages; for my $i (1..$count) { my $message = Mvalve::Message->new( headers => { 'X-Mvalve-Destination' => 'test' }, content => $i, ); ok( $writer->insert( message => $message ), "insert data $i"); $messages{ $message->id } = $message; } { my $message = $reader->next; ok( $message, 'first message should not be throttled' ); if ($message) { delete $messages{ $message->id }; } $count--; for my $i (1..$count) { my $message = $reader->next; ok( ! $message, "subsequent messages should be throttled" ); } } diag("Going to receive messages as they are being throttled. This may take a few moments..."); { my $i = 0; while ($i < $count) { my $message = $reader->next; next unless $message; ok( delete $messages{ $message->id }, "Deleting a proper (unhandled) message"); $i++; } is( $i, $count, "count matches" ); is (keys %messages, 0, "consumed all messages"); } }