# -*- mode: perl -*- # # Copyright (c) 2008 Stephen R. Scaffidi # All rights reserved. # # This program is free software; you may redistribute it and/or modify it # under the same terms as Perl itself. # # Current RCS Info: # # $Id: Asynch.pm 38 2008-03-11 16:48:32Z hercynium $ # $HeadURL: https://rtg-utilities.svn.sourceforge.net/svnroot/rtg-utilities/SNMP-Asynch/lib/SNMP/Query/Asynch.pm $ # $Date: 2008-03-11 12:48:32 -0400 (Tue, 11 Mar 2008) $ # $Author: hercynium $ # $Revision: 38 $ package SNMP::Query::Asynch; # Pragmas use strict; use warnings; # Standard use Carp; # Cpan use SNMP; # See chap 17, pg. 404, PBP (Conway 2005) # use version; our $VERSION = qv('0.1_32'); use version; our $VERSION = qv(sprintf "0.1_%d", q$Revision: 38 $ =~ /: (\d+)/); # This comes in handy so we don't pass bogus # parameters to SNMP::Session->new() use vars qw(@valid_sess_params); @valid_sess_params = qw( DestHost Community Version RemotePort Timeout Retries RetryNoSuch SecName SecLevel SecEngineId ContextEngineId Context AuthProto AuthPass PrivProto PrivPass AuthMasterKey PrivMasterKey AuthLocalizedKey PrivLocalizedKey VarFormats TypeFormats UseLongNames UseSprintValue UseEnums UseNumeric BestGuess NonIncreasing ); #---------------------------------------------------------- # Constructor sub new { my $class = shift; my $self = bless {}, $class; $self->{query_stack} = []; $self->{results} = []; $self->{max_in_flight} = 10; $self->{current_in_flight} = 0; $self->{this_run_issued} = 0; $self->{this_run_finished} = 0; $self->{grand_total_issued} = 0; $self->{grand_total_finished} = 0; return $self; } #--------------------------------------------------------- # Verifies that the named parameter is a subref sub _check_param_callback { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref return 1 unless exists $params->{$param_name}; croak "Bad parameter for [$param_name] - not a CODE ref" if ref $params->{$param_name} ne 'CODE'; return 1; } #--------------------------------------------------------- # TODO Fill in the code and use in the add_XXX() or _make_XXX_query() methods # Verifies that the named parameter is something the SNMP # module can use as a VarBind or VarBindList. sub _check_param_varbinds { my $self = shift; my $param_name = shift; # string, hash key my $params = shift; # hashref return; } #--------------------------------------------------------- sub add_getbulk { my $self = shift; my $params = shift; my $query_stack = $self->{query_stack}; # These are required for all query ops so make sure they're present. my $varbinds = $params->{VarBinds} || croak "Bad or missing parameter [VarBinds]"; my $query_type = $params->{QueryType} || croak "Bad or missing parameter [QueryType]"; # Make sure our callback params are valid. $self->_check_param_callback( 'PreCallback', $params ); $self->_check_param_callback( 'PostCallback', $params ); if ( $query_type eq 'getbulk' ) { my $query = $self->_make_getbulk_query($params); my $query_stack = $self->{query_stack}; push @$query_stack, $query; } else { croak "Attempt to add using unknown query type: $query_type\n"; } return; } #--------------------------------------------------------- # NOTE This method has gotten long and complex. # TODO Refactor, but think carefully. # # I see a couple of places where I can create closure-based memory leaks, and # knowing myself, I could end up doing it inadvertenly at any time. It's code # like this where some more experienced hackers could help me *big time* sub _make_getbulk_query { my $self = shift; my $query_info = shift; # These params are required for a getbulk query op. my $non_repeaters = exists $query_info->{NonRepeaters} ? $query_info->{NonRepeaters} : croak "Bad or missing parameter [NonRepeaters]"; my $max_repeaters = exists $query_info->{MaxRepeaters} ? $query_info->{MaxRepeaters} : croak "Bad or missing parameter [MaxRepeaters]"; # Currently, these are validated in the add() method, so no need here. my $preop_callback = $query_info->{PreCallback}; my $postop_callback = $query_info->{PostCallback}; my $batch_callback = $query_info->{BatchCallback}; # TODO I may want to add a method to validate # and/or transform the VarBinds parameter my $varbinds = $query_info->{VarBinds}; # Parse out the parameters for creating the session. # I really think I should be validating them better here... # Maybe I need a separate subroutine... # TODO write the routine described above. my %sess_params; $sess_params{$_} = $query_info->{$_} for grep { exists $query_info->{$_} } @valid_sess_params; my $batch_size = $query_info->{BatchSize}; return sub { # I wonder if this should be before or after the counter increments? $preop_callback->() if $preop_callback; $self->{current_in_flight}++; $self->{this_run_issued}++; $self->{grand_total_issued}++; my $callback = sub { my $bulkwalk_results = shift; # Store the results and info about the query for later... push @{ $self->{results} }, $query_info, $bulkwalk_results; # NOTE should I do this callback here, or somewhere else? hmmmmm.... $postop_callback->() if $postop_callback; # Am I introducing a bug with the increments # here and the SNMP::finish() down below? $self->{current_in_flight}--; $self->{this_run_finished}++; $self->{grand_total_finished}++; $self->{BatchCallback}() if $self->{BatchSize} && ( $self->{this_run_finished} % $self->{BatchSize} == 0 ); if ( scalar @{ $self->{query_stack} } ) { my $next_query = pop @{ $self->{query_stack} }; return $next_query->(); } $self->{current_in_flight} <= 0 ? return SNMP::finish() : return 1; }; my $sess = SNMP::Session->new(%sess_params); my $operation_id = $sess->bulkwalk( $non_repeaters, $max_repeaters, $varbinds, [$callback] ); # Why, yes. I am being sneaky here. Since $query_info is referenced # within the $callback closure, any data stored in the hash it points # to should be available, regardless of whether is was stored before # or after the closure definition. $query_info->{OperationId} = $operation_id; return; # No need to return anything, AFAICT. }; } #--------------------------------------------------------- sub current_in_flight { return shift->{current_in_flight} } sub this_run_issued { return shift->{this_run_issued} } sub this_run_finished { return shift->{this_run_finished} } sub grand_total_issued { return shift->{grand_total_issued} } sub grand_total_finished { return shift->{grand_total_finished} } sub shuffle { } # TODO Implement with List::Util::shuffle #--------------------------------------------------------- sub execute { my $self = shift; my $params = shift; # The KeepLast option can come in handy if, for example, another # thread or process is working on the contents of the results # array from a previous execution and may not finish before the # next execution. @{ $self->{results} } = () unless ( # I'll make my OWN idioms from now on, HAHA! (You can explicitly # set keeplast or it will use the object's default) defined $params->{KeepLast} ? $params->{KeepLast} : $self->{KeepLast} ); # Install 'batch' callback if applicable... $self->{BatchCallback} = $params->{BatchCallback} if $self->_check_param_callback( 'BatchCallback', $params ); $self->{BatchSize} = $params->{BatchSize}; # Determine our maximum concurrency level for this run my $max_in_flight = $params->{InFlight} || $self->{max_in_flight}; # Make a copy of the stack in case we want to run the same query my $query_stack_ref = $self->{query_stack}; my @query_stack_copy = @{ $self->{query_stack} }; # Set some counters $self->{current_in_flight} = 0; $self->{this_run_issued} = 0; $self->{this_run_finished} = 0; # Begin issuing operations while ( scalar @$query_stack_ref ) { my $query = pop @$query_stack_ref; $query->(); last if $self->{current_in_flight} >= $max_in_flight; } # Wait for the ops to complete, or time-out (if specified) $params->{MasterTimeout} ? SNMP::MainLoop( $params->{MasterTimeout}, &SNMP::finish() ) : SNMP::MainLoop(); # Reset the stack for the next run. $self->{query_stack} = \@query_stack_copy; return $self->get_results_ref(); } #--------------------------------------------------------- # Returns a reference to the results array from executing the query. sub get_results_ref { return shift->{results}; } 1; __END__ =pod =head1 NAME SNMP::Query::Asynch - Fast asynchronous execution of batches of SNMP queries =head1 VERSION Version 0.01 =head1 SYNOPSIS use SNMP::Query::Asynch; my @varbinds = qw( ifDescr ifInOctets ifOutOctets ifAlias ifType ifName ifInErrors ifOutErrors ifSpeed ifAdminStatus ifOperStatus ); my $query = SNMP::Query::Asynch->new(); # You should create and populate @hosts to make this synposis code work. # It's an AoH, fairly simple. For example... my @hosts = create_hosts_array('snmp_hosts.csv'); foreach my $host (@hosts) { # Add a getbulk operation to the queue. $query->add_getbulk({ # Params passed to directly to SNMP::Session->new() DestHost => $host->{HOSTIP}, Community => $host->{COMMUNITY}, Version => $host->{SNMPVER}, # getbulk only supports 2 or 3. # Params concerning the type of query operation # See POD for SNMP::Session->getbulk() in this case. MaxRepeaters => 20, NonRepeaters => 0, # The varbinds to be operated on - can be a reference to anything # supported by the corresponding query operation in SNMP::Session. VarBinds => \@varbinds, }); } # Execute the queries that were added, get a reference to the results array. my $results = $query->execute({ InFlight => 50, # Simultaneous operations MasterTimeout => 60, # Seconds until unfinished operations are aborted. }); # See what the results look like. use Data::Dumper; print Dumper $results; =head1 DESCRIPTION This module allows for a fairly simple, streamlined means of executing large numbers of SNMP operations as fast as your systems can handle. It extensively uses net-snmp's asynchronous operation interfaces and callbacks to keep as much data flowing as you need. Perl's support of closures and anonymous subroutines provide the means for sophisticated, elegant control of query operations before and after execution. There are also facilities to install callbacks that are run after pre-set numbers (batches) of operations are completed . These callbacks can be used to log progress, update the user, transfer results from memory to disk (or even another thread or process!) or anything you can think of! If there's some feature you desire, do not hesitate to ask me!!! Please be aware - my primary design concern is speed and flexibility. I have certain non-scientific, subjective benchmarks I use to decide if some modification is worth-while, but so far the design of the internals of this module has lent itself to feature additions and enhancements very well. =head1 SUBROUTINES/METHODS =head3 new Constructs a new query object with an empty queue. =head2 Query Operation Addition Methods In order to build a queue of query operations, you would repeatedly call one or more of these methods below to add an operation to the queue. In addition to the parameters described for each method, they all require parameters that are passed directly to SNMP::Session->new() at execution time. However, those parameters are somewhat validated when the methods are called, to try to make debugging easier for code using this module. See the discussion below on "Required Parameters SNMP::Session->new()" for more information. =head3 add_getbulk Adds a getbulk query operation to the queue. =head3 add_gettable - NOT YET IMPLEMENTED Adds a gettable query operation to the queue. =head3 add_get - NOT YET IMPLEMENTED Adds a get query operation to the queue. =head3 add_fget - NOT YET IMPLEMENTED Adds an fget query operation to the queue. =head3 add_bulkwalk - NOT YET IMPLEMENTED Adds a bulkwalk query operation to the queue. =head3 add_getnext - NOT YET IMPLEMENTED Adds a getnext query operation to the queue. =head3 add_fgetnext - NOT YET IMPLEMENTED Adds an fgetnext query operation to the queue. =head3 add_set - NOT YET IMPLEMENTED Adds a set query operation to the queue. =head3 Required Parameters SNMP::Session->new() Each query operation in the queue requires that a SNMP::Session object is constructed, but you can't pre-construct the SNMP::Session objects and pass those in because any more than 1023 of these in memory at a time will crash the program. We get around this limitation by constructing only as many sessions as are needed to support the in-flight operations, and no more. To do that at execution time requires that the user specify the SNMP::Session->new() parameters whenever they add a new query operation using the methods below. Therefore, I have listed below the parameters for SNMP::Session->new() that will be accepted by each of the query operation additions methods. Because of these limitations, this module is tightly coupled with the L module. There's really no other way to go about it, at least none that I can think of that doesn't unacceptably degrade the execution speed. =over 4 =item * DestHost =item * Community =item * Version =item * RemotePort =item * Timeout =item * Retries =item * RetryNoSuch =item * SecName =item * SecLevel =item * SecEngineId =item * ContextEngineId =item * Context =item * AuthProto =item * AuthPass =item * PrivProto =item * PrivPass =item * AuthMasterKey =item * PrivMasterKey =item * AuthLocalizedKey =item * PrivLocalizedKey =item * VarFormats =item * TypeFormats =item * UseLongNames =item * UseSprintValue =item * UseEnums =item * UseNumeric =item * BestGuess =item * NonIncreasing =back =head2 execute Executes all operations in the queue. =head2 shuffle Shuffles the operations in the queue so they are executed in random order. =head2 current_in_flight Returns the number of operations currently issued which have not yet been completed. Please note: completed means that the operation was issued and either results data or an error condition were recieved. If the operation was interrupted before that happens, then is not counted as completed. This typically only happens when the call to execute() was interrupted by a fatal error or the query-object's master timeout was exceeded. =head2 this_run_issued Returns the number of operations that have been issued during the current execute() call. If called after execute() has completed, returns the number issued during the most recent execute() call. Each call to execute() resets this value to zero. =head2 this_run_finished Returns the number of operations that have been completed during the current execute() call. If called after execute() has completed, returns the number completed during the most recent execute() call. Each call to execute() resets this value to zero. =head2 grand_total_issued Returns the number of operations that have been issued during all calls to execute() since the query object was created. This value is *never* reset. =head2 grand_total_finished Returns the number of operations that have been completed during all calls to execute() since the query object was created. This value is *never* reset. =head2 get_results_ref Returns a reference to the query object's internal results array. NOTE: Yes, I *know* that providing access to an object's internal data is poor OO design, but it gets the job done right now. I do plan on converting the results array into it's own type of object with OO-kosher semantics, but only if that does not substantially impact overall speed. That said, use this method with caution because in the future it will likely be changed to return something completely different than an array-ref or maybe even be removed and replaced with a different method. I may also have a moment of insanity and make the results-object tied so as to look like an array. But I doubt it unless people ask for it enough. =head1 EXPORTS Nothing. =head1 SEE ALSO SNMP SNMP::Effective SNMP::Multi MRTG RTG YATG =head1 AUTHOR Steve Scaffidi, C<< >> =head1 BUGS Please report any bugs or feature requests to C, or through the web interface at L. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes. =head1 SUPPORT You can find documentation for this module with the perldoc command. perldoc SNMP::Query::Asynch You can also look for information at: =over 4 =item * AnnoCPAN: Annotated CPAN documentation L =item * CPAN Ratings L =item * RT: CPAN's request tracker L =item * Search CPAN L =back =head1 ACKNOWLEDGEMENTS =head1 DEPENDENCIES =head1 BUGS AND LIMITATIONS =head1 INCOMPATIBILITIES =head1 LICENSE AND COPYRIGHT Copyright 2008 Steve Scaffidi, all rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut