package POE::Component::EasyDBI::SubProcess;
use strict;
use warnings FATAL => 'all';
# Initialize our version
our $VERSION = '1.23';
# Use Error.pm's try/catch semantics
use Error qw( :try );
# We pass in data to POE::Filter::Reference
use POE::Filter::Reference;
# We run the actual DB connection here
use DBI;
sub new {
my ($class, $opts) = @_;
my $obj = bless($opts, $class);
$obj->{queue} = [];
$obj->{ping_timeout} = $obj->{ping_timeout} || 0;
return $obj;
}
# This is the subroutine that will get executed upon the fork() call by our parent
sub main {
if ( $^O eq 'MSWin32' ) {
binmode(STDIN); binmode(STDOUT);
}
# Autoflush to avoid weirdness
#select((select(STDOUT), $| = 1)[0]);
select(STDOUT); $|++;
select(STDERR); $|++;
$SIG{__WARN__} = 'DEFAULT';
$SIG{__DIE__} = 'DEFAULT';
my $self;
# check for alternate fork
if ($_[0] == 1) {
# we need to read in the first
my $filter = POE::Filter::Reference->new();
my $opts;
# get our first option hashref
while ( sysread( STDIN, my $buffer = '', 1024 ) ) {
$opts = $filter->get( [ $buffer ] );
last if (defined $opts);
}
$self = __PACKAGE__->new(splice(@{$opts},0,1));
$self->{filter} = $filter;
if (@{$opts}) {
push(@{$self->{queue}},@{$opts});
}
undef $filter;
} else {
$self = __PACKAGE__->new(shift);
$self->{filter} = POE::Filter::Reference->new();
}
$self->{0} = $0 = "$0 ".__PACKAGE__;
$self->{lastpingtime} = time();
unless (defined($self->{sig_ignore_off})) {
$SIG{INT} = $SIG{TERM} = $SIG{HUP} = 'IGNORE';
}
# if (defined($self->{use_cancel})) {
# Signal INT causes query cancel
# XXX disabled for now
#$SIG{INT} = sub { if ($sth) { $sth->cancel; } };
# }
while (!$self->connect()) { }
$self->pt("connected at ".localtime());
return if ($self->{done});
# check for data in queue first
$self->process();
if ($self->{done}) {
$self->pt("disconnected at ".localtime());
if ($self->{dbh}) {
$self->{dbh}->disconnect();
}
return;
}
# listen for commands from our parent
READ: while ( sysread( STDIN, my $buffer = '', 1024 ) ) {
# Feed the line into the filter
# and put the data in the queue
my $d = $self->{filter}->get( [ $buffer ] );
push(@{$self->{queue}},@$d) if ($d);
# INPUT STRUCTURE IS:
# $d->{action} = SCALAR -> WHAT WE SHOULD DO
# $d->{sql} = SCALAR -> THE ACTUAL SQL
# $d->{placeholders} = ARRAY -> PLACEHOLDERS WE WILL USE
# $d->{id} = SCALAR -> THE QUERY ID ( FOR PARENT TO KEEP TRACK OF WHAT IS WHAT )
# $d->{primary_key} = SCALAR -> PRIMARY KEY FOR A HASH OF HASHES
# $d->{last_insert_id} = SCALAR|HASH -> HASH REF OF TABLE AND FIELD OR SCALAR OF A QUERY TO RUN AFTER
# and others..
# process all in the queue until a problem occurs or done
REDO:
unless ($self->process()) {
last READ if ($self->{done});
# oops problem...
if ($self->{reconnect}) {
# need to reconnect
delete $self->{reconnect};
# keep trying to connect
while (!$self->connect()) { }
# and bail when we are told
last READ if ($self->{done});
goto REDO;
}
}
}
# Arrived here due to error in sysread/etc
if ($self->{dbh}) {
$self->{dbh}->disconnect();
delete $self->{dbh};
}
# debug
# require POE::API::Peek;
# my $p = POE::API::Peek->new();
# my @sessions = $p->session_list();
# require Data::Dumper;
# open(FH,">db.txt");
# print FH Data::Dumper->Dump([\@sessions]);
# close(FH);
}
sub pt {
$0 = shift->{0}.' '.shift;
}
sub connect {
my $self = shift;
$self->{output} = undef;
$self->{error} = undef;
# Actually make the connection
try {
$self->{dbh} = DBI->connect(
# The DSN we just set up
(map { $self->{$_} } qw( dsn username password )),
# We set some configuration stuff here
{
((ref($self->{options}) eq 'HASH') ? %{$self->{options}} : ()),
# quiet!!
'PrintError' => 0,
'PrintWarn' => 0,
# Automatically raise errors so we can catch them with try/catch
'RaiseError' => 1,
# Disable the DBI tracing
'TraceLevel' => 0,
},
);
# Check for undefined-ness
if (!defined($self->{dbh})) {
die "Error Connecting to Database: $DBI::errstr";
}
} catch Error with {
$self->output( $self->make_error( 'DBI', shift ) );
};
# Catch errors!
if ($self->{error} && $self->{no_connect_failures}) {
sleep($self->{reconnect_wait}) if ($self->{reconnect_wait});
return 0;
} elsif ($self->{error}) {
# QUIT
$self->{done} = 1;
return 1;
}
# if ($self->{dsn} =~ m/SQLite/ && $self->{options}
# && ref($self->{options}) eq 'HASH' && $self->{options}->{AutoCommit}) {
# # TODO error checking
# $self->db_do({ sql => 'BEGIN', id => -1 });
# delete $self->{output};
# }
# send connect notice
$self->output({ id => 'DBI-CONNECTED' });
return 1;
}
sub process {
my $self = shift;
return 0 unless (@{$self->{queue}});
# Process each data structure
foreach my $input (shift(@{$self->{queue}})) {
$input->{action} = lc($input->{action});
# Now, we do the actual work depending on what kind of query it was
if ($input->{action} eq 'exit') {
# Disconnect!
$self->{done} = 1;
return 0;
}
my $now = time();
my $needping = (($self->{ping_timeout} == 0 or $self->{ping_timeout} > 0)
and (($now - $self->{lastpingtime}) >= $self->{ping_timeout})) ? 1 : 0;
if ($self->{dbh}) {
# Don't work:
# unless ($self->{dbh}->{Active}) {
# # put the query back on the stack
# unshift(@{$self->{queue}},$input);
# # and reconnect
# $self->{dbh}->disconnect();
# $self->{reconnect} = 1;
# return 0;
# }
if ($needping) {
if (eval{ $self->{dbh}->ping(); }) {
$self->pt("pinged at ".localtime());
$self->{lastpingtime} = $now;
} else {
# put the query back on the stack
unshift(@{$self->{queue}},$input);
# and reconnect
$self->{dbh}->disconnect();
$self->{reconnect} = 1;
return 0;
}
}
#} elsif (!$self->{dbh}) {
} else {
#die "Database gone? : $DBI::errstr";
# put the query back on the stack
unshift(@{$self->{queue}},$input);
# and reconnect
eval { $self->{dbh}->disconnect(); };
$self->{reconnect} = 1;
return 0;
}
if (defined($self->{no_cache}) && !defined($input->{no_cache})) {
$input->{no_cache} = $self->{no_cache};
}
if (defined($input->{sql})) {
# remove beginning whitespace
$input->{sql} =~ s/^\s*//;
}
if ( $input->{action} =~ m/^(func|commit|rollback|begin_work)$/ ) {
$input->{method} = $input->{action};
$self->do_method( $input );
} elsif ( $input->{action} eq 'method') {
# Special command to do $dbh->$method->()
$self->do_method( $input );
} elsif ( $input->{action} eq 'insert' ) {
# Fire off the SQL and return success/failure + rows affected and insert id
$self->db_insert( $input );
} elsif ( $input->{action} eq 'do' ) {
# Fire off the SQL and return success/failure + rows affected
$self->db_do( $input );
} elsif ( $input->{action} eq 'single' ) {
# Return a single result
$self->db_single( $input );
} elsif ( $input->{action} eq 'quote' ) {
$self->db_quote( $input );
} elsif ( $input->{action} eq 'arrayhash' ) {
# Get many results, then return them all at the same time in a array of hashes
$self->db_arrayhash( $input );
} elsif ( $input->{action} eq 'hashhash' ) {
# Get many results, then return them all at the same time in a hash of hashes
# on a primary key of course. the columns are returned in the cols key
$self->db_hashhash( $input );
} elsif ( $input->{action} eq 'hasharray' ) {
# Get many results, then return them all at the same time in a hash of arrays
# on a primary key of course. the columns are returned in the cols key
$self->db_hasharray( $input );
} elsif ( $input->{action} eq 'array' ) {
# Get many results, then return them all at the same time in an array of comma seperated values
$self->db_array( $input );
} elsif ( $input->{action} eq 'arrayarray' ) {
# Get many results, then return them all at the same time in an array of arrays
$self->db_arrayarray( $input );
} elsif ( $input->{action} eq 'hash' ) {
# Get many results, then return them all at the same time in a hash keyed off the
# on a primary key
$self->db_hash( $input );
} elsif ( $input->{action} eq 'keyvalhash' ) {
# Get many results, then return them all at the same time in a hash with
# the first column being the key and the second being the value
$self->db_keyvalhash( $input );
} else {
# Unrecognized action!
$self->{output} = $self->make_error( $input->{id}, "Unknown action sent '$input->{id}'" );
}
# XXX another way?
if ($input->{id} eq 'DBI' || ($self->{output}->{error}
&& ($self->{output}->{error} =~ m/no connection to the server/i
|| $self->{output}->{error} =~ m/server has gone away/i
|| $self->{output}->{error} =~ m/server closed the connection/i
|| $self->{output}->{error} =~ m/connect failed/i))) {
unshift(@{$self->{queue}},$input);
eval { $self->{dbh}->disconnect(); };
$self->{reconnect} = 1;
return 0;
}
$self->output;
}
return 1;
}
sub commit {
my $self = shift;
my $id = shift->{id};
try {
$self->{dbh}->commit;
} catch Error with {
$self->{output} = $self->make_error( $id, shift );
};
return ($self->{output}) ? 0 : 1;
}
sub begin_work {
my $self = shift;
my $id = shift->{id};
try {
$self->{dbh}->begin_work;
} catch Error with {
$self->{output} = $self->make_error( $id, shift );
};
return ($self->{output}) ? 0 : 1;
}
# This subroutine makes a generic error structure
sub make_error {
my $self = shift;
# Make the structure
my $data = { id => shift };
# Get the error, and stringify it in case of Error::Simple objects
my $error = shift;
if (ref($error) && ref($error) eq 'Error::Simple') {
$data->{error} = $error->text;
} else {
$data->{error} = $error;
}
if ($data->{error} =~ m/has gone away/i || $data->{error} =~ m/lost connection/i) {
$data->{id} = 'DBI';
}
$self->{error} = $data;
# All done!
return $data;
}
# This subroute is for supporting any type of $dbh->$method->() calls
sub do_method {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# The result
my $result = undef;
my $method = $data->{method};
my $dbh = $self->{dbh};
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
if ($data->{args} && ref($data->{args}) eq 'ARRAY') {
$result = $dbh->$method(@{$data->{args}});
} else {
$result = $dbh->$method();
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { result => $result, id => $data->{id} };
}
return;
}
# This subroutine does a DB QUOTE
sub db_quote {
my $self = shift;
# Get the input structure
my $data = shift;
# The result
my $quoted = undef;
# Quote it!
try {
$quoted = $self->{dbh}->quote( $data->{sql} );
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
# Check for errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { result => $quoted, id => $data->{id} };
}
return;
}
# This subroutine runs a 'SELECT ... LIMIT 1' style query on the db
sub db_single {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = undef;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "SINGLE is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# Actually do the query!
try {
# There are warnings when joining a NULL field, which is undef
no warnings;
if (exists($data->{separator})) {
$result = join($data->{separator},$sth->fetchrow_array());
} else {
$result = $sth->fetchrow_array();
}
} catch Error with {
die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { result => $result, id => $data->{id} };
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
# This subroutine does an insert into the db
sub db_insert {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
my $dsn = $self->{dsn} || '';
# Variables we use
my $sth = undef;
my $rows = undef;
my @queries;
my @placeholders;
# XXX depricate hash for insert
if (defined($data->{hash}) && !defined($data->{insert})) {
$data->{insert} = delete $data->{hash};
}
if (defined($data->{insert}) && ref($data->{insert}) eq 'HASH') {
$data->{insert} = [$data->{insert}];
}
# Check if this is a non-insert statement
if (defined($data->{insert}) && ref($data->{insert}) eq 'ARRAY') {
delete $data->{placeholders};
delete $data->{sql};
foreach my $hash (@{$data->{insert}}) {
# sort so we always get a consistant list of fields in the errors and placeholders
my @fields = sort keys %{$hash};
# adjust the placeholders, they should know that placeholders passed in are irrelevant
# XXX subtypes when a hash value is a HASH or ARRAY?
push(@placeholders,[ map { $hash->{$_} } @fields ]);
push(@queries,"INSERT INTO $data->{table} ("
.join(',',@fields).") VALUES (".join(',',(map { '?' } @fields)).")");
}
} elsif (!defined($data->{sql}) || $data->{sql} !~ /^INSERT/i ) {
$self->{output} = $self->make_error( $data->{id}, "INSERT is for INSERTS only! ( $data->{sql} )" );
return;
} else {
push(@queries,$data->{sql});
push(@placeholders,$data->{placeholders});
}
for my $i ( 0 .. $#queries ) {
$data->{sql} = $queries[$i];
$data->{placeholders} = $placeholders[$i];
my $do_last = 0;
if ($data->{begin_work} && $i == 0) {
$self->begin_work($data) or last;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$rows += $sth->execute( @{ $data->{placeholders} } );
} catch Error with {
if (defined($sth->errstr)) {
die $sth->errstr;
} else {
die "error when trying to execute bind of placeholders in insert: $_[0]";
}
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
} catch Error with {
my $e = shift;
$self->{output} = $self->make_error( $data->{id}, "failed at query #$i : $e" );
$do_last = 1; # can't use last here
};
last if ($do_last);
}
if ($data->{commit} && defined($rows) && !defined($self->{output})) {
$self->commit($data);
}
# If rows is not undef, that means we were successful
if (defined($rows) && !defined($self->{output})) {
# Make the data structure
$self->{output} = { rows => $rows, result => $rows, id => $data->{id} };
unless ($data->{last_insert_id}) {
if (defined($sth)) {
$sth->finish();
}
return;
}
# get the last insert id
try {
my $qry = '';
if (ref($data->{last_insert_id}) eq 'HASH') {
my $l = $data->{last_insert_id};
# checks for different database types
if ($dsn =~ m/dbi:pg/i) {
$qry = "SELECT $l->{field} FROM $l->{table} WHERE oid='".$sth->{'pg_oid_status'}."'";
} elsif ($dsn =~ m/dbi:mysql/i) {
if (defined($self->{dbh}->{'mysql_insertid'})) {
$self->{output}->{insert_id} = $self->{dbh}->{'mysql_insertid'};
} else {
$qry = 'SELECT LAST_INSERT_ID()';
}
} elsif ($dsn =~ m/dbi:oracle/i) {
$qry = "SELECT $l->{field} FROM $l->{table}";
} elsif ($dsn =~ /dbi:sqlite/i) {
$self->{output}->{insert_id} = $self->{dbh}->func('last_insert_rowid');
} else {
die "EasyDBI doesn't know how to handle a last_insert_id with your dbi, contact the author.";
}
} else {
# they are supplying thier own query
$qry = $data->{last_insert_id};
}
if (defined($sth)) {
$sth->finish();
}
if ($qry) {
try {
$self->{output}->{insert_id} = $self->{dbh}->selectrow_array($qry);
} catch Error with {
die $sth->error;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
} catch Error with {
# special case, insert was ok, but last_insert_id errored
$self->{output}->{error} = shift;
};
} elsif (!defined($rows) && !defined($self->{output})) {
# Internal error...
$self->{output} = $self->make_error( $data->{id}, 'Internal Error in db_do of EasyDBI Subprocess' );
#die 'Internal Error in db_do';
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
# This subroutine runs a 'DO' style query on the db
sub db_do {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $rows = undef;
# Check if this is a non-select statement
# if ( $data->{sql} =~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "DO is for non-SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$rows += $sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
if ($data->{commit} && defined($rows) && !defined($self->{output})) {
$self->commit($data);
}
# If rows is not undef, that means we were successful
if (defined($rows) && !defined($self->{output})) {
# Make the data structure
$self->{output} = { rows => $rows, result => $rows, id => $data->{id} };
} elsif (!defined($rows) && !defined($self->{output})) {
# Internal error...
$self->{output} = $self->make_error( $data->{id}, 'Internal Error in db_do of EasyDBI Subprocess' );
#die 'Internal Error in db_do';
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_arrayhash {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = [];
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "ARRAYHASH is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# my $newdata;
#
# # Bind the columns
# try {
# $sth->bind_columns( \( @$newdata{ @{ $sth->{'NAME_lc'} } } ) );
# } catch Error with {
# die $sth->errstr;
# };
# Actually do the query!
try {
while ( my $hash = $sth->fetchrow_hashref() ) {
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = [];
$rows = 0;
}
$rows++;
# Copy the data, and push it into the array
push( @{ $result }, { %{ $hash } } );
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = { rows => $rows, id => $data->{id}, result => $result, chunked => $data->{chunked} };
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $sth->errstr;
};
# XXX is dbh->err the same as sth->err?
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, id => $data->{id}, result => $result };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_hashhash {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = {};
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "HASHHASH is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
my (@cols, %col);
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# The result hash
my $newdata = {};
# Check the primary key
my $foundprimary = 0;
# default to the first one
unless (defined($data->{primary_key})) {
$data->{primary_key} = 1;
}
if ($data->{primary_key} =~ m/^\d+$/) {
# primary_key can be a 1 based index
if ($data->{primary_key} > $sth->{NUM_OF_FIELDS}) {
# die "primary_key ($data->{primary_key}) is out of bounds (".$sth->{NUM_OF_FIELDS}.")";
die "primary_key ($data->{primary_key}) is out of bounds";
}
$data->{primary_key} = $sth->{NAME}->[($data->{primary_key}-1)];
}
# Find the column names
for my $i ( 0 .. $sth->{NUM_OF_FIELDS}-1 ) {
$col{$sth->{NAME}->[$i]} = $i;
push(@cols, $sth->{NAME}->[$i]);
$foundprimary = 1 if ($sth->{NAME}->[$i] eq $data->{primary_key});
}
unless ($foundprimary == 1) {
die "primary key ($data->{primary_key}) not found";
}
# Actually do the query!
try {
while ( my @row = $sth->fetchrow_array() ) {
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = {};
$rows = 0;
}
$rows++;
foreach (@cols) {
$result->{$row[$col{$data->{primary_key}}]}{$_} = $row[$col{$_}];
}
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = {
rows => $rows,
result => $result,
id => $data->{id},
cols => [ @cols ],
chunked => $data->{chunked},
primary_key => $data->{primary_key}
};
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, id => $data->{id}, result => $result, cols => [ @cols ], primary_key => $data->{primary_key} };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_hasharray {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = {};
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "HASHARRAY is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
my (@cols, %col);
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# The result hash
my $newdata = {};
# Check the primary key
my $foundprimary = 0;
if ($data->{primary_key} =~ m/^\d+$/) {
# primary_key can be a 1 based index
if ($data->{primary_key} > $sth->{NUM_OF_FIELDS}) {
# die "primary_key ($data->{primary_key}) is out of bounds (".$sth->{NUM_OF_FIELDS}.")";
die "primary_key ($data->{primary_key}) is out of bounds";
}
$data->{primary_key} = $sth->{NAME}->[($data->{primary_key}-1)];
}
# Find the column names
for my $i ( 0 .. $sth->{NUM_OF_FIELDS}-1 ) {
$col{$sth->{NAME}->[$i]} = $i;
push(@cols, $sth->{NAME}->[$i]);
$foundprimary = 1 if ($sth->{NAME}->[$i] eq $data->{primary_key});
}
unless ($foundprimary == 1) {
die "primary key ($data->{primary_key}) not found";
}
# Actually do the query!
try {
while ( my @row = $sth->fetchrow_array() ) {
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = {};
$rows = 0;
}
$rows++;
push(@{ $result->{$row[$col{$data->{primary_key}}]} }, @row);
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id}, cols => [ @cols ], chunked => $data->{chunked}, primary_key => $data->{primary_key} };
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id}, cols => [ @cols ], primary_key => $data->{primary_key} };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_array {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = [];
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "ARRAY is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# The result hash
my $newdata = {};
# Actually do the query!
try {
# There are warnings when joining a NULL field, which is undef
no warnings;
while ( my @row = $sth->fetchrow_array() ) {
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = [];
$rows = 0;
}
$rows++;
if (exists($data->{separator})) {
push(@{$result},join($data->{separator},@row));
} else {
push(@{$result},join(',',@row));
}
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id}, chunked => $data->{chunked} };
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $!;
#die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id} };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_arrayarray {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = [];
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "ARRAYARRAY is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# The result hash
my $newdata = {};
# Actually do the query!
try {
while ( my @row = $sth->fetchrow_array() ) {
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = [];
$rows = 0;
}
$rows++;
# There are warnings when joining a NULL field, which is undef
push(@{$result},\@row);
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id}, chunked => $data->{chunked} };
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $!;
#die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id} };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_hash {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = {};
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "HASH is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# The result hash
my $newdata = {};
# Actually do the query!
try {
my @row = $sth->fetchrow_array();
if (@row) {
$rows = @row;
for my $i ( 0 .. $sth->{NUM_OF_FIELDS}-1 ) {
$result->{$sth->{NAME}->[$i]} = $row[$i];
}
}
} catch Error with {
die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift );
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id} };
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
sub db_keyvalhash {
# Get the dbi handle
my $self = shift;
# Get the input structure
my $data = shift;
# Variables we use
my $sth = undef;
my $result = {};
my $rows = 0;
# Check if this is a non-select statement
# if ( $data->{sql} !~ /^SELECT/i ) {
# $self->{output} = $self->make_error( $data->{id}, "KEYVALHASH is for SELECT queries only! ( $data->{sql} )" );
# return;
# }
SWITCH: {
if ($data->{begin_work}) {
$self->begin_work($data) or last SWITCH;
}
# Catch any errors
try {
# Make a new statement handler and prepare the query
if ($data->{no_cache}) {
$sth = $self->{dbh}->prepare( $data->{sql} );
} else {
# We use the prepare_cached method in hopes of hitting a cached one...
$sth = $self->{dbh}->prepare_cached( $data->{sql} );
}
# Check for undef'ness
if (!defined($sth)) {
die 'Did not get a statement handler';
} else {
# Execute the query
try {
$sth->execute( @{ $data->{placeholders} } );
} catch Error with {
die ( defined($sth->errstr) ? $sth->errstr : $@ );
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
}
# Actually do the query!
try {
while (my @row = $sth->fetchrow_array()) {
if ($#row < 1) {
die 'You need at least 2 columns selected for a keyvalhash query';
}
if (exists($data->{chunked}) && defined($self->{output})) {
# chunk results ready to send
$self->output();
$result = {};
$rows = 0;
}
$rows++;
$result->{$row[0]} = $row[1];
if (exists($data->{chunked}) && $data->{chunked} == $rows) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id}, chunked => $data->{chunked} };
}
}
# in the case that our rows == chunk
$self->{output} = undef;
} catch Error with {
die $sth->errstr;
};
die $self->{dbh}->errstr if ( $self->{dbh}->errstr );
# Check for any errors that might have terminated the loop early
if ( $sth->err() ) {
# Premature termination!
die $sth->errstr;
}
} catch Error with {
$self->{output} = $self->make_error( $data->{id}, shift);
};
}
# Check if we got any errors
if (!defined($self->{output})) {
# Make output include the results
$self->{output} = { rows => $rows, result => $result, id => $data->{id} };
if (exists($data->{chunked})) {
$self->{output}->{last_chunk} = 1;
$self->{output}->{chunked} = $data->{chunked};
}
}
# Finally, we clean up this statement handle
if (defined($sth)) {
$sth->finish();
}
return;
}
# Prints any output to STDOUT
sub output {
my $self = shift;
# Get the data
my $data = shift || undef;
unless (defined($data)) {
$data = $self->{output};
$self->{output} = undef;
# TODO use this at some point
$self->{error} = undef;
}
# Freeze it!
my $outdata = $self->{filter}->put( [ $data ] );
# Print it!
print STDOUT @$outdata;
return;
}
1;
__END__
=head1 NAME
POE::Component::EasyDBI::SubProcess - Backend of POE::Component::EasyDBI
=head1 ABSTRACT
This module is responsible for implementing the guts of POE::Component::EasyDBI.
The fork and the connection to the DBI.
=head2 EXPORT
Nothing.
=head1 SEE ALSO
L<POE::Component::EasyDBI>
L<DBI>
L<POE>
L<POE::Wheel::Run>
L<POE::Filter::Reference>
L<POE::Component::DBIAgent>
L<POE::Component::LaDBI>
L<POE::Component::SimpleDBI>
=head1 AUTHOR
David Davis E<lt>xantus@cpan.orgE<gt>
=head1 CREDITS
Apocalypse E<lt>apocal@cpan.orgE<gt>
=head1 COPYRIGHT AND LICENSE
Copyright 2003-2004 by David Davis and Teknikill Software
This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut