# $Id: /mirror/coderepos/lang/perl/Queue-Q4M/trunk/lib/Queue/Q4M.pm 67347 2008-07-28T10:10:40.696016Z daisuke $ # # Copyright (c) 2008 Daisuke Maki # All rights reserved. package Queue::Q4M; use Moose; has 'auto_reconnect' => ( is => 'rw', isa => 'Bool', required => 1, default => 1, ); has 'connect_info' => ( is => 'rw', isa => 'ArrayRef', required => 1, ); has 'sql_maker' => ( is => 'rw', isa => 'SQL::Abstract', required => 1, default => sub { SQL::Abstract->new } ); has '_dbh' => ( is => 'rw', isa => 'Maybe[DBI::db]', ); has '__table' => ( is => 'rw', isa => 'Maybe[Str]' ); has '__res' => ( is => 'rw', isa => 'Maybe[Queue::Q4M::Result]' ); __PACKAGE__->meta->make_immutable; no Moose; use DBI; use SQL::Abstract; our $VERSION = '0.00011'; use constant Q4M_MINIMUM_VERSION => '0.1'; sub connect { my $self = shift; if (! ref $self) { $self = $self->new(@_); } if (my $old = $self->_dbh()) { $old->disconnect(); } my $dbh = $self->_connect(); $self->_dbh( $dbh ); # Make sure we have the minimum supported API version # (or, a Q4M enabled mysql, for that matter) my $version; eval { my $sth = $dbh->prepare(<<' EOSQL'); SELECT PLUGIN_VERSION from information_schema.plugins WHERE plugin_name = ? EOSQL $sth->execute('QUEUE'); $sth->bind_columns(\$version); $sth->fetchrow_arrayref; $sth->finish; }; warn if $@; if (! $version || $version < Q4M_MINIMUM_VERSION) { confess( "Connected database does not meet the minimum required q4m version (" . Q4M_MINIMUM_VERSION . "). Got version " . (defined $version ? $version : '(undef)' ) ); } $self; } sub _connect { my $self = shift; return DBI->connect(@{ $self->connect_info }); } sub dbh { my $self = shift; my $dbh = $self->_dbh; if (! $dbh || ! $dbh->ping) { $self->auto_reconnect or die "not connect"; $dbh = $self->_connect(); $self->_dbh( $dbh ); } return $dbh; } sub next { my $self = shift; my @args = @_; # First, undef any cached table name that we might have had $self->__table(undef); my @tables = grep { !/^\d+$/ } map { (my $v = $_) =~ s/:.*$//; $v } @args ; # Cache this statement handler so we don't unnecessarily create # string or handles my $dbh = $self->dbh; my $sql = sprintf( "SELECT queue_wait(%s)", join(',', (('?') x scalar(@args))) ); my ($index) = $dbh->selectrow_array($sql, undef, @args); my $table = $index > 0 ? $tables[$index - 1] : undef; my $res = Queue::Q4M::Result->new( rv => defined $table, table => $table, on_release => sub { $self->__table(undef) } ); if (defined $table) { $self->__table($table); } $self->__res($res); return $res; } *fetch = \&fetch_array; BEGIN { foreach my $type qw(array arrayref hashref) { eval sprintf( <<'EOSUB', $type, $type ); sub fetch_%s { my $self = shift; my $table = (!@_ || ! blessed $_[0] || ! $_[0]->isa('Queue::Q4M::Result')) ? $self->__table : shift; $table or die "no table"; my ($sql, @bind) = $self->sql_maker->select($table, @_); my $dbh = $self->dbh; return $dbh->selectrow_%s($sql, undef, @bind); } EOSUB die if $@; } } sub insert { my $self = shift; my $table = shift; my ($sql, @bind) = $self->sql_maker->insert($table, @_); my $dbh = $self->dbh; my $sth = $dbh->prepare($sql); my $rv = $sth->execute(@bind); $sth->finish; return $rv; } sub disconnect { my $self = shift; my $dbh = $self->_dbh; if ($dbh) { $dbh->do("select queue_end()"); $dbh->disconnect; $self->_dbh(undef); } } sub clear { my ($self, $table) = @_; return $self->dbh->do("DELETE FROM $table"); } sub DEMOLISH { my $self = shift; $self->disconnect; } package Queue::Q4M::Result; use overload bool => \&as_bool, '""' => \&as_string, fallback => 1 ; use Scope::Guard; sub new { my $class = shift; my %args = @_; return bless [ $args{rv}, $args{table}, Scope::Guard->new( $args{on_release} ) ], $class; } sub as_bool { $_[0]->[0] } sub as_string { $_[0]->[1] } sub DESTROY { $_[0]->[2]->dismiss(1) if $_[0]->[2] } 1; __END__ =head1 NAME Queue::Q4M - Simple Interface To q4m =head1 SYNOPSIS use Queue::Q4M; my $q = Queue::Q4M->connect( connect_info => [ 'dbi:mysql:dbname=mydb', $username, $password ], ); for (1..10) { $q->insert($table, \%fieldvals); } while ($q->next($table)) { my ($col1, $col2, $col3) = $q->fetch($table, \@fields); print "col1 = $col1, col2 = $col2, col3 = $col3\n"; } while ($q->next($table)) { my $cols = $q->fetch_arrayref($table, \@fields); print "col1 = $cols->[0], col2 = $cols->[1], col3 = $cols->[2]\n"; } while ($q->next($table)) { my $cols = $q->fetch_hashref($table, \@fields); print "col1 = $cols->{col1}, col2 = $cols->{col2}, col3 = $cols->{col3}\n"; } # to use queue_wait(table_cond1,table_cond2,timeout) while (my $which = $q->next(@table_conds)) { # $which contains the table name } $q->disconnect; =head1 DESCRIPTION Queue::Q4M is a simple wrapper to q4m, which is an implementation of a queue using mysql. =head1 METHODS =head2 new Creates a new Queue::Q4M instance. Normally you should use connect() instead. =head2 connect Connects to the target database. my $q = Queue::Q4M->connect( connect_info => [ 'dbi:mysql:dbname=q4m', ] ); =head2 next($table_cond1[, $table_cond2, $table_cond3, ..., $timeout]) Blocks until the next item is available. This is equivalent to calling queue_wait() on the given table. my $which = $q->next( $table_cond1, $table_cond2, $table_cond3 ); =head2 fetch =head2 fetch_array Fetches the next available row. Takes the list of columns to be fetched. my ($col1, $col2, $col3) = $q->fetch( $table, [ qw(col1 col2 col3) ] ); =head2 fetch_arrayref Same as fetch_array, but fetches using fetchrow_arrayref() my $arrayref = $q->fetch_arrayref( $table, [ qw(col1 col2 col3) ] ); =head2 fetch_hashref Same as fetch_array, but fetches using fetchrow_hashref() my $hashref = $q->fetch_hashref( $table, [ qw(col1 col2 col3) ] ); =head2 insert($table, \%field) Inserts into the queue. The first argument should be a scalar specifying a table name. The second argument is a hashref that specifies the mapping between column names and their respective values. $q->insert($table, { col1 => $val1, col2 => $val2, col3 => $val3 }); For backwards compatibility, you may omit $table if you specified $table in the constructor. =head2 clear($table) Deletes everything the specified queue. Be careful! =head2 dbh Returns the database handle after making sure that it's connected. =head2 disconnect Disconnects. =head2 BUILD =head2 DEMOLISH These are defined as part of Moose infrastructure =head1 AUTHOR Copyright (c) 2008 Daisuke Maki Edaisuke@endeworks.jpE =head1 CONTRIBUTOR Taro Funaki =head1 LICENSE This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. See http://www.perl.com/perl/misc/Artistic.html =cut