# Copyright (C) 1999-2002, Internet Journals Corporation . # Copyright (C) 2002 David Muir Sharnoff # All rights reserved. License hearby granted for anyone to use this # module at their own risk. Please feed useful changes back to # David Muir Sharnoff . package Object::Transaction; my %cache; $VERSION = 1.01; my $lock_debugging = 0; my $debug = 0; my $warnings = 0; my $registered; require File::Flock; use Storable; use POSIX qw(O_CREAT O_RDWR); require File::Sync; use Carp; use Carp qw(verbose); use vars qw($magic_cookie); $magic_cookie = "O:Ta"; require Exporter; @ISA = qw(Exporter); @EXPORT = qw(transaction transaction_pending commit abandon uncache); use strict; # things to override sub initialize { die "deferred" } sub file { die "deferred" } sub presave {} sub postsave {} sub postload {} sub preload {} sub preremove {} sub postremove {} sub id { my ($this) = @_; return $this->{'ID'}; } sub precommit {} # a few wrappers my %locks; sub _lock { my ($file) = @_; if ($lock_debugging) { my ($package, $filename, $line) = caller; my ($package2, $filename2, $line2) = caller(1); print STDERR "\n{{{{ $file $line, $line2"; } $locks{$file} = 1; File::Flock::lock($file); } sub _unlock { my ($file) = @_; if ($lock_debugging) { my ($package, $filename, $line) = caller; my ($package2, $filename2, $line2) = caller(1); print STDERR "\n}}}} $file $line, $line2"; } delete $locks{$file}; File::Flock::unlock($file); } sub _lockrename { my ($from, $to) = @_; if ($lock_debugging) { my ($package, $filename, $line) = caller; my ($package2, $filename2, $line2) = caller(1); print STDERR "{$from->$to} $line, $line2"; } $locks{$to} = $locks{$from}; delete $locks{$from}; File::Flock::lock_rename($from, $to); } sub _unlock_all { for my $f (keys %locks) { _unlock($f); } } sub _read_file { my ($file) = @_; no strict; my $r; my (@r); local(*F); open(F, "<$file") || die "open $file: $!"; @r = ; close(F); return join("",@r); } sub _write_file { my ($f, @data) = @_; no strict; undef $!; my $d = join('', @data); local(*F,*O); open(F, ">$f") || die "open >$f: $!"; $O = select(F); $| = 1; select($O); (print F $d) || die "write $f: $!"; File::Sync::fsync_fd(fileno(F)) || die "fsync $f: $!"; close(F) || die "close $f: $!"; if ($d && ! -s $f) { # Houston, we have a problem! # Let's try this again! # this code may no longer be necessary. confess "cannot write $f: $!" if caller(50); # prevent deep recursion print STDERR "Write to $f failed ($!), trying again\n" if $warnings; _write_file($f, $d); } return 1; } # now the meat sub new { my ($pkg, @args) = @_; no strict 'refs'; my $obj = ${pkg}->initialize(@args); bless $obj, $pkg; $obj->cache; return $obj; } use vars qw($commit_inprogress); $commit_inprogress = 0; my $firstload; sub load { my ($package, $baseid) = @_; print STDERR "LOAD $package $baseid\n" if $debug; if (exists $cache{$package}{$baseid}) { print STDERR "Returing cached $package $baseid\n" if $debug; return $cache{$package}{$baseid}; } my $newid; eval { $newid = $package->preload($baseid); }; confess $@ if $@; if ($newid && exists $cache{$package}{$newid}) { print STDERR "Returing cached $package $baseid\n" if $debug; return $cache{$package}{$newid}; } $firstload = time unless $firstload; my $id = $newid || $baseid; return undef unless $id; my $file = $package->file($id); # all method invocations can have side-effects. if ($cache{$package}{$id}) { print STDERR "Returing recently-cached $package $id\n" if $debug; return $cache{$package}{$id}; } # # No read-lock is required because files are only modified # through rename rather than rewrite. # # This does create the possibility of a program failure if you # try to read a file that is deleted at just the right time. # return undef unless -e $file; my $frozen = _read_file($file); { no re 'taint'; substr($frozen, 0, length($magic_cookie)) eq $magic_cookie or die "corrupt file: $file"; substr($frozen, 0, length($magic_cookie)) = ''; } my $obj = Storable::thaw $frozen; print STDERR "Pulling fresh copy for $package $id from $file\n" if $debug; die "unable to thaw $file!" unless $obj; $obj->{'OLD'} = Storable::thaw $frozen; $obj->{'OLD'}{'__frozen'} = \$frozen; $obj->postload($id); $cache{$package}{$id} = $obj; modperl_register() unless $registered; if ($obj->{'__transfollowers'}) { print STDERR "Transleader with followers\n" if $debug; for my $class (sort keys %{$obj->{'__transfollowers'}}) { for my $id (sort keys %{$obj->{'__transfollowers'}{$class}}) { # will rollback as side-effect my $follower = _loadany($class, $id); } } $obj = Storable::thaw ${$obj->{'__rollback'}}; $cache{$package}{$id} = $obj; _lock $file; $obj->postload($id); _unlock $file; $obj->_realsave(); } elsif ($obj->{'__transleader'}) { print STDERR "Transfollower\n" if $debug; my $leader = _loadany($obj->{'__transleader'}{'CLASS'}, $obj->{'__transleader'}{'ID'}); if (exists $leader->{'__transfollower'} && exists $leader->{'__transfollower'}{$package} && exists $leader->{'__transfollower'}{$package}{$id}) { # rollback time! $obj = Storable::thaw ${$obj->{'__rollback'}}; $cache{$package}{$id} = $obj; _lock $file; $obj->postload($id); _unlock $file; } else { delete $obj->{'__transleader'}; delete $obj->{'__rollback'}; } eval { $obj->_realsave(); }; if ($@ =~ /^DATACHANGE: file/) { return load($package, $baseid); } die $@ if $@; } if ($obj->{'__removenow'}) { $obj->_realremove(); return undef; } return $obj; } sub objectref { my ($this) = @_; my $id = $this->id(); die "id function returned empty on $this" unless $id; return bless [ ref $this, $id ], 'Object::Transaction::Reference'; } { package Object::Transaction::Reference; sub loadref { my ($ref) = @_; my ($pkg, $id) = @$ref; return Object::Transaction::_loadany($pkg, $id); } } sub _loadany { my ($pkg, $id) = @_; no strict qw(refs); unless (defined @{"${pkg}::ISA"}) { require "$pkg.pm"; } return ${pkg}->load($id); } my %tosave; sub abandon { %tosave = (); } sub cache { my ($this) = @_; my $pkg = ref $this; my $id = $this->id(); confess unless defined $id; confess "id clash with $pkg $id\n" if $cache{$pkg} && defined $cache{$pkg}{$id} && $cache{$pkg}{$id} ne $this; $cache{$pkg}{$id} = $this; modperl_register() unless $registered; } sub uncache { my ($this) = @_; if (ref $this) { delete $cache{ref $this}{$this->id()}; $this->{'__uncached'} = 1; } else { %cache = (); undef $firstload; } } sub removelater { my ($this) = @_; $this->{'__removenow'} = 1; $this->savelater(); } sub remove { my ($this) = @_; $this->removelater() if $this; commit(); } sub savelater { my ($this, $trivial, $code) = @_; confess "attempt to call savelater() from within a presave() or postsave()" if $commit_inprogress == 2; my $id = $this->id(); confess "id not defined" unless defined $id; $tosave{ref $this}{$id} = $this; $this->{'__readonly'} = 0; if ($code) { $this->{'__doatsave'} = [] unless $this->{'__doatsave'}; } if ($trivial) { $this->{'__trivial'} = 1; } else { delete $this->{'__trivial'}; } $this->cache() unless $this->{'OLD'}; check_hash($this); } sub readlock { my ($this) = @_; my $id = $this->id(); confess unless defined $id; $tosave{ref $this}{$id} = $this; $this->{'__readonly'} = 1 unless exists $this->{'__readonly'}; } sub save { my ($this) = @_; $this->savelater() if $this; commit(); } sub transaction_pending { return 1 if %tosave; return 0; } sub transaction { eval { require ObjTransLclCnfg; }; shift if ref $_[0] ne 'CODE'; my ($funcref, @args) = @_; my (%c) = (%cache); my $r; my @r; my $want = wantarray(); my $die = 0; my $count = 0; for(;;) { die if $die; # protect against 'next' et al inside eval $die = 1; eval { if ($want) { @r = &$funcref(@args); } else { $r = &$funcref(@args); } }; if ($@ =~ /^DATACHANGE: file/) { %cache = %c; print STDERR "Restarting transaction: $@" if $warnings; $die = 0; die "Aborting Transaction -- Too many locking failures ($count): $@" if $ObjTransLclCnfg::maxtries && $count++ > $ObjTransLclCnfg::maxtries; redo; } require Carp; Carp::croak $@ if $@; last; } return @r if $want; return $r; } # # One of the changed objects becomes the transaction leader. The state # of the leader determines the state of the entire transaction. # # The leader gets modified twice: first to note the other participants # in the transaction and then later to commit the transaction. # # The other participants also get written twice, but the second writing # happens the next time the object gets loaded, rather than at the time # of the transaction. # my $unlock; my $datachangefailures; sub commit { confess "attemp to call commit() from within a precommit(), presave() or postsave()" if $commit_inprogress; local($commit_inprogress) = 1; return 0 unless %tosave; my @commitlist; my %precommitdone; my $done = 0; while (! $done) { $done = 1; for my $type (keys %tosave) { for my $obj (values %{$tosave{$type}}) { next if $precommitdone{$obj}++; if ($obj->precommit($obj->old)) { $done = 0; } } } } my @savelist; for my $cls (sort keys %tosave) { for my $id (sort keys %{$tosave{$cls}}) { push(@savelist, $tosave{$cls}{$id}); } } $commit_inprogress = 2; if (@savelist == 1) { if ($savelist[0]->{'__removenow'}) { $savelist[0]->_realremove(); } else { $savelist[0]->_realsave(); } %tosave = (); $datachangefailures = 0; return 1; } my $leader = shift(@savelist); $leader->{'__rollback'} = exists $leader->{'OLD'} ? $leader->{'OLD'}{'__frozen'} : Storable::nfreeze { '__removenow' => 1 }; for my $s (@savelist) { die "attemp to save an 'uncached' object" if exists $s->{'__uncached'}; $leader->{'__toremove'}{ref($s)}{$s->id()} = 1 if $s->{'__deletenow'}; next if $s->{'__trivial'}; $leader->{'__transfollowers'}{ref($s)}{$s->id()} = 1; $s->{'__transleader'} = { 'CLASS' => ref($leader), 'ID' => $leader->id(), }; $s->{'__rollback'} = exists $s->{'OLD'} ? $s->{'OLD'}{'__frozen'} : Storable::nfreeze { '__removenow' => 1 }; } delete $leader->{'__readonly'}; if (! -e $leader->file()) { $leader->_realsave(); } _lock $leader->file(); $leader->_realsave(1); for my $s (@savelist) { $s->_realsave(); } delete $leader->{'__transfollowers'}; delete $leader->{'__rollback'}; $leader->_realsave(1); if ($leader->{'__toremove'}) { $leader->_removeall(); $leader->_realsave(1); } _unlock $leader->file(); if (exists $leader->{'__removenow'}) { $leader->_realremove(); } %tosave = (); $datachangefailures = 0; return 1; } my $srand; sub _realsave { my ($this, $keeplock) = @_; my $id = $this->id(); my $file = $this->file($id); my $old = $this->old(); my (@passby) = $this->presave($old); if (defined $old) { _lock $file unless $keeplock; my $frozen = _read_file($file); substr($frozen, 0, length($magic_cookie)) eq $magic_cookie or die "corrupt file: $file"; substr($frozen, 0, length($magic_cookie)) = ''; if ($frozen ne ${$old->{'__frozen'}}) { _unlock_all(); abandon(); uncache(); srand(time ^ ($$ < 5)) unless $srand; $srand = 1; require Time::HiRes; my $st = rand(0.5)*(1.3**$datachangefailures); $st = ($st % 200 + 100) if $st > 300; printf STDERR "DATACHANGE sleep %d for %.2f seconds\n", $$, $st if $warnings; Time::HiRes::sleep($st); printf STDERR "DATACHANGE sleep %d done\n", $$ if $warnings; $datachangefailures++; $firstload = undef; if ($this->{__poison}) { die "Cached object from previous transaction reused"; } $this->{__poison} = 'DATACHANGE'; warn "DATACHANGE: file $file changed out from under $$\n" if $warnings; die "DATACHANGE: file $file changed out from under $$, please retry"; } if ($this->{'__readonly'}) { _unlock $file unless $keeplock; return; } } else { _lock $file unless $keeplock; } delete $this->{'OLD'}; delete $this->{'__readonly'}; my $newfrozen = Storable::nfreeze($this); _write_file("$file.tmp", $magic_cookie, $newfrozen); _lock "$file.tmp"; confess("write failed on $file.tmp") unless -s "$file.tmp"; rename("$file.tmp", $file) or die "rename $file.tmp -> $file: $!"; die unless -e $file; _lockrename("$file.tmp", $file); $this->postsave($old, @passby); if ($file ne $this->file($id)) { # can change sometimes my $new = $this->file($id); File::Flock::lock_rename($file, $new); $file = $new; } _unlock $file unless $keeplock; $this->{'OLD'} = Storable::thaw($newfrozen); $this->{'OLD'}{'__frozen'} = \$newfrozen; } sub _removeall { my ($this) = @_; for my $class (sort keys %{$this->{'__toremove'}}) { for my $id (sort keys %{$this->{'__toremove'}{$class}}) { # will remove as side-effect my $follower = $class->load($id); } } } sub _realremove { my ($this) = @_; _lock $this->file(); $this->preremove(); _unlock $this->file(); unlink($this->file()); $this->postremove(); delete $cache{ref $this}{$this->id()} } sub old { my ($this) = @_; return $this->{'OLD'} if exists $this->{'OLD'}; return undef; } sub check_hash { # Look for references used as hash keys. # XXXX Turn this off in production. my ($hash_ref) = @_; for my $key (keys %{$hash_ref}) { if($key =~ /HASH\(0x[0-9a-f]+\)/) { confess "Hash used as a key; class: " . ref($hash_ref) . "; value: $hash_ref->{$key}\n"; } else { my $val = $hash_ref->{$key}; if(ref($val) eq 'HASH') { check_hash($val); } } } } sub modperl_register { $registered = 1; return unless $ENV{MOD_PERL}; Apache->push_handlers("PerlCleanupHandler", \&modperl_cleanup); } sub modperl_cleanup { $registered = 0; undef %locks; undef %tosave; $datachangefailures = 0; $commit_inprogress = 0; # # This next one is debateable. If we don't clear # out the cache then the process will grow and grow. # If we don't clear out the cache we will have many # more aborted transactions. An in-between setting # is probably necessary. # undef %cache; } 1;