The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Jonk;
use strict;
use warnings;
use Jonk::Job;
use Try::Tiny;
use Carp ();

our $VERSION = '0.10_02';

sub new {
    my ($class, $dbh, $opts) = @_;

    unless ($dbh) {
        Carp::croak('missing job queue database handle.');
    }

    my $functions  = _parse_functions($opts);
    my $table_name = $opts->{table_name} || 'job';
    my $driver     = _verify_driver($dbh);

    bless {
        dbh           => $dbh,
        table_name    => $table_name,
        functions     => $functions,
        driver        => $driver,
        has_func      => scalar(keys %{$functions}) ? 1 : 0,

        _errstr       => undef,

        insert_time_callback => ($opts->{insert_time_callback}||sub{
            my ( $sec, $min, $hour, $mday, $mon, $year, undef, undef, undef ) = localtime(time);
            return sprintf('%04d-%02d-%02d %02d:%02d:%02d', $year + 1900, $mon + 1, $mday, $hour, $min, $sec);
        }),

        insert_query => sprintf(
            'INSERT INTO %s (func, arg, enqueue_time, grabbed_until, run_after, retry_cnt, priority) VALUES (?,?,?,0,?,0,?)'
            ,$table_name
        ),

        grab_query => sprintf('UPDATE %s SET grabbed_until = ? WHERE id = ? AND grabbed_until = ?', $table_name),

        lookup_job_query => sprintf('SELECT * FROM %s WHERE id = ? AND grabbed_until <= ? AND run_after <= ?', $table_name),

        find_job_query => sprintf(
            'SELECT * FROM %s WHERE func IN (%s) AND grabbed_until <= ? AND run_after <= ? ORDER BY priority DESC LIMIT %s',
            $table_name, (join(', ', map { "'$_'" } keys %{$functions})), ($opts->{job_find_size} || 50)
        ),

        delete_query => sprintf('DELETE FROM %s WHERE id = ?', $table_name),

        failed_query => sprintf('UPDATE %s SET retry_cnt = retry_cnt + 1, run_after = ?, grabbed_until = 0, priority = ? WHERE id = ?', $table_name),

        unixtime_query => _settled_unixtime_query($driver),

    }, $class;
}

sub _parse_functions {
    my $opts = shift;

    my $functions = $opts->{functions} || [];
    my $default_grab_for = $opts->{default_grab_for} || (60*60);

    my $funcs = +{};
    for (my $i = 0; $i < @{$functions}; $i++) {
        my $func = $functions->[$i];

        my $value;
        if    (not defined $functions->[$i+1]) {$i++                       }
        elsif (ref $functions->[$i+1])         {$value = $functions->[++$i]}

        $value->{grab_for}     ||= $default_grab_for;
        $value->{serializer}   ||= ($opts->{default_serializer}   || sub {$_[0]});
        $value->{deserializer} ||= ($opts->{default_deserializer} || sub {$_[0]});

        $funcs->{$func} = $value;
    }
    $funcs;
}

sub _verify_driver {
    my $dbh = shift;
    my $driver = $dbh->{Driver}{Name};
    $driver =~ /(mysql|SQLite|Pg)/ ? $driver : Carp::croak('Jonk support only mysql,SQLite,Pg');
}

sub _settled_unixtime_query {
    my $driver = shift;

    if ($driver eq 'Pg') {
        return "SELECT TRUNC(EXTRACT('epoch' from NOW()))";
    } elsif ($driver eq 'mysql') {
        return 'SELECT UNIX_TIMESTAMP()';
    }
}

sub errstr {$_[0]->{_errstr}}

sub insert {
    my ($self, $func, $arg, $opt) = @_;

    my $job_id;
    try {
        $self->{_errstr} = undef;
        local $self->{dbh}->{RaiseError} = 1;
        local $self->{dbh}->{PrintError} = 0;

        my $serializer = $self->{functions}->{$func}->{serializer} || sub {$_[0]};
        my $sth = $self->{dbh}->prepare_cached($self->{insert_query});
        $sth->bind_param(1, $func);
        $sth->bind_param(2, $serializer->($arg), _bind_param_attr($self->{driver}));
        $sth->bind_param(3, $self->{insert_time_callback}->());
        $sth->bind_param(4, $opt->{run_after}||0);
        $sth->bind_param(5, $opt->{priority} ||0);
        $sth->execute();

        $job_id = $self->{dbh}->last_insert_id("","",$self->{table_name},"");
        $sth->finish;
    } catch {
        $self->{_errstr} = "can't insert for job queue database: $_"
    };

    $job_id;
}

