package Parallel::Fork::BossWorker; use 5.008008; use strict; use warnings; use Carp; use Data::Dumper qw(Dumper); use IO::Handle; # Perl module variables our @ISA = qw(); our $VERSION = '0.03'; sub new { my $class = shift; my %values = @_; my $self = { result_handler => $values{result_handler} || undef, # Method for handling output of the workers worker_count => $values{worker_count} || 10, # Number of workers global_timeout => $values{global_timeout} || 0, # Number of seconds before the worker terminates the job, 0 for unlimited work_handler => $values{work_handler}, # Handler which will process the data from the boss work_queue => [], msg_delimiter => "\0\0\0" }; bless $self, ref($class) || $class; # The work handler is required if (not defined $self->{work_handler}) { croak("Parameters \`work_handler' is required."); } return $self; } sub add_work(\@) { my $self = shift; my $work = shift; push(@{ $self->{work_queue} }, $work); } sub process { my $self = shift; my $handler = shift; eval { # If a worker dies, there's a problem local $SIG{CHLD} = sub { my $pid = wait(); if (defined $self->{workers}->{$pid}) { confess("Worker $pid died."); } }; # Start the workers $self->start(); # Handy pipe reference my $from_workers = $self->{from_workers}; # Read from the workers, loop until they all shut down while(my $result = $self->receive($from_workers)) { # Process the result handler if ($result->{data} && defined $self->{result_handler}) { &{ $self->{result_handler} }( $result->{data} ); } # If there's still work to be done, send it to the worker, otherwise shut it down if ($#{ $self->{work_queue} } > -1) { my $worker = $self->{workers}->{$result->{pid}}; $self->send( $self->{workers}->{ $result->{pid} }, # Worker's pipe pop(@{ $self->{work_queue} }) ); } else { my $fh = $self->{workers}->{ $result->{pid} }; delete($self->{workers}->{ $result->{pid} }); close($fh); } } }; if ($@) { croak($@); } } sub start { my $self = shift(); # Create a pipe for the workers to communicate to the boss pipe($self->{from_workers}, $self->{to_boss}); # Create the workers foreach (1..$self->{worker_count}) { # Open a pipe for the worker pipe(my $from_boss, my $to_worker); # Fork off a worker my $pid = fork(); if ($pid > 0) { # Boss $self->{workers}->{$pid} = $to_worker; } elsif ($pid == 0) { # Worker # Close unused pipes close($self->{from_workers}); close($to_worker); open(STDIN, '/dev/null'); # Setup communication pipes my $to_boss = $self->{to_boss}; # Send the initial request $self->send($to_boss, {pid => $$}); # Start processing $self->worker($from_boss); # When the worker subroutine completes, exit exit; } else { confess("Failed to fork: $!"); } } # Close unused pipes close($self->{to_boss}); delete($self->{to_boss}); } sub worker(\*) { my $self = shift(); my $from_boss = shift(); # Read instructions from the server while (my $instructions = $self->receive($from_boss)) { # If the handler's children die, that's not our business $SIG{CHLD} = 'IGNORE'; # Execute the handler with the given instructions my $result; eval { # Handle alarms local $SIG{ALRM} = sub { die "Work handler timed out." }; # Set alarm alarm($self->{global_timeout}); # Execute the handler and get it's result if (defined $self->{work_handler}) { $result = &{ $self->{work_handler} }($instructions); } # Disable alarm alarm(0); }; # Warn on errors if ($@) { croak("Worker $$ error: $@"); } # Send the result to the server $self->send($self->{to_boss}, {pid => $$, data => $result}); } } sub receive(\*) { my $self = shift(); # Get the file handle my $fh = shift(); # Get a value from the file handle local $/ = $self->{msg_delimiter}; my $value = <$fh>; # Deserialize the data no strict; no warnings; my $data = eval($value); if ($@) { confess("Failed to deserialize data: $@"); } return $data; } sub send(\*$) { my $self = shift(); # Get the file handle my $fh = shift(); # Get the value which will be sent my $value = shift(); # Print the value to the file handle local $Data::Dumper::Deepcopy = 1; local $Data::Dumper::Indent = 0; local $Data::Dumper::Purity = 1; print $fh Dumper($value) . $self->{msg_delimiter}; # Force the file handle to flush $fh->flush(); } 1; __END__ =head1 NAME Parallel::Fork::BossWorker - Perl extension for easiliy creating forking queue processing applications. =head1 SYNOPSIS The minimal usage of Parallel::Fork::BossWorker requires you supply the work_handler argument which returns a hash reference. use Parallel::Fork::BossWorker; # Create new BossWorker instance my $bw = new Parallel::Fork::BossWorker( work_handler => sub { my $work = shift; ... do work here ... return {}; } ); $bw->add_work({key=>"value"}); $bw->process(); Additionally, you could specify the result_handler argument, which is passed the hash reference returned from your work_handler. use Parallel::Fork::BossWorker; # Create new BossWorker instance my $bw = new Parallel::Fork::BossWorker( work_handler => sub { my $work = shift; ... do work here ... return {result => "Looks good"}; }, result_handler => sub { my $result = shift; print "$result->{result}\n"; } ); =head1 DESCRIPTION Parallel::Fork::BossWorker makes creating multiprocess applications easy. The module is designed to work in a queue style of setup; with the worker processes requesting 'work' from the boss process. The boss process transparently serializes and sends the work data to your work handler, to be consumed and worked. The worker process then transparently serializes and sends optional data back to the boss process to be handled in your result handler. This process repeats until the work queue is empty. =head1 METHODS =head2 new(...) Creates and returns a new Parallel::Fork::BossWorker object. my $bw = Parallel::Fork::BossWorker->new(work_handler => \&routine) Parallel::Fork::BossWorker has options which allow you to customize how exactly the queue is handled and what is done with the data. =over 4 =item * C<< work_handler => \&routine >> The work_handler argument is required, the sub is called with it's first and only argument being one of the values in the work queue. Each worker calls this sub each time it receives work from the boss process. The handler may trap $SIG{ALRM}, which may be called if global_timeout is specified. The work_handler should clean up after itself, as the workers may call the work_handler more than once. =item * C<< result_handler => \&routine >> The result_handler argument is optional, the sub is called with it's first and only argument being the return value of work_handler. The boss process calls this sub each time a worker returns data. This subroutine is not affected by the value of global_timeout. =item * C<< global_timeout => $seconds >> By default, a handler can execute forever. If global_timeout is specified, an alarm is setup to terminate the work_handler so processing can continue. =item * C<< worker_count => $count >> By default, 10 workers are started to process the data queue. Specifying worker_count can scale the worker count to any number of workers you wish. Take care though, as too many workers can adversely impact performance, though the optimal number of workers will depend on what your handlers do. =item * C<< msg_delimiter => $delimiter >> Sending messages to and from the child processes is accomplished using Data::Dumper. When transmitting data, a delimiter must be used to identify the breaks in messages. By default, this delimiter is "\0\0\0", this delimiter may not appear in your data. =head2 add_work(\%work) Adds work to the instance's queue. $bw->add_work({data => "my data"}); =head2 process() Forks off the child processes and begins processing data. $bw->process(); =head1 REQUIREMENTS This module depends on the following modules: Carp Data::Dumper IO::Handle =head1 BUGS None I'm aware of yet, but I'm sure I'll receive reports :) =head1 SEE ALSO =head1 AUTHOR Jeff Rodriguez, Ejeff@jeffrodriguez.comE =head1 COPYRIGHT AND LICENSE Copyright (c) 2007, Jeff Rodriguez All rights reserved. Redistribution and use in source and binary forms, with or without modification, are permitted provided that the following conditions are met: * Redistributions of source code must retain the above copyright notice, this list of conditions and the following disclaimer. * Redistributions in binary form must reproduce the above copyright notice, this list of conditions and the following disclaimer in the documentation and/or other materials provided with the distribution. * Neither the name of the nor the names of its contributors may be used to endorse or promote products derived from this software without specific prior written permission. THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. =cut