package DJabberd::Queue; use strict; use warnings; use base 'Exporter'; our @EXPORT_OK = qw(NO_CONN RESOLVING CONNECTING CONNECTED); use DJabberd::Log; our $logger = DJabberd::Log->get_logger; use fields ( 'vhost', 'endpoints', 'to_deliver', 'last_connect_fail', 'state', 'connection', ); use constant NO_CONN => \ "no connection"; use constant RESOLVING => \ "resolving"; use constant CONNECTING => \ "connecting"; use constant CONNECTED => \ "connected"; sub new { my $self = shift; my %opts = @_; $self = fields::new($self) unless ref $self; $self->{vhost} = delete $opts{vhost} or die "vhost required"; Carp::croak("Not a vhost: $self->{vhost}") unless $self->vhost->isa("DJabberd::VHost"); if (my $endpoints = delete $opts{endpoints}) { Carp::croak("endpoints must be an arrayref") unless (ref $endpoints eq 'ARRAY'); $self->{endpoints} = $endpoints; } else { $self->{endpoints} = []; } die "too many opts" if %opts; $self->{to_deliver} = []; # DJabberd::QueueItem? $self->{last_connect_fail} = 0; # unixtime of last connection failure $self->{state} = NO_CONN; # see states above return $self; } sub endpoints { my $self = shift; my $endpoints = $self->{endpoints}; if (@_) { @$endpoints = @_; } return @$endpoints; } # called by Connection::ServerOut constructor sub set_connection { my ($self, $conn) = @_; $logger->debug("Set connection for queue to '$self->{domain}' to connection '$conn->{id}'"); $self->{connection} = $conn; } sub vhost { my $self = shift; return $self->{vhost}; } sub enqueue { my ($self, $stanza, $cb) = @_; $logger->debug("Queuing stanza (" . $stanza . ") for"); if ($self->{state} == NO_CONN) { $logger->debug(" .. starting to connect to"); $self->start_connecting; } if ($self->{state} == CONNECTED) { $logger->debug(" .. already connected, writing stanza."); $self->{connection}->send_stanza($stanza); $cb->delivered; } else { $logger->debug(" .. pushing queue item."); push @{ $self->{to_deliver} }, DJabberd::QueueItem->new($stanza, $cb); } } sub failed_to_connect { my $self = shift; $self->{state} = NO_CONN; $self->{last_connect_fail} = time(); $logger->debug("Failed to connect queue"); while (my $qi = shift @{ $self->{to_deliver} }) { $qi->callback->error; } } # called by our connection when it's connected sub on_connection_connected { my ($self, $conn) = @_; $logger->debug("connection $conn->{id} connected! conn=$conn->{id}, selfcon=$self->{connection}->{id}"); # TODO why are we this checking here? return unless $conn == $self->{connection}; $logger->debug(" ... unloading queue items"); $self->{state} = CONNECTED; while (my $qi = shift @{ $self->{to_deliver} }) { $conn->send_stanza($qi->stanza); $qi->callback->delivered; # TODO: the connection might need to handle marking things as delivered # otherwise we could run into a problem if the connection dies mid-stanza. } } sub on_connection_failed { my ($self, $conn) = @_; $logger->debug("connection failed for queue"); return unless $conn == $self->{connection}; $logger->debug(" .. match"); return $self->failed_to_connect; } sub on_connection_error { my ($self, $conn) = @_; $logger->debug("connection error for queue"); return unless $conn == $self->{connection}; $logger->debug(" .. match"); my $pre_state = $self->{state}; $self->{state} = NO_CONN; $self->{connection} = undef; if ($pre_state == CONNECTING) { # died while connecting: no more luck $self->give_up_connecting; $self->on_final_error; } else { # died during an active connection, let's try again if (@{ $self->{to_deliver} }) { $logger->warn("Reconnecting to '$self->{domain}'"); $self->start_connecting; } } } sub on_final_error { my $self = shift; while (my $qi = shift @{ $self->{to_deliver} }) { $qi->callback->error("connection failure"); } } sub start_connecting { my $self = shift; $logger->debug("Starting connection"); die unless $self->{state} == NO_CONN; my $endpoints = $self->{endpoints}; unless (@$endpoints) { $self->failed_to_connect; return; } $self->{state} = CONNECTING; my $endpt = $endpoints->[0]; my $conn = $self->new_connection(endpoint => $endpt, queue => $self); $self->set_connection($conn); $conn->start_connecting; } sub new_connection { die "Sorry, this method needs to be overridden in your subclass of " . ref( $_[0] ) . "."; } package DJabberd::QueueItem; sub new { my ($class, $stanza, $cb) = @_; return bless [ $stanza, $cb ], $class; } sub stanza { return $_[0][0] } sub callback { return $_[0][1] } 1;