#!/usr/bin/perl package KiokuDB::Backend::DBI; BEGIN { $KiokuDB::Backend::DBI::AUTHORITY = 'cpan:NUFFIN'; } { $KiokuDB::Backend::DBI::VERSION = '1.21'; } use Moose; use Moose::Util::TypeConstraints; use MooseX::Types -declare => [qw(ValidColumnName SchemaProto)]; use MooseX::Types::Moose qw(ArrayRef HashRef Str Defined); use Moose::Util::TypeConstraints qw(enum); use Try::Tiny; use Data::Stream::Bulk::DBI; use SQL::Abstract; use JSON; use Scalar::Util qw(weaken refaddr); use KiokuDB::Backend::DBI::Schema; use KiokuDB::TypeMap; use KiokuDB::TypeMap::Entry::DBIC::Row; use KiokuDB::TypeMap::Entry::DBIC::ResultSource; use KiokuDB::TypeMap::Entry::DBIC::ResultSet; use KiokuDB::TypeMap::Entry::DBIC::Schema; use namespace::clean -except => 'meta'; with qw( KiokuDB::Backend KiokuDB::Backend::Serialize::Delegate KiokuDB::Backend::Role::Clear KiokuDB::Backend::Role::TXN KiokuDB::Backend::Role::Scan KiokuDB::Backend::Role::Query::Simple KiokuDB::Backend::Role::Query::GIN KiokuDB::Backend::Role::Concurrency::POSIX KiokuDB::Backend::Role::GC Search::GIN::Extract::Delegate ); # KiokuDB::Backend::Role::TXN::Nested is not supported by many DBs # we don't really care though my @std_cols = qw(id class root tied); my @reserved_cols = ( @std_cols, 'data' ); my %reserved_cols = ( map { $_ => 1 } @reserved_cols ); subtype ValidColumnName, as Str, where { not exists $reserved_cols{$_} }; subtype SchemaProto, as Defined, where { Class::MOP::load_class($_) unless ref; !ref($_) || blessed($_) and $_->isa("DBIx::Class::Schema::KiokuDB"); }; sub new_from_dsn { my ( $self, $dsn, @args ) = @_; @args = %{ $args[0] } if @args == 1 and ref $args[0] eq 'HASH'; $self->new( dsn => "dbi:$dsn", @args ); } sub BUILD { my $self = shift; $self->schema; # connect early if ( $self->create ) { $self->create_tables; } } has '+serializer' => ( default => "json" ); # to make dumps readable has json => ( isa => "Object", is => "ro", default => sub { JSON->new }, ); has create => ( isa => "Bool", is => "ro", default => 0, ); has 'dsn' => ( isa => "Str|CodeRef", is => "ro", ); has [qw(user password)] => ( isa => "Str", is => "ro", ); has dbi_attrs => ( isa => HashRef, is => "ro", ); has mysql_strict => ( isa => "Bool", is => "ro", default => 1, ); has sqlite_sync_mode => ( isa => enum([qw(0 1 2 OFF NORMAL FULL off normal full)]), is => "ro", predicate => "has_sqlite_fsync_mode", ); has on_connect_call => ( isa => "ArrayRef", is => "ro", lazy_build => 1, ); sub _build_on_connect_call { my $self = shift; my @call; if ( $self->mysql_strict ) { push @call, sub { my $storage = shift; if ( $storage->can("connect_call_set_strict_mode") ) { $storage->connect_call_set_strict_mode; } }; }; if ( $self->has_sqlite_fsync_mode ) { push @call, sub { my $storage = shift; if ( $storage->sqlt_type eq 'SQLite' ) { $storage->dbh_do(sub { $_[1]->do("PRAGMA synchronous=" . $self->sqlite_sync_mode) }); } }; } return \@call; } has dbic_attrs => ( isa => "HashRef", is => "ro", lazy_build => 1, ); sub _build_dbic_attrs { my $self = shift; return { on_connect_call => $self->on_connect_call, }; } has connect_info => ( isa => ArrayRef, is => "ro", lazy_build => 1, ); sub _build_connect_info { my $self = shift; return [ $self->dsn, $self->user, $self->password, $self->dbi_attrs, $self->dbic_attrs ]; } has schema => ( isa => "DBIx::Class::Schema", is => "ro", lazy_build => 1, init_arg => "connected_schema", handles => [qw(deploy kiokudb_entries_source_name)], ); has _schema_proto => ( isa => SchemaProto, is => "ro", init_arg => "schema", default => "KiokuDB::Backend::DBI::Schema", ); has schema_hook => ( isa => "CodeRef|Str", is => "ro", predicate => "has_schema_hook", ); sub _build_schema { my $self = shift; my $schema = $self->_schema_proto->clone; unless ( $schema->kiokudb_entries_source_name ) { $schema->define_kiokudb_schema( extra_entries_columns => $self->columns ); } if ( $self->has_schema_hook ) { my $h = $self->schema_hook; $self->$h($schema); } $schema->connect(@{ $self->connect_info }); } has storage => ( isa => "DBIx::Class::Storage::DBI", is => "rw", lazy_build => 1, handles => [qw(dbh_do)], ); sub _build_storage { shift->schema->storage } has for_update => ( isa => "Bool", is => "ro", default => 1, ); has _for_update => ( isa => "Bool", is => "ro", lazy_build => 1, ); sub _build__for_update { my $self = shift; return ( $self->for_update and $self->storage->sqlt_type =~ /^(?:MySQL|Oracle|PostgreSQL)$/ ); } has columns => ( isa => ArrayRef[ValidColumnName|HashRef], is => "ro", default => sub { [] }, ); has _columns => ( isa => HashRef, is => "ro", lazy_build => 1, ); sub _build__columns { my $self = shift; my $rs = $self->schema->source( $self->kiokudb_entries_source_name ); my @user_cols = grep { not exists $reserved_cols{$_} } $rs->columns; return { map { $_ => $rs->column_info($_)->{extract} || undef } @user_cols }; } has _ordered_columns => ( isa => "ArrayRef", is => "ro", lazy_build => 1, ); sub _build__ordered_columns { my $self = shift; return [ @reserved_cols, sort keys %{ $self->_columns } ]; } has _column_order => ( isa => "HashRef", is => "ro", lazy_build => 1, ); sub _build__column_order { my $self = shift; my $cols = $self->_ordered_columns; return { map { $cols->[$_] => $_ } 0 .. $#$cols } } has '+extract' => ( required => 0, ); has sql_abstract => ( isa => "SQL::Abstract", is => "ro", lazy_build => 1, ); sub _build_sql_abstract { my $self = shift; SQL::Abstract->new; } # use a Maybe so we can force undef in the builder has batch_size => ( isa => "Maybe[Int]", is => "ro", lazy => 1, builder => '_build_batch_size', ); sub _build_batch_size { my $self = shift; if ($self->storage->sqlt_type eq 'SQLite') { return 999; } else { return undef; } } sub has_batch_size { defined shift->batch_size } sub register_handle { my ( $self, $kiokudb ) = @_; $self->schema->_kiokudb_handle($kiokudb); } sub default_typemap { KiokuDB::TypeMap->new( isa_entries => { # redirect to schema row 'DBIx::Class::Row' => KiokuDB::TypeMap::Entry::DBIC::Row->new, # actual serialization 'DBIx::Class::ResultSet' => KiokuDB::TypeMap::Entry::DBIC::ResultSet->new, # fake, the entries never get written to the db 'DBIx::Class::ResultSource' => KiokuDB::TypeMap::Entry::DBIC::ResultSource->new, 'DBIx::Class::Schema' => KiokuDB::TypeMap::Entry::DBIC::Schema->new, }, ); } sub insert { my ( $self, @entries ) = @_; return unless @entries; my $g = $self->schema->txn_scope_guard; $self->insert_rows( $self->entries_to_rows(@entries) ); # hopefully we're in a transaction, otherwise this totally sucks if ( $self->extract ) { my %gin_index; foreach my $entry ( @entries ) { my $id = $entry->id; if ( $entry->deleted || !$entry->has_object ) { $gin_index{$id} = []; } else { my $d = $entry->backend_data || $entry->backend_data({}); $gin_index{$id} = [ $self->extract_values( $entry->object, entry => $entry ) ]; } } $self->update_index(\%gin_index); } $g->commit; } sub entries_to_rows { my ( $self, @entries ) = @_; my ( %insert, %update, @dbic ); foreach my $t ( \%insert, \%update ) { foreach my $col ( @{ $self->_ordered_columns } ) { $t->{$col} = []; } } foreach my $entry ( @entries ) { my $id = $entry->id; if ( $id =~ /^dbic:schema/ ) { next; } elsif ( $id =~ /^dbic:row:/ ) { push @dbic, $entry->data; } else { my $targ = $entry->prev ? \%update : \%insert; my $row = $self->entry_to_row($entry, $targ); } } return \( %insert, %update, @dbic ); } sub entry_to_row { my ( $self, $entry, $collector ) = @_; for (qw(id class tied)) { push @{ $collector->{$_} }, $entry->$_; } push @{ $collector->{root} }, $entry->root ? 1 : 0; push @{ $collector->{data} }, $self->serialize($entry); my $cols = $self->_columns; foreach my $column ( keys %$cols ) { my $c = $collector->{$column}; if ( my $extract = $cols->{$column} ) { if ( my $obj = $entry->object ) { push @$c, $obj->$extract($column); next; } } elsif ( ref( my $data = $entry->data ) eq 'HASH' ) { if ( exists $data->{$column} and not ref( my $value = $data->{$column} ) ) { push @$c, $value; next; } } push @$c, undef; } } sub insert_rows { my ( $self, $insert, $update, $dbic ) = @_; my $g = $self->schema->txn_scope_guard; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; if ( $self->extract ) { if ( my @ids = map { @{ $_->{id} || [] } } $insert, $update ) { my $batch_size = $self->batch_size || scalar(@ids); my @ids_copy = @ids; while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) { my $del_gin_sth = $dbh->prepare_cached("DELETE FROM gin_index WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")"); $del_gin_sth->execute(@batch_ids); $del_gin_sth->finish; } } } my $colinfo = $self->schema->source('entries')->columns_info; my %rows = ( insert => $insert, update => $update ); foreach my $op (qw(insert update)) { my $prepare = "prepare_$op"; my ( $sth, @cols ) = $self->$prepare($dbh); my $i = 1; foreach my $column_name (@cols) { my $attributes = {}; if ( exists $colinfo->{$column_name} ) { my $dt = $colinfo->{$column_name}{data_type}; $attributes = $self->storage->bind_attribute_by_data_type($dt); } $sth->bind_param_array( $i, $rows{$op}->{$column_name}, $attributes ); $i++; } $sth->execute_array({ArrayTupleStatus => []}) or die; $sth->finish; } $_->insert_or_update for @$dbic; }); $g->commit; } sub prepare_select { my ( $self, $dbh, $stmt ) = @_; $dbh->prepare_cached($stmt . ( $self->_for_update ? " FOR UPDATE" : "" ), {}, 3); # 3 = don't use if still Active } sub prepare_insert { my ( $self, $dbh ) = @_; my @cols = @{ $self->_ordered_columns }; my $ins = $dbh->prepare_cached("INSERT INTO entries (" . join(", ", @cols) . ") VALUES (" . join(", ", ('?') x @cols) . ")"); return ( $ins, @cols ); } sub prepare_update { my ( $self, $dbh ) = @_; my ( $id, @cols ) = @{ $self->_ordered_columns }; my $upd = $dbh->prepare_cached("UPDATE entries SET " . join(", ", map { "$_ = ?" } @cols) . " WHERE $id = ?"); return ( $upd, @cols, $id ); } sub update_index { my ( $self, $entries ) = @_; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my $i_sth = $dbh->prepare_cached("INSERT INTO gin_index (id, value) VALUES (?, ?)"); foreach my $id ( keys %$entries ) { my $rv = $i_sth->execute_array( {ArrayTupleStatus => []}, $id, $entries->{$id}, ); } $i_sth->finish; }); } sub _parse_dbic_key { my ( $self, $key ) = @_; @{ $self->json->decode(substr($key,length('dbic:row:'))) }; } sub _part_rows_and_ids { my ( $self, $rows_and_ids ) = @_; my ( @rows, @ids, @special ); for ( @$rows_and_ids ) { if ( /^dbic:schema/ ) { push @special, $_; } elsif ( /^dbic:row:/ ) { push @rows, $_; } else { push @ids, $_; } } return \( @rows, @ids, @special ); } sub _group_dbic_keys { my ( $self, $keys, $mkey_handler ) = @_; my ( %keys, %ids ); foreach my $id ( @$keys ) { my ( $rs_name, @key ) = $self->_parse_dbic_key($id); if ( @key > 1 ) { $mkey_handler->($id, $rs_name, @key); } else { # for other objects we queue up IDs for a single SELECT push @{ $keys{$rs_name} ||= [] }, $key[0]; push @{ $ids{$rs_name} ||= [] }, $id; } } return \( %keys, %ids ); } sub get { my ( $self, @rows_and_ids ) = @_; return unless @rows_and_ids; my %entries; my ( $rows, $ids, $special ) = $self->_part_rows_and_ids(\@rows_and_ids); if ( @$ids ) { $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my @ids_copy = @$ids; my $batch_size = $self->batch_size || scalar(@$ids); while ( my @batch_ids = splice(@ids_copy, 0, $batch_size) ) { my $sth = $self->prepare_select($dbh, "SELECT id, data FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")"); $sth->execute(@batch_ids); $sth->bind_columns( \my ( $id, $data ) ); # not actually necessary but i'm keeping it around for reference: #my ( $id, $data ); #use DBD::Pg qw(PG_BYTEA); #$sth->bind_col(1, \$id); #$sth->bind_col(2, \$data, { pg_type => PG_BYTEA }); while ( $sth->fetch ) { $entries{$id} = $data; } } }); } if ( @$rows ) { my $schema = $self->schema; my $err = \"foo"; my ( $rs_keys, $rs_ids ) = try { $self->_group_dbic_keys( $rows, sub { my ( $id, $rs_name, @key ) = @_; # multi column primary keys need 'find' my $obj = $schema->resultset($rs_name)->find(@key) or die $err; # die to stop search $entries{$id} = KiokuDB::Entry->new( id => $id, class => ref($obj), data => $obj, ); }); } catch { die $_ if ref $_ and refaddr($_) == refaddr($err); } or return; foreach my $rs_name ( keys %$rs_keys ) { my $rs = $schema->resultset($rs_name); my $ids = $rs_ids->{$rs_name}; my @objs; if ( @$ids == 1 ) { my $id = $ids->[0]; my $obj = $rs->find($rs_keys->{$rs_name}[0]) or return; $entries{$id} = KiokuDB::Entry->new( id => $id, class => ref($obj), data => $obj, ); } else { my ($pk) = $rs->result_source->primary_columns; my $keys = $rs_keys->{$rs_name}; my @objs = $rs->search({ $pk => $keys })->all; return if @objs != @$ids; # this key lookup is because it's not returned in the same order my %pk_to_id; @pk_to_id{@$keys} = @$ids; foreach my $obj ( @objs ) { my $id = $pk_to_id{$obj->id}; $entries{$id} = KiokuDB::Entry->new( id => $id, class => ref($obj), data => $obj, ); } } } } for ( @$special ) { $entries{$_} = KiokuDB::Entry->new( id => $_, $_ eq 'dbic:schema' ? ( data => $self->schema, class => "DBIx::Class::Schema" ) : ( data => undef, class => "DBIx::Class::ResultSource" ) ); } return if @rows_and_ids != keys %entries; # ->rows only works after we're done map { ref($_) ? $_ : $self->deserialize($_) } @entries{@rows_and_ids}; } sub delete { my ( $self, @ids_or_entries ) = @_; # FIXME special DBIC rows my @ids = map { ref($_) ? $_->id : $_ } @ids_or_entries; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my $g = $self->schema->txn_scope_guard; my $batch_size = $self->batch_size || scalar(@ids); my @ids_copy = @ids; while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) { if ( $self->extract ) { # FIXME rely on cascade delete? my $sth = $dbh->prepare_cached("DELETE FROM gin_index WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")"); $sth->execute(@batch_ids); $sth->finish; } my $sth = $dbh->prepare_cached("DELETE FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")"); $sth->execute(@batch_ids); $sth->finish; } $g->commit; }); return; } sub exists { my ( $self, @rows_and_ids ) = @_; return unless @rows_and_ids; my $schema = $self->schema; my %entries; my ( $rows, $ids, $special ) = $self->_part_rows_and_ids(\@rows_and_ids); if ( @$ids ) { $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my $batch_size = $self->batch_size || scalar(@$ids); my @ids_copy = @$ids; while ( my @batch_ids = splice @ids_copy, 0, $batch_size ) { my $sth = $self-> prepare_select ( $dbh, "SELECT id FROM entries WHERE id IN (" . join(", ", ('?') x @batch_ids) . ")"); $sth->execute(@batch_ids); $sth->bind_columns( \( my $id ) ); $entries{$id} = 1 while $sth->fetch; } }); } if ( @$rows ) { my ( $rs_keys, $rs_ids ) = $self->_group_dbic_keys( $rows, sub { my ( $id, $rs_name, @key ) = @_; $entries{$id} = defined $schema->resultset($rs_name)->find(@key); # FIXME slow }); foreach my $rs_name ( keys %$rs_keys ) { my $rs = $schema->resultset($rs_name); my $ids = $rs_ids->{$rs_name}; my $keys = $rs_keys->{$rs_name}; my ( $pk ) = $rs->result_source->primary_columns; my @exists = $rs->search({ $pk => $keys })->get_column($pk)->all; my %pk_to_id; @pk_to_id{@$keys} = @$ids; @entries{@pk_to_id{@exists}} = ( (1) x @exists ); } } for ( @$special ) { if ( $_ eq 'dbic:schema' ) { $entries{$_} = 1; } elsif ( /^dbic:schema:(.*)/ ) { $entries{$_} = defined try { $schema->source($1) }; } } return @entries{@rows_and_ids}; } sub txn_begin { shift->storage->txn_begin(@_) } sub txn_commit { shift->storage->txn_commit(@_) } sub txn_rollback { shift->storage->txn_rollback(@_) } sub clear { my $self = shift; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; $dbh->do("DELETE FROM gin_index"); $dbh->do("DELETE FROM entries"); }); } sub _sth_stream { my ( $self, $sql, @bind ) = @_; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my $sth = $self->prepare_select($dbh, $sql); $sth->execute(@bind); Data::Stream::Bulk::DBI->new( sth => $sth ); }); } sub _select_entry_stream { my ( $self, @args ) = @_; my $stream = $self->_sth_stream(@args); return $stream->filter(sub { [ map { $self->deserialize($_->[0]) } @$_ ] }); } sub all_entries { my $self = shift; $self->_select_entry_stream("SELECT data FROM entries"); } sub root_entries { my $self = shift; $self->_select_entry_stream("SELECT data FROM entries WHERE root"); } sub child_entries { my $self = shift; $self->_select_entry_stream("SELECT data FROM entries WHERE not root"); } sub _select_id_stream { my ( $self, @args ) = @_; my $stream = $self->_sth_stream(@args); return $stream->filter(sub {[ map { $_->[0] } @$_ ]}); } sub all_entry_ids { my $self = shift; $self->_select_id_stream("SELECT id FROM entries"); } sub root_entry_ids { my $self = shift; $self->_select_id_stream("SELECT id FROM entries WHERE root"); } sub child_entry_ids { my $self = shift; $self->_select_id_stream("SELECT id FROM entries WHERE not root"); } sub simple_search { my ( $self, $proto ) = @_; my ( $where_clause, @bind ) = $self->sql_abstract->where($proto); $self->_select_entry_stream("SELECT data FROM entries $where_clause", @bind); } sub search { my ( $self, $query, @args ) = @_; my %args = ( distinct => $self->distinct, @args, ); my %spec = $query->extract_values($self); my @binds; my $inner_sql = $self->_search_gin_subquery(\%spec, \@binds); return $self->_select_entry_stream("SELECT data FROM entries WHERE id IN (".$inner_sql.")",@binds); } sub _search_gin_subquery { my ($self, $spec, $binds) = @_; my @v = ref $spec->{values} eq 'ARRAY' ? @{ $spec->{values} } : (); if ( $spec->{method} eq 'set' ) { my $op = $spec->{operation}; die 'gin set query received bad operation' unless $op =~ /^(UNION|INTERSECT|EXCEPT)$/i; die 'gin set query missing subqueries' unless ref $spec->{subqueries} eq 'ARRAY' && scalar @{ $spec->{subqueries} }; return "(". ( join ' '.$op.' ', map { $self->_search_gin_subquery($_, $binds) } @{ $spec->{subqueries} } ).")"; } elsif ( $spec->{method} eq 'all' and @v > 1) { # for some reason count(id) = ? doesn't work push @$binds, @v; return "SELECT id FROM gin_index WHERE value IN ". "(" . join(", ", ('?') x @v) . ")" . "GROUP BY id HAVING COUNT(id) = " . scalar(@v); } else { push @$binds, @v; return "SELECT DISTINCT id FROM gin_index WHERE value IN ". "(" . join(", ", ('?') x @v) . ")"; } } sub fetch_entry { die "TODO" } sub remove_ids { my ( $self, @ids ) = @_; die "Deletion the GIN index is handled implicitly"; } sub insert_entry { my ( $self, $id, @keys ) = @_; die "Insertion to the GIN index is handled implicitly"; } sub _table_info { my ( $self, $catalog, $schema, $table ) = @_; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; my $filter = ( $self->storage->sqlt_type eq 'SQLite' ? '%' : '' ); foreach my $arg ( $catalog, $schema, $table ) { $arg = $filter unless defined $arg; } $dbh->table_info($catalog, $schema, $table, 'TABLE')->fetchall_arrayref; }); } sub tables_exist { my $self = shift; return ( @{ $self->_table_info(undef, undef, 'entries') } > 0 ); } sub create_tables { my $self = shift; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; unless ( $self->tables_exist ) { $self->deploy({ producer_args => { mysql_version => 4.1 } }); } }); } sub drop_tables { my $self = shift; $self->dbh_do(sub { my ( $storage, $dbh ) = @_; $dbh->do("DROP TABLE gin_index"); $dbh->do("DROP TABLE entries"); }); } sub DEMOLISH { my $self = shift; return if $_[0]; if ( $self->has_storage ) { $self->storage->disconnect; } } sub new_garbage_collector { my ( $self, %args ) = @_; if ( grep { $_ !~ /^(?:entries|gin_index)/ } map { $_->[2] } @{ $self->_table_info } ) { die "\nRefusing to GC a database with additional tables.\n\nThis is ecause the root set and referencing scheme might be ambiguous (it's not yet clear what garbage collection should actually do on a mixed schema).\n"; } else { my $cmd = $args{command}; my $class = $args{class} || $cmd ? $cmd->class : "KiokuDB::GC::Naive"; Class::MOP::load_class($class); return $class->new( %args, backend => $self, ( $cmd ? ( verbose => $cmd->verbose ) : $cmd ), ); } } __PACKAGE__->meta->make_immutable; __PACKAGE__ __END__ =pod =head1 NAME KiokuDB::Backend::DBI - L backend for L =head1 SYNOPSIS my $dir = KiokuDB->connect( "dbi:mysql:foo", user => "blah", password => "moo', columns => [ # specify extra columns for the 'entries' table # in the same format you pass to DBIC's add_columns name => { data_type => "varchar", is_nullable => 1, # probably important }, ], ); $dir->search({ name => "foo" }); # SQL::Abstract =head1 DESCRIPTION This backend for L leverages existing L accessible databases. The schema is based on two tables, C and C (the latter is only used if a L extractor is specified). The C table has two main columns, C and C (currently in JSPON format, in the future the format will be pluggable), and additional user specified columns. The user specified columns are extracted from inserted objects using a callback (or just copied for simple scalars), allowing SQL where clauses to be used for searching. =head1 COLUMN EXTRACTIONS The columns are specified using a L instance. One additional column info parameter is used, C, which is called as a method on the inserted object with the column name as the only argument. The return value from this callback will be used to populate the column. If the column extractor is omitted then the column will contain a copy of the entry data key by the same name, if it is a plain scalar. Otherwise the column will be C. These columns are only used for lookup purposes, only C is consulted when loading entries. =head1 DBIC INTEGRATION This backend is layered on top of L and reused L for DDL. Because of this objects from a L can refer to objects in the KiokuDB entries table, and vice versa. For more details see L. =head1 SUPPORTED DATABASES This driver has been tested with MySQL 5 (4.1 should be the minimal supported version), SQLite 3, and PostgresSQL 8.3. The SQL code is reasonably portable and should work with most databases. Binary column support is required when using the L serializer. =head2 Transactions For reasons of performance and ease of use database vendors ship with read committed transaction isolation by default. This means that read locks are B acquired when data is fetched from the database, allowing it to be updated by another writer. If the current transaction then updates the value it will be silently overwritten. IMHO this is a much bigger problem when the data is unstructured. This is because data is loaded and fetched in potentially smaller chunks, increasing the risk of phantom reads. Unfortunately enabling truly isolated transaction semantics means that C may fail due to a lock contention, forcing you to repeat your transaction. Arguably this is more correct "read comitted", which can lead to race conditions. Enabling repeatable read or serializable transaction isolation prevents transactions from interfering with eachother, by ensuring all data reads are performed with a shared lock. For more information on isolation see L =head3 SQLite SQLite provides serializable isolation by default. L =head3 MySQL MySQL provides read committed isolation by default. Serializable level isolation can be enabled by by default by changing the C global variable, L =head3 PostgreSQL PostgreSQL provides read committed isolation by default. Repeatable read or serializable isolation can be enabled by setting the default transaction isolation level, or using the C SQL statement. L, L =head1 ATTRIBUTES =over 4 =item schema Created automatically. This is L object that is used for schema deployment, connectivity and transaction handling. =item connect_info An array reference whose contents are passed to L. If omitted will be created from the attrs C, C, C and C. =item dsn =item user =item password =item dbi_attrs Convenience attrs for connecting using L. User in C's builder. =item columns Additional columns, see L. =item serializer L. Coerces from a string, too: KiokuDB->connect("dbi:...", serializer => "storable"); Defaults to L. =item create If true the existence of the tables will be checked for and the DB will be deployed if not. Defaults to false. =item extract An optional L used to create the C entries. Usually L. =item schema_hook A hook that is called on the backend object as a method with the schema as the argument just before connecting. If you need to modify the schema in some way (adding indexes or constraints) this is where it should be done. =item for_update If true (the defaults), will cause all select statement to be issued with a C modifier on MySQL, Postgres and Oracle. This is highly reccomended because these database provide low isolation guarantees as configured out the box, and highly interlinked graph databases are much more susceptible to corruption because of lack of transcational isolation than normalized relational databases. =item sqlite_sync_mode If this attribute is set and the underlying database is SQLite, then C will be issued with this value. Can be C, C or C (SQLite's default), or 0, 1, or 2. See L. =item mysql_strict If true (the default), sets MySQL's strict mode. This is B reccomended, or you may enjoy some of MySQL's more interesting features, like automatic data loss when the columns are too narrow. See L and L for more details. =item on_connect_call See L. This attribute is constructed based on the values of C and C, but may be overridden if you need more control. =item dbic_attrs See L. Defaults to { on_connect_call => $self->on_connect_call } =item batch_size SQL that deals with entries run in batches of the amount provided in C. If it is not provided, the statements will run in a single batch. This solves the issue with SQLite where lists can only handle 999 elements at a time. C will be set to 999 by default if the driver in use is SQLite. =back =head1 METHODS See L and the various roles for more info. =over 4 =item deploy Calls L. Deployment to MySQL requires that you specify something like: $dir->backend->deploy({ producer_args => { mysql_version => 4 } }); because MySQL versions before 4 did not have support for boolean types, and the schema emitted by L will not work with the queries used. =item drop_tables Drops the C and C tables. =back =head1 TROUBLESHOOTING =head2 I get C You are problably using MySQL, which comes with a helpful data compression feature: when your serialized objects are larger than the maximum size of a C column MySQL will simply shorten it for you. Why C defaults to 64k, and how on earth someone would consider silent data truncation a sane default I could never fathom, but nevertheless MySQL does allow you to disable this by setting the "strict" SQL mode in the configuration. To resolve the actual problem (though this obviously won't repair your lost data), alter the entries table so that the C column uses the nonstandard C datatype. =head1 VERSION CONTROL L =head1 AUTHOR Yuval Kogman Enothingmuch@woobling.orgE =head1 COPYRIGHT Copyright (c) 2008, 2009 Yuval Kogman, Infinity Interactive. All rights reserved This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =begin Pod::Coverage BUILD DEMOLISH all_entries all_entry_ids child_entries child_entry_ids clear create_tables default_typemap delete entries_to_rows entry_to_row exists fetch_entry has_batch_size insert insert_entry insert_rows new_from_dsn new_garbage_collector prepare_insert prepare_select prepare_update register_handle remove_ids root_entries root_entry_ids search simple_search tables_exist txn_begin txn_commit txn_rollback update_index =end Pod::Coverage =cut