package AnyEvent::Gearman::Connection; use Any::Moose; use Scalar::Util 'weaken'; use AnyEvent::Socket; use AnyEvent::Handle; has hostspec => ( is => 'ro', isa => 'Str', required => 1, ); has _host => ( is => 'rw', isa => 'Str', ); has _port => ( is => 'rw', isa => 'Int | Str', ); has context => ( is => 'rw', isa => 'Object', weak_ref => 1, ); has handler => ( is => 'rw', isa => 'Maybe[AnyEvent::Handle]', clearer => 'clear_handler', ); has on_connect_callbacks => ( is => 'rw', isa => 'ArrayRef', default => sub { [] }, ); has dead_time => ( is => 'rw', isa => 'Int', default => 0, ); has _need_handle => ( is => 'rw', isa => 'ArrayRef', default => sub { [] }, ); has _job_handles => ( is => 'rw', isa => 'HashRef', default => sub { {} }, ); has _con_guard => ( is => 'rw', isa => 'Object', ); no Any::Moose; sub BUILD { my $self = shift; # parse hostspec my ($host, $service) = parse_hostport $self->hostspec; unless (defined $host) { $host = $self->hostspec; $service = 4730; } unless (defined($host) && defined($service)) { die sprintf('Failed to parse hostspec: "%s"', $self->hostspec); } $self->_host( $host ); $self->_port( $service ); } sub connect { my ($self) = @_; # already connected return if $self->handler; my $g = tcp_connect $self->_host, $self->_port, sub { my ($fh) = @_; if ($fh) { my $handle = AnyEvent::Handle->new( fh => $fh, on_read => sub { $self->process_packet }, on_error => sub { my @undone = @{ $self->_need_handle }, values %{ $self->_job_handles }; $_->event('on_fail') for @undone; $self->_need_handle([]); $self->_job_handles({}); $self->mark_dead; }, ); $self->handler( $handle ); $_->() for map { $_->[0] } @{ $self->on_connect_callbacks }; } else { warn sprintf("Connection failed: %s", $!); $self->mark_dead; $_->() for map { $_->[1] } @{ $self->on_connect_callbacks }; } $self->on_connect_callbacks( [] ); }; weaken $self; $self->_con_guard($g); $self; } sub connected { !!shift->handler; } sub add_on_ready { my ($self, $cb, $eb) = @_; if ($self->connected) { $cb->(); } else { push @{ $self->on_connect_callbacks }, [ $cb, $eb ]; $self->connect; } } sub mark_dead { my ($self) = @_; $self->dead_time( time + 10 ); $self->clear_handler; } sub alive { my ($self) = @_; $self->dead_time <= time; } sub process_packet { my $self = shift; my $handle = $self->handler; $handle->unshift_read(chunk => 4, sub { unless ($_[1] eq "\0RES") { die qq[invalid packet: $_[1]"]; } $_[0]->unshift_read( chunk => 8, sub { my ($type, $len) = unpack 'NN', $_[1]; my $packet_handler = $self->can("process_packet_$type"); unless ($packet_handler) { # Ignore unimplement packet $_[0]->unshift_read( chunk => $len, sub {} ) if $len; return; } $packet_handler->( $self, $len ); }); }); weaken $self; } __PACKAGE__->meta->make_immutable; __END__ =head1 NAME AnyEvent::Gearman::Connection - common base class to handle connection =head1 SEE ALSO L, L, L. =head1 METHODS =head2 connect =head2 connected =head2 add_on_ready =head2 mark_dead =head2 alive =head2 process_packet =head1 AUTHOR Daisuke Murase Pedro Melo =head1 COPYRIGHT AND LICENSE Copyright (c) 2009 by KAYAC Inc. This program is free software; you can redistribute it and/or modify it under the same terms as Perl itself. The full text of the license can be found in the LICENSE file included with this module. =cut