SUMMARY: CONSTR | METHOD DETAIL: CONSTR | METHOD

Class Thread::Queue::Duplex

Inherits from:
Thread::Queue::TQDContainer
Thread::Queue::Queueable
Exporter

Thread-safe request/response queue with identifiable elements. Provides methods for N threads to queue items to other threads, and then wait only for responses to specific queued items.

Note: this object is derived from an threads::shared arrayref to optimize performance.

Licensed under the Academic Free License version 2.1, as specified in the License.txt file included in this software package, or at OpenSource.org.

Author:
D. Arnold
Version:
0.92
Since:
2005-12-01
See Also:
Thread::Queue::Queueable

Unless otherwise noted, $obj is the object instance variable.

Exported Symbols
$tqd_global_lockglobal threads::shared variable for locking
TQD_Qobject field index of threads::shared array used for the queue
TQD_MAPobject field index of threads::shared hash mapping queue request IDs to requests/responses
TQD_IDGENobject field index of threads::shared scalar integer used to generate request IDs
TQD_LISTENERSobject field index of threads::shared scalar integer count of current listeners
TQD_REQUIRE_LISTENERobject field index of threads::shared scalar flag indicating if listeners are required before permitting an enqueue operation
TQD_MAX_PENDINGobject field index of threads::shared scalar integer max number of pending requests before an enqueue will block
TQD_URGENT_COUNTobject field index of threads::shared scalar integer count of current urgent requests
TQD_MARKSobject field index of threads::shared hash mapping request IDs of marked requests

Constructor Summary
new(ListenerRequired => value, MaxPending => value)
          Creates a new empty queue, and associated mapping hash

Method Summary
available(@ids)
          Test if a response is available for a either any request, or for any of a set of requests
cancel(@ids)
          Cancel one or more pending requests
cancel_all()
          Cancel all current requests and responses, using the cancel() algorithm above, plus cancels all simplex requests still in the queue
dequeue()
          Dequeue the next request
dequeue_nb()
          Dequeue the next request
dequeue_until($timeout)
          Dequeue the next request
dequeue_urgent()
          Dequeue the next urgent request
enqueue(@args)
          Enqueue a request to the tail of the queue
enqueue_and_wait(@args)
          Enqueue a request to the tail of the queue, and wait for the response
enqueue_and_wait_until($timeout, @args)
          Enqueue a request to the tail of the queue, and wait up to $timeout seconds for the response
enqueue_simplex(@args)
          Enqueue a simplex request to the tail of the queue
enqueue_simplex_urgent(@args)
          Enqueue a simplex request to the head of the queue
enqueue_urgent(@args)
          Enqueue a request to the head of the queue
enqueue_urgent_and_wait(@args)
          Enqueue a request to the head of the queue, and wait up to $timeout seconds for the response
enqueue_urgent_and_wait_until($timeout, @args)
          Enqueue a request to the head of the queue, and wait up to $timeout seconds for the response
get_mark($id)
          Returns any current mark on a specified request
ignore()
          Deregister as a queue listener
listen()
          Register as a queue listener
mark($id, $value)
          Mark a request with a value
marked($id, $value)
          Test if a request is marked, or if the mark is a specified value
pending()
          Report the number of pending requests
ready($id)
          Test if a response is available for a specific request
respond($id, @response)
          Post a response to a request
set_max_pending($limit)
          Set maximum number of pending requests permitted
unmark($id)
          Remove any marks from a request
wait($id)
          Wait for a response to a request
wait_all(@IDs_or_container_refs)
          Wait for a response to all specified requests
wait_all_until($timeout, @IDs_or_container_refs)
          Wait up to $timeout seconds for a response to all specified requests
wait_any(@IDs_or_container_refs)
          Wait for a response to any specified request
wait_any_until($timeout, @IDs_or_container_refs)
          Wait up to $timeout seconds for a response to any specified request
wait_for_listener($timeout)
          Wait until a listener has registered
wait_until($id, $timeout)
          Wait up to $timeout seconds for a response to a request

Constructor Details

new

new(ListenerRequired => value, MaxPending => value)

Creates a new empty queue, and associated mapping hash.

Parameters:
ListenerRequired => boolean value indicating if registered listener required before enqueue is permitted.
MaxPending => positive integer maximum number of pending requests; enqueue attempts will block until the pending count drops below this value. The limit may be applied or modified later via the set_max_pending() method. A value of zero indicates no limit.
Returns:
Thread::Queue::Duplex object

Method Details

available

available(@ids)

Test if a response is available for a either any request, or for any of a set of requests.

Parameters:
@ids - (optional) list of request IDs of responses for which to test.
In scalar context, returns:
first request ID of available responses, or undef if none available
In list context, returns:
(list of request IDs of available responses, or undef if none available )

cancel

cancel(@ids)

Cancel one or more pending requests.

If a response to a cancelled request has already been posted to the queue response map (i.e., the request has already been serviced), the response is removed from the map, the Thread::Queue::Queueable object in the response, and the response is discarded.

If a response to a cancelled request has not yet been posted to the queue response map, an empty entry is added to the queue response map. (Note: threads::shared doesn't permit splicing shared arrays yet, so we can't remove the request from the queue).

