#/** # Thread-safe request/response queue with identifiable elements. # Provides methods for N threads to queue items to other threads, and # then wait only for responses to specific queued items. #

# Note: this object is derived from an threads::shared arrayref # to optimize performance. #

# Licensed under the Academic Free License version 2.1, as specified in the # License.txt file included in this software package, or at # OpenSource.org. # # @author D. Arnold # @since 2005-12-01 # @self $obj # @see Thread::Queue::Queueable # @exports $tqd_global_lock global threads::shared variable for locking # @exports TQD_Q object field index of threads::shared array used for the queue # @exports TQD_MAP object field index of threads::shared hash mapping queue request IDs to requests/responses # @exports TQD_IDGEN object field index of threads::shared scalar integer used to generate request IDs # @exports TQD_LISTENERS object field index of threads::shared scalar integer count of current listeners # @exports TQD_REQUIRE_LISTENER object field index of threads::shared scalar flag indicating if listeners are required before permitting an enqueue operation # @exports TQD_MAX_PENDING object field index of threads::shared scalar integer max number of pending requests before an enqueue will block # @exports TQD_URGENT_COUNT object field index of threads::shared scalar integer count of current urgent requests # @exports TQD_MARKS object field index of threads::shared hash mapping request IDs of marked requests #*/ package Thread::Queue::Duplex; # # Copyright (C) 2005,2006, Presicient Corp., USA # require 5.008; use threads; use threads::shared; use Thread::Queue::Queueable; use Thread::Queue::TQDContainer; use Exporter; use base qw(Exporter Thread::Queue::Queueable Thread::Queue::TQDContainer); BEGIN { use constant TQD_Q => 0; use constant TQD_MAP => 1; use constant TQD_IDGEN => 2; use constant TQD_LISTENERS => 3; use constant TQD_REQUIRE_LISTENER => 4; use constant TQD_MAX_PENDING => 5; use constant TQD_URGENT_COUNT => 6; use constant TQD_MARKS => 7; our @EXPORT = (); # we export nothing by default our @EXPORT_OK = qw($tqd_global_lock); our %EXPORT_TAGS = ( tqd_codes => [ qw/TQD_Q TQD_MAP TQD_IDGEN TQD_LISTENERS TQD_REQUIRE_LISTENER TQD_MAX_PENDING TQD_URGENT_COUNT TQD_MARKS/ ]); Exporter::export_tags(keys %EXPORT_TAGS); } use strict; use warnings; # # global semaphore used for class-level wait() # notification # our $tqd_global_lock : shared = 0; our $tqd_debug : shared = 0; our $VERSION = '0.92'; #/** # Constructor. Creates a new empty queue, and associated mapping hash. # # @param ListenerRequired boolean value indicating if registered listener # required before enqueue is permitted. # @param MaxPending positive integer maximum number of pending requests; # enqueue attempts will block until the pending count # drops below this value. The limit may be applied or modified later # via the set_max_pending() method. # A value of zero indicates no limit. # # @return Thread::Queue::Duplex object #*/ sub new { my $class = shift; $@ = 'Invalid argument list', return undef if (scalar @_ && (scalar @_ & 1)); my %args = @_; foreach (keys %args) { $@ = 'Invalid argument list', return undef unless ($_ eq 'ListenerRequired') || ($_ eq 'MaxPending'); $@ = 'Invalid argument list', return undef if (($_ eq 'MaxPending') && defined($args{$_}) && (($args{$_}!~/^\d+/) || ($args{$_} < 0))); } my $idgen : shared = 1; my $listeners : shared = 0; my $max_pending : shared = $args{MaxPending} || 0; my $urgent_count : shared = 0; my %marks : shared = (); my @obj : shared = ( &share([]), &share({}), \$idgen, \$listeners, $args{ListenerRequired}, \$max_pending, \$urgent_count, \%marks ); return bless \@obj, $class; } #/** # Register as a queue listener. Permits "ListenerRequired" # queues to accept requests when at least one listener # has registered. # # @return Thread::Queue::Duplex object #*/ sub listen { my $obj = shift; lock(${$obj->[TQD_LISTENERS]}); ${$obj->[TQD_LISTENERS]}++; cond_broadcast(${$obj->[TQD_LISTENERS]}); return $obj; } #/** # Deregister as a queue listener. When all listeners # deregister, a "ListenerRequired" queue will no longer # accept new requests until a listener registers via # listen() # # @return Thread::Queue::Duplex object #*/ sub ignore { my $obj = shift; lock(${$obj->[TQD_LISTENERS]}); ${$obj->[TQD_LISTENERS]}-- if ${$obj->[TQD_LISTENERS]}; return $obj; } #/** # Wait until a listener has registered. # # @param $timeout (optional) number of seconds to wait for a listener. # # @return Thread::Queue::Duplex if a listener is registered; undef otherwise. #*/ sub wait_for_listener { my ($obj, $timeout) = shift; my $listeners = $obj->[TQD_LISTENERS]; lock($$listeners); return undef if ($timeout && ($timeout < 0)); if ($timeout) { $timeout += time(); cond_timedwait($$listeners, $timeout) while (!$$listeners) && ($timeout > time()); return $$listeners ? $obj : undef; } cond_wait($$listeners) while (!$$listeners); return $$listeners ? $obj : undef; } # # common function for build enqueue list # sub _filter_nq { my $id = shift; my @params : shared = ($id, (undef) x (scalar @_ << 1)); # # marshall params, checking for Queueable objects # my $i = 1; foreach (@_) { @params[$i..$i+1] = (ref $_ && (ref $_ ne 'ARRAY') && (ref $_ ne 'HASH') && (ref $_ ne 'SCALAR') && $_->isa('Thread::Queue::Queueable')) ? $_->onEnqueue() : (undef, $_); # # invoke onEnqueue method # # $params[$i] = ref $_; # $params[$i+1] = $_->onEnqueue(); # } # else { # @params[$i..$i+1] = (undef, $_); # } $i += 2; } return \@params; } sub _lock_load { my ($obj, $params, $urgent) = @_; lock(@{$obj->[TQD_Q]}); # # check current length if we have a limit # while (${$obj->[TQD_MAX_PENDING]} && (${$obj->[TQD_MAX_PENDING]} <= scalar @{$obj->[TQD_Q]})) { # print "pending before: ", scalar @{$obj->[TQD_Q]}, "\n"; cond_wait(@{$obj->[TQD_Q]}); # print "pending after: ", scalar @{$obj->[TQD_Q]}, "\n"; } if ($urgent) { unshift @{$obj->[TQD_Q]}, $params; ${$obj->[TQD_URGENT_COUNT]}++; } else { push @{$obj->[TQD_Q]}, $params; } cond_signal @{$obj->[TQD_Q]}; 1; } sub _get_id { my $obj = shift; lock(${$obj->[TQD_IDGEN]}); my $id = ${$obj->[TQD_IDGEN]}++; # # rollover, just in case...not perfect, # but good enough # ${$obj->[TQD_IDGEN]} = 1 if (${$obj->[TQD_IDGEN]} > 2147483647); return $id; } #/** # Enqueue a request to the tail of the queue. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Request ID if successful; undef if ListenerRequired and no listeners # are registered #*/ sub enqueue { my $obj = shift; return undef if ($obj->[TQD_REQUIRE_LISTENER] && (! ${$obj->[TQD_LISTENERS]})); my $id = $obj->_get_id; my $params = _filter_nq($id, @_); _lock_load($obj, $params); lock($tqd_global_lock); cond_broadcast($tqd_global_lock); return $id; } #/** # Enqueue a request to the head of the queue. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Request ID if successful; undef if ListenerRequired and no listeners # are registered #*/ sub enqueue_urgent { my $obj = shift; return undef if ($obj->[TQD_REQUIRE_LISTENER] && (! ${$obj->[TQD_LISTENERS]})); my $id = $obj->_get_id; my $params = _filter_nq($id, @_); _lock_load($obj, $params, 1); lock($tqd_global_lock); cond_broadcast($tqd_global_lock); return $id; } # # blocking versions of enqueue() # #/** # Enqueue a request to the tail of the queue, and wait for the response. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Response structure if successful; undef if ListenerRequired and no listeners # are registered #*/ sub enqueue_and_wait { my $obj = shift; my $id = $obj->enqueue(@_); return undef unless defined($id); return $obj->wait($id); } #/** # Enqueue a request to the tail of the queue, and wait up to $timeout seconds # for the response. # # @param $timeout number of seconds to wait for a response # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Response structure if successful; undef if ListenerRequired and no listeners # are registered, or if no response is received within the specified $timeout #*/ sub enqueue_and_wait_until { my $obj = shift; my $timeout = shift; my $id = $obj->enqueue(@_); return undef unless defined($id); return $obj->wait_until($id, $timeout); } #/** # Enqueue a request to the head of the queue, and wait up to $timeout seconds # for the response. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Response structure if successful; undef if ListenerRequired and no listeners # are registered #*/ sub enqueue_urgent_and_wait { my $obj = shift; my $id = $obj->enqueue_urgent(@_); return undef unless defined($id); return $obj->wait($id); } #/** # Enqueue a request to the head of the queue, and wait up to $timeout seconds # for the response. # # @param $timeout number of seconds to wait for a response # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Response structure if successful; undef if ListenerRequired and no listeners # are registered, or if no response is received within the specified $timeout #*/ sub enqueue_urgent_and_wait_until { my $obj = shift; my $timeout = shift; my $id = $obj->enqueue_urgent(@_); return undef unless defined($id); return $obj->wait_until($id, $timeout); } # # Simplex versions # #/** # Enqueue a simplex request to the tail of the queue. Simplex requests # do not generate responses. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Thread::Queue::Duplex object if successful; undef if ListenerRequired and # no listeners are registered #*/ sub enqueue_simplex { my $obj = shift; return undef if ($obj->[TQD_REQUIRE_LISTENER] && (! ${$obj->[TQD_LISTENERS]})); my $params = _filter_nq(undef, @_); _lock_load($obj, $params); lock($tqd_global_lock); cond_broadcast($tqd_global_lock); return $obj; } #/** # Enqueue a simplex request to the head of the queue. Simplex requests # do not generate responses. # # @param @args the request. Request values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Thread::Queue::Duplex object if successful; undef if ListenerRequired and # no listeners are registered #*/ sub enqueue_simplex_urgent { my $obj = shift; return undef if ($obj->[TQD_REQUIRE_LISTENER] && (! ${$obj->[TQD_LISTENERS]})); my $params = _filter_nq(undef, @_); _lock_load($obj, $params, 1); lock($tqd_global_lock); cond_broadcast($tqd_global_lock); return $obj; } # # recover original param list, including reblessing Queueables # sub _filter_dq { my $result = shift; # # keep ID; collapse the rest # my @results = (shift @$result); my $class; $class = shift @$result, push (@results, $class ? ${class}->onDequeue(shift @$result) : shift @$result) while (@$result); return \@results; } #/** # Dequeue the next request. Waits until a request is available before # returning. # # @return arrayref of request values. The request ID is the first element # in the returned array. #*/ sub dequeue { my $obj = shift; my $request; my $q = $obj->[TQD_Q]; while (1) { # # lock order is important here # lock(@$q); cond_wait @$q until scalar @$q; $request = shift @$q; # print threads->self()->tid(), " dequeue\n"; ${$obj->[TQD_URGENT_COUNT]}-- if ${$obj->[TQD_URGENT_COUNT]}; # # cancelled request ? # next if ($request->[0] && ($request->[0] == -1)); # # check for cancel # { lock(%{$obj->[TQD_MAP]}); delete $obj->[TQD_MAP]{$request->[0]}, next if ($request->[0] && exists $obj->[TQD_MAP]{$request->[0]}); } # # signal any waiters # cond_broadcast @{$obj->[TQD_Q]}; last; } return _filter_dq($request); } #/** # Dequeue the next request. Waits until a request is available, or up to # $timeout seconds, before returning. # # @param $timeout number of seconds to wait for a request # # @return undef if no request available within $timeout seconds. Otherwise, # arrayref of request values. The request ID is the first element # in the returned array. #*/ sub dequeue_until { my ($obj, $timeout) = @_; return undef unless $timeout && ($timeout > 0); $timeout += time(); my $request; while (1) { lock(@{$obj->[TQD_Q]}); print STDERR "dq_until...\n" if $tqd_debug; cond_timedwait(@{$obj->[TQD_Q]}, time() + 1) while (($timeout > time()) && (! scalar @{$obj->[TQD_Q]})); print STDERR "dq_until done...\n" if $tqd_debug; # # if none, then we must've timed out # return undef unless scalar @{$obj->[TQD_Q]}; $request = shift @{$obj->[TQD_Q]}; # print threads->self()->tid(), " dequeue_until\n"; ${$obj->[TQD_URGENT_COUNT]}-- if ${$obj->[TQD_URGENT_COUNT]}; # # cancelled request ? # next if ($request->[0] && ($request->[0] == -1)); # # check for cancel # { lock(%{$obj->[TQD_MAP]}); delete $obj->[TQD_MAP]{$request->[0]}, next if ($request->[0] && exists $obj->[TQD_MAP]{$request->[0]}); } # # signal any waiters # cond_broadcast @{$obj->[TQD_Q]}; last; } return _filter_dq($request); } #/** # Dequeue the next request. Returns immediately if no request is available. # # @return undef if no request available; otherwise, # arrayref of request values. The request ID is the first element # in the returned array. #*/ sub dequeue_nb { my $obj = shift; my $request; while (1) { lock(@{$obj->[TQD_Q]}); return undef unless scalar @{$obj->[TQD_Q]}; $request = shift @{$obj->[TQD_Q]}; # print threads->self()->tid(), " dequeue_nb\n"; ${$obj->[TQD_URGENT_COUNT]}-- if ${$obj->[TQD_URGENT_COUNT]}; # # cancelled request ? # next if ($request->[0] && ($request->[0] == -1)); # # check for cancel (ie, the request is already in the map) # { lock(%{$obj->[TQD_MAP]}); delete $obj->[TQD_MAP]{$request->[0]}, next if ($request->[0] && exists $obj->[TQD_MAP]{$request->[0]}); } # # signal any waiters # cond_broadcast @{$obj->[TQD_Q]}; last; } return _filter_dq($request); } #/** # Dequeue the next urgent request. Waits until an urgent request is available before # returning. # # @return arrayref of request values. The request ID is the first element # in the returned array. #*/ sub dequeue_urgent { my $obj = shift; my $request; while (1) { lock(@{$obj->[TQD_Q]}); return undef unless (scalar @{$obj->[TQD_Q]}) && ${$obj->[TQD_URGENT_COUNT]}; $request = shift @{$obj->[TQD_Q]}; ${$obj->[TQD_URGENT_COUNT]}-- if ${$obj->[TQD_URGENT_COUNT]}; # # cancelled request ? # next if ($request->[0] && ($request->[0] == -1)); # # check for cancel # { lock(%{$obj->[TQD_MAP]}); delete $obj->[TQD_MAP]{$request->[0]}, next if ($request->[0] && exists $obj->[TQD_MAP]{$request->[0]}); } # # signal any waiters # cond_broadcast @{$obj->[TQD_Q]}; last; } return _filter_dq($request); } #/** # Report the number of pending requests. # # @return number of requests remaining in the queue. #*/ sub pending { my $obj = shift; lock(@{$obj->[TQD_Q]}); lock(%{$obj->[TQD_MAP]}); my $p = scalar @{$obj->[TQD_Q]}; my $i; foreach (@{$obj->[TQD_Q]}) { next unless $_ && ref $_ && (ref $_ eq 'ARRAY') && $_->[0]; # # NOTE: this intermediate assignment is required for no apparent # reason in order to keep from getting "Free to wrong pool" aborts # $i = $_->[0]; $p-- if ($i == -1) || exists($obj->[TQD_MAP]{$i}); } return $p; } #/** # Set maximum number of pending requests permitted. Signals any # currently threads which may be blocked waiting for the number # of pending requests to drop below the maximum permitted. # # @param $limit positive integer maximum number of pending requests permitted. # A value of zero indicates no limit. # # @return Thread::Queue::Duplex object #*/ sub set_max_pending { my ($obj, $limit) = @_; $@ = 'Invalid limit.', return undef unless (defined($limit) && ($limit=~/^\d+/) && ($limit >= 0)); lock(@{$obj->[TQD_Q]}); ${$obj->[TQD_MAX_PENDING]} = $limit; # # wake up anyone whos been waiting for queue to change # cond_broadcast(@{$obj->[TQD_Q]}); return $obj; } # # common function for building response list # sub _create_resp { my @params : shared = ((undef) x (scalar @_ << 1)); # # marshall params, checking for Queueable objects # my $i = 0; foreach (@_) { if (ref $_ && (ref $_ ne 'ARRAY') && (ref $_ ne 'HASH') && (ref $_ ne 'SCALAR') && $_->isa('Thread::Queue::Queueable')) { # # invoke onEnqueue method # $params[$i] = ref $_; $params[$i+1] = $_->onEnqueue(); } else { @params[$i..$i+1] = (undef, $_); } $i += 2; } return \@params; } #/** # Post a response to a request. If the request has been cancelled, # the response is discarded; otherwise, all threads blocked on the # queue are signalled that a new response is available. # # @param $id the ID of the request being responded to. # @param @response the response. Response values must be either scalars, # references to threads::shared variables, or Thread::Queue::Queueable # objects # # @return Thread::Queue::Duplex object #*/ sub respond { my $obj = shift; my $id = shift; # # silently ignore response to a simplex request # return $obj unless defined($id); my $result = _create_resp(@_); { lock(%{$obj->[TQD_MARKS]}); delete $obj->[TQD_MARKS]{$id}; print STDERR "respond: locking for $id at ", time(), "\n" if $tqd_debug; lock(%{$obj->[TQD_MAP]}); print STDERR "respond: locked for $id at ", time(), "\n" if $tqd_debug; # # check if its been canceled # _cancel_resp($result), return $obj if exists $obj->[TQD_MAP]{$id}; $obj->[TQD_MAP]{$id} = $result; # cond_signal %{$obj->[TQD_MAP]}; cond_broadcast(%{$obj->[TQD_MAP]}); } lock($tqd_global_lock); cond_broadcast($tqd_global_lock); return $obj; } # # common function for filtering response list # sub _filter_resp { my ($obj, $result) = @_; # # collapse the response elements # my @results = (); my $class; $class = shift @$result, push (@results, $class ? ${class}->onDequeue(shift @$result) : shift @$result) while (@$result); return \@results; } #/** # Wait for a response to a request. Also available as dequeue_response() # alias. # # @param $id the request ID of the response for which to wait. # # @return the response as an arrayref. #*/ sub wait { my $obj = shift; my $id = shift; my $reqmap = $obj->[TQD_MAP]; my $result; { lock(%$reqmap); unless ($$reqmap{$id}) { cond_wait %$reqmap until $$reqmap{$id}; } $result = delete $$reqmap{$id}; cond_signal %$reqmap if keys %$reqmap; } return $obj->_filter_resp($result); } *dequeue_response = \&wait; #/** # Test if a response is available for a specific request. # # @param $id the request ID of the response for which to test. # # @return Thread::Queue::Duplex object if response is available; undef otherwise. #*/ sub ready { my $obj = shift; my $id = shift; # # no lock really needed here... # return defined($obj->[TQD_MAP]{$id}) ? $obj : undef; } #/** # Test if a response is available for a either any request, # or for any of a set of requests. # # @param @ids (optional) list of request IDs of responses for which to test. # # @return first request ID of available responses, or undef if none available # @returnlist list of request IDs of available responses, or undef if none available #*/ sub available { my $obj = shift; my @ids = (); my $reqmap = $obj->[TQD_MAP]; lock(%$reqmap); if (scalar @_) { # print STDERR "available with a list\n"; map { push @ids, $_ if $$reqmap{$_}; } @_; } else { # print STDERR "available without list\n"; @ids = keys %$reqmap; } return scalar @ids ? wantarray ? @ids : $ids[0] : undef; } #/** # Wait up to $timeout seconds for a response to a request. # # @param $id the request ID of the response for which to wait. # @param $timeout number of seconds to wait # # @return the response as an arrayref, or undef if none available within the timeout #*/ sub wait_until { my ($obj, $id, $timeout) = @_; return undef unless $timeout && ($timeout > 0); $timeout += time(); my $result; my $reqmap = $obj->[TQD_MAP]; my $tid = threads->self()->tid(); while ($timeout > time()) { lock(%$reqmap); print STDERR "wait_until in $tid for $id at ", time(), "\n" if $tqd_debug; cond_timedwait(%$reqmap, $timeout) unless $$reqmap{$id}; print STDERR "wait_until in $tid for $id signaled at ", time(), "\n" if $tqd_debug; next unless $$reqmap{$id}; print STDERR "wait_until in $tid for $id done ", time(), "\n" if $tqd_debug; print STDERR "avail keys in $tid ", join(', ', keys %$reqmap), "\n" if $tqd_debug; $result = delete $$reqmap{$id}; cond_broadcast %$reqmap; last; } return $result ? $obj->_filter_resp($result) : undef; } # # some grouped waits # wait indefinitely for *any* of the # supplied ids # #/** # Wait for a response to any specified request. May be called as either # an instance or class method. #

