The London Perl and Raku Workshop takes place on 26th Oct 2024. If your company depends on Perl, please consider sponsoring and/or attending.
package DB::Evented;

use 5.006;
use strictures;
use AnyEvent::DBI;

=head1 NAME

DB::Evented - A pragmatic DBI like evented module.

=cut

our $VERSION = '0.06';
our $handlers = [];

=head1 SYNOPSIS

Doing selects in synchronise order is not always the most efficient way to interact with the 
Database. 

  use DB::Evented;

  my $evented = DB::Evented->new("DBI:SQLite2:dbname=$dname", "","");

  my $results;
  $evented->selectcol_arrayref(
    q{
      select
        test1,
        test2
      from
        test
    },
    { 
      Columns => [1,2],
      response => sub {
          $results->{result1} = shift;
      }		
    }
  );
  
  $evented->selectrow_hashref(
    q{
      select
        test1,
        test2
      from
        test
    },
    {
      response => sub {
        $results->{result2} = shift;
      }
    }
  );

  $evented->execute_in_parallel;

=head1 STATIC METHODS

=head2 new ($connection_str, $username, $pass, %dbi_args )

In order to initialize a DB::Evented object a connection_str is most likely required.
See AnyEvent::DBI for more information.

=cut

sub new {
  my $class = shift;
  $class ||= ref $class;
  my ($connection_str, $username, $pass, %dbi_args) = @_;
  return bless {
    connection_str => $connection_str,
    username => $username,
    pass => $pass,
    dbi_args => \%dbi_args,
    _queue => [],
  }, $class;
}

=head1 INSTANCE METHODS

=head2 any_event_handler

This will return an AnyEvent::DBI handler. The key difference between this handler and DBI is that it's using AnyEvent
under the hood. What does this mean? It means that if you use an AnyEvent::DBI method it will run asynchronously.

=cut
sub any_event_handler {
  my $self = shift;
  return AnyEvent::DBI->new($self->{connection_str}, $self->{username}, $self->{pass}, %{$self->{dbi_args}}, on_error => sub {
    $self->clear_queue;
    warn "DBI Error: $@ at $_[1]:$_[2]\n";
  });
}

=head2 clear_handlers

Clears all handlers

=cut

sub clear_handlers {
  $handlers = [];
}

=head2 clear_queue

Clears the queue of any db todos

=cut

sub clear_queue {
  $_[0]->{_queue} = undef;
}

=head2 execute_in_parallel

Will execute all of the queued statements in parallel. This will create a pool of handlers and cache them if necessary.

=cut

sub execute_in_parallel {
  my $self = shift;
  if ( scalar @{$self->{_queue}} ) {
    # Setup a pool of handlers
    # TODO: Make this more intelligent to shrink
    if ( ! scalar @{$handlers} || ( scalar @{$handlers} < scalar @{$self->{_queue}} )) {
      while ( scalar @{$handlers} < scalar @{$self->{_queue}} ) {
        push @{$handlers}, $self->any_event_handler;
      }
    }
    $self->{cv} = AnyEvent->condvar;
    my $count = 0;
    for my $item ( @{$self->{_queue}} ) {
      my $cb = pop @$item;
      my $callback_wrapper = sub { 
        my ($dbh, $result) = @_;
        $cb->($result, $dbh);
        $self->{cv}->end;
      };
      my $req_method = pop @$item;
      my $line = pop @$item;
      my $file = pop @$item;
      $self->{cv}->begin;
      $handlers->[$count]->_req($callback_wrapper, $line, $file, $req_method, @$item);
      $count++;
    }
    $self->{cv}->recv;
    delete $self->{cv};
    $self->clear_queue;
  }
  return;
}

sub _add_to_queue {
  my ( $self, $sql, $attr, $key_field, @args) = @_;

  my $cb = delete $attr->{response};
  my $item = [$sql, $attr, $key_field, @args, __PACKAGE__ . '::_req_dispatch', $cb]; 

  push @{$self->{_queue}}, $item;
}

sub _req_dispatch {
  my (undef, $st, $attr, $key_field, @args) = @{+shift};
  my $method_name = pop @args;
  my $result = $AnyEvent::DBI::DBH->$method_name($key_field ? ($st, $key_field, $attr, @args) : ($st, $attr, @args) );
  [1, $result ? $result : undef];
}

=head2 selectall_arrayref ($sql, \%attr, @binds )

This method functions in the same way as DBI::selectall_arrayref. The key difference
being it delays the execution until execute_in_parallel has been called. The results
can be accessed in the response attribute call back 

=cut

=head2 selectall_hashref ($sql, $key_field, \%attr, @binds )

This method functions in the same way as DBI::selectall_hashref. The key difference
being it delays the execution until execute_in_parallel has been called. The results
can be accessed in the response attribute call back 

=cut

=head2 selectrow_arrayref ($sql, \%attr, @binds )

This method functions in the same way as DBI::selectrow_arrayref. The key difference
being it delays the execution until execute_in_parallel has been called. The results
can be accessed in the response attribute call back 

=cut

=head2 selectrow_hashref ($sql, \%attr, @binds )

This method functions in the same way as DBI::selectrow_hashref. The key difference
being it delays the execution until execute_in_parallel has been called. The results
can be accessed in the response attribute call back 

=cut

for my $method_name ( qw(selectrow_hashref selectcol_arrayref selectall_hashref selectall_arrayref) ) {
  no strict 'refs';
  *{$method_name} = sub {
    my $self = shift;
    my ($sql, $key_field, $attr, @args) = (shift, ($method_name eq 'selectall_hashref' ? (shift) : (undef)), shift, @_);
    $self->_add_to_queue($sql, $attr, $key_field, @args, $method_name, (caller)[1,2]);
  };
}

# TODO: Investigate if this is the bet way to handle this.
# The child processes are technically held by AnyEvent::DBI
# by clearing the known handlers these children *should* be reaped
sub DESTROY {
  my $error = do {
    local $@;
    eval {
      DB::Evented->clear_handlers;
    };
    $@;
  };
  $? = 0 unless $error;
}

=head1 AUTHOR

Logan Bell, C<< <logie at cpan.org> >>

=head1 SUPPORT

You can find documentation for this module with the perldoc command.

    perldoc DB::Evented

You can also look for information at:

=head1 ACKNOWLEDGEMENTS

Aaron Cohen and Belden Lyman.

=head1 LICENSE

Copyright (c) 2013 Logan Bell and Shutterstock Inc (http://shutterstock.com).  All rights reserved.  This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself.

=cut

1; # End of DB::Evented