When a server thread attempts to dequeue[_nb|_until]() a cancelled request, the request is discarded and the dequeue operation is retried. If the cancelled request is already dequeued, the server thread will detect the cancellation when it attempts to respond to the request, and will invoke the Thread::Queue::Queueable objects in the response, and then discards the response.

Note that, as simplex requests do not have an identifier, there is no way to explicitly cancel a specific simplex request.

Parameters:
@ids - list of request IDs to be cancelled.
Returns:
Thread::Queue::Duplex object

cancel_all

cancel_all()

Cancel all current requests and responses, using the cancel() algorithm above, plus cancels all simplex requests still in the queue.

Note: In-progress requests (i.e., request which have been removed from the queue, but do not yet have an entry in the response map) will not be cancelled.

Returns:
Thread::Queue::Duplex object

dequeue

dequeue()

Dequeue the next request. Waits until a request is available before returning.

Returns:
arrayref of request values. The request ID is the first element in the returned array.

dequeue_nb

dequeue_nb()

Dequeue the next request. Returns immediately if no request is available.

Returns:
undef if no request available; otherwise, arrayref of request values. The request ID is the first element in the returned array.

dequeue_until

dequeue_until($timeout)

Dequeue the next request. Waits until a request is available, or up to $timeout seconds, before returning.

Parameters:
$timeout - number of seconds to wait for a request
Returns:
undef if no request available within $timeout seconds. Otherwise, arrayref of request values. The request ID is the first element in the returned array.

dequeue_urgent

dequeue_urgent()

Dequeue the next urgent request. Waits until an urgent request is available before returning.

Returns:
arrayref of request values. The request ID is the first element in the returned array.

enqueue

enqueue(@args)

Enqueue a request to the tail of the queue.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Request ID if successful; undef if ListenerRequired and no listeners are registered

enqueue_and_wait

enqueue_and_wait(@args)

Enqueue a request to the tail of the queue, and wait for the response.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Response structure if successful; undef if ListenerRequired and no listeners are registered

enqueue_and_wait_until

enqueue_and_wait_until($timeout, @args)

Enqueue a request to the tail of the queue, and wait up to $timeout seconds for the response.

Parameters:
$timeout - number of seconds to wait for a response
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Response structure if successful; undef if ListenerRequired and no listeners are registered, or if no response is received within the specified $timeout

enqueue_simplex

enqueue_simplex(@args)

Enqueue a simplex request to the tail of the queue. Simplex requests do not generate responses.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Thread::Queue::Duplex object if successful; undef if ListenerRequired and no listeners are registered

enqueue_simplex_urgent

enqueue_simplex_urgent(@args)

Enqueue a simplex request to the head of the queue. Simplex requests do not generate responses.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Thread::Queue::Duplex object if successful; undef if ListenerRequired and no listeners are registered

enqueue_urgent

enqueue_urgent(@args)

Enqueue a request to the head of the queue.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Request ID if successful; undef if ListenerRequired and no listeners are registered

enqueue_urgent_and_wait

enqueue_urgent_and_wait(@args)

Enqueue a request to the head of the queue, and wait up to $timeout seconds for the response.

Parameters:
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Response structure if successful; undef if ListenerRequired and no listeners are registered

enqueue_urgent_and_wait_until

enqueue_urgent_and_wait_until($timeout, @args)

Enqueue a request to the head of the queue, and wait up to $timeout seconds for the response.

Parameters:
$timeout - number of seconds to wait for a response
@args - the request. Request values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Response structure if successful; undef if ListenerRequired and no listeners are registered, or if no response is received within the specified $timeout

get_mark

get_mark($id)

Returns any current mark on a specified request.

Parameters:
$id - ID of request whose mark is to be returned.
Returns:
the mark value; undef if not marked

ignore

ignore()

Deregister as a queue listener. When all listeners deregister, a "ListenerRequired" queue will no longer accept new requests until a listener registers via listen()

Returns:
Thread::Queue::Duplex object

listen

listen()

Register as a queue listener. Permits "ListenerRequired" queues to accept requests when at least one listener has registered.

Returns:
Thread::Queue::Duplex object

mark

mark($id, $value)

Mark a request with a value. Provides a means to associate properties to a request after it has been queued, but before the response has been posted. The responder may test for marks via the marked() method, or retrieve the mark value via get_mark().

Parameters:
$id - ID of request to be marked
$value - (optional) value to be added as a mark; if not specified, a default value of 1 is used.
Returns:
Thread::Queue::Duplex object

marked

marked($id, $value)

Test if a request is marked, or if the mark is a specified value.

Parameters:
$id - ID of request to test for a mark
$value - (optional) value to test for
Returns:
1 if the request is marked and either no $value was specified, or, if a $value was specified, the mark value equals $value; undef otherwise.

pending

pending()

Report the number of pending requests.

Returns:
number of requests remaining in the queue.

ready

ready($id)

Test if a response is available for a specific request.

Parameters:
$id - the request ID of the response for which to test.
Returns:
Thread::Queue::Duplex object if response is available; undef otherwise.