# As an instance method, a list of request IDs is provided, and the method waits for # a response event on any of the specified requests. #

# As a class method, the caller provides a list of either Thread::Queue::TQDContainer # objects (TQD is itself a TQDContainer), # or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request # IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments # with no identifiers, waits for any enqueue event on the contained queue. # For arrayref arguments with IDS, waits for a response event for any # of the specified IDs. # # @param @IDs_or_container_refs as instance method, a list of request IDs to wait for; # as class method, a list of either of Thread::Queue::TQDContainer objects, # or arrayrefs containing a Thread::Queue::TQDContainer object, followed by # zero or more request IDs for the queue object. # # @return as an instance method, returns a hashref of request IDs mapped to their response; # as a class method, returns a list of TQD containers which have events pending. #*/ sub wait_any { my $obj = shift; return _tqd_wait(undef, undef, @_) unless ref $obj; my $reqmap = $obj->[TQD_MAP]; my %responses = (); { lock(%$reqmap); # # cond_wait isn't behaving as expected, so we need to # test first, then wait if needed # map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; until (keys %responses) { cond_wait %$reqmap; map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; # # go ahead and signal...if no one's waiting, no harm # cond_signal %$reqmap; } } $responses{$_} = $obj->_filter_resp($responses{$_}) foreach (keys %responses); return \%responses; } # # wait up to timeout for any # #/** # Wait up to $timeout seconds for a response to any specified request. May be called as either # an instance or class method. #

# As an instance method, a list of request IDs is provided, and the method waits for # a response event on any of the specified requests. #

# As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, # or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request # IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments # with no identifiers, waits for any enqueue event on the queue. # For arrayref arguments with IDS, waits for a response event for any # of the specified IDs. # # @param $timeout number of seconds to wait for a response event # @param @IDs_or_container_refs as instance method, a list of request IDs to wait for; # as class method, a list of either of Thread::Queue::TQDContainer objects, # or arrayrefs containing a Thread::Queue::TQDContainer object, followed by # zero or more request IDs for the queue object. # # @return undef if no response events occured within $timeout seconds; otherwise, # as an instance method, returns a hashref of request IDs mapped to their response; # as a class method, returns a list of queues which have events pending. #*/ sub wait_any_until { my $obj = shift; return _tqd_wait(shift, undef, @_) unless ref $obj; my $timeout = shift; return undef unless $timeout && ($timeout > 0); $timeout += time(); my $reqmap = $obj->[TQD_MAP]; my %responses = (); { lock(%$reqmap); # # cond_wait isn't behaving as expected, so we need to # test first, then wait if needed # map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; while ((! keys %responses) && ($timeout > time())) { cond_timedwait(%$reqmap, $timeout); map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; # # go ahead and signal...if no one's waiting, no harm # cond_signal %$reqmap; } } $responses{$_} = $obj->_filter_resp($responses{$_}) foreach (keys %responses); return keys %responses ? \%responses : undef; } #/** # Wait for a response to all specified requests. May be called as either # an instance or class method. #

