package Net::Async::PostgreSQL::Client; { $Net::Async::PostgreSQL::Client::VERSION = '0.007'; } use strict; use warnings; use Protocol::PostgreSQL::Client '0.008'; use parent qw{IO::Async::Protocol::Stream}; use Scalar::Util (); =head1 NAME Net::Async::PostgreSQL - support for the PostgreSQL wire protocol =head1 VERSION version 0.007 =head1 SYNOPSIS # Simple queries are performed similar to DBI: $dbh->do(q{insert into something (x,y,z) values (1,2,3)}); # These can also use bind variables: $dbh->do(q{insert into something (x,y,z) values (?,?,?)}, undef, 1,2,3); # Prepared statements work the same as DBI by default my $sth = $dbh->prepare(q{select * from table where name = ?}); $sth->bind_param(1, 'test'); $sth->execute; # ... but have async_ versions for passing handlers: my $sth = $dbh->async_prepare( sql => q{select * from table where name = ?}, on_error => sub { warn "failed" } ); $sth->async_execute( on_bind_request => sub { return @param; }, on_header => sub { ... }, on_row => sub { ... }, on_error => sub { ... }, on_complete => sub { ... }, ); # And there's a helper method for doing regular queries: $dbh->run_query( sql => q{select * from something where id = ?}, parameters => [1], on_row => sub { warn "Had " . $_[1]->{} }, on_error => sub { warn "Error encountered" }, on_complete => sub { warn "all done" } ); =head1 DESCRIPTION The interface is provided by L, which attempts to offer something close to L but with support for event-based request handling. See L for more details. =cut use Socket qw(SOCK_STREAM); BEGIN { foreach my $k (qw( send_message has_queued is_authenticated is_first_message initial_request queue send_next_in_queue message debug handle_message message_length simple_query copy_data copy_done backend_state active_statement prepare prepare_async row_description is_ready send_copy_data add_handler_for_event )) { no strict 'refs'; *{join '::', __PACKAGE__, $k} = sub { shift->pg->$k(@_) }; } } =head1 METHODS =cut sub new { my $class = shift; my %args = @_; # Clear any options that will cause the parent class to complain my $loop = delete $args{loop}; # Want the IO::Async::Protocol constructor, so SUPER is good enough for us here my $self = $class->SUPER::new; $self->pg->add_handler_for_event(send_request => $self->_capture_weakself(sub { my ($self) = splice @_, 0, 2; # ignore pg object $self->write(@_); return 1; })); $self->configure(%args); # Automatically add to the event loop if we were passed one $loop->add($self) if $loop; return $self; } sub sap { my ($self, $sub) = @_; Scalar::Util::weaken $self; return sub { $self->$sub(@_); }; } =head2 configure Apply callbacks and other parameters, preparing state for event loop start. =cut sub configure { my $self = shift; my %args = @_; # Debug flag is used to control the copious amounts of data that we dump out when tracing $self->{debug} = $args{debug} if exists $args{debug}; # %args = $self->pg->configure(%args); foreach (qw{host service}) { $self->{$_} = delete $args{$_} if exists $args{$_}; } %args = $self->pg->configure(%args); $self->SUPER::configure(%args); return $self; } sub pg { my $self = shift; if(@_) { $self->{pg} = shift; return $self; } $self->{pg} ||= Protocol::PostgreSQL::Client->new; return $self->{pg}; } =head2 on_connection_established Prepare and activate a new transport. =cut sub on_connection_established { my $self = shift; my $sock = shift; my $transport = IO::Async::Stream->new(handle => $sock) or die "No transport?"; $self->configure(transport => $transport); $self->debug("Have transport " . $self->transport); } =head2 on_starttls Upgrade the underlying stream to use TLS. =cut sub on_starttls { my $self = shift; $self->debug("Upgrading to TLS"); require IO::Async::SSLStream; $self->SSL_upgrade( on_upgraded => $self->_capture_weakself(sub { my ($self) = @_; $self->debug("TLS upgrade complete"); $self->{tls_enabled} = 1; $self->initial_request; }), on_error => sub { die "error @_"; } ); } =head2 connect =cut sub connect { my $self = shift; my %args = @_; my $on_connected = delete $args{on_connected}; my $host = exists $args{host} ? delete $args{host} : $self->{host}; $self->pg->add_handler_for_event(password => sub { my $self = shift; $self->send_message('PasswordMessage', password => $self->{pass}); return 0; # single-shot event }); $self->SUPER::connect( %args, service => $args{service} || $self->{service} || 5432, host => $host, socktype => SOCK_STREAM, on_resolve_error => sub { die "Resolution failed for $host"; }, on_connect_error => sub { die "Could not connect to $host"; }, on_connected => $self->sap(sub { my $self = shift; my ($pg, $sock) = @_; $self->pg->initial_request; $on_connected->($self) if $on_connected; }) ); } =head2 on_read Handle read requests by passing full packets back to the protocol handler. =cut sub on_read { my $self = shift; my ($buffref, $eof) = @_; return 0 unless length($$buffref) >= 5; my ($code, $size) = unpack('C1N1', $$buffref); if(length($$buffref) >= $size+1) { $self->pg->handle_message(substr $$buffref, 0, $size+1, ''); return 1; } return 0; } =head2 do =cut sub do { my $self = shift; my ($sql, $attrib, @param) = @_; $self->simple_query($sql); return $self; } sub on_password { my $self = shift; } =head2 terminate Sends the Terminate message to the database server and closes the connection for a clean shutdown. =cut sub terminate { my $self = shift; return unless $self->transport; my $transport = $self->transport; Scalar::Util::weaken(my $loop = $transport->get_loop); # TODO could just ->close_when_empty? $transport->configure(on_outgoing_empty => $self->_capture_weakself(sub { my $self = shift; $self->close; $loop->later($self->_capture_weakself(sub { $self->pg->invoke_event('closed'); })); })); $self->send_message('Terminate'); return $self; } 1; __END__ =head1 SEE ALSO =over 4 =item * L - the real database interface =item * L - proxy request support for DBI =back =head1 AUTHOR Tom Molesworth =head1 LICENSE Copyright Tom Molesworth 2011. Licensed under the same terms as Perl itself.