package Forks::Super::Sync::Win32; use strict; use warnings; use Carp; use Time::HiRes; use POSIX ':sys_wait_h'; use Win32::Semaphore; our @ISA = qw(Forks::Super::Sync); our $VERSION = '0.63'; our $NOWAIT_YIELD_DURATION = 250; my @RELEASE_ON_EXIT = (); # Something we have to watch out for is a process dying without # releasing the resources that it possessed. We have three # defences against this issue below. # # 1. call CORE::kill 0, ... to see if other proc is still alive # 2. check $!{EINVAL} (Win) and $!{ESRCH} (Cyg) to see if wait call failed # 3. release resources in a DESTROY block (and remove func, though that # probably doesn't help that much) $Forks::Super::Config::CONFIG{'Win32::Semaphore'} = 1; my $sem_id = 0; sub new { my ($pkg, $count, @initial) = @_; my $self = bless {}, $pkg; $self->{count} = $count; $self->{initial} = [ @initial ]; $self->{id} = $sem_id++; # initial value of 1 means that resource is available $self->{sem} = [ map { Win32::Semaphore->new(1,1) } 1..$count ]; # initial value of 0 means that resource is locked. # after fork (in &releaseAfterFork), parent will release {parent_sync} # and child will release {child_sync} $self->{parent_sync} = Win32::Semaphore->new(0,1,"$^T-$$-p"); $self->{child_sync} = Win32::Semaphore->new(0,1,"$^T-$$-c"); $self->{ppid} = $$; $self->{acquired} = []; $self->{invalid} = []; return $self; } sub releaseAfterFork { my ($self, $childPid) = @_; $self->{childPid} = $childPid; my $label = $$ == $self->{ppid} ? 'P' : 'C'; if ($label eq 'P') { $self->{parent_sync}->release(); $self->{child_sync}->wait(); } elsif ($label eq 'C') { $self->{child_sync}->release(); $self->{parent_sync}->wait(); } for my $i (0 .. $self->{count} - 1) { if ($self->{initial}[$i] ne $label) { $self->release($i); } else { $self->acquire($i,0); } } if ($label eq 'C') { $self->{parent_sync}->release(); $self->{child_sync}->wait(); $self->{child_sync}->release(); } elsif ($label eq 'P') { $self->{child_sync}->release(); $self->{parent_sync}->wait(); $self->{parent_sync}->release(); } return; } # more robust version of Win32::Semaphore->wait. # detects when partner process has died without releasing the semaphore # return true if successfully waited on lock sub _wait_on { my ($self, $n, $expire) = @_; return 1 if !$self->{sem}; my $partner = $$ == $self->{ppid} ? $self->{childPid} : $self->{ppid}; while (1) { local $! = 0; my $nk = CORE::kill 0, $partner; if (!$nk) { carp "sync::_wait_on thinks $partner is gone"; $self->{skip_wait_on} = 1; delete $self->{sem}; return 3; } my $z = $self->{sem} && $self->{sem}[$n]->wait($NOWAIT_YIELD_DURATION); if ($z) { return 1; } # $!{ERROR_BAD_COMMAND} is a Windows thing if ($!{EINVAL} || $!{ESRCH} || $!{ERROR_BAD_COMMAND}) { carp "sync::_wait_on: \$!=$!"; return 2; } elsif ($!) { carp "\$! is ",0+$!," $! ",0+$^E," $^E ", join(",", grep { $!{$_} } sort keys %!), "\n"; } if ($expire > 0 && Time::HiRes::time() >= $expire) { return 0; } waitpid -1, &WNOHANG; } } sub acquire { my ($self, $n, $timeout) = @_; if ($n < 0 || $n >= $self->{count}) { return; } if ($self->{acquired}[$n]) { return -1; } # XXX - need to handle the case where the partner process has died # without releasing a lock my $expire = -1; if (defined $timeout) { $expire = Time::HiRes::time() + $timeout; } my $z = $self->_wait_on($n, $expire); if ($z > 0) { $self->{acquired}[$n] = 1; return 1; } else { $self->{acquired}[$n] = 0; return 0; } } sub release { my ($self, $n) = @_; if ($n < 0 || $n >= $self->{count}) { return; } if (!$self->{acquired}[$n]) { return 0; } $self->{acquired}[$n] = 0; if ($self->{sem} && $self->{sem}[$n]) { local ($!,$^E) = (0,0); my $z = eval { $self->{sem}[$n]->release() }; if ($z) { return $z; } # does carp clear $! or $^E? that's inconvenient my ($e,$E) = (0+$!,0+$^E); carp "Forks::Super::Sync::Win32::release[$n] failed: $!/$^E/$@ // ", "$e/$E"; if ($E == 6 || $E == 126) { # The handle is invalid # The specified module could not be found # XXX - why does this happen? $self->{invalid}[$n] = 1; return -1; } print STDERR "sem[$n] exists but \$!,\$^E are $!,$^E,$e,$E\n"; return; } elsif ($self->{DESTROYING}) { return -1; } elsif ($self->{invalid}[$n]) { return -1; } else { carp "Forks::Super::Sync::Win32: ", "release [$n] called on undefined semaphore"; $self->{invalid}[$n] = 1; return -1; } # return $self->{sem} && $self->{sem}[$n] && $self->{sem}[$n]->release(); } sub remove { my $self = shift; $self->release($_) for 0 .. $self->{count} - 1; $self->{sem} = []; } sub DESTROY { my $self = shift; $self->{DESTROYING} = 1; $self->release($_) for 0 .. $self->{count}-1; $self->{sem} = []; } 1; =head1 NAME Forks::Super::Sync::Win32 - Forks::Super sync object using Win32::Semaphore =head1 SYNOPSIS $lock = Forks::Super::Sync->new(implementation => 'Win32', ...); $pid=fork(); $lock->releaseAfterFork(); if ($pid == 0) { # child code $lock->acquire(...); $lock->release(...); } else { $lock->acquire(...); $lock->release(...); } =head1 DESCRIPTION IPC synchronization object implemented with L. Advantages: fast, doesn't create files or use filehandles Disadvantages: Windows only. =head1 SEE ALSO L =cut