# As an instance method, a list of request IDs is provided, and the method waits for # a response event on all of the specified requests. #

# As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, # or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request # IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments # with no identifiers, waits for responses to all current requests on the queue. # For arrayref arguments with IDS, waits for a response to all # of the specified IDs. # # @param @IDs_or_container_refs as instance method, a list of request IDs to wait for; # as class method, a list of either of Thread::Queue::TQDContainer objects, # or arrayrefs containing a Thread::Queue::TQDContainer object, followed by # zero or more request IDs for the queue object. # # @return as an instance method, returns a hashref of request IDs mapped to their response; # as a class method, returns a list of queues which have events pending. #*/ sub wait_all { my $obj = shift; return _tqd_wait(undef, 1, @_) unless ref $obj; my $reqmap = $obj->[TQD_MAP]; my %responses = (); { lock(%$reqmap); map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; until (scalar keys %responses == scalar @_) { cond_wait %$reqmap; map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; # # go ahead and signal...if no one's waiting, no harm # cond_signal %$reqmap; } } $responses{$_} = $obj->_filter_resp($responses{$_}) foreach (keys %responses); return \%responses; } #/** # Wait up to $timeout seconds for a response to all specified requests. May be called as either # an instance or class method. #

# As an instance method, a list of request IDs is provided, and the method waits for # a response event on all of the specified requests. #

# As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, # or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request # IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments # with no identifiers, waits for responses to all current requests on the queue. # For arrayref arguments with IDS, waits for a response to all # of the specified IDs. # # @param $timeout number of seconds to wait for all response # @param @IDs_or_container_refs as instance method, a list of request IDs to wait for; # as class method, a list of either of Thread::Queue::TQDContainer objects, # or arrayrefs containing a Thread::Queue::TQDContainer object, followed by # zero or more request IDs for the queue object. # # @return undef unless all response events occured within $timeout seconds; otherwise, # as an instance method, returns a hashref of request IDs mapped to their response; # as a class method, returns a list of queues which have events pending. #*/ sub wait_all_until { my $obj = shift; return _tqd_wait(shift, 1, @_) unless ref $obj; my $timeout = shift; return undef unless $timeout && ($timeout > 0); $timeout += time(); my $reqmap = $obj->[TQD_MAP]; my %responses = (); { lock(%$reqmap); map { $responses{$_} = delete $$reqmap{$_} if $$reqmap{$_}; } @_; while ((scalar keys %responses != scalar @_) && ($timeout > time())) { cond_timedwait(%$reqmap, $timeout); map { $responses{$_} = $$reqmap{$_} if $$reqmap{$_}; } @_; # # go ahead and signal...if no one's waiting, no harm # cond_signal %$reqmap; } # # if we got all our responses, then remove from map # map { delete $$reqmap{$_} } @_ if (scalar keys %responses == scalar @_); } $responses{$_} = $obj->_filter_resp($responses{$_}) foreach (keys %responses); #print 'list has ', scalar @_, ' we got ', scalar keys %responses, "\n"; return (scalar keys %responses == scalar @_) ? \%responses : undef; } #/** # Mark a request with a value. Provides a means to # associate properties to a request after it has been # queued, but before the response has been posted. The # responder may test for marks via the marked() # method, or retrieve the mark value via get_mark(). # # @param $id ID of request to be marked # @param $value (optional) value to be added as a mark; if not specified, # a default value of 1 is used. # # @return Thread::Queue::Duplex object #*/ sub mark { my ($obj, $id, $value) = @_; $value = 1 unless defined($value); lock(%{$obj->[TQD_MAP]}); lock(%{$obj->[TQD_MARKS]}); # # already responded or cancelled # return undef if (exists $obj->[TQD_MAP]{$id}); $obj->[TQD_MARKS]{$id} = $value; return $obj; } #/** # Remove any marks from a request. # # @param $id ID of request to be unmarked. # # @return Thread::Queue::Duplex object #*/ sub unmark { my ($obj, $id) = @_; lock(%{$obj->[TQD_MARKS]}); delete $obj->[TQD_MARKS]{$id}; return $obj; } #/** # Returns any current mark on a specified request. # # @param $id ID of request whose mark is to be returned. # # @return the mark value; undef if not marked #*/ sub get_mark { my ($obj, $id) = @_; lock(%{$obj->[TQD_MARKS]}); return $obj->[TQD_MARKS]{$id}; } #/** # Test if a request is marked, or if the mark is a specified value. # # @param $id ID of request to test for a mark # @param $value (optional) value to test for # # @return 1 if the request is marked and either no $value was specified, # or, if a $value was specified, the mark value equals $value; undef # otherwise. #*/ sub marked { my ($obj, $id, $value) = @_; lock(%{$obj->[TQD_MARKS]}); return (defined($obj->[TQD_MARKS]{$id}) && ((defined($value) && ($obj->[TQD_MARKS]{$id} eq $value)) || (!defined($value)))); } sub _cancel_resp { my $resp = shift; # # collapse the response elements, # and call onCancel to any that are Queueables # my $class; $class = shift @$resp, $class ? ${class}->onCancel(shift @$resp) : shift @$resp while (@$resp); return 1; } #/** # Cancel one or more pending requests. #