respond

respond($id, @response)

Post a response to a request. If the request has been cancelled, the response is discarded; otherwise, all threads blocked on the queue are signalled that a new response is available.

Parameters:
$id - the ID of the request being responded to.
@response - the response. Response values must be either scalars, references to threads::shared variables, or Thread::Queue::Queueable objects
Returns:
Thread::Queue::Duplex object

set_max_pending

set_max_pending($limit)

Set maximum number of pending requests permitted. Signals any currently threads which may be blocked waiting for the number of pending requests to drop below the maximum permitted.

Parameters:
$limit - positive integer maximum number of pending requests permitted. A value of zero indicates no limit.
Returns:
Thread::Queue::Duplex object

unmark

unmark($id)

Remove any marks from a request.

Parameters:
$id - ID of request to be unmarked.
Returns:
Thread::Queue::Duplex object

wait

wait($id)

Wait for a response to a request. Also available as dequeue_response() alias.

Parameters:
$id - the request ID of the response for which to wait.
Returns:
the response as an arrayref.

wait_all

wait_all(@IDs_or_container_refs)

Wait for a response to all specified requests. May be called as either an instance or class method.

As an instance method, a list of request IDs is provided, and the method waits for a response event on all of the specified requests.

As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments with no identifiers, waits for responses to all current requests on the queue. For arrayref arguments with IDS, waits for a response to all of the specified IDs.

Parameters:
@IDs_or_container_refs - as instance method, a list of request IDs to wait for; as class method, a list of either of Thread::Queue::TQDContainer objects, or arrayrefs containing a Thread::Queue::TQDContainer object, followed by zero or more request IDs for the queue object.
Returns:
as an instance method, returns a hashref of request IDs mapped to their response; as a class method, returns a list of queues which have events pending.

wait_all_until

wait_all_until($timeout, @IDs_or_container_refs)

Wait up to $timeout seconds for a response to all specified requests. May be called as either an instance or class method.

As an instance method, a list of request IDs is provided, and the method waits for a response event on all of the specified requests.

As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments with no identifiers, waits for responses to all current requests on the queue. For arrayref arguments with IDS, waits for a response to all of the specified IDs.

Parameters:
$timeout - number of seconds to wait for all response
@IDs_or_container_refs - as instance method, a list of request IDs to wait for; as class method, a list of either of Thread::Queue::TQDContainer objects, or arrayrefs containing a Thread::Queue::TQDContainer object, followed by zero or more request IDs for the queue object.
Returns:
undef unless all response events occured within $timeout seconds; otherwise, as an instance method, returns a hashref of request IDs mapped to their response; as a class method, returns a list of queues which have events pending.

wait_any

wait_any(@IDs_or_container_refs)

Wait for a response to any specified request. May be called as either an instance or class method.

As an instance method, a list of request IDs is provided, and the method waits for a response event on any of the specified requests.

As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects (TQD is itself a TQDContainer), or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments with no identifiers, waits for any enqueue event on the contained queue. For arrayref arguments with IDS, waits for a response event for any of the specified IDs.

Parameters:
@IDs_or_container_refs - as instance method, a list of request IDs to wait for; as class method, a list of either of Thread::Queue::TQDContainer objects, or arrayrefs containing a Thread::Queue::TQDContainer object, followed by zero or more request IDs for the queue object.
Returns:
as an instance method, returns a hashref of request IDs mapped to their response; as a class method, returns a list of TQD containers which have events pending.

wait_any_until

wait_any_until($timeout, @IDs_or_container_refs)

Wait up to $timeout seconds for a response to any specified request. May be called as either an instance or class method.

As an instance method, a list of request IDs is provided, and the method waits for a response event on any of the specified requests.

As a class method, the caller provides a list of either Thread::Queue::TQDContainer objects, or arrayrefs with a Thread::Queue::TQDContainer object, and zero or more request IDs. For Thread::Queue::TQDContainer object arguments, and arrayref arguments with no identifiers, waits for any enqueue event on the queue. For arrayref arguments with IDS, waits for a response event for any of the specified IDs.

Parameters:
$timeout - number of seconds to wait for a response event
@IDs_or_container_refs - as instance method, a list of request IDs to wait for; as class method, a list of either of Thread::Queue::TQDContainer objects, or arrayrefs containing a Thread::Queue::TQDContainer object, followed by zero or more request IDs for the queue object.
Returns:
undef if no response events occured within $timeout seconds; otherwise, as an instance method, returns a hashref of request IDs mapped to their response; as a class method, returns a list of queues which have events pending.

wait_for_listener

wait_for_listener($timeout)

Wait until a listener has registered.

Parameters:
$timeout - (optional) number of seconds to wait for a listener.
Returns:
Thread::Queue::Duplex if a listener is registered; undef otherwise.

wait_until

wait_until($id, $timeout)

Wait up to $timeout seconds for a response to a request.

Parameters:
$id - the request ID of the response for which to wait.
$timeout - number of seconds to wait
Returns:
the response as an arrayref, or undef if none available within the timeout

Generated by psichedoc on Sun Mar 5 11:01:53 2006