sub _bind_param_attr {
    my $driver = shift;

    if ( $driver eq 'Pg' ) {
        return { pg_type => DBD::Pg::PG_BYTEA() };
    } elsif ( $driver eq 'SQLite' ) {
        return DBI::SQL_BLOB();
    }
    return;
}

sub _server_unixitime {
    my $self = shift;
    return time() if $self->{driver} eq 'SQLite';
    $self->{dbh}->selectrow_array($self->{unixtime_query});
}

sub _grab_job {
    my ($self, $callback, $opt) = @_;

    my $job;
    try {
        $self->{_errstr} = undef;
        local $self->{dbh}->{RaiseError} = 1;
        local $self->{dbh}->{PrintError} = 0;

        my $time = $self->_server_unixitime;
        my $sth = $callback->($time);

        while (my $row = $sth->fetchrow_hashref) {
            $job = $self->_grab_a_job($row, $time, $opt);
            last if $job;
        }

        $sth->finish;
    } catch {
        $self->{_errstr} = "can't grab job from job queue database: $_";
    };

    $job;

}

sub _grab_a_job {
    my ($self, $row, $time) = @_;

    my $sth = $self->{dbh}->prepare_cached($self->{grab_query});
    $sth->execute(
        ($time + ($self->{functions}->{$row->{func}}->{grab_for})),
        $row->{id},
        $row->{grabbed_until}
    );
    my $grabbed = $sth->rows;
    $sth->finish;
    $grabbed ? Jonk::Job->new($self => $row) : undef;
}

sub lookup_job {
    my ($self, $job_id) = @_;

    $self->_grab_job(
        sub {
            my $time = shift;
            my $sth = $self->{dbh}->prepare_cached($self->{lookup_job_query});
            $sth->execute($job_id, $time, $time);
            $sth;
        }
    );
}

sub find_job {
    my ($self, $opts) = @_;

    unless ($self->{has_func}) {
        Carp::croak('missin find_job functions.');
    }

    $self->_grab_job(
        sub {
            my $time = shift;
            my $sth = $self->{dbh}->prepare_cached($self->{find_job_query});
            $sth->execute($time, $time);
            $sth;
        }
    );
}

sub _delete {
    my ($self, $job_id) = @_;

    try {
        my $sth = $self->{dbh}->prepare_cached($self->{delete_query});
        $sth->execute($job_id);
        $sth->finish;
        return $sth->rows;
    } catch {
        $self->{_errstr} = "can't dequeue job from job queue database: $_";
        return;
    };
}

sub _failed {
    my ($self, $job_id, $opt) = @_;

    my $retry_delay = $self->_server_unixitime + (defined($opt->{retry_delay}) ? $opt->{retry_delay} : 60);
    my $priority    = (defined($opt->{priority}) ? $opt->{priority} : 0);

    try {
        my $sth = $self->{dbh}->prepare_cached($self->{failed_query});
        $sth->execute($retry_delay, $priority, $job_id);
        $sth->finish;
        return $sth->rows;
    } catch {
    warn 'ababaaaba';
        $self->{_errstr} = "can't update job from job queue database: $_";
        return;
    };
}

1;

__END__

=head1 NAME

Jonk - simple job tank manager.

=head1 SYNOPSIS

    use DBI; 
    use Jonk;
    my $dbh = DBI->connect(...);
    my $jonk = Jonk->new($dbh, {functions => [qw/MyWorker/]});
    # insert job
    {
        $jonk->insert('MyWorker', 'arg');
    }

    # execute job
    {
        my $job = $jonk->find_job;
        print $job->func; # MyWorker
        print $job->arg;  # arg
        $job->completed;
    }

=head1 DESCRIPTION

