The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
#  You may distribute under the terms of either the GNU General Public License
#  or the Artistic License (the same terms as Perl itself)
#
#  (C) Paul Evans, 2013 -- leonerd@leonerd.org.uk

package Protocol::CassandraCQL::Client;

use strict;
use warnings;

our $VERSION = '0.08';

use base qw( IO::Socket::IP );

use Carp;

use Protocol::CassandraCQL qw(
   :opcodes :results
   send_frame recv_frame FLAG_COMPRESS
);
use Protocol::CassandraCQL::Frame;
use Protocol::CassandraCQL::Result;

use Compress::Snappy qw( compress decompress );

use constant DEFAULT_CQL_PORT => 9042;

=head1 NAME

C<Protocol::CassandraCQL::Client> - a minimal Cassandra CQL client

=head1 SYNOPSIS

 use Protocol::CassandraCQL::Client;
 use Protocol::CassandraCQL qw( CONSISTENCY_QUORUM );

 my $cass = Protocol::CassandraCQL::Client->new(
    PeerHost => "localhost",
    Keyspace => "my-keyspace",
 );

 my ( undef, $result ) = $cass->query( "SELECT v FROM numbers" );

 foreach my $row ( $result->rows_hash ) {
    say "We have a number $row->{v}";
 }

=head1 DESCRIPTION

This subclass of L<IO::Socket::IP> implements a client that can execute
queries on a Cassandra CQL database. It is not intended as a complete client,
is simply provides enough functionallity to test that the protocol handling is
working, and is used to implement the bundled F<examples/cqlsh> utility.

For a more complete client, see instead L<Net::Async::CassandraCQL>.

=cut

=head1 CONSTRUCTOR

=cut

=head2 $cass = Protocol::CassandraCQL::Client->new( %args )

Takes the following arguments in addition to those accepted by
L<IO::Socket::IP>:

=over 8

=item Username => STRING

=item Password => STRING

Authentication credentials if required by the server.

=item Keyspace => STRING

If defined, selects the keyspace to C<USE> after connection.

=back

=cut

sub new
{
   my $class = shift;
   my %args = @_ == 1 ? ( PeerHost => $_[0] ) : @_;

   $args{PeerService} ||= DEFAULT_CQL_PORT;

   my $self = $class->SUPER::new( %args ) or return;

   $self->startup( %args );
   $self->use_keyspace( $args{Keyspace} ) if defined $args{Keyspace};

   return $self;
}

=head1 METHODS

=cut

=head2 ( $result_op, $result_frame ) = $cass->send_message( $opcode, $frame )

Sends a message with the given opcode and L<Protocol::CassandraCQL::Frame> for
the message body. Waits for a response to be received, and returns it.

If the response opcode is C<OPCODE_ERROR> then the error message string is
thrown directly as an exception; this method will only return in non-error
cases.

=cut

sub send_message
{
   my $self = shift;
   my ( $opcode, $frame ) = @_;

   {
      my $flags = 0;
      my $body = $frame->bytes;

      my $body_compressed = compress( $body );
      if( length $body_compressed < length $body ) {
         $body = $body_compressed;
         $flags |= FLAG_COMPRESS;
      }

      send_frame( $self, 0x01, $flags, 0, $opcode, $body );
   }

   my ( $version, $flags, $streamid, $result_op, $body ) = recv_frame( $self ) or croak "Unable to ->recv: $!";

   $version == 0x81 or
      croak sprintf "Unexpected message vrsion %#02x", $version;

   if( $flags & FLAG_COMPRESS ) {
      $body = decompress( $body );
      $flags &= ~FLAG_COMPRESS;
   }
   $flags == 0 or
      croak sprintf "Unexpected flags 0x%02x", $flags;

   $streamid == 0 or
      croak "Unexpected stream ID $streamid";

   my $response = Protocol::CassandraCQL::Frame->new( $body );

   if( $result_op == OPCODE_ERROR ) {
      $response->unpack_int;
      croak "OPCODE_ERROR: " . $response->unpack_string;
   }

   return ( $result_op, $response );
}

# function
sub _decode_result
{
   my ( $response ) = @_;

   my $result = $response->unpack_int;

   if( $result == RESULT_VOID ) {
      return;
   }
   elsif( $result == RESULT_ROWS ) {
      return rows => Protocol::CassandraCQL::Result->from_frame( $response );
   }
   elsif( $result == RESULT_SET_KEYSPACE ) {
      return keyspace => $response->unpack_string;
   }
   elsif( $result == RESULT_SCHEMA_CHANGE ) {
      return schema_change => [ map { $response->unpack_string } 1 .. 3 ];
   }
   else {
      return "??" => $response->bytes;
   }
}

sub startup
{
   my $self = shift;
   my %args = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_STARTUP,
      Protocol::CassandraCQL::Frame->new
         ->pack_string_map( {
            CQL_VERSION => "3.0.5",
            COMPRESSION => "Snappy",
         } )
   );

   if( $op == OPCODE_AUTHENTICATE ) {
      my $authenticator = $response->unpack_string;
      if( $authenticator eq "org.apache.cassandra.auth.PasswordAuthenticator" ) {
         defined $args{Username} and defined $args{Password} or
            croak "Cannot authenticate without a username/password";

         ( $op, $response ) = $self->send_message( OPCODE_CREDENTIALS,
            Protocol::CassandraCQL::Frame->new
               ->pack_string_map( {
                  username => $args{Username},
                  password => $args{Password},
               } )
         );
      }
      else {
         croak "Unrecognised authenticator $authenticator";
      }
   }

   $op == OPCODE_READY or croak "Expected OPCODE_READY";
}

=head2 ( $type, $result ) = $cass->query( $cql, $consistency )

Performs a CQL query. The returned values will depend on the type of query:

For C<USE> queries, the type is C<keyspace> and C<$result> is a string giving
the name of the new keyspace.

For C<CREATE>, C<ALTER> and C<DROP> queries, the type is C<schema_change> and
C<$result> is a 3-element ARRAY reference containing the type of change, the
keyspace and the table name.

For C<SELECT> queries, the type is C<rows> and C<$result> is an instance of
L<Protocol::CassandraCQL::Result> containing the returned row data.

For other queries, such as C<INSERT>, C<UPDATE> and C<DELETE>, the method
returns nothing.

=cut

sub query
{
   my $self = shift;
   my ( $cql, $consistency ) = @_;

   my ( $op, $response ) = $self->send_message( OPCODE_QUERY,
      Protocol::CassandraCQL::Frame->new
         ->pack_lstring( $cql )
         ->pack_short( $consistency )
   );

   $op == OPCODE_RESULT or croak "Expected OPCODE_RESULT";
   return _decode_result( $response );
}

=head2 ( $type, $result ) = $cass->use_keyspace( $keyspace )

A convenient shortcut to the C<USE $keyspace> query which escapes the keyspace
name.

=cut

sub use_keyspace
{
   my $self = shift;
   my ( $keyspace ) = @_;

   # CQL's "quoting" handles any character except quote marks, which have to
   # be doubled
   $keyspace =~ s/"/""/g;

   $self->query( qq(USE "$keyspace"), 0 );
}

=head1 SPONSORS

This code was paid for by

=over 2

=item *

Perceptyx L<http://www.perceptyx.com/>

=item *

Shadowcat Systems L<http://www.shadow.cat>

=back

=head1 AUTHOR

Paul Evans <leonerd@leonerd.org.uk>

=cut

0x55AA;