# If a response to a cancelled request has already been # posted to the queue response map (i.e., the request has already # been serviced), the response is removed from the map, # the Thread::Queue::Queueable # object in the response, and the response is discarded. #

# If a response to a cancelled request has not yet been posted to # the queue response map, an empty entry is added to the queue response map. # (Note: threads::shared doesn't permit splicing shared arrays yet, # so we can't remove the request from the queue). #

# When a server thread attempts to dequeue[_nb|_until]() a cancelled # request, the request is discarded and the dequeue operation is retried. # If the cancelled request is already dequeued, the server thread will # detect the cancellation when it attempts to respond to the request, # and will invoke the Thread::Queue::Queueable # objects in the response, and then discards the response. #

# Note that, as simplex requests do not have an identifier, there # is no way to explicitly cancel a specific simplex request. # # @param @ids list of request IDs to be cancelled. # # @return Thread::Queue::Duplex object #*/ sub cancel { my $obj = shift; # # when we lock both, *always* lock in this order to avoid # deadlock # lock(@{$obj->[TQD_Q]}); lock(%{$obj->[TQD_MAP]}); lock(%{$obj->[TQD_MARKS]}); foreach (@_) { delete $obj->[TQD_MARKS]{$_}; $obj->[TQD_MAP]{$_} = undef, next unless (exists $obj->[TQD_MAP]{$_}); # # already responded to, call onCancel for any Queueuables # _cancel_resp(delete $obj->[TQD_MAP]{$_}); } return $obj; } #/** # Cancel all current requests and responses, using the # cancel() algorithm above, plus cancels # all simplex requests still in the queue. #