Jonk is simple job queue manager system

Job is saved and taken out. Besides, nothing is done.

You may use Jonk to make original Job Queuing System.

=head1 METHODS

=head2 my $jonk = Jonk::Worker->new($dbh, [\%options]);

Creates a new Jonk object, and returns the object.

$options is an optional settings.

=over 4

=item * $dbh

$dbh is database handle.

=item * $options->{functions}

Key word of job which this Jonk instance looks for.

=over 4

=item * $options->{functions} = [qw/worker_key worker_key2/]

can set *worker_key* at arrayref.

=item * $options->{functions} = ['worker_key' => {grab_for => 5}],

can set worker_key's grab_for setting by hash-ref.

=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}],

can set worker_key's (de)serializer code setting by hash-ref.

=item * $options->{functions} = ['worker_key' => {serializer => \&serialize_code, deserializer => \&deserialize_code}, 'worker_key2'],

can mix worker settings.

=back

=item * $options->{table_name}

specific job table name.

Default job table name is `job`.

=item * $options->{job_find_size}

specific lookup job record size.

Default 50.

=item * $options->{default_serializer}

global serializer setting.

=item * $options->{default_deserializer}

global deserializer setting.

=item * $options->{default_grab_for}

global grab_for setting.

=back

=head2 my $job_id = $jonk->insert($func, $arg);

enqueue a job to a database.
returns job.id.

=over 4

=item * $func

specific your worker funcname.

=item * $arg

job argument data.

=back

=head2 my $job = $jonk->lookup_job($job_id);

lookup a job from a database.

returns Jonk::Job object.

=over 4

=item * $job_id

lookup specific $job_id's job.

=back

=head2 my $job = $jonk->find_job(\%opts);

=over 4

=item * $opts->{job_find_size}

find job limit size.

=back

=head2 $jonk->errstr;

get most recent error infomation.

=head1 ERROR HANDLING

  my $job = $jonk->lookup;
  if ($jonk->errstr) {
      die $jonk->errstr;
  }

=head1 SCHEMA

=head2 MySQL

    CREATE TABLE job (
        id            int(10) UNSIGNED NOT NULL auto_increment,
        func          varchar(255)     NOT NULL,
        arg           MEDIUMBLOB,
        enqueue_time  DATETIME         NOT NULL,
        grabbed_until int(10) UNSIGNED NOT NULL,
        run_after     int(10) UNSIGNED NOT NULL DEFAULT 0,
        retry_cnt     int(10) UNSIGNED NOT NULL DEFAULT 0,
        priority      int(10) UNSIGNED NOT NULL DEFAULT 0,
        primary key ( id )
    ) ENGINE=InnoDB

=head2 SQLite

    CREATE TABLE job (
        id            INTEGER PRIMARY KEY ,
        func          text,
        arg           text,
        enqueue_time  text,
        grabbed_until INTEGER UNSIGNED NOT NULL,
        run_after     INTEGER UNSIGNED NOT NULL DEFAULT 0,
        retry_cnt     INTEGER UNSIGNED NOT NULL DEFAULT 0,
        priority      INTEGER UNSIGNED NOT NULL DEFAULT 0
    )

=head2 PostgreSQL

    CREATE TABLE job (
        id            SERIAL PRIMARY KEY,
        func          TEXT NOT NULL,
        arg           BYTEA,
        enqueue_time  TIMESTAMP NOT NULL,
        grabbed_until INTEGER NOT NULL,
        run_after     INTEGER NOT NULL DEFAULT 0,
        retry_cnt     INTEGER NOT NULL DEFAULT 0,
        priority      INTEGER NOT NULL DEFAULT 0
    )

=head1 SEE ALSO

L<Qudo>

L<TheSchwartz>

=head1 SUPPORT

  irc: #jonk@irc.perl.org

=head1 REPOSITORY

  git clone git://github.com/nekokak/p5-Jonk.git

=head1 CONTRIBUTORS

tokuhirom

kan_fushihara

fujiwara

=head1 AUTHOR

Atsushi Kobayashi E<lt>nekokak _at_ gmail _dot_ comE<gt>

=head1 LICENSE

This library is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.

=cut