package POE::Component::SNMP::Dispatcher; use strict; use base qw/Net::SNMP::Dispatcher/; use POE::Kernel; use POE::Session; use Time::HiRes qw/time/; our $VERSION = '1.29'; our $INSTANCE; # reference to our Singleton object our $MESSAGE_PROCESSING; # reference to single MP object # sub VERBOSE() { 1 } # debugging, that is use constant VERBOSE => 1; # sub DEBUG_INFO(){ } *DEBUG_INFO = sub {}; *DEBUG_INFO = \&Net::SNMP::Dispatcher::DEBUG_INFO; use constant _ACTIVE => 0; # State of the event ( not used ) use constant _TIME => 1; # Execution time use constant _CALLBACK => 2; # Callback reference use constant _DELAY => 3; # Delay, in seconds use constant _SINGLE => 0; use constant _PAUSE_FD => 0; # {{{ SUBCLASSED METHODS # all subclassed methods return the same values as their base # versions. # {{{ instance methods and constructor sub instance { $INSTANCE ||= POE::Component::SNMP::Dispatcher->_new } # In Net::SNMP::Dispatcher, this function invokes the event # dispatch loop. Here, we let POE handle things for us instead, # and overload with a no-op. sub activate { } sub _new { shift->SUPER::_new(@_)->_new_session() } sub _new_session { my $this = shift; # $this->{_active} = Net::SNMP::Message::TRUE; $this->{_active} = 1; $MESSAGE_PROCESSING = $Net::SNMP::Dispatcher::MESSAGE_PROCESSING; POE::Session->create( object_states => [ $this => [ qw/ _start _stop __schedule_event __invoke_callback __socket_callback __listen __dispatch_pdu __clear_pending / ], ]); $this; } # }}} instance methods and constructor # {{{ send_pdu and _send_pdu # Net::SNMP::Dispatcher::send_pdu() takes a reference to &_send_pdu in # its own package, which bypasses inheritance. Here we temporarily # replace that reference to point to our own local copy before # continuing. # # This is the first method in the chain of calls to # Net::SNMP::Dispatcher that gets the action going. sub send_pdu { my ($this, $pdu, $delay) = @_; DEBUG_INFO('%s', dump_args( [ $pdu, $delay ] )); local *Net::SNMP::Dispatcher::_send_pdu = \&_send_pdu; VERBOSE and DEBUG_INFO('{-------- SUPER::send_pdu()'); my $retval = $this->SUPER::send_pdu($pdu, $delay); VERBOSE and DEBUG_INFO(' --------} SUPER::send_pdu()'); $retval; } # _send_pdu() tosses requests into POE space at the __dispatch_pdu # state, which invokes SUPER::_send_pdu() or queues requests for # later, as appropriate. sub _send_pdu { my ($this, $pdu, $timeout, $retries) = @_; DEBUG_INFO('dispatching request [%d] %s', $pdu->transport->fileno, VERBOSE ? dump_args( [ $pdu, $timeout, $retries ] ) : ''); # using yield() or call() instead of post() here breaks things. So don't do that. POE::Kernel->post(_poe_component_snmp_dispatcher => __dispatch_pdu => $pdu, $timeout, $retries); 1; } # }}} send_pdu and _send_pdu # {{{ schedule and cancel # Net::SNMP v5.x # In Net::SNMP::Dispatcher, the critical methods to intercept are: # - register() : listen for data on a socket # - schedule() : schedule a timeout action if no response is received # - deregister(): stop listening on a socket # - cancel() : cancel a pending event # Our versions hand the appropriate actions to POE. # sub schedule { my ($this, $when, $callback) = @_; my $time = time; # cook the args like Net::SNMP::schedule() does for _event_insert() my $event = [ $this->{_active}, $time + $when, $this->_callback_create($callback), $when ]; my $fileno = $this->_get_fileno($event); if ($event->[_TIME] <= $time) { # run the callback NOW, instead of invoking __invoke_callback. saves a POE call(). DEBUG_INFO('{-------- invoking callback [%d] %s', $fileno, VERBOSE ? dump_args( $event->[_CALLBACK] ) : ''); $this->_callback_execute($event->[_CALLBACK]); # no parameter cooking needed! DEBUG_INFO(' --------} callback complete [%d]', $fileno); } else { DEBUG_INFO("%0.1f seconds [%d] %s", $event->[_DELAY], $fileno, VERBOSE ? dump_args( $event->[_CALLBACK] ) : ''); # This call breaks down to $kernel->alarm_set($event) POE::Kernel->call(_poe_component_snmp_dispatcher => __schedule_event => $event); # POE::Kernel->post(_poe_component_snmp_dispatcher => __schedule_event => $event); # breaks # POE::Kernel->yield(__schedule_event => $event); } $event; } sub cancel { my ($this, $event) = @_; # this catches a stray shutdown case where __schedule has been # queued but not yet dispatched. In this case, $event->[_TIME] # will be an epoch time in the future, meaning that we never # replaced it with a POE delay id, which means there is no POE # event to cancel. return if $event->[_TIME] > time; # $event->[_TIME] is the POE alarm id, which was stashed in __schedule_event DEBUG_INFO('remove alarm id %d', $event->[_TIME]); POE::Kernel->alarm_remove($event->[_TIME]); return ! ! $this->_pending_pdu_count($this->_get_fileno($event)); # boolean: are there entries are left? } # }}} schedule and cancel # {{{ register and deregister ## version support # see the notes on Net::SNMP v4.x support our $SUPER_register = 'SUPER::register'; our $SUPER_deregister = 'SUPER::deregister'; ## coding notes # # Here we say POE::Kernel->call(dispatcher => '__listen' ), which does # select_read() *within a POE::Session* and returns, instead of simply # invoking select_read() here, so that select_read() is guaranteed to # occur from within the 'dispatcher' session (instead of possibly the # parent 'snmp' session). Otherwise, when we reach _unlisten(), we # could get a (silent) failure because the "session doesn't own # handle". # This was a *GIGANTIC* hassle to debug, and I don't care who # knows about it. During the course of tracing this down, Rocco even # added a diagnostic message to indicate this problem (see the Changes # file for POE 0.29 ), so at least I can have the satisfaction of # having been responsible for somebody else down the line not having # to spend the hours debugging this same problem that I did. sub register { my ($this, $transport, $callback) = @_; DEBUG_INFO('register on [%d] %s', $transport->fileno, VERBOSE ? dump_args([ $callback ]) : ''); if (ref ($transport = $this->$SUPER_register($transport, $callback))) { # POE::Kernel->post(_poe_component_snmp_dispatcher => __listen => $transport); _SINGLE or POE::Kernel->call(_poe_component_snmp_dispatcher => __listen => $transport); # we would use this version if we were sending the callback to # return with the "got data" event, but in fact we retrieve it # directly from the SNMP object. I can't make up my mind # which is cleaner in terms of encapsulation: # POE::Kernel->post(_poe_component_snmp_dispatcher => __listen => $transport, # [ $this->_callback_create($callback), $transport ]); } $transport; } # there is an optimization here in not having a __unlisten state # corresponding to __listen (avoiding call() overhead), and just # telling the kernel directly to stop watching the handle. __listen # only needs to exist because when we watch a socket, we have to be in # the right session... deregister() is always called in the same # session as __listen. sub deregister { my ($this, $transport) = @_; my $fileno = $transport->fileno; DEBUG_INFO('deregister on [%d] %s', $transport->fileno, VERBOSE ? dump_args([ $transport ]) : ''); if (ref ($transport = $this->$SUPER_deregister($transport))) { _SINGLE or $this->_unwatch_socket($transport->socket); } # no more current. $this->_clear_current_pdu($fileno); if ($this->_pending_pdu_count($fileno)) { # run next pending DEBUG_INFO('dispatching (queued) request on [%d] %d remaining', $fileno, $this->_pending_pdu_count($fileno) - 1); # POE::Kernel->yield(__dispatch_pending_pdu => $fileno); POE::Kernel->yield(__dispatch_pdu => $this->_get_next_pending_pdu_args($fileno)); } $transport; } # }}} register and deregister # {{{ Net::SNMP v4.x # Net::SNMP 5.x changed some of the method names of methods I was # overriding. I decided to support both versions. # The two variables $SUPER_register and $SUPER_deregister are kindof a # hack around the syntax that I *want* to work, but is not valid perl: # $self->SUPER::$method() # # (caller(0))[3] can't always be trusted to find the value of $method, # especially when the function being called started its life being # called ®ister, but is now being invoked as &_listen. # # so here we manually list our SUPER::$method names, and in case it # turns out we're working with Net::SNMP v4.x, we change the names # (and symbol table entries) below. if (Net::SNMP->VERSION() < 5.0) { # In our SUPER class (Net::SNMP::Dispatcher), the critical methods # to interecept are: # # _listen: listen for data on a socket # _schedule: schedule a timeout action if no response is received # _unlisten: stop listening on a socket # _cancel: cancel a timeout if a response is received # # Here, we play games with the symbol table so that these # functions, which were renamed from 4.x to 5.x, are subclassed # appropriately. *_schedule = \&schedule; *_cancel = \&cancel; *_listen = \®ister; *_unlisten = \&deregister; $SUPER_register = 'SUPER::_listen'; $SUPER_deregister = 'SUPER::_unlisten'; } # }}} Net::SNMP v4.x # }}} SUBCLASSED METHODS # {{{ PRIVATE METHODS ##### socket methods # ## These two methods are the only place in this module where the ## socket refcounting is done, so it's all self-contained. # # {{{ _watch_socket # socket listen with refcount. If socket refcount, increment it. Else # set refcount and listen on the socket. # # accesses global kernel. sub _watch_socket { my ($this, $socket) = @_; my $fileno = $socket->fileno; if (not $this->{_refcount}{$fileno}) { # reference counting starts at 1 for the controlling # *session*, and 1 for this *request*. # # refcount will fluctuate between 1 and 2 until the owning # snmp session is stopped, then it will drop to 0 and we'll # stop watching that handle. $this->{_refcount}{$fileno} = 1 + 1; _SINGLE and $this->{_refcount}{$fileno}--; DEBUG_INFO('[%d] refcount %d, select', $fileno, $this->{_refcount}{$fileno}); POE::Kernel->select_read($socket, '__socket_callback'); } else { _SINGLE and return $this->{_refcount}{$fileno}; $this->{_refcount}{$fileno}++; DEBUG_INFO('[%d] refcount %d, resume', $fileno, $this->{_refcount}{$fileno}); _PAUSE_FD and POE::Kernel->select_resume_read($socket); } $this->{_refcount}{$fileno}; } # }}} _watch_socket # {{{ _unwatch_socket # decrement the socket refcount. unlisten if refcount == 0. # accesses global kernel. sub _unwatch_socket { my ($this, $socket) = @_; my $fileno = $socket->fileno; if (--$this->{_refcount}{$fileno} <= 0) { DEBUG_INFO('[%d] refcount %d, unselect', $fileno, $this->{_refcount}{$fileno}); # stop listening on this socket POE::Kernel->select_read($socket, undef); } else { DEBUG_INFO('[%d] refcount %d, pause %s', $fileno, $this->{_refcount}{$fileno}, ('(deferred)') x defined $this->_current_pdu($fileno) ); _PAUSE_FD and POE::Kernel->select_pause_read($socket) unless $this->_current_pdu($fileno); } $this->{_refcount}{$fileno} } # }}} _unwatch_socket ##### ##### current and pending PDU pethods # {{{ _current_pdu # if called with one argument, a fileno, returns the current pdu. # # if called with two arguments, a fileno and a pdu, makes that pdu the # current pdu. sub _current_pdu { my ($this, $fileno, $pdu) = @_; if (@_ == 3) { $this->{_current_pdu}{$fileno} = $pdu; } $this->{_current_pdu}{$fileno}; } # remove the current pdu. return it. sub _clear_current_pdu { my ($this, $fileno) = @_; delete $this->{_current_pdu}{$fileno}; } # }}} _current_pdu # {{{ (_enqueue_pending|_get_next_pending|_clear_pending)_pdu # enqueues an array reference sub _enqueue_pending_pdu { my ($this, $fileno, $arg) = @_; push @{$this->{_pending_pdu}{$fileno}}, $arg; } # dequeues an array reference and dereferences it, returning an array sub _get_next_pending_pdu_args { my ($this, $fileno) = @_; @{ shift @{$this->{_pending_pdu}{$fileno}} } } # deletes the pending queue sub _clear_pending_pdu { my ($this, $fileno) = @_; delete $this->{_pending_pdu}{$fileno}; } # }}} (_enqueue_pending|_get_next_pending|_clear_pending)_pdu # {{{ _pending_pdu_count sub _pending_pdu_count { my ($this, $fileno) = @_; ref $this->{_pending_pdu}{$fileno} eq 'ARRAY' ? scalar @{$this->{_pending_pdu}{$fileno}} : 0 } # }}} _pending_pdu_count ##### # {{{ _current_callback # fetch the "current" callback for the fileno corresponding to the # socket we just saw a response on out of Net::SNMP::Dispatcher. sub _current_callback { my ($this, $fileno) = @_; $this->{_descriptors}{$fileno} } # }}} _current_callback # {{{ _get_fileno # the calls to schedule($when, $callback) looks like this: # $this->schedule($delay, [\&_send_pdu, $pdu, $pdu->timeout, $pdu->retries]); # $this->schedule($timeout, [\&_transport_timeout, $pdu, $timeout, $retries]) # so _CALLBACK is: [ CODE, PDU, TIMEOUT, RETRIES ]; sub _get_fileno { my ($self, $event) = @_; # $event->[_CALLBACK]->[1] is a $pdu object return $event->[_CALLBACK]->[1]->transport->fileno; } # }}} _get_fileno # }}} PRIVATE METHODS # {{{ POE EVENTS # By convention, all POE states, except _start and _stop, have # two leading underscores. # {{{ _start and _stop sub _start { $_[KERNEL]->alias_set('_poe_component_snmp_dispatcher') } sub _stop { $_[KERNEL]->alias_remove('_poe_component_snmp_dispatcher'); undef $INSTANCE; } # }}} _start and _stop # {{{ __dispatch_pdu # We want to prevent conflicts between listening sockets and pending # requests, because POE can't listen to two at a time on the same # handle. If that socket is currently listening for a reply to a # different request (eg $this->_current_pdu() is TRUE), the request is # queued, otherwise it is dispatched immediately. # # (which again additionally POE-izes Net::SNMP) # # this event is invoked by _send_pdu() sub __dispatch_pdu { my ($this, $heap, @pdu_args) = @_[OBJECT, HEAP, ARG0..$#_]; # these are the args this state was invoked with: # @pdu_args = ( $pdu, $timeout, $retries ); my $pdu = $pdu_args[0]; my $fileno = $pdu->transport->fileno; # enqueue or execute if ($this->_current_pdu($fileno)) { # this socket is busy. enqueue. $this->_enqueue_pending_pdu($fileno => \@pdu_args); DEBUG_INFO('queued request for [%d] %d requests pending', $fileno, $this->_pending_pdu_count($fileno)); } else { # this socket is free. execute. DEBUG_INFO('sending request for [%d]', $fileno); $this->_current_pdu($fileno => $pdu); VERBOSE and DEBUG_INFO('{-------- SUPER::_send_pdu() for [%d]', $fileno); $this->SUPER::_send_pdu(@pdu_args); VERBOSE and DEBUG_INFO(' --------} SUPER::_send_pdu() for [%d]', $fileno ); } } # }}} __dispatch_pdu # {{{ __schedule_event # this event is invoked by schedule() / _event_insert() sub __schedule_event { my ($this, $kernel, $event) = @_[ OBJECT, KERNEL, ARG0 ]; # $event->[_ACTIVE] is always true for us, and we ignore it. # # $event->[_TIME] is the epoch time this event should fire. We # use that value for scheduling the POE event, then replace it # with POE's alarm id. # # $event->[_CALLBACK] is an opaque callback reference. # # $event->[_DELAY] is how long from the time of scheduling to # fire the event, in seconds # # We get this same $event back in cancel(), where we reference # $event->[_TIME] as alarm id to deactivate. if ($event->[_TIME] <= time) { $this->_callback_execute($event->[_CALLBACK]); # no parameter cooking needed! return; } my $timeout_id = $kernel->alarm_set(__invoke_callback => $event->[_TIME], $event->[_CALLBACK]); # stash the alarm id. since $event is a reference, this # assignment is "global". $event->[_TIME] = $timeout_id; # I only use $event->[_DELAY] for debugging. DEBUG_INFO("alarm id %d, %0.1f seconds [%d] %s", $timeout_id, $event->[_DELAY], $this->_get_fileno($event), VERBOSE ? dump_args([ $event->[_CALLBACK] ]) : '' ); } # }}} __schedule_event # {{{ __invoke_callback # Invokes a callback immediately. # # this event is invoked when an delay has fired. sub __invoke_callback { my ($this, $callback) = @_[OBJECT, ARG0]; my $fileno = $callback->[1]->transport->fileno; DEBUG_INFO('{-------- invoking scheduled callback for [%d] %s', $fileno, VERBOSE ? dump_args([ $callback ]) : ''); $this->_callback_execute($callback); DEBUG_INFO(' --------} callback complete for [%d]', $fileno ); } # }}} __invoke_callback # {{{ __listen # stash the supplied $callback based on the fileno of the $transport # object. tell POE to watch the $transport's socket. # # this event is invoked by register() sub __listen { my ($this, $kernel, $heap, $transport, $callback) = @_[OBJECT, KERNEL, HEAP, ARG0, ARG1]; my $fileno = $transport->fileno; # we'll fetch the callback directly from $this in # __socket_callback. later versions of POE allow for sending the # callback with the request, but we only strive for a "relatively # recent" version. Actually, we've tested all the way back to # 0.22, released 03-Jul-2002. DEBUG_INFO('listening on [%d]', $fileno); $this->_watch_socket($transport->socket); } # }}} __listen # {{{ __socket_callback # fetch the stashed callback and execute it. # # this event is invoked when a watched socket becomes ready to read # data. sub __socket_callback { my ($this, $heap, $socket) = @_[OBJECT, HEAP, ARG0]; my $fileno = $socket->fileno; return unless $this->_current_callback($fileno); DEBUG_INFO('{-------- invoking callback for [%d] %s', $fileno, dump_args($this->_current_callback($fileno))); $this->_callback_execute( @{ $this->_current_callback($fileno) } ); # the extra argument contained in the callback is harmless DEBUG_INFO(' --------} callback complete for [%d]', $fileno); } # }}} __socket_callback # {{{ __clear_pending # account for a 'finish' request to a parent snmp session. Cancels # any *pending* requests for the specified session. However, if # 'finish' is called on a session while the Dispatcher is currently # listening for a reply to that session, that reply *will* be # delivered when it arrives. # # this event is invoked from P::C::S::close_snmp_session(), to help us # keep in sync. sub __clear_pending { my ($this, $session) = @_[OBJECT, ARG0]; DEBUG_INFO('start'); my $socket = $session->transport ? $session->transport->socket : $session->{_pdu}{_transport} ? $session->{_pdu}{_transport}->socket : undef; my $fileno = $socket ? $socket->fileno : undef; DEBUG_INFO('clearing %d pending requests', $this->_pending_pdu_count($fileno)); $this->_clear_pending_pdu($fileno); # we purposely do NOT delete $this->_current_pdu($fileno) until # *AFTER* the select() stuff, so that it doesn't bother doing # socket ops, because next we will stop listening all the way. # drop reference count # $this->_unwatch_socket($session->transport->socket); $this->_unwatch_socket($socket); if (defined (my $pdu = $this->_clear_current_pdu($fileno))) { DEBUG_INFO('cancelling current request'); # stop listening $this->deregister($pdu->transport); # cancel pending timeout: # Fetch the last cached reference held to our request (and its # postback) held outside our own codespace... if (defined (my $request = $MESSAGE_PROCESSING->msg_handle_delete($pdu->request_id))) { # ... which returns enough information to cancel anything # we had pending: $this->cancel($request->timeout_id); } } DEBUG_INFO('done'); } # }}} __clear_pending # }}} POE EVENTS # {{{ method call tracing # this code generates overload stubs for EVERY method in class # SUPER, that warn their name and args before calling SUPER:: whatever. if (0) { my $code_for_method_tracing = q!<import('sub_fullname'); } sub dump_args { return '' unless VERBOSE; my @out; my $first = 0; for (@{$_[0]}) { next if ref eq __PACKAGE__; # next if $first++; my $out; if (ref eq 'ARRAY') { $out .= '['; $out .= join ' ', map {ref $_ ? (ref $_ eq 'CODE' ? sub_fullname($_) : ref $_ ) : $_ || 'undef'} @$_; $out .= ']'; } else { $out .= ref $_ ? ref $_ : $_; } push @out, $out; } return '{' . join (" ", @out) . '}'; } # }}} dump_args # }}} method call tracing 1; __END__ # vi:foldmethod=marker: