The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#  (C) Paul Evans, 2013 --

package Protocol::CassandraCQL::Client;

use strict;
use warnings;

our $VERSION = '0.08';

use base qw( IO::Socket::IP );

use Carp;

use Protocol::CassandraCQL qw(
   :opcodes :results
   send_frame recv_frame FLAG_COMPRESS
use Protocol::CassandraCQL::Frame;
use Protocol::CassandraCQL::Result;

use Compress::Snappy qw( compress decompress );

use constant DEFAULT_CQL_PORT => 9042;

=head1 NAME

C<Protocol::CassandraCQL::Client> - a minimal Cassandra CQL client


 use Protocol::CassandraCQL::Client;
 use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );

 my $cass = Protocol::CassandraCQL::Client->new(
    PeerHost => "localhost",
    Keyspace => "my-keyspace",

 my ( undef, $result ) = $cass->query( "SELECT v FROM numbers" );

 foreach my $row ( $result->rows_hash ) {
    say "We have a number $row->{v}";


This subclass of L<IO::Socket::IP> implements a client that can execute
queries on a Cassandra CQL database. It is not intended as a complete client,
is simply provides enough functionallity to test that the protocol handling is
working, and is used to implement the bundled F<examples/cqlsh> utility.

For a more complete client, see instead L<Net::Async::CassandraCQL>.




=head2 $cass = Protocol::CassandraCQL::Client->new( %args )

Takes the following arguments in addition to those accepted by

=over 8

=item Username => STRING

=item Password => STRING

Authentication credentials if required by the server.

=item Keyspace => STRING

If defined, selects the keyspace to C<USE> after connection.



sub new
   my $class = shift;
   my %args = @_ == 1 ? ( PeerHost => $_[0] ) : @_;

   $args{PeerService} ||= DEFAULT_CQL_PORT;

   my $self = $class->SUPER::new( %args ) or return;

   $self->startup( %args );
   $self->use_keyspace( $args{Keyspace} ) if defined $args{Keyspace};

   return $self;

=head1 METHODS


=head2 ( $result_op, $result_frame ) = $cass->send_message( $opcode, $frame )

Sends a message with the given opcode and L<Protocol::CassandraCQL::Frame> for
the message body. Waits for a response to be received, and returns it.

If the response opcode is C<OPCODE_ERROR> then the error message string is
thrown directly as an exception; this method will only return in non-error


sub send_message
   my $self = shift;
   my ( $opcode, $frame ) = @_;

      my $flags = 0;
      my $body = $frame->bytes;

      my $body_compressed = compress( $body );
      if( length $body_compressed < length $body ) {
         $body = $body_compressed;
         $flags |= FLAG_COMPRESS;

      send_frame( $self, 0x01, $flags, 0, $opcode, $body );

   my ( $version, $flags, $streamid, $result_op, $body ) = recv_frame( $self ) or croak "Unable to ->recv: $!";

   $version == 0x81 or
      croak sprintf "Unexpected message vrsion %#02x", $version;

   if( $flags & FLAG_COMPRESS ) {
      $body = decompress( $body );
      $flags &= ~FLAG_COMPRESS;
   $flags == 0 or
      croak sprintf "Unexpected flags 0x%02x", $flags;

   $streamid == 0 or
      croak "Unexpected stream ID $streamid";

   my $response = Protocol::CassandraCQL::Frame->new( $body );

   if( $result_op == OPCODE_ERROR ) {
      croak "OPCODE_ERROR: " . $response->unpack_string;

   return ( $result_op, $response );

# function
sub _decode_result
   my ( $response ) = @_;

   my $result = $response->unpack_int;

   if( $result == RESULT_VOID ) {
   elsif( $result == RESULT_ROWS ) {
      return rows => Protocol::CassandraCQL::Result->from_frame( $response );
   elsif( $result == RESULT_SET_KEYSPACE ) {
      return keyspace => $response->unpack_string;
   elsif( $result == RESULT_SCHEMA_CHANGE ) {
      return schema_change => [ map { $response->unpack_string } 1 .. 3 ];
   else {
      return "??" => $response->bytes;

sub startup
   my $self = shift;
   my %args = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_STARTUP,
         ->pack_string_map( {
            CQL_VERSION => "3.0.5",
            COMPRESSION => "Snappy",
         } )

   if( $op == OPCODE_AUTHENTICATE ) {
      my $authenticator = $response->unpack_string;
      if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
         defined $args{Username} and defined $args{Password} or
            croak "Cannot authenticate without a username/password";

         ( $op, $response ) = $self->send_message( OPCODE_CREDENTIALS,
               ->pack_string_map( {
                  username => $args{Username},
                  password => $args{Password},
               } )
      else {
         croak "Unrecognised authenticator $authenticator";

   $op == OPCODE_READY or croak "Expected OPCODE_READY";

=head2 ( $type, $result ) = $cass->query( $cql, $consistency )

Performs a CQL query. The returned values will depend on the type of query:

For C<USE> queries, the type is C<keyspace> and C<$result> is a string giving
the name of the new keyspace.

For C<CREATE>, C<ALTER> and C<DROP> queries, the type is C<schema_change> and
C<$result> is a 3-element ARRAY reference containing the type of change, the
keyspace and the table name.

For C<SELECT> queries, the type is C<rows> and C<$result> is an instance of
L<Protocol::CassandraCQL::Result> containing the returned row data.

For other queries, such as C<INSERT>, C<UPDATE> and C<DELETE>, the method
returns nothing.


sub query
   my $self = shift;
   my ( $cql, $consistency ) = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_QUERY,
         ->pack_lstring( $cql )
         ->pack_short( $consistency )

   $op == OPCODE_RESULT or croak "Expected OPCODE_RESULT";
   return _decode_result( $response );

=head2 ( $type, $result ) = $cass->use_keyspace( $keyspace )

A convenient shortcut to the C<USE $keyspace> query which escapes the keyspace


sub use_keyspace
   my $self = shift;
   my ( $keyspace ) = @_;

   # CQL's "quoting" handles any character except quote marks, which have to
   # be doubled
   $keyspace =~ s/"/""/g;

   $self->query( qq(USE "$keyspace"), 0 );


This code was paid for by

=over 2

=item *

Perceptyx L<>

=item *

Shadowcat Systems L<>


=head1 AUTHOR

Paul Evans <>

