# $Date: 2004/03/25 22:58:20 $ # $Revision: 1.14 $ package Net::Z3950::AsyncZ; our $VERSION = '0.10'; use Net::Z3950::AsyncZ::Options::_params; use Net::Z3950::AsyncZ::Errors; use Net::Z3950::AsyncZ::ZLoop; use Net::Z3950::AsyncZ::ErrMsg; use Event; use POSIX ":sys_wait_h"; use Symbol; use Exporter; use sigtrap qw (die untrapped normal-signals die error-signals); @ISA=qw (Exporter); @EXPORT_OK = qw(asyncZOptions isZ_MARC isZ_GRS isZ_RAW isZ_Error isZ_nonRetryable isZ_Info isZ_DEFAULT noZ_Response isZ_Header isZ_ServerName Z_serverName getZ_RecNum getZ_RecSize delZ_header delZ_pid delZ_serverName prep_Raw get_ZRawRec ); %EXPORT_TAGS = ( record => [qw(isZ_MARC isZ_GRS isZ_RAW isZ_DEFAULT getZ_RecNum getZ_RecSize)], errors => [qw(isZ_Error isZ_nonRetryable)], header => [qw(isZ_ServerName Z_serverName noZ_Response isZ_Header delZ_header delZ_pid delZ_serverName isZ_Info)] ); use IPC::ShareLite qw( LOCK_EX); use strict; my %forkedPID=(); # pids of forked process saved in hash: # keys = pids, values = our indexes to forked processes # these deleted when fork data is processed # if there are no keys left in hash, then the timer loop exits my %exitCode=(); # saves exit codes of forked processes # keys = pids, values = exit codes # processes without 0 values are killed in DESTROY to prevent zombies my %resultTable = (); # saves pids, hosts and report results of child processes # keys = pids, values = [ host, report_results, index, retry_index ] # # SLOT 0 host server address # SLOT 1 report results: boolean = true if report, false if not # SLOT 2 index of process in current cycle (original or retry) # SLOT 3 retry_index: # # -1, -2 or index of process retrying a failed query # # initialized to -1 in original cycle,-2 in retry cycle # (a positive retry_index replaces original cycle's -1) # # the retry_index is always -2 in the retry process: # my $__DBUG = 0; my $_ERROR_VAL = Net::Z3950::AsyncZ::Errors::errorval(); $SIG{CHLD} = \&childhandler; sub asyncZOptions { return Net::Z3950::AsyncZ::Options::_params->new(@_); } sub isZ_Header { $_[0] =~ Net::Z3950::AsyncZ::Report::get_pats(); } sub isZ_MARC { $_[0] =~ Net::Z3950::AsyncZ::Report::get_MARC_pat(); } sub isZ_GRS { $_[0] =~ Net::Z3950::AsyncZ::Report::get_GRS_pat(); } sub isZ_RAW { $_[0] =~ Net::Z3950::AsyncZ::Report::get_RAW_pat(); } sub isZ_DEFAULT { $_[0] =~ Net::Z3950::AsyncZ::Report::get_DEFAULT_pat(); } sub getZ_RecNum { $_[0] =~ /\s(\d+)\]/; $1; } sub _setupUTF8 { return if is_utf8_init(); local $^W = 0; eval { require MARC::Charset; }; local $^W = 1; if ($@) { warn "UTF8 requires MARC::Charset\n"; return 0; } set_uft8_init(); return 1; } # params: string or ref to string # boolean: true, then substitution uses 'g' modifier # substitution string # if subst string is not defined, empty string is substituted # return: either string or reference to string, depending on whether a reference or a string # was intially passed in paramter $_[0] sub delZ_header { my($str,$g, $subst) = @_; my $pat = Net::Z3950::AsyncZ::Report::get_pats(); return _del_headers($str,$pat, $g, $subst); } # see delZ_header sub delZ_pid { my($str,$g, $subst) = @_; return _del_headers($str,'<#--\d+-->', $g, $subst); } # see delZ_header sub delZ_serverName { my($str,$g, $subst) = @_; return _del_headers($str,'', $g, $subst); } sub _del_headers { my $str = ref $_[0] ? ${$_[0]} : $_[0]; my $pat = $_[1]; my $g = $_[2]; my $subst = (defined $_[3]) ? $_[3] : ""; if($g) { $str =~ s/$pat/$subst/g; } else { $str =~ s/$pat/$subst/; } return \$str if ref $_[0]; return $str; } # make string from array, return ref to string # param: array of raw records sub prep_Raw { my $raw = shift; my $str = join "",@$raw; $raw = delZ_header(\$str); # will get back ref to string $raw = delZ_pid($raw,1); # passing ref will get back ref $raw = delZ_serverName($raw,1); $raw = delZ_header($raw,1,''); return $raw; } # param: ref to string of raw records # return next record sub get_ZRawRec { my $raw = shift; return undef if ! $raw; if ($$raw !~ //) { # presumed last record my $rec = $$raw; $$raw = ""; return $rec; } $$raw =~ s/(.*?)//; return $1; } # tests whether line is our substitue for absence of Report: # {!-- library.anu.edu.au --} # It reports previous server's name in curlies, substituted for angle brackets # (like HTML comment) which hold server name in header of each report item sub noZ_Response { $_[0]=~/\{!--\s+.*\s+--\}/; } # tests if line contains server name sub isZ_ServerName { $_[0] =~ //; } sub isZ_PID { $_[0] =~ /<#--\d+-->/; } sub isZ_Info { &isZ_PID || &noZ_Response; } # returns server name sub Z_serverName { if( $_[0] =~ //){ return $1 if $1; } return undef; } # returns 0 if not an error # returns 2 if cycle 2 error # returns 1 if non-recoverable cycle 1 error sub isZ_Error { my $err = shift; return 0 if !$err; return 2 if $err->[0] && $err->[1]; return 1 if $err->[0] && !$err->[0]->{retry}; return 0; } # tests return value of isZ_Error() # returns true if the error was a cycle 1 fatal error sub isZ_nonRetryable { $_[0] == 1; } { my @results=(); my @errors=(); my @recSize = (); my $busy = 0; my $utf8_init = 0; sub is_utf8_init { $utf8_init; } sub set_uft8_init { $utf8_init = 1; } sub _utf8 { my $index = shift; _setupUTF8() if !$utf8_init; return if !$utf8_init; my $cs = MARC::Charset->new(); for(my $i = 0; $i < scalar(@{$results[$index]}); $i++) { $results[$index]->[$i] = $cs->to_utf8($results[$index]->[$i]); } } sub _saveResults { $busy = 1; my ($arr, $index) = @_; $results[$index] = $arr; $busy = 0; } sub _saveErrors { @errors = @_; } sub _isBusy { return $busy; } # returns reference to results array sub getResult { my ($self,$index) = @_; _utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8'); return $results[$index]; } sub getZ_RecSize { $recSize[$_[0]]; } sub getErrors { my ($self,$index) = @_; return [$errors[$index]->[0], $errors[$index]->[1]] if $errors[$index]; return undef; } sub getMaxErrors { return scalar @errors; } sub _callback { $busy = 1; my ($self, $index) = @_; _utf8($index) if $self->{options}[$index]-> _getFieldValue('utf8'); my $cb = $self->{options}[$index]-> _getFieldValue('cb'); $cb = $self->{cb} if !$cb; my $last_el = scalar(@{$results[$index]})-1; my $size = $results[$index]->[$last_el]; $size =~ /\*==(\d+)==\*/; $recSize[$index] = $1 ? $1 : 0; $results[$index]->[$last_el] =~s/\*==(\d+)==\*//; &$cb($index, $results[$index]) if $cb; $busy = 0; } } #-------------------------------------------------------------------# # private paramaters: # start: start time for timers # zl: array of forked processes # errors: reference to Net::Z3950::AsyncZ::Errors object for main process # share: reference to IPC::ShareLite # timer: reference to timer watcher # unlooped: notifies DESTROY when all pipes have been processed, # because DESTROY is called for each closed pipe--hence # makes it safe to do cleanup that applies to main process # monitor_pid: pid of the monitor, for killing it #--------------------------------------------------------------------# sub new { my($class, %args) = @_; my $index = 0; my $self = { start => time(), zl => [], query=>$args{query}, errors=>undef, log=>$args{log} || undef, cb=>$args{cb}, timer => undef, timeout=>$args{timeout} || 25, timeout_min=>$args{timeout_min} || 5, interval => $args{interval} || 1, servers=>$args{servers}, options=>$args{options}, unlooped=>0, maxpipes=>$args{maxpipes} || 4, share => undef, monitor => 0 || $args{monitor}, monitor_pid=>undef, swap_check => $args{swap_check} || 0, swap_attempts => $args{swap_attempts} || 5 }; bless $self,$class; $self->{ errors } = Net::Z3950::AsyncZ::Errors->new($self->{log}); %forkedPID=(); %exitCode=(); %resultTable = (); my $incr = $self->{maxpipes}; $self->{share} = new IPC::ShareLite( -key => $$ + 5000, -create => 'yes', -destroy => 'yes'); $self->{monitor_pid} = $self->_monitor() if $self->{monitor}; $SIG{HUP} = sub { $self->{abort} = 1; $self->{unlooped} = 1; # notify DESTROY that it's safe to kill outstanding processes $! = 227; die "Aborting." }; $self->processHosts(-1,%args); # retry servers that returned without error fatal codes my @retries = $self->_getReTries(); $args{'servers'} = \@retries; $self->{'servers'} = $args{'servers'}; $self->processHosts(-2, %args); $self->_showStats(\%resultTable) if $__DBUG; $self->_processErrors(); kill KILL => $self->{monitor_pid} if $self->{monitor}; $self->{share} = undef; return $self; } sub processHosts { my ($self, $retry_marker, %args) = @_; my $index = 0; my $count = 0; $self->{unlooped} = 0; $self->{start} = time(); %forkedPID=(); foreach my $server(@{$args{servers}}) { $self->{server} = $server; $self->{options}[$index] = Net::Z3950::AsyncZ::Options::_params->new(format=>$args{format}, num_to_fetch=>$args{num_to_fetch}) if ! defined $self->{options}[$index]; $self->{options}[$index]->option(_this_server=>$server->[0]); $self->start($index, $retry_marker); if($count == $self->{maxpipes}) { my $mem_avail = $self->{swap_check} ? 0 : 1; my $attempts = 0; while(!$mem_avail) { $mem_avail = is_mem_left(); if (!$mem_avail){ my $start_t = time(); Event->timer(at => time+$self->{swap_check},cb => sub { $_[0]->w->cancel;} ); Event::loop; # print STDERR "(swap-check) slept: ", time()-$start_t,"\n" if $__DBUG; } $attempts++; # print STDERR "(swap-check) attempts: $attempts\n" if $__DBUG;; die "Memory resources appear to be too low to continue;\n", "try settng the swap_check to a higher value and or", "allowing for more than $self->{swap_attempts} swap_attempts\n" if $attempts > $self->{swap_attempts}; } $self->{timer} = Event->timer(interval => $self->{interval}, hard=>1, cb=> sub { $self->timerCallBack(); } ); Event::loop(); $count = -1; } $index++; $count++; } # if there are any servers left to wait for, get another loop if(scalar (@{$args{servers}})%$self->{maxpipes} != 0) { $self->{timer} = Event->timer(interval => $self->{interval}, hard=>1, cb=> sub { $self->timerCallBack(); } ); Event::loop(); } $self->{unlooped} = 1; } sub _getReTries { my $self = shift; my @retries=(); my $count=0; foreach my $pid (keys %resultTable) { if($resultTable{$pid}->[1] == 0) { my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid}); ## created for testing only next if !$err->doRetry(); ## not being saved my $index = $resultTable{$pid}->[2]; push @retries, $self->{servers}[$index]; $self->{options}[$count] = $self->{options}[$index]; $resultTable{$pid}->[3] = $count; # save retry index $count++; } } return @retries; } sub start { my $self=shift; return if defined $self->{abort}; my $index = shift; my $retry_marker = shift; my $pid; if($pid = fork) { $forkedPID{$pid} = $index; $exitCode{$pid} = -1; $resultTable{$pid}->[0] = @{$self->{servers}[$index]}[0]; # server name $resultTable{$pid}->[1] = 0; # report = false $resultTable{$pid}->[2] = $index; # current index $resultTable{$pid}->[3] = $retry_marker; # retry index print "process $index: \$pid = $pid $resultTable{$pid}->[0] @{$self->{servers}[$index]}[1] @{$self->{servers}[$index]}[2]\n" if $__DBUG; } else { die "Server cannot handle your request at this time" unless defined $pid; $self->{share}->destroy(0); my $update = $self->{options}[$index]->_updateObjectHash($self); my $query = $update->{query} ? $update->{query} : $self->{query}; my $log = $update->{log} ? $update->{log} : $self->{log}; $self->{options}[$index]->_setFieldValue('_this_pid', $$); my $zerrs = Net::Z3950::AsyncZ::Errors->new($log, @{$self->{server}}[0], $query, $self->{options}[$index]->get_preferredRecordSyntax(), @{$self->{server}}[2] ); $self->{zl}[$index] = Net::Z3950::AsyncZ::ZLoop->new(@{$self->{server}},$query,$self->{options}[$index]); $self->{zl}[$index]->setTimer($self->{interval}); my $host = @{$self->{servers}[$index]}[0]; if ($self->{zl}[$index]->{report} && $self->{share}) { push @{$self->{zl}[$index]->{report}}, "*==" . $self->{zl}[$index]->{rsize} . "==*\n"; $self->{share}->store(join '',@{$self->{zl}[$index]->{report}}); } elsif ($self->{share}) { $self->{share}->store(""); } else { exit (Net::Z3950::AsyncZ::ErrMsg::_EINVAL()); } exit 0; } } { my $in_getResult = 0; sub _gettingResult { $in_getResult; } sub _getResult { $in_getResult = 1; my ($self, $pid) = @_; exit (Net::Z3950::AsyncZ::ErrMsg::_EINVAL()) if !$self->{share}; $self->{share}->lock(LOCK_EX); while(_isBusy()) { } my $data = $self->{share}->fetch(); return if !$data; # presumably should never occur # but it happened once and split doesn't # complain about splitting an undefined value my @data = split "\n", $data; $data[0] =~ //; my $host = $1; $self->{share}->store("\{!\-\- $host \-\-\}") if $host; $data[1] =~ /<#--(\d+)-->/ if $data[1]; my $_this_pid = $1 if $1; $resultTable{$_this_pid}->[1] = 1 if $_this_pid && exists $resultTable{$_this_pid}; splice(@data,1,1); $pid = $_this_pid if $_this_pid; my $index = _getIndex($pid); while(_isBusy()) { } _saveResults(\@data, $index); while(_isBusy()) { } $self->_callback($index); # if $self->{cb}; $self->{share}->unlock; $in_getResult = 0; } } sub _getIndex { my $pid = shift; return $resultTable{$pid}->[2] if $resultTable{$pid}->[3] == -1; # cycle 1, no retry index my $current_index = $resultTable{$pid}->[2]; # this process's index, from either cycle foreach $pid (keys %resultTable) { # if this retry index == $current_index, return $resultTable{$pid}->[2] # $current_index must be a cycle 2 index if $resultTable{$pid}->[3] == $current_index; # and this table entry is cycle 1 entry } return $resultTable{$pid}->[2]; # default: returns cycle 1 or 2 index } sub allDone { foreach my $pid (keys %exitCode) { return 0 if $exitCode{$pid} == -1; } return 1; } sub timerCallBack { my $self=shift; my $Seconds = time(); foreach my $pid (keys %forkedPID) { while (_gettingResult()) { } $self->_getResult($pid), delete $forkedPID{$pid} if $exitCode{$pid} == 0; } my $endval = $Seconds - $self->{start}; if ($endval > $self->{timeout} || allDone() ) { $self->{timer}->cancel(); Event::unloop(); } } sub _processErrors { my $self = shift; my %cycle_1 = (); my %cycle_2 = (); my @errors = (); my $_count = 0; $__DBUG =0; print "\n\nProcessing Errors\n" if $__DBUG; foreach my $pid (keys %resultTable) { $cycle_2{$pid} = $resultTable{$pid}, next if $resultTable{$pid}->[1] == 0 && $resultTable{$pid}->[3] == -2; $cycle_1{$pid} = $resultTable{$pid} if $resultTable{$pid}->[1] == 0; } print "\nCycle 1\n" if $__DBUG; $self->_showStats(\%cycle_1) if $__DBUG; foreach my $pid_1 (keys %cycle_1) { my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid_1}); my $index = _getIndex($pid_1); $errors[$index]->[0] = $err; print $pid_1, " " if $__DBUG; $self->_printError($err) if $__DBUG; } print "\nCycle 2\n" if $__DBUG; $self->_showStats(\%cycle_2) if $__DBUG; foreach my $pid_2 (keys %cycle_2) { my $err = Net::Z3950::AsyncZ::ErrMsg->new($exitCode{$pid_2}); my $index = _getIndex($pid_2); $errors[$index]->[1] = $err; print $pid_2, " " if $__DBUG; $self->_printError($err) if $__DBUG; } _saveErrors(@errors); $__DBUG =0; } sub _printError { my $self = shift; my $err = shift; my $errno = $err->{errno}; my $num = sprintf( "[%3d]",$errno); print "$num "; print $err->{msg} if $err->{msg}; print " NET" if $err->isNetwork(); print " SYSTEM" if $err->isSystem(); print " TRY AGAIN" if $err->isTryAgain(); print " SUCCESS" if $err->isSuccess(); print " --Z3950 ERROR " if $err->isZ3950(); print " --RETRY " if $err->doRetry(); print "\n"; } sub childhandler { while((my $retv = waitpid(-1,WNOHANG))>0) { $exitCode{$retv} = $? >> 8; $? = $exitCode{$retv}, die if Net::Z3950::AsyncZ::ErrMsg::_abort($exitCode{$retv}); } $SIG{CHLD} = \&childhandler; } use Carp; sub DESTROY { my $self = shift; # Because each process uses this DESTROY method, we have to # wait for the main loop to end before closing its error log # and before killing any potential zombie processes return if !$self->{unlooped}; print "DESTROY\n" if $__DBUG; foreach my $pid (keys %exitCode) { if( kill 0 => $pid) { kill 9 => $pid if ($exitCode{$pid} < 0 || $exitCode{$pid} > 0); print "killing $pid\n" if ($exitCode{$pid} < 0 || $exitCode{$pid} > 0) && $__DBUG; } } kill KILL => $self->{monitor_pid} if $self->{monitor}; $self->{share} = undef if defined $self->{share}; sleep(1); # allow time for remaining killed processes to be reaped } sub _monitor { my $self = shift; $SIG{ALRM} = sub { my $pid = getppid(); # print "killing: $pid\n"; kill HUP => $pid; kill KILL => $$; }; my $pid; if($pid = fork) { return $pid; } else { die "Unable to fork" unless defined $pid; alarm($self->{monitor}); while (1) { sleep(10); } } } sub is_mem_left { my $vmstat; if($^O =~ /linux/) { $vmstat = "vmstat 1 3 | "; } else { $vmstat = "vmstat -S 1 3| "; } open VMSTAT, $vmstat or die "can't open vmstat"; my (@si,@so,$si_index,$so_index,@fields); my $count=0; while() { sleep(1); # helps to insure that vmstat produces 3 lines of output s/^\s*// and s/\s*$//; s/\s+/;/g; if(/si/i && /so/i) { @fields = split /;/; for(my $i=0; $i< scalar @fields; $i++) { $si_index = $i if $fields[$i] =~ /^si$/i; $so_index = $i if $fields[$i] =~ /^so$/i; } } elsif(/\d/) { @fields = split /;/; $si[$count] = $fields[$si_index]; $so[$count] = $fields[$so_index]; $count++; } } close VMSTAT; sleep 3 and return 1 if $count < 2; # fix for when vmstat returns after only one cycle return 0 if abs($si[2] - $si[1]) >= 20; return 0 if abs($so[2] - $so[1]) >= 20; return 1; } 1; __END__ sub _showStats { my $self = shift; my $table = shift; print "\nStats\n"; foreach my $pid (keys %$table) { print "$pid:\t"; print "$table->{$pid}->[0]", " result: $table->{$pid}->[1]", " index: $table->{$pid}->[2]", " retry index: $table->{$pid}->[3]"; print "\texit code: $exitCode{$pid}\n" if exists $exitCode{$pid}; } }