#!/usr/bin/perl
use strict;
use warnings;
use IO::Async::Loop;
use Net::Async::CassandraCQL;
use Protocol::CassandraCQL qw( :consistencies );
use Getopt::Long;
GetOptions(
'host|h=s' => \(my $HOST = "localhost"),
'user|u=s' => \my $USERNAME,
'pass|p=s' => \my $PASSWORD,
'keyspace|k=s' => \my $KEYSPACE,
'interval|i=f' => \(my $INTERVAL = 1),
'recreate' => \my $RECREATE,
'primaries|P=i' => \(my $PRIMARIES = 1),
) or exit 1;
defined $KEYSPACE or die "Need a --keyspace\n";
my $loop = IO::Async::Loop->new;
my $cass = Net::Async::CassandraCQL->new(
host => $HOST,
username => $USERNAME,
password => $PASSWORD,
keyspace => $KEYSPACE,
primaries => $PRIMARIES,
);
$loop->add( $cass );
$cass->connect->get;
# We don't use counter columns for this test, because they don't appear to
# work properly when multiple cassandra are hosted on the same physical
# machine
if( $RECREATE ) {
# don't care if it succeeded; if it fails that's probably because it existed
$loop->await( $cass->query( "DROP TABLE pinger", CONSISTENCY_ALL ) );
$cass->query( "CREATE TABLE pinger (key text PRIMARY KEY, count int)", CONSISTENCY_ALL )->get;
sleep 10;
}
$cass->query( "INSERT INTO pinger (key, count) VALUES ('pingcount', 0)", CONSISTENCY_ALL )->get;
my ( $select_smt, $update_smt ) = Future->needs_all(
$cass->prepare( "SELECT count FROM pinger WHERE key = 'pingcount'" ),
$cass->prepare( "UPDATE pinger SET count = ? WHERE key = 'pingcount'" ),
)->get;
while(1) {
my $f;
$cass->_debug_print_nodes;
$f = $select_smt->execute( [], CONSISTENCY_QUORUM );
$f->await until $f->is_ready;
if( $f->failure ) {
warn scalar $f->failure;
}
else {
my ( $type, $result ) = $f->get;
my $count = $result->row_array(0)->[0];
print "COUNT $count\n";
$f = $update_smt->execute( [ $count+1 ], CONSISTENCY_QUORUM );
$f->await until $f->is_ready;
warn scalar $f->failure if $f->failure
}
$loop->delay_future( after => $INTERVAL )->get;
}