# Note: In-progress requests (i.e., # request which have been removed from the queue, but do not yet # have an entry in the response map) will not be cancelled. # # @return Thread::Queue::Duplex object #*/ sub cancel_all { my $obj = shift; # # when we lock both, *always* lock in this order to avoid # deadlock # lock(@{$obj->[TQD_Q]}); lock(%{$obj->[TQD_MAP]}); lock(%{$obj->[TQD_MARKS]}); # # first cancel all pending responses # _cancel_resp(delete $obj->[TQD_MAP]{$_}) foreach (keys %{$obj->[TQD_MAP]}); # # then cancel all the pending requests by # setting their IDs to -1 # # UPDATED per bug reported on CPAN # # delete $obj->[TQD_MARKS]{$_->[0]}, # $_->[0] = -1 # foreach (@{$obj->[TQD_Q]}); foreach (@{$obj->[TQD_Q]}) { delete $obj->[TQD_MARKS]{$_->[0]} if defined($_->[0]); $_->[0] = -1; } # # how will we cancel inprogress requests ?? # need a map value, or alternate map... # return $obj; } ########################################################## # # BEGIN CLASS LEVEL METHODS # ########################################################## sub _tqd_wait { my $timeout = shift; my $wait_all = shift; # # validate params # map { return undef unless ($_ && ref $_ && ( ((ref $_ eq 'ARRAY') && ($#$_ >= 0) && ref $_->[0] && $_->[0]->isa('Thread::Queue::TQDContainer') ) || $_->isa('Thread::Queue::TQDContainer') )); } @_; my @avail = (); my @qs = (); my @containers = (); push(@containers, ((ref $_ eq 'ARRAY') ? $_->[0] : $_)), push(@qs, $containers[-1]->get_queue()) foreach (@_); #print join(', ', @qs), "\n"; my ($q, $container, $ids); my $count = scalar @qs; my @ids; $timeout += time() if $timeout; while ($count) { lock($tqd_global_lock); foreach (0..$#_) { last unless $count; next unless $qs[$_]; $q = $qs[$_]; $container = $containers[$_]; $ids = $_[$_]; # # if we've got ids, check for responses # push(@avail, $container), $qs[$_] = undef, $count-- if (((ref $ids eq 'ARRAY') && (scalar @$ids > 1)) ? $q->available(@{$ids}[1..$#$ids]) : $q->pending()); } # end foreach queue last unless (($wait_all && $count) || (! scalar @avail)); unless ($timeout) { print STDERR "TQD: locking...\n" if $tqd_debug; cond_wait($tqd_global_lock); print STDERR "TQD: locked\n" if $tqd_debug; next; } # print STDERR "timed out and avail has ", scalar @avail, "\n" and cond_timedwait($tqd_global_lock, $timeout); return () unless ($timeout > time()); } #print STDERR "avail has ", scalar @avail, "\n"; return @avail; } ########################################################## # # END CLASS LEVEL METHODS # ########################################################## ############################################### # # All TQQ default methods can be used as is # ############################################### ############################################### # # TQDContainer overrides # ############################################### sub set_queue { return $_[0]; } sub get_queue { return $_[0]; } 1;