package Data::Consumer; use warnings; use strict; use Carp qw(confess cluck); use vars qw/$Debug $VERSION $Fail $Cmd/; # This code was formatted with the following perltidy options: # -ple -ce -bbb -bbc -bbs -nolq -l=100 -noll -nola -nwls='=' -isbc -nolc -otr -kis # If you patch it please use the same options for your patch. =head1 NAME Data::Consumer - Repeatedly consume a data resource in a robust way =head1 VERSION Version 0.10 =cut $VERSION= '0.10'; =head1 SYNOPSIS use Data::Consumer; my $consumer = Data::Consumer->new( type => $consumer_name, unprocessed => $unprocessed, working => $working, processed => $processed, failed => $failed, max_passes => $num_or_undef, max_process => $num_or_undef, max_elapsed => $seconds_or_undef, ); $consumer->consume( sub { my $id = shift; print "processed $id\n"; } ); =head1 DESCRIPTION It is a common requirement to need to process a feed of items of some sort in a robust manner. Such a feed might be records that are inserted into a table, or files dropped in a delivery directory. Writing a script that handles all the edge cases, like getting "stuck" on a failed item, and manages things like locking so that the script can be parallelized can be tricky and is certainly repetitive. The aim of L is to provide a framework to allow writing such consumer type scripts as easy as writing a callback that processes each item. The framework handles the rest. The basic idea is that one need only use, or in the case of a feed type not already supported, define a L subclass which implements a few reasonably well defined primitive methods which handle the required tasks, and then the L methods use those to provide a DWIMily consistant interface to the end consumer. Currently L is distributed with two subclasses, L for handling records in a MySQL db (using the MySQL C function), and L for handling a drop directory scenario (like for FTP or a mail directory). Once a resource type has been defined as a L subclass the use pattern is to construct the subclass with the appropriate arguments, and then call consume with a callback. =head2 The Consumer Pattern The consumer pattern is where code wants to consume an 'atomic' resource piece by piece. The consuming codes doesnt really want to worry much about how they got the piece a task that should be handled by the framework. The consumer subclasses assume that the resource can be modeled as a queue (that there is some ordering principle by which they can be processed in a predictable sequence). The consume pattern in full glory is something very close to the following following pseudo code. The items marked with asterixs are where user callbacks may be invoked: DO RESET TO THE BEGINNING OF THE QUEUE WHILE QUEUE NOT EMPTY AND CAN *PROCEED* ACQUIRE NEXT ITEM TO PROCESS FROM QUEUE MARK AS 'WORKING' *PROCESS* ITEM IF PROCESSING FAILED MARK AS 'FAILED' OTHERWISE MARK AS 'PROCESSED' SWEEP UP ABANDONDED 'WORKING' ITEMS AND MARK THEM AS 'FAILED' UNTIL WE CANNOT *PROCEED* OR NOTHING WAS PROCESSED RELEASE ANY LOCKS STILL HELD This implies that each item potentially has four states: C, C, C and C. In a database these might be values in a field, in a drop directory scenario these would be different directories, but with all of them they would normally be supplied as values to the L subclass being created. =head2 Subclassing Data::Consumer L can be used with any resource type that can be modeled as a queue, supports some form of advisory locking mechanism, and provides a way to discriminate between at least the C and C state. The routines that must be defined for a new consumer type are C, C, C, C, and C<_mark_as()>, C<_do_callback()>>. =over 4 =item new It is almost for sure that a subclass will need to override the default constructor. All L objects are blessed hashes, and in fact you should always call the parents classes constructor first with: my $self= $class->SUPER::new(); =item reset This routine is used to reset the objects internal state so the next call to acquire will return the first available item in the queue. =item acquire This routine is to find and in some way lock the next item in the queue. It should ensure that it call is_ignored() on each item to verify the item has not been requested to be ignored. =item release This routine is to release any held locks in the object. =item _mark_as This routine is called to "mark" an item as a particular state. It should be able to handle user supplied values. For instance L implements this as an update statement that maps user supplied values to the consumer state names. Possible states are: C, C, C, C. =item _do_callback This routine is used to call the user supplied callback with the correct arguments. What arguments are appropriate for the callback are context dependent on the type of class. For instance in L calls the callback with the arguments C<($consumer, $id, $dbh)> whereas L calls the callback with the arguments C<($consumer, $filespec, $filehandle, $filename)>. The point is that the end user should be passed the arguments that make sense, not necessarily the same thing for each consumer type. =back Every well-behaved L subclass should include the functional equivalent of the following code in its .pm file: use base 'Data::Consumer'; __PACKAGE__->register(); This will ensure that it can be properly loaded by C<< Data::Consumer->new(type=>$shortname) >>. It is also normal for a L subclass to provide special methods as needed. For instance C<< Data::Consumer::Dir->fh() >> and C<< Data::Consumer::MySQL->dbh() >>. =head1 METHODS =head2 CLASS->new(%opts) Constructor. Normally L's constructor is not called directly, instead the constructor of a subclass is used. However to make it easier to have a data driven load process L accepts the C argument which should specify the the short name of the subclass (the part after C) or the full name of the subclass. Thus Data::Consumer->new(type=>'MySQL',%args); is exactly equivalent to calling Data::Consumer::MySQL->new(%args); except that the former will automatically require or use the appropriate module and the latter necessitates that you do so yourself. Every L subclass constructor supports the following arguments on top of any that are subclass specific. Additionally some arguments are universally used, but have different meaning depending on the subclass. =over 4 =item unprocessed How to tell if the item is unprocessed. How this argument is interpreted depends on the L subclass involved. =item working How to tell if the item is currently being worked on. How this argument is interpreted depends on the L subclass involved. =item processed How to tell if the item has already been worked on. How this argument is interpreted depends on the L subclass involved. =item failed How to tell if processing failed while handling the item. How this argument is interpreted depends on the L subclass involved. =item max_passes => $num_or_undef Normally C will loop through the data set until it is exhausted. By setting this parameter you can control the maximum number of iterations, for instance setting it to C<1> will result in a single pass through the data per invocation. If C<0> (or any other false value) is treated as meaning "loop until exhausted". =item max_processed => $num_or_undef Maximum number of items to process per invocation. If set to a false value there is no limit. =item max_failed => $num_or_undef Maximum number of failed process attempts that may occur before consume will stop. If set to a false value there is no limit. Setting this to 1 will cause processing to stop after the first failure. =item max_elapsed => $seconds_or_undef Maximum amount of time that may have elapsed when starting a new process. If more than this value has elapsed then no further processing occurs. If C<0> (or any false value) then there is no time limit. =item proceed => $code_ref This is a callback that may be used to control the looping process in consume via the C method. See the documentation of C and C =item sweep => $bool *** NOTE CURRENTLY THIS OPTION IS DISABLED *** If this parameter is true, and there are four modes defined (C, C, C, C) then consume will perform a "sweep up" after every pass, which is responsible for moving "abandonded" files from the working directory (such as from a previous process that segfaulted during processing). Generally this should not be necessary. =back =head2 CLASS->register(@alias) Used by subclasses to register themselves as a L subclass and register any additional aliases that the class may be identified as. Will throw an exception if any of the aliases are already associated to a different class. When called on a subclass in list context returns a list of the subclasses registered aliases, If called on L in list context returns a list of all alias class mappings. =cut =head2 $class_or_object->debug_warn_hook() Specify a callback to use to capture diagnostics data produced by a Data::Consumer object. If called as a class method sets the default object for all Data::Consumer objects that have not explicitly set a hook. If called as an object method sets the hook to use for that object alone. Returns the current effective hook. Defaults to use the default_debug_warn() method for the object. Thus it can be overriden by a subclass if necessary. The hook will be called with the arguments ($consumer,$level,@lines) and is not expected to return anything. =cut my $debug_warn_hook; sub debug_warn_hook { my $self= shift; if (@_) { if (ref $self) { $self->{debug_warn_hook}= shift; } else { $debug_warn_hook= shift; } } if (ref $self and defined $self->{debug_warn_hook}) { return $self->{debug_warn_hook}; } return $debug_warn_hook || $self->can('default_debug_warn'); } =head2 $class_or_object->default_debug_warn($level,$debug); Use warn to output diagnostics. Message includes the process id and the class name. =cut sub default_debug_warn { my $self= shift; my $level= shift; cluck($level) if $level=~/\D/; my $debug_level= $self->debug_level; if ( $debug_level > $level ) { warn ref($self) || $self, "\t$$\t>>> $_\n" for @_; } } =head2 $class_or_object->debug_level($level,@debug_lines) Set the minimum debug level. When called as an object method sets the value of that object alone. undef is distinct from 0 in that undef results in the global debug level being used for that object. When called as a class method sets the value for all objects which do not have a defined debug level. Returns the current effective debug level for the object or class. =cut sub debug_level { my $self= shift; if (@_) { if (ref $self) { $self->{debug_level}= shift; } else { $Debug= shift; } } if (ref $self and defined $self->{debug_level}) { return $self->{debug_level}; } return $Debug || 0; } =head2 $class_or_object->debug_warn($level,@debug_lines) If the current debugging level is above C<$level> then call the current debug_warn_hook() to output a set of diagnostic messages. =cut sub debug_warn { my $self=shift; my $level=shift; my $hook=$self->debug_warn_hook; my $pfx= ref $self ? $self->{debug_pfx} || '' : ''; $hook->($self,$level,map { $pfx.$_ } @_); } BEGIN { my %alias2class; my %class2alias; $Debug and $Debug >= 5 and warn "\n"; sub register { my $class= shift; ref $class and confess "register() is a class method and cannot be called on an object\n"; my $pack= __PACKAGE__; if ( $class eq $pack ) { return wantarray ? %alias2class : 0 + keys %alias2class; } ( my $std_name= $class ) =~ s/^\Q$pack\E:://; $std_name =~ s/::/-/g; my @failed; for my $name ( $class, $std_name, @_ ) { if ( $alias2class{$name} and $alias2class{$name} ne $class ) { push @failed, $name; next; } __PACKAGE__->debug_warn( 5, "registered '$name' as an alias of '$class'" ); $alias2class{$name}= $class; $class2alias{$class}{$name}= $class; } @failed and confess "Failed to register aliases for '$class' as they are already used\n", join( "\n", map { "\t'$_' is already assigned to '$alias2class{$_}'" } @failed ), "\n"; return wantarray ? %{ $class2alias{$class} } : 0 + keys %{ $class2alias{$class} }; } sub new { my ( $class, %opts )= @_; ref $class and confess "new() is a class method and cannot be called on an object\n"; if ( $class eq __PACKAGE__ ) { my $type= $opts{type} or confess "'type' is a mandatory named parameter for $class->new()\n"; my $full = $type; if (!$alias2class{$full}) { if ($full!~/::/) { $full=~s/-/::/g; $full=join '::',$class,$full; } eval "require $full; 1" or confess "'type' parameter '$type' could not be loaded properly: $@\n"; } $class= $alias2class{$full} or confess "'type' parameter '$type' mapped to '$full' which does not seem to exist\n"; } my $object= bless {}, $class; $class->debug_warn( 5, "created new object '$object'" ); return $object; } } =head2 $object->last_id() Returns the identifier for the last item acquired. Returns undef if acquire has never been called or if the last attempt to acquire data failed because none was available. =cut sub last_id { my $self= shift; return $self->{last_id}; } # Until i figure out to make gedit handle begin/end directives this has to # stay commented out #=begin dev # #=head2 $object->_mark_as($type,$id) # #** Must be overriden ** # #Mark an item as a particular type if the object defines that type. # #This is wrapped by mark_as() for error checking, so you are guaranteed #that $type will be one of # # 'unprocessed', 'working', 'processed', 'failed' # #and that $object->{$type} will be true value, and that $id will be from #the currently acquired item. # #=end dev =head2 $object->mark_as($type) Mark an item as a particular type if the object defines that type. Allowed types are C, C, C, C =cut sub _mark_as { confess "must be overriden" } BEGIN { my ( %valid, @valid ); @valid= qw ( unprocessed working processed failed ); @valid{@valid}= ( 1 .. @valid ); sub mark_as { my $self= shift @_; my $key= shift @_; $valid{$key} or confess "Unknown type in mark_as(), valid options are ", join( ", ", map { "'$_'" } @valid ), "\n"; my $id= @_ ? shift @_ : $self->last_id; defined $id or confess "Nothing acquired to be marked as '$key' in mark_as.\n"; return unless $self->{$key}; return $self->_mark_as( $key, $id ); } } =head2 $object->process($callback) Marks the current item as C and processes it using the C<$callback>. If the C<$callback> dies then the item is marked as C, otherwise the item is marked as C once the C<$callback> returns. The return value of the C<$callback> is ignored. C<$callback> will be called with at least two arguments, the first being the $consumer object itself, and the second being an identifier for the current record. Normally additional, likely to be useful, arguments are provided as well, on a per subclass basis. For example L will pass in the consumer object, the id of the to be processed record, and a copy of the consumers database handle as well for convenience. On the other hand L will pass in the consumer object, followed by a filespecification for the file to be processed, an open filehandle to the file, and the filename itself (with no path). The callback may call the methods 'leave', 'ignore', 'fail', and 'halt' on the consumer object before returning, typically by doing something like return $consumer->ignore; this allows the callback to send specific signals to consume, specifically leave : return the item to the unprocessed state after the callback returns. ignore : return the item to the unprocessed state after the callback returns and never attempt to process it again with this consumer object. fail : same result as dieing in a callback, except without throwing an exception in the situation where there might be $SIG{__DIE__} hooks to worry about. halt : stop the consume() process after this has been executed For further details always consult the relevent subclasses documentation for C =cut sub process { my $self= shift; my $callback= shift; my $id= $self->last_id; defined $id or $self->error("Undefined last_id. Nothing acquired yet?"); $self->mark_as('working'); local $Cmd; delete $self->{defer_leave}; my $error= $self->_do_callback($callback); $error ||= $self->{fail}; if ( $error ) { $self->mark_as('failed'); $self->error($error); } else { if ($self->{defer_leave}) { $self->mark_as('unprocessed'); } else { $self->mark_as('processed'); } } return 1; } =head2 $consumer->leave() Sometimes its useful to defer processing. This method when called from within a consume/process callback will result in the item being marked as 'unprocessed' after the callback returns (so long as it does not die). Typically this is invoked as return $consumer->leave; from withing a consume/process callback. Returns $consumer. Will die if not 'unprocessed' state is defined. =cut sub leave { my $self= shift; confess("Can't leave as 'unprocessed' is undefined!") if !$self->{unprocessed}; $self->{defer_leave}++; return $self; } =head2 $consumer->ignore(@list) This can used to cause acquire to ignore each item in @list. If @list is empty then it is assumed it is being called from within consume/process and and marks the currently acquired item as ignored and calls $consumer->leave(). Returns $consumer. Will die if not 'unprocessed' state is defined. =cut sub ignore { my $self= shift; if (@_) { for my $id (@_) { $self->{ignore}{$id}++; } } else { my $id= $self->last_id; $self->{ignore}{$id}++; $self->leave; } return $self; } =head2 $consumer->fail($message) Same as doing die($message) from within a consume/process callback except that no exception is thrown (no $SIG{__DIE__} callbacks are invoked) and the error is defered until the callback actually returns. Typically used as return $consumer->fail; from within a consumer() callback. Returns the $consumer object. =cut sub fail { my $self= shift; $self->{fail}= shift; return $self; } =head2 $consumer->halt() Causes consume() to halt processing and exit once the callback returns. Typically invoked like return $consumer->halt; or return $consumer->fail->halt; Returns the consumer object. =cut sub halt { my $self= shift; $self->{halt}++; return $self; } =head2 $object->is_ignored($id) Returns true if an item has been set to be ignored. If $id is omitted defaults to last_id =cut sub is_ignored { my $self= shift; my $id= @_ ? shift @_ : $self->last_id; return if !defined $id; return $self->{ignore}{$id} ? 1 : 0 } =head2 $object->reset() Reset the state of the object. =head2 $object->acquire() Aquire an item to be processed. returns an identifier to be used to identify the item acquired. =head2 $object->release() Release any locks on the currently held item. Normally there is no need to call this directly. =cut sub reset { confess "abstract method must be overriden by subclass\n"; } sub acquire { confess "abstract method must be overriden by subclass\n"; } sub release { confess "abstract method must be overriden by subclass\n"; } =head2 $object->error() Calls the C callback if the user has provided one, otherwise calls C. Probably not all that useful for an end user. =cut sub error { my $self= shift; if ( $self->{error} ) { $self->{error}->(@_); } else { confess @_; } } =head2 $object->consume($callback) Consumes a data resource until it is exhausted using C, C, and C as appropriate. Normally this is the main method used by external processes. Before each attempt to acquire a new resource, and once at the end of each pass consume will call C to determine if it can do so. The user may hook into this by specifying a callback in the constructor. This callback will be executed with no args when it is in the inner loop (per item), and with the number of passes at the end of each pass (starting with 1). =head2 $object->proceed($passes) Returns C if the conditions specified at construction time are satisfied and processing may proceed. Returns C otherwise. If the user has specified a C callback in the constructor then this will be executed before any other rules are applied, with a reference to the current C<$object>, a reference to the runstats, and if being called at the end of pass with the number of passes. If this callback returns C then the other rules will be applied, and only if all other conditions from the constructor are satisfied will C itself return C. =head2 $object->runstats() Returns a reference to a hash of statistics about the last (or currently running) execution of consume. Example: { 'passes' => 2, 'processed_this_pass' => 0, 'processed' => 3, 'start_time' => 1209750962, 'failed' => 0, 'elapsed' => 0, 'end_time' => 1209750962, 'failed_this_pass' => 0 } Note that start_time and end_time are unix timestamps. =cut sub runstats { $_[0]->{runstats} } sub proceed { my $self= shift; my $runstats= $self->{runstats}; $runstats->{end_time}= time; $runstats->{elapsed}= $runstats->{end_time} - $runstats->{start_time}; if ( my $cb= $self->{proceed} ) { $cb->( $self, $self->{runstats}, @_ ) # pass on the $passes argument if its there or return; } for my $key (qw(elapsed passes processed failed)) { my $max= "max_$key"; return if $self->{$max} && $runstats->{$key} >= $self->{$max}; } return if $self->{halt}; return 1; } sub consume { my $self= shift; my $callback= shift; my $passes= 0; unless ($self->{runstats}) { $self->{runstats}= {}; $self->{runstats}{$_}= 0 for qw(passes processed failed processed_this_pass failed_this_pass); } my $runstats= $self->{runstats}; $runstats->{start_time}= time; $self->reset(); do { ++$runstats->{passes}; $runstats->{processed_this_pass}= $runstats->{failed_this_pass}= 0; while ( $self->proceed && defined( my $item= $self->acquire ) ) { eval { $self->process($callback); $runstats->{processed_this_pass}++; $runstats->{processed}++; 1; } or do { $runstats->{failed_this_pass}++; $runstats->{failed}++; # quotes force string copy $self->debug_warn(5, "Failed during \$self->process(\$callback): $@"); } } } while $self->proceed( $runstats->{passes} ) && $runstats->{processed_this_pass}; # if we still hold a lock let it go. delete $self->{halt}; $self->release; return $runstats; } =head1 AUTHOR Yves Orton, C<< >> =head1 BUGS Please report any bugs or feature requests to C, or through the web interface at L. I will be notified, and then you'll automatically be notified of progress on your bug as I make changes. =head1 SUPPORT You can find documentation for this module with the perldoc command. perldoc Data::Consumer You can also look for information at: =over 4 =item * RT: CPAN's request tracker L =item * AnnoCPAN: Annotated CPAN documentation L =item * CPAN Ratings L =item * Search CPAN L =back =head1 ACKNOWLEDGEMENTS Igor Sutton for ideas, testing and support =head1 COPYRIGHT & LICENSE Copyright 2008 Yves Orton, all rights reserved. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. =cut 1; # End of Data::Consumer