The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Thread::Pool::Simple;
use 5.008;
use strict;
use threads;
use threads::shared;
use warnings;
use Carp;
use Storable qw(nfreeze thaw);
use Thread::Queue;

our $VERSION = '0.03';

sub new {
    my $class = shift;
    my %arg = @_;
    my %config : shared = (min => 1,
                           max => 10,
                           load => 100,
                           lifespan => 1000,
                          );
    for ('min', 'max', 'load', 'lifespan') {
        $config{$_} = $arg{$_} if exists $arg{$_};
    }
    my %code_ref;
    $code_ref{pre} = $arg{pre} || [];
    $code_ref{do} = $arg{do} || [];
    $code_ref{post} = $arg{post} || [];

    my %obj : shared;
    my $self = \%obj;
    $self->{config} = \%config;
    $self->{pending} = Thread::Queue->new();
    $self->{done} = &share({});
    $self->{worker} = &share({});
    $self->{worker}{count} = 0;
    my $state = 1;
    $self->{state} = &share(\$state);
    bless $self, $class;
    $self->{thread} = &share({});
    async {
        $self->_run(\%code_ref);
    }->detach();
    return $self;
}

sub _run {
    my ($self, $code_ref) = @_;
    while (1) {
        {
            if (!$self->_state()) {
                sleep 1;
                next;
            }
        }
        last if $self->terminating();
        $self->increase($code_ref) if $self->busy();
        threads->yield();
    }
    my $worker = $self->{worker};
    lock %$worker;
    --$worker->{count};
    cond_signal %$worker;
}

sub _handle {
    my ($self, $code_ref) = @_;
    my $lifespan = do { lock %{$self->{config}}; $self->{config}{lifespan} };
    while (1) {
        {
            if (!$self->_state()) {
                sleep 1;
                redo;
            }
        }
        last unless $lifespan-- && !$self->terminating();
        my $do_code = $code_ref->{do};
        my $do_func = shift @$do_code;
        next unless 'CODE' eq ref $do_func;
        my ($id, $job) = unpack('Na*', $self->{pending}->dequeue());
        last if $id == 1;
        my $arg = thaw($job);
        my @ret;
        if ($id == 0) {
            eval { scalar $do_func->(@$do_code, @$arg) };
            next;
        }
        elsif ($id % 2) {
            $ret[0] = eval { $do_func->(@$do_code, @$arg) };
        }
        else {
            @ret = eval { $do_func->(@$do_code, @$arg) };
        }
        $ret[0] = $@ if $@;
        my $ret = nfreeze(\@ret);
        {
            lock %{$self->{done}};
            $self->{done}{$id} = $ret;
            cond_signal %{$self->{done}};
        }
        threads->yield();
    }
    my $post_code = $code_ref->{post};
    my $post_func = shift @$post_code;
    if ('CODE' eq ref $post_func) {
        eval { $post_func->(@$post_code) };
        carp $@ if $@;
    }
    my $worker = $self->{worker};
    lock %$worker;
    --$worker->{count};
    cond_signal %$worker;
}

sub join {
    my ($self) = @_;
    $self->_state(-1);
    my $worker = $self->{worker};
    lock %$worker;
    $self->{pending}->enqueue((pack('Na*', 1, '')) x $worker->{count});
    cond_wait %$worker while $worker->{count} >= 0;
}

sub detach {
    my ($self) = @_;
    $self->_state(-1);
    my $worker = $self->{worker};
    lock %$worker;
    $self->{pending}->enqueue((pack('Na*', 1, '')) x $worker->{count});
}

sub pause {
    my ($self) = @_;
    return unless $self->_state() > 0;
    $self->_state(0);
}

sub resume {
    my ($self) = @_;
    return if $self->_state();
    $self->_state(1);
}

sub running {
    my ($self) = @_;
    return if $self->_state() > 0;
}

sub terminating {
    my ($self) = @_;
    my $state = $self->_state();
    return unless $state < 0;
    my $pending = $self->{pending}->pending();
    return 1 if $state == -2 && !$pending;
    my $done = do { lock %{$self->{done}}; keys %{$self->{done}} };
    return 1 if $state == -1 && !$pending && !$done;
    return;
}

