#!/usr/bin/perl # # $Header: /Users/claude/fuzz/lib/Genezzo/Row/RCS/RSTab.pm,v 7.5 2006/10/19 09:04:13 claude Exp claude $ # # copyright (c) 2003,2004,2005 Jeffrey I Cohen, all rights reserved, worldwide # # package Genezzo::Row::RSTab; use strict; use warnings; use Carp; use Genezzo::Util; use Genezzo::PushHash::HPHRowBlk; use Genezzo::PushHash::PushHash; use Genezzo::PushHash::PHFixed; use Data::Dumper; use Genezzo::BufCa::BCFile; use Genezzo::SpaceMan::SMFile; use Genezzo::Row::RSFile; use warnings::register; our @ISA = qw(Genezzo::PushHash::HPHRowBlk); our $GZERR = sub { my %args = (@_); return unless (exists($args{msg})); if (exists($args{self})) { my $self = $args{self}; if (defined($self) && exists($self->{GZERR})) { my $err_cb = $self->{GZERR}; return &$err_cb(%args); } } my $warn = 0; if (exists($args{severity})) { my $sev = uc($args{severity}); $sev = 'WARNING' if ($sev =~ m/warn/i); # don't print 'INFO' prefix if ($args{severity} !~ m/info/i) { printf ("%s: ", $sev); $warn = 1; } } # XXX XXX XXX print __PACKAGE__, ": ", $args{msg}; # print $args{msg}; # carp $args{msg} # if (warnings::enabled() && $warn); }; sub make_fac1 { my $tclass = shift; my %args = ( @_); if (exists($args{hashref})) { carp "cannot supply hashref to factory method - deleting !\n" if warnings::enabled(); delete $args{hashref}; } my %td_hash1 = (); my $newfunc = sub { # whoami @_; my $tiehash1 = tie %td_hash1, $tclass, %args; return $tiehash1; }; return $newfunc; } # private sub _init { #whoami; #greet @_; my $self = shift; my %required = ( tablename => "no tablename !", object_id => "no object id !", tso => "no tso", bufcache => "no buffer cache", object_type => "no object type" ); my %optional = ( dbh_ctx => {} # XXX XXX XXX: is this used? ); my %args = (@_); return 0 unless (Validate(\%args, \%required)); $self->{tablename} = $args{tablename}; $self->{tso} = $args{tso}; $self->{bc} = $args{bufcache}; $self->{object_id} = $args{object_id}; $self->{object_type} = $args{object_type}; $self->{fac1} = make_fac1('Genezzo::PushHash::PushHash'); $self->{get_filenum} = {}; # get the filenum based upon chunkno $self->{get_chunkno} = {}; # get the chunkno based upon filenum # preload so 0/0 works for nextkey $self->{get_filenum}->{0} = 0; $self->{get_chunkno}->{0} = 0; # XXX XXX XXX : self->splitrow - if no split rows then simplify store/fetch # get the href info from TSTableAFU... my $s2 = $self->{tso}->TSTableAFU (tablename => $args{tablename}, object_id => $args{object_id}); if (defined($s2)) { my $sth = shift @{$s2}; if (defined($sth)) { # XXX XXX XXX XXX # greet $sth->SQLFetch(); # greet $s2; } else { greet "no afu for $args{tablename}"; # greet $s2; # greet $href; } } my $href = shift @{$s2}; foreach my $i (@{ $href->{filesused} }) { # XXX: use magic loadme flag return 0 unless (defined(_make_new_chunk($self, $i))); } # the "small row" threshold is only a small fraction of the # default block size in order to avoid wasting space. For # example, if the current block has 500 bytes of freespace, and # the packed row is 1000 bytes, it would be more efficient to # split the row over two blocks versus leaving 500 bytes free and # allocating a new block. $self->{small_row} = 200; # use HSuck on rows > small_row bytes return 1; } sub TIEHASH { #sub new # greet @_; my $invocant = shift; my $class = ref($invocant) || $invocant ; my $self = $class->SUPER::TIEHASH(@_); my %args = (@_); return undef unless (_init($self,%args)); if ((exists($args{GZERR})) && (defined($args{GZERR})) && (length($args{GZERR}))) { # NOTE: don't supply our GZERR here - will get # recursive failure... $self->{GZERR} = $args{GZERR}; } return bless $self, $class; } # end new # private routines sub _make_new_chunk # override the hph method { # whoami @_; my $self = shift; # XXX: rstab specific my $loadme = shift; my $harr = $self->{ "Genezzo::PushHash::hph" . ":H_ARRAY" }; my $hcount = scalar(@{$harr}); my $tso = $self->{tso}; my $fileinfo; if (defined ($loadme)) { $fileinfo = $tso->TSFileInfo (fileno => $loadme); } else { # if we have a current file number then get the next one, else # don't advance my $neednext = ($hcount > 0); my $currfileno = $tso->TS_get_fileno (nextfile => $neednext); $fileinfo = $tso->TSFileInfo (fileno => $currfileno); } return (undef) unless (defined($fileinfo)); my ($filename, $filesize, $fileblks, $filenumber) = @{$fileinfo}; my %realhash = (); my %args = ( hashref => \%realhash , factory => $self->{fac1}, tablename => $self->{tablename}, object_id => $self->{object_id}, object_type => $self->{object_type}, filename => $filename, numbytes => $filesize, numblocks => $fileblks, filenumber => $filenumber, bufcache => $self->{bc}, tso => $self->{tso} ); if (exists($self->{GZERR}) && defined($self->{GZERR})) { $args{GZERR} = $self->{GZERR}; } # if ($MAXCOUNT > 1) # { # return undef # unless ($hcount < $MAXCOUNT); #} my $tiehash = $self->{ "Genezzo::PushHash::hph" . ":PushHash_Factory" }->(%args); unless (defined ($tiehash)) { carp "factory could not allocate pushhash" if warnings::enabled(); return undef; # factory out of hashes } # can only work with pushhash - else die croak "not a pushhash" unless ($tiehash->isa("Genezzo::PushHash::PushHash")); return undef # push failed unless( (push @{$harr}, $tiehash) > $hcount); # get file number as defined by initial bc file registration my $absolute_fileno = $self->{bc}->FileReg(FileName => $filename, FileNumber => $filenumber); $self->{get_filenum}->{($hcount+1)} = $absolute_fileno; $self->{get_chunkno}->{$absolute_fileno} = ($hcount + 1); { return undef unless $tso->TSTableUseFile ( tablename => $self->{tablename}, object_id => $self->{object_id}, filenumber => $absolute_fileno, # href => $href ); # push (@{$href->{filesused}}, $absolute_fileno); } # NOTE: treat array as 1-based (versus 0 based) return $harr->[$hcount]; } sub _splitrid # override the hph method { my $self = $_[0]; # no shift # split into 2 parts - chunkno and sliceno unless ($_[1] =~ m/$Genezzo::PushHash::hph::RIDSEPRX/) { my $msg = "could not split key: $_[1] \n"; my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); return undef; # no rid separator } my @splitval = split(/$Genezzo::PushHash::hph::RIDSEPRX/,($_[1]), 2); # given an input rid based upon the absolute file number, convert # to chunk number my $absfileno = $splitval[0]; # whisper "chunkno $_[0]->{get_chunkno}->{$absfileno}"; $splitval[0] = $_[0]->{get_chunkno}->{$absfileno}; return @splitval; } sub _joinrid # override the hph method { my $self = shift; my @args = @_; my $chunkno = $args[0]; # whisper "fileno $self->{get_filenum}->{$chunkno}"; # convert the internal chunknumber to the absolute file number $args[0] = $self->{get_filenum}->{$chunkno}; return (join ($Genezzo::PushHash::hph::RIDSEP, @args)); } # Constraint Methods # get/set the constraint list sub _constraint_check { # whoami; my $self = shift; # greet $self->{tablename}; $self->{constraint_list} = shift if @_ ; return $self->{constraint_list}; } # check constraint when inserting new row sub check_insert { my $self = shift; return 0 unless (exists($self->{constraint_list})); my $cons_list = $self->{constraint_list}; return 0 unless (defined($cons_list) && exists($cons_list->{check_insert})); my $cci_check = $cons_list->{check_insert}; my $val; # be very paranoid - filter might be invalid perl eval {$val = &$cci_check(@_) }; # XXX XXX: figure out name of defective constraint if ($@) { whisper "check constraint blew up: $@"; greet $cci_check; my $msg = "bad check constraint: $@\n"; my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); return 1; #### undef; } return ($val); } # end check insert # insert new values into index when inserting new row sub index_insert { my $self = shift; return 0 unless (exists($self->{constraint_list})); my $cons_list = $self->{constraint_list}; return 0 unless (defined($cons_list) && exists($cons_list->{index_insert})); my $cci = $cons_list->{index_insert}; return (&$cci(@_)); } # end index insert sub HSuck { # whoami; my $self = shift; my %args = ( @_); # CONSTRAINT # greet %args; return undef unless (defined($args{value})); my $val = $args{value}; my $off = (defined($args{offset})) ? $args{offset} : scalar(@{$val}); # offsets are 1 based, not 0 my $next = $args{next}; my $headless = $args{headless}; # set if not a true row head piece $next = ':' . $next if (defined($next)); my ($place, $frag, $blk_place, $newoff); my $firsttry = 1; L_whileoff: while (defined($off)) { if ($firsttry) { # try to fit row piece in current block $blk_place = $self->_get_current_block(); } else { $blk_place = $self->_make_new_block(); } last unless (defined($blk_place)); my ($blktie, $blocknum, $bceref, $href_tie) = $self->_get_block_and_bce($blk_place); # Note: we don't pack the value here like # HPush/STORE -- packing is performed at the block level. # XXX XXX: need a way to specify a packing method for the # underlying HSuck ($place, $newoff, $frag) = $blktie->HSuck ( value => $val, next => $next, offset => $off, headless => $headless ); unless (defined($place)) { # allow failure on first try - current block might be too # small to hold any part of split row, but new block # should always be able to take a piece. if ($firsttry) { $firsttry = 0; next L_whileoff; # try again in new block } # XXX XXX: should store list of inserts so can delete # pieces if run out of space return undef; } $firsttry = 0; if (defined($frag)) { $next = $frag . ":"; } else { $next = ":"; } # build next pointer as fully qualified rid -- set block_place # final sliceno to $place my $sloty = '(0$)'; # XXX : replace trailing zero $next .= $blk_place; $next =~ s/$sloty/$place/; # offset is now new offset $off = $newoff; } # end while my @outi; if (defined($blk_place) && defined($place)) { my $slotyy = '(0$)'; # XXX : replace trailing zero $next = $blk_place; $next =~ s/$slotyy/$place/; } else { # XXX XXX: is this bad? Probably not. greet "bad:", $firsttry, $blk_place, $place, $off, $next; } push @outi, $next; if (defined($off)) { push @outi, $off; } if (defined($frag)) { push @outi, $frag; # column was fragmented } return @outi; } # end HSuck # HPush public method (not part of standard hash) sub HPush { my ($self, $value) = @_; my $toobig = 0; my $maxsize = 2 * $self->{small_row}; # XXX XXX: need a "mutating" constraint to support default values # for null columns, numeric precision, column uppercasing, etc. # CONSTRAINT - "check constraints" first if ($self->check_insert($value, undef, $self->{tablename})) { # Note: $place is undefined, because we haven't pushed # yet. But might need it for check constraints that # require the rid (very very improbable - should probably # be illegal) { # Check constraint FAILED return undef; } } my $packstr = PackRowCheck($value, $maxsize); if (defined($packstr)) { # fit maybe } else { $toobig = 1; } my $place = $self->_localHPush($packstr, $value, $toobig); return undef unless (defined($place)); # CONSTRAINT - index_inserts, like primary/unique key. Need to do # after the push because need a rid for the index. maybe # restructure so don't have to push huge rows that would violate # constraint. bt->insert_maybe with localhpush callback to # generate rid # # val TBD callback needs a place_ref so subsequent cons_lists can # get correct place, and so HPush can return place to caller # # cons_list ($value, null, val_TBD_cb, place_ref) if ($self->index_insert($value, $place)) { { whisper "undo insert!!"; # and don't specify the constraint check. Why? # Because if the insert failed with a duplicate key, # then delete will remove it from the index. $self->_localDELETE($place); return undef; } } return $place; } # count estimation sub FirstCount { # whoami; my $self = shift; my $key = $self->FIRSTKEY(); my @outi; push @outi, $key; return $self->NextCount(@outi); } # FirstCount # count estimation sub NextCount { # whoami; my ($self, $prevkey, $esttot, $sum, $sumsq, $chunkcount, $totchunk) = @_; return undef unless (defined($prevkey)); my ($chunkno, $prevsliceno) = $self->_splitrid($prevkey); my $chunk; unless (defined($esttot)) { # greet "first first"; $prevsliceno = (); ($esttot, $sum, $sumsq, $chunkcount, $totchunk) = (0,0,0,0,0); $chunkno = $self->_First_Chunkno(); while (defined($chunkno)) { $chunk = $self->_get_a_chunk($chunkno); my @foo = $chunk->FirstCount(); # greet @foo; # XXX XXX: why not defined? if ((scalar(@foo) > 4) && defined($foo[-1])) { $totchunk += $foo[-1]; } $chunkno = $self->_Next_Chunkno($chunkno); } $chunkno = $self->_First_Chunkno(); } my @outi; # push @outi, $prevkey, $esttot, $sum, $sumsq, $chunkcnt, $totchunk; my $quitLoop = 1; # XXX XXX my $loopCnt = 0; while (defined($chunkno)) { $chunk = $self->_get_a_chunk($chunkno); $loopCnt++; if (defined ($prevsliceno)) { @outi = $chunk->NextCount($prevsliceno, $esttot, $sum, $sumsq, $chunkcount, $totchunk); } else { my @oldouti = @outi; @outi = $chunk->FirstCount(); if (scalar(@outi)) { $outi[2] += $sum; $outi[3] += $sumsq; $outi[4] += $chunkcount; $outi[5] = $totchunk; # $esttot = $sum * ($totchunk/$chunkcount) # if (($sum > 0) && ($chunkcount > 0) && ($totchunk > 0)); $outi[1] = $outi[2] * ($totchunk/$outi[4]) if (($outi[2] > 0) && ($outi[4] > 0) && ($totchunk > 0)); # current sum + (current avg * remaining chunks) # $outi[1] = $outi[2] + (($outi[2]/$outi[4])*($outi[5]-$outi[4])) } else { @outi = @oldouti; } } last if ($quitLoop && scalar(@outi) && defined($outi[0])); $prevsliceno = (); $chunkno = $self->_Next_Chunkno($chunkno); # XXX XXX: add logic here $quitLoop = 1 if $loopCnt > 10; } # end while chunkno return @outi unless (scalar(@outi) && defined($chunkno) && defined($outi[0])); my $sliceno = shift @outi; unshift @outi, $self->_joinrid($chunkno, $sliceno); return @outi; } # nextcount sub _localHPush { my ($self, $packstr, $value, $toobig) = @_; # NOTE: space management busted here -- pushing a huge row # (packstr >= blocksize) bounces off the new block, which then # allocates a new file. Avoid the issue by only pushing small # strings if (!($toobig) && (length($packstr) < $self->{small_row})) { my $place = $self->SUPER::HPush($packstr); return $place if (defined($place)); } # if string was too big, or push failed, try HSuck my @stat = $self->HSuck (value => $value); # null stat for failure, or extra cols indicates partial pack -- # should see a single column for complete pack. return undef unless (scalar(@stat) == 1); return $stat[0]; # rid for first rowpiece } sub STORE { my ($self, $place, $value) = @_; # CONSTRAINT my $cons_list = $self->{constraint_list}; my ($cci_index, $ccu, $ccd); my @cc_op_list; # delete/insert operations if (defined($cons_list)) { $ccu = $cons_list->{update}; $ccd = $cons_list->{delete}; $cci_index = $cons_list->{index_insert}; } # CONSTRAINT - do "check constraints" before update if ($self->check_insert($value, $place, $self->{tablename})) { # Check constraint FAILED return undef; } my $oldvalue; if ($place !~ m/^PUSH$/) { if (defined($cci_index)) { # clear out the insert callback -- ccu will generate an # op_list of delete/inserts if necessary. $cci_index = (); } if (defined($ccu)) { $oldvalue = $self->FETCH($place); if (&$ccu($value, $oldvalue, $place, \@cc_op_list)) { whisper "updated key - delete from index"; greet @cc_op_list; greet $place, $value, $oldvalue; # greet &$ccd($oldvalue, $place); } else { whisper "keys match - index not updated"; $oldvalue = (); # keys match - don't keep old value } } } # end !push # 2 cases: # either PUSHing a new key *or* # the old keys didn't match so we deleted them from the index. # # NOTE that we do the "index_insert" before the update, because the # rid exists already. my $maxj; if (defined($cci_index)) { return undef if (&$cci_index($value, $place)); } elsif (scalar(@cc_op_list)) { my $maxi = scalar(@cc_op_list); for my $i (0..($maxi - 1)) { my $opv = $cc_op_list[$i]; greet $opv; my $cnam = $opv->[0]; my $del1 = $opv->[1]; my $ins1 = $opv->[2]; if (defined($del1) && defined($oldvalue)) { greet &$del1($oldvalue, $place); } if (defined($ins1)) { if (&$ins1($value, $place)) { # Index insert FAILED - revert indexes to old state $maxj = $i; if (defined($oldvalue)) # had an old key { whisper " attempt to restore old value"; &$ins1($oldvalue, $place); } return undef unless ($i); # we're done if only a single index last; } } } } if (defined($maxj)) { for my $j (0..($maxj - 1)) { my $opv = $cc_op_list[$j]; greet $opv; my $cnam = $opv->[0]; my $del1 = $opv->[1]; my $ins1 = $opv->[2]; if (defined($del1)) { # delete the new value greet &$del1($value, $place); } if (defined($ins1) && defined($oldvalue)) { # restore the old value if possible if (&$ins1($oldvalue, $place)) { greet "really screwed up, sorry!"; my $msg = "Serious error during update!!\n"; my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); return undef; } } } # end for return undef; } # end if maxj my $stat = $self->_localStore($place, $value); if (!defined($stat)) { if (defined($cci_index)) { # localStore FAILED - revert indexes to old state whisper "delete new key from index"; &$ccd($value, $place); if (defined($oldvalue)) { whisper " attempt to restore old value"; &$cci_index($oldvalue, $place); } } elsif (scalar(@cc_op_list)) { my $maxi = scalar(@cc_op_list); for my $i (0..($maxi - 1)) { my $opv = $cc_op_list[$i]; greet $opv; my $cnam = $opv->[0]; my $del1 = $opv->[1]; my $ins1 = $opv->[2]; if (defined($del1)) { greet &$del1($value, $place); } if (defined($ins1) && defined($oldvalue)) { if (&$ins1($oldvalue, $place)) { whisper "oh gosh!"; my $msg = "Really serious error during update!!\n"; my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); } } } # end for } } # end if not defined stat return ($stat); } # if setfwdptr (set Forwarding pointer), then only storing an empty # header with pointer to the first rowpiece. sub _localStore { # whoami; my ($self, $place, $value) = @_; my $toobig = 0; my $oldsize = 0; my @estat = $self->_exists2($place); # HPHRowBlk method if ( (scalar(@estat) > 2) && Genezzo::Block::RDBlock::_isheadrow($estat[0]) && Genezzo::Block::RDBlock::_istailrow($estat[0]) ) { # if the row already exists as a single, contiguous buffer, # set maxsize for PackRowCheck to determine if can update in # place. $oldsize = $estat[2]; } my $maxsize = 2 * $self->{small_row}; # greet "max: $maxsize, old: $oldsize"; $maxsize = $oldsize if ($oldsize > $maxsize); my $packstr = PackRowCheck($value, $maxsize); # greet "packstr: ", length($packstr) if (defined($packstr)); unless ($oldsize) { # keep maxsize a bit small unless space is already allocated $maxsize = $self->{small_row}; } if (defined($packstr)) { # greet "fit maybe"; } else { # greet "toobig"; $toobig = 1; } if ($place =~ m/^PUSH$/) { $place = $self->_localHPush($packstr, $value, $toobig); return undef unless (defined($place)); return $value; } # XXX XXX: race condition here -- would need to lock row to ensure # that rowstat doesn't change. Probably need a new STORE with the # specification to only update if both head and tail, and the # _exists2 is merely a hint. # estat = ($rowstat, $rowposn, $rowlen) @estat = $self->_exists2($place); # HPHRowBlk method if ( (scalar(@estat) < 3) # no such row or bad rid (so fail on STORE) # or the row fits in a block || ( (scalar(@estat) > 2) && Genezzo::Block::RDBlock::_isheadrow($estat[0]) && Genezzo::Block::RDBlock::_istailrow($estat[0]) && !($toobig) # toobig check && $estat[2] >= length($packstr) # see if still fits ) ) { if (!($toobig) && (length($packstr) <= $maxsize)) # avoid long rows { # greet "should fit"; my $stat = $self->SUPER::STORE($place, $packstr); return $value if (defined($stat)); return undef; } } # set up arguments for HSuck. HSuck new value as "headless", # since it gets stored as a continuation of the first piece of the # old row my %nargs = (value => $value, headless => 1); # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX # if the original row is split over multiple blocks, then push the # new value and replace the first piece of the original row with a # forwarding pointer to the new value # # It's not an optimal or efficient replacement strategy, but it is # straightforward. Need to figure out how to: # 1. re-use freed space from old row # 2. figure out how to update individual columns more # efficiently. One possible variation is do current replacement # strategy starting at column N, i.e., if updating column N, # truncate the row from column N onward and stick the forwarding # pointer at col N. # # XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX XXX my $split_row = !(Genezzo::Block::RDBlock::_istailrow($estat[0])); # suck new value first, so we still have the old row if it fails... # sukk = (newplace, offset, frag flag) my @sukk = $self->HSuck (%nargs); # XXX XXX: HSuck could fail here - that would be bad... greet "sukk:", @sukk; return undef # XXX XXX: need to test this... unless (scalar(@sukk) && (defined($sukk[0]))); # if the old value is split over multiple blocks if ($split_row) { whisper "long row"; # use localFetchDelete to delete the existing row pieces, but # keep the first piece and mark it as complete (both head and # tail bits get set). Note: First piece *must* be large # enough to hold the forwarding pointer. my $val = $self->_localFetchDelete($place, 1, 1); # delete trailing # pieces, # but keep head # now only the first piece is left, # greet "oldval: ", $val; } { # create a forwarding pointer to the row. It needs to an # empty col1 fragment that points to col1 in the new row. my @fakerow; push @fakerow, ""; # blank col1 push @fakerow, "F:".$sukk[0]; my $packfake = PackRow(\@fakerow); my $sstat = $self->SUPER::STORE($place, $packfake); # XXX XXX: STORE could fail here too - that would be bad # XXX: don't call localstore recursively... #### my $sstat = $self->_localStore($place, \@fakerow, 1); # greet "stat:", $sstat; # greet $self->_exists2($sukk[0]); # HPHRowBlk method # greet "e1:", @estat; # clear the tail flag of the first piece, since it now points to # the new value $estat[0] &= ~($Genezzo::Block::RDBlock::RowStats{tail}); @estat = $self->_exists2($place, $estat[0]); # HPHRowBlk method # greet "e2:", @estat; } return $value; } #sub FETCH #{ # my ($self, $place) = @_; # # my $value = ($self->SUPER::FETCH($place)); # # return (undef) # unless (defined($value)); # # my @outarr = UnPackRow($value); # # return (\@outarr); # #} sub FETCH { my ($self, $place) = @_; # CONSTRAINT return $self->_localFetchDelete($place); } # if doDelete is set then perform recursive delete on chained rows sub _localFetchDelete { my ($self, $place, $doDelete, $keepHead) = @_; # XXX : should be able to do FETCH directly if know have # information that no split rows... # XXX XXX: maybe ok to piggy-back fetch2 into normal FETCH? # fetcha = (value, rowstat) my @fetcha = $self->_fetch2($place); # HPHRowBlk method # XXX: check status? my $deleteStatus; if (defined($doDelete)) { # greet "a", $keepHead, $place, @fetcha; unless (defined($keepHead)) { $deleteStatus = $self->SUPER::DELETE($place); } } return undef unless ( (scalar(@fetcha) > 1) && defined($fetcha[0]) # value && defined($fetcha[1]) # rowstat && Genezzo::Block::RDBlock::_isheadrow($fetcha[1])); my @rowpiece = UnPackRow($fetcha[0], $Genezzo::Util::UNPACK_TEMPL_ARR); # first row piece # Note: just return if row was not split. Avoid the extra push in # the while loop return (\@rowpiece) if (Genezzo::Block::RDBlock::_istailrow($fetcha[1])); if (defined($doDelete) && defined($keepHead)) { # keep the head, but define it as a completed piece # greet "del, keephead", @fetcha; my @estat = $self->_exists2($place, ( $fetcha[1] | ($Genezzo::Block::RDBlock::RowStats{tail}) ) ); # HPHRowBlk method # Note: fetcha[1] ("rowstat") is checked to terminate while loop, # so don't update it here! # greet @estat; } my $gotFrag = 0; my @outarr; # current rowpiece loaded into outarr in while loop # Fetch the remaining row pieces, and re-assemble the row. If the # piece isn't the tail (end) of the row, the last column is a # "next pointer", a pointer to the next piece, with a flag which # indicates whether the last column (the real last column, not the # aforementioned next pointer) was split. my $piececount = 0; L_rowpiece: while (1) { if ($gotFrag) { # column was fragmented - merge the next column piece my $h1 = shift @rowpiece; $outarr[-1] .= $h1; # append remainder to end of last column } # append next set of columns to existing row push @outarr, @rowpiece; # greet "count:", $piececount, @outarr; last L_rowpiece # done when last piece of row is fetched if (Genezzo::Block::RDBlock::_istailrow($fetcha[1])); my $nextp = pop @outarr; # last column was pointer to next piece, # so remove it from output # check next pointer to see if column was fragmented (split) my ($frag, $pieceplace) = split(':', $nextp); # XXX XXX: clean this up - centralize knowledge of frag flag somewhere $gotFrag = (defined($frag)) && ($frag =~ m/F/); # get the next piece @fetcha = $self->_fetch2($pieceplace); $piececount++; unless ( (scalar(@fetcha) > 1) && defined($fetcha[0]) && defined($fetcha[1]) ) { # ERROR: remainder of row not found if (scalar(@outarr)) { # greet "b", $pieceplace, @fetcha; # greet @outarr; my $tname = $self->{tablename}; whisper "table $tname: malformed row $place at $pieceplace"; my $msg = "table $tname: malformed row $place at " . "$pieceplace\n"; my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); } return undef; } if (defined($doDelete)) { # XXX: should check delete status here # greet "c", $pieceplace, @fetcha; whisper "delete piece $pieceplace"; $deleteStatus = $self->SUPER::DELETE($pieceplace); # greet $deleteStatus; } @rowpiece = UnPackRow($fetcha[0], $Genezzo::Util::UNPACK_TEMPL_ARR); } # end while l_rowpiece return (\@outarr); } # end localFetchDelete sub DELETE { # whoami; my ($self, $place) = @_; # CONSTRAINT my $cons_list = $self->{constraint_list}; return $self->_localDELETE($place, $cons_list); } sub _localDELETE { # whoami; my ($self, $place, $cons_list) = @_; # CONSTRAINT # XXX XXX XXX XXX: need to support foreign key constraint, also # DELETE CASCADE my $ccd; if (defined($cons_list)) { $ccd = $cons_list->{delete}; } my @estat = $self->_exists2($place); # HPHRowBlk method if ( (scalar(@estat) < 3) # no such row or bad rid # or the row fits in a block || ( (scalar(@estat) > 2) && Genezzo::Block::RDBlock::_isheadrow($estat[0]) && Genezzo::Block::RDBlock::_istailrow($estat[0]) ) ) { my $val = $self->SUPER::DELETE($place); return undef unless (defined($val)); if (defined($ccd)) { my @outarr = UnPackRow($val, $Genezzo::Util::UNPACK_TEMPL_ARR); &$ccd(\@outarr, $place); # delete from index return (\@outarr); } else { return (UnPackRow($val, $Genezzo::Util::UNPACK_TEMPL_ARR)); } } # fetch the pieces and delete them if (defined($ccd)) { my @outarr = $self->_localFetchDelete($place, 1); &$ccd(\@outarr, $place); return (\@outarr); } else { return $self->_localFetchDelete($place, 1); } } # end DELETE sub SQLPrepare # get a DBI-style statement handle { my $self = shift; my %args = @_; $args{pushhash} = $self; $args{tablename} = $self->{tablename}; if ((exists($self->{GZERR})) && (defined($self->{GZERR}))) { $args{GZERR} = $self->{GZERR}; } my $sth = Genezzo::Row::SQL_RSTab->new(%args); return $sth; } package Genezzo::Row::SQL_RSTab; use strict; use warnings; use Genezzo::Util; sub _init { my $self = shift; my %args = (@_); return 0 unless (defined($args{pushhash})); $self->{pushhash} = $args{pushhash}; $self->{tablename} = $args{tablename}; if (defined($args{alias})) { $self->{tablename} = $args{alias}; } if (defined($args{filter})) { $self->{SQLFilter} = $args{filter}; my $ff = $args{filter}; # greet $ff; my $cons_list = $self->{pushhash}->_constraint_check(); if (defined($cons_list)) { my @both_keys = Genezzo::Util::GetIndexKeys($ff); if ((scalar(@both_keys) > 1) && (exists($cons_list->{SQLPrepare}))) { # greet @both_keys; # keys in table colidx order my $get_search = $cons_list->{SQLPrepare}; # prepare an index search if have startkey/stopkey my $searchhandle = &$get_search(@both_keys); $self->{IndexSth} = $searchhandle if (defined($searchhandle)); } } } $self->{rownum} = 0; return 1; } sub new { # whoami; my $invocant = shift; my $class = ref($invocant) || $invocant ; my $self = { }; my %args = (@_); if ((exists($args{GZERR})) && (defined($args{GZERR})) && (length($args{GZERR}))) { # NOTE: don't supply our GZERR here - will get # recursive failure... $self->{GZERR} = $args{GZERR}; } return undef unless (_init($self,%args)); return bless $self, $class; } # end new # SQL-style execute and fetch functions sub SQLExecute { my ($self, $filter) = @_; # $self->{SQLFilter} = $filter; # check this if (exists($self->{IndexSth})) { greet "index execute"; $self->{SQLFetchKey} = 1; return $self->{IndexSth}->SQLExecute(); } $self->{SQLFetchKey} = $self->FIRSTKEY(); # XXX: define filters and fetchcols return (1); } # XXX XXX XXX XXX: create a separate dynamic package to # hold the fetch state, vs keeping the fetch state in the base # pushhash. Then can maintain multiple independent SQLFetches open # on same RSTab object. # combine NEXTKEY and FETCH in a single operation sub SQLFetch { my ($self, $key) = @_; my $fullfilter = $self->{SQLFilter}; my $filter = (defined($fullfilter)) ? $fullfilter->{filter} : undef; # use explicit key if necessary # $self->{SQLFetchKey} = $key # if (defined($key)); while (defined($self->{SQLFetchKey})) { my $currkey; if (exists($self->{IndexSth})) { # greet "index fetch"; my @idx_row = $self->{IndexSth}->SQLFetch(); # greet @idx_row; unless (scalar(@idx_row) > 1) { $self->{SQLFetchKey} = undef; return undef; } pop @idx_row; # remove extra search cols pop @idx_row; $currkey = pop @idx_row; # val is rowid for table } else { $currkey = $self->{SQLFetchKey}; } my $outarr = $self->FETCH($currkey); my $tablename = $self->{tablename}; my $get_col_alias = {$tablename => $outarr}; # save the value of the key because we pre-advance to the next one $self->{SQLFetchKey} = $self->NEXTKEY($currkey) unless (exists($self->{IndexSth})); # Note: always return the rid return ($currkey, $outarr) unless (defined($filter)); # filter is defined my $val; my $rownum = $self->{rownum} + 1; # be very paranoid - filter might be invalid perl eval {$val = &$filter($self, $currkey, $outarr, $get_col_alias, $rownum) }; if ($@) { whisper "filter blew up: $@"; greet $fullfilter; my $msg = "bad filter: $@\n" ; # $msg .= Dumper($fullfilter) # if (defined($fullfilter)); my %earg = (self => $self, msg => $msg, severity => 'warn'); &$GZERR(%earg) if (defined($GZERR)); return undef; } unless (!$val) { $self->{rownum} += 1; return ($currkey, $outarr); } } return undef; } sub AUTOLOAD { my $self = shift; my $ph = $self->{pushhash}; our $AUTOLOAD; my $newfunc = $AUTOLOAD; $newfunc =~ s/.*:://; return if $newfunc eq 'DESTROY'; # greet $newfunc; return ($ph->$newfunc(@_)); } END { } 1; __END__ # Below is stub documentation for your module. You better edit it! =head1 NAME Genezzo::Row::RSTab.pm - Row Source TABle tied hash class. =head1 SYNOPSIS use Genezzo::Row::RSTab; # see Tablespace.pm -- implementation and usage is tightly tied # to genezzo engine... # make a factory for rsfile my $fac2 = make_fac2('Genezzo::Row::RSFile'); my %args = ( factory => $fac2, # need tablename, bufcache, etc... tablename => ... tso => ... bufcache => ... ); my %td_hash; $tie_val = tie %td_hash, 'Genezzo::Row::RSTab', %args; # pushhash style my @rowarr = ("this is a test", "and this is too"); my $newkey = $tie_val->HPush(\@rowarr); @rowarr = ("update this entry", "and this is too"); $tied_hash{$newkey} = \@rowarr; my $getcount = $tie_val->HCount(); =head1 DESCRIPTION RSTab is a hierarchical pushhash (see L) class that stores perl arrays as rows in a table, writing them into a block (byte buffer) via B and B. =head1 ARGUMENTS =over 4 =item tablename (Required) - the name of the table =item tso (Required) - tablespace object from B =item bufcache (Required) - buffer cache object from B =back =head1 CONCEPTS Logically, a table is made of rows, and rows are vectors of columns. Physically (at least from an OS implementation viewpoint), a table is made up of blocks stored in files. The RSTab hierarchical pushhash (hph) uses an RSFile factory, though it could be constructed as an hph of arbitrary depth. The basic HPush mechanism takes an array, flattens it into a string, and pushes the string into one of the underlying blocks. While the RSTab api is primarily intended as a row-based interface, it has some extensions to directly manipulate the underlying blocks. These extensions are useful for building specialized index mechanisms (see L) like B-trees, or for supporting rows that span multiple blocks. =head2 Basic PushHash You can use RSTab as a persistent hash of arrays of scalars if you like. The arrays and scalars can be of arbitrary length (as long as they fit in your datafiles). =head2 SQL DBI-style interface RSTab is designed to efficiently support prepare/execute/fetch operations against tables. What distinguishes this API from a standard hash is that the "prepare" operation generates a custom, stateful iterator that understands filters and range selection. A filter is simply a predicate which is applied to every row -- rows which pass are returned to the caller, and rows which fail are "filtered out". Range selection is somewhat similar, with the notion of start and stop keys -- the iterator only returns the rows which are restricted to a certain range of values. In general, range selection is driven off a separate indexing mechanism that positions the fetch to specifically retrieve the range in an efficient manner, versus fetching all rows and filtering rows outside the range. =head2 HPHRowBlk - Row and Block operations HPHRowBlk is a special pushhash subclass with certain direct block manipulation methods. One very useful function is HSuck, which provides support for rows that span multiple blocks. While the standard HPush fails if a row exceeds the space in a single block, the HSuck api lets the underlying blocks consume the rows in pieces -- each block "sucks up" as much of the row as it can. The RSTab HPush is re-implemented on top of HSuck to support large rows. =head2 Counting, Estimation, Approximation RSTab has some support for count estimation, inspired by some of Peter Haas' work (Sequential Sampling Procedures for Query Size Estimation, ACM SIGMOD 1992, Online Aggregation (with J. Hellerstein and H. Wang), ACM SIGMOD 1997 Ripple Joins for Online Aggregation (with J. Hellerstein) ACM SIGMOD 1999). It could use support for confidence intervals, so drop me a line if you understand Central Limit Theorem, Hoeffding and Chebyshev inequalites. Knowledge of change-points and time-series is also a plus. =head1 FUNCTIONS RSTab support all standard hph hierarchical pushhash operations, with the extension that it manipulates arrays of scalars, not individual scalars. =head2 EXPORT =head1 LIMITATIONS various =head1 TODO =over 4 =item rownum filter support to move to separate package =item $href: remove - need a dict function to return allfileused via tso =item HSuck: need a way to specify packing method =item HSuck: fix trailing zero replacement =item NextCount: fix quitloop =item localPush/Store: qualify length packstr as percentage of blocksize (1/3?) =item localStore: race condition on rowstat =item localFetchDelete: frag flag info, delete status. Could express this function as a generalized "RowSplice" (as distinct from RDBlkA::HSplice, which is a block splice operator). Would need be able to splice based upon column number/array offset, as well as substring byte offset -- the inverse functionality of PackRow2/HSuck =item DBI - support Bind and projection (returning only certain specified columns, versus all columns) =item _init: change to use TSTableAFU support versus href->{filesused} =item need support for constraints that "mutate" supplied values, e.g. manipulate numeric precision or supply default values for columns. Also need support for foreign keys in delete. =back =head1 AUTHOR Jeffrey I. Cohen, jcohen@genezzo.com =head1 SEE ALSO L, L, L, L, L, L, L, L, L, L. Copyright (c) 2003, 2004, 2005 Jeffrey I Cohen. All rights reserved. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by the Free Software Foundation; either version 2 of the License, or any later version. This program is distributed in the hope that it will be useful, but WITHOUT ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. You should have received a copy of the GNU General Public License along with this program; if not, write to the Free Software Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA Address bug reports and comments to: jcohen@genezzo.com For more information, please visit the Genezzo homepage at L =cut