sub increase {
    my ($self, $code_ref) = @_;
    eval {
        my $worker = $self->{worker};
        lock %$worker;
        my $max = do { lock %{$self->{config}}; $self->{config}{max} };
        return if $worker->{count} > $max;
        my $pre_code = $code_ref->{pre};
        my $pre_func = shift @$pre_code;
        if ('CODE' eq ref $pre_func) {
            eval { $pre_func->(@$pre_code) };
            carp $@ if $@;
        }
        threads->create(\&_handle, $self, $code_ref)->detach();
        ++$worker->{count};
    };
    carp "fail to add new thread: $@" if $@;
}

sub busy {
    my ($self) = @_;
    my $worker = do { lock %{$self->{worker}}; $self->{worker}{count} };
    my ($min, $load) = do { lock %{$self->{config}}; @{$self->{config}}{'min', 'load'} };
    return $worker < $min
      || $self->{pending}->pending() > $worker * $load;
}

sub _state {
    my $self = shift;
    my $state = $self->{state};
    lock $$state;
    return $$state unless @_;
    my $s = shift;
    $$state = $s;
    return $$state;
}

sub config {
    my $self = shift;
    my $config = $self->{config};
    lock %$config;
    return %$config unless @_;
    %$config = (%$config, @_);
    return %$config;
}

sub add {
    my $self = shift;
    my $context = wantarray;
    my $arg = nfreeze(\@_);
    my $id = 0;
    while (1) {
        $id = int(rand(time())) if defined $context;
        next if defined $context && $id < 10;
        ++$id if defined $context && $context == $id % 2;
        lock %{$self->{done}};
        last unless exists $self->{done}{$id};
    }
    $self->{pending}->enqueue(pack('Na*', $id, $arg));
    return $id;
}

sub remove {
    my ($self, $id) = @_;
    return unless $id;
    lock %{$self->{done}};
    cond_wait %{$self->{done}} until exists $self->{done}{$id};
    cond_signal %{$self->{done}} if 1 < keys %{$self->{done}};
    my $ret = delete $self->{done}{$id};
    return unless defined $ret;
    $ret = thaw($ret);
    return $ret->[0] if $id % 2;
    return @$ret;
}

sub remove_nb {
    my ($self, $id) = @_;
    return unless $id;
    lock %{$self->{done}};
    my $ret = delete $self->{done}{$id};
    return unless defined $ret;
    $ret = thaw($ret);
    return ($id, $ret->[0]) if $id % 2;
    return ($id, @$ret);
}

1;
__END__

=head1 NAME

Thread::Pool::Simple - A simple thread-pool implementation

=head1 SYNOPSIS

  use Thread::Pool::Simple;

  my $pool = Thread::Pool::Simple->new(
                 min => 3,           # at least 3 workers
                 max => 5,           # at most 5 workers
                 load => 10,         # increase worker if on average every worker has 10 jobs waiting
                 lifespan => 1000,   # work retires after 1000 jobs
                 pre => [\&pre_handle, $arg1, $arg2, ...]   # run before creating worker thread
                 do => [\&do_handle, $arg1, $arg2, ...]     # job handler for each worker
                 post => [\&post_handle, $arg1, $arg2, ...] # run after worker threads end
               );

  my ($id1) = $pool->add(@arg1); #call in list context
  my $id2) = $pool->add(@arg2);  #call in scalar conetxt
  $pool->add(@arg3)              #call in void context

  my @ret = $pool->remove($id1); #get result (block)
  my $ret = $pool->remove_nb($id2); #get result (no block)

  $pool->join();                 #wait till all jobs are done
  $pool->detach();               #don't wait.

=head1 DESCRIPTION

C<Thread::Pool::Simple> provides a simple thread-pool implementaion
without external dependencies outside core modules.

Jobs can be submitted to and handled by multi-threaded `workers'
managed by the pool.

=head1 AUTHOR

Jianyuan Wu, E<lt>jwu@cpan.orgE<gt>

=head1 COPYRIGHT AND LICENSE

Copyright 2007 by Jianyuan Wu

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut