use strict;
package Event::tcpsession;
use Carp;
use Symbol;
use Socket;
use Ioctl qw(FIONBIO);
use Errno qw(EAGAIN);
use Event 0.61;
use Event::Watcher qw(R W T);
require Event::io;
use base 'Event::io';
use vars qw($VERSION);
$VERSION = '0.14';
use constant DEBUG_SHOW_RPCS => 0;
use constant DEBUG_BYTES => 0;
use constant PROTOCOL_VERSION => 2;
use constant RECONNECT_TM => 3;
use constant HEADER_FORMAT => 'Nn';
# special message IDs
use constant NOREPLY_ID => 0;
use constant APIMAP_ID => 1;
use constant RESERVED_IDS => 2;
'Event::Watcher'->register;
# API is an ordered array:
# { name => 'opname', code => sub {}, req => 'nn' }
# { name => 'opname', code => sub {}, req => 'nn', reply => 'nn' }
sub new {
my ($class, %p) = @_;
my @passthru;
push @passthru, desc => $p{desc} if
exists $p{desc};
my $o = $class->SUPER::new(parked => 1, reentrant => 0, @passthru);
$o->{status_cb} = $p{cb} || sub {};
$o->{api} = $p{api} || [];
$o->{delayed} = [];
$o->{q} = []; # message queue
$o->{pend} = {}; # pending transactions
$o->{next_txn} = $$;
$o->set_peer(can_ignore => 1, %p);
$o;
}
sub is_server_side { # make function call XXX
my ($o) = @_;
!exists $o->{iaddr}
}
# Transaction IDs are for keeping track of roundtrip messaging.
# They are also used for special messages. Special messages
# only use low-order IDs. The special range from
# [0x8000, 0x8000 + RESERVEDIDS) is unused.
#
# use 1 bit to distinguish short/long messages? XXX
#
sub get_next_transaction_id {
my ($o) = @_;
$o->{next_txn} = ($o->{next_txn}+1) & 0x7fff;
$o->{next_txn} = RESERVED_IDS if $o->{next_txn} < RESERVED_IDS;
$o->{next_txn} | ($o->is_server_side ? 0x8000 : 0);
}
#########################################################################
sub fd {
if (@_ == 1) {
shift->SUPER::fd;
} else {
my ($o, $fd) = @_;
if (caller eq __PACKAGE__) {
if ($fd) {
ioctl $fd, FIONBIO, pack('l', 1)
or die "ioctl FIONBIO: $!";
#setsockopt($c->{e_fd}, IPPROTO_TCP, TCP_NODELAY, pack('l',1))
# or die "setsockopt: $!";
}
$o->SUPER::fd($fd)
} else {
if (!defined $fd) {
# This is a special case for regression testing.
# Who knows, maybe it is generally useful too.
close $o->fd;
$o->SUPER::fd(undef)
} else {
$o->set_peer(fd => $fd);
}
}
}
}
sub cb {
if (caller eq __PACKAGE__) {
shift->SUPER::cb(@_);
} else {
my $o = shift;
if (@_ == 0) {
$o->{status_cb}
} else {
$o->{status_cb} = shift;
}
}
}
#########################################################################
sub set_peer {
my ($o,%p) = @_;
croak "set_peer: '".$o->desc."' already connected"
if $o->{peer_set};
if (exists $p{port}) {
#client side
my $iaddr;
if (exists $p{host}) {
my $host = $p{host};
$iaddr = inet_aton($host) || die "Lookup of host '$host' failed";
} elsif (exists $p{iaddr}) {
$iaddr = $p{iaddr};
warn "Both iaddr & host given; host ignored" if exists $p{host};
} else {
$iaddr = inet_aton('localhost');
}
my $port = $p{port};
$o->{iaddr} = $iaddr;
$o->{port} = $port;
$o->{status_cb}->($o, 'not available')
if !$o->connect_to_server;
} elsif (exists $p{fd}) {
#server side
$o->fd($p{fd});
$o->reconnected;
} else {
return
if $p{can_ignore};
croak("connect to what?");
}
$o->{peer_set} = 1;
}
sub disconnect {
my ($o, $why) = @_;
if ($o->is_server_side) {
# recovery is always client's responsibility
$o->cancel;
return 1;
}
$o->{status_cb}->($o, 'disconnect', $why);
$o->connect_to_server;
}
sub connect_to_server {
my ($o) = @_;
$o->fd(undef);
my $fd = gensym;
socket($fd, PF_INET, SOCK_STREAM, getprotobyname('tcp'))
or die "socket: $!";
if (!connect($fd, sockaddr_in($o->{port}, $o->{iaddr}))) {
$o->{status_cb}->($o, 'connect', $!);
$o->timeout(RECONNECT_TM);
$o->cb([$o,'connect_to_server']);
$o->start;
return
}
$o->fd($fd);
$o->{status_cb}->($o, 'connect');
$o->reconnected;
1
}
sub reconnected {
my ($o) = @_;
$o->timeout(undef);
delete $o->{pend};
delete $o->{peer_version};
delete $o->{peer_api};
delete $o->{peer_opname};
$o->{ibuf} = '';
$o->{obuf} = pack 'n', PROTOCOL_VERSION;
append_obuf($o, APIMAP_ID, join("\n", map {
my @z = ($_->{name}, $_->{req} || '');
push @z, $_->{reply} || '' if exists $_->{reply};
join($;, @z);
} @{$o->{api}}));
# reload pending transactions
# (anything not requiring acknowledgement gets/got ignored)
while (my ($tx,$i) = each %{$o->{pend}}) {
# warn "pend $i->[0]{name}";
append_obuf($o, $tx, $i->[2]);
}
$o->poll(R|W);
$o->cb([$o,'service']);
$o->start;
}
#########################################################################
sub append_obuf { # function call
my ($o, $tx, $m) = @_;
# length is inclusive
my $mlen = length $m;
$o->{obuf} .= pack(HEADER_FORMAT, 6+$mlen, $tx) . $m;
$o->poll($o->poll | W);
}
sub pack_args {
my $template = shift;
if ($template) {
pack $template, @_;
} elsif (@_ == 0) {
''
} elsif (@_ == 1) {
$_[0]
} else {
undef
}
}
sub unpack_args {
my ($template, $bytes) = @_;
if ($template) {
unpack $template, $bytes
} elsif (length $bytes) {
$bytes
} else {
()
}
}
sub service {
my ($o, $e) = @_;
my $w = $e->w;
return $o->disconnect("inactivity")
if $e->got & T;
return $o->disconnect("fd closed")
if !defined $w->fd;
if ($e->got & R) {
my $buf = $o->{ibuf};
while (1) {
my $ret = sysread $w->fd, $buf, 8192, length($buf);
next if $ret;
last if $!{EAGAIN};
return $o->disconnect("sysread ret=$ret, $!");
}
#warn "$$:R:".unpack('h*', $buf).":";
# decode $buf
if (!exists $o->{peer_version} and length $buf >= 2) {
# check PROTOCOL_VERSION ...
$o->{peer_version} = unpack 'n', substr($buf, 0, 2);
warn "$$:peer_version=$o->{peer_version}"
if DEBUG_SHOW_RPCS;
$buf = substr $buf, 2;
$o->disconnect("peer version mismatch $o->{peer_version} != ".
PROTOCOL_VERSION)
if $o->{peer_version} != PROTOCOL_VERSION;
}
while (length $buf >= 6) {
my ($len, $tx) = unpack HEADER_FORMAT, $buf;
last if length $buf < $len; # got a complete message?
my $m = substr $buf, 6, $len-6;
$buf = substr $buf, $len; # snip
if ($tx == NOREPLY_ID) {
my $opid = unpack 'n', $m;
$m = substr $m, 2;
my $api = $o->{api}[$opid];
if (!$api) {
warn "API $opid not found (ignored)";
next
}
# EVAL
my @args = unpack_args($api->{req}, $m);
warn "$$:Run($opid)(".join(', ', @args).")"
if DEBUG_SHOW_RPCS;
$api->{code}->($o, @args);
} elsif ($tx < RESERVED_IDS) {
if ($tx == APIMAP_ID) {
my @api;
for my $packedspec (split /\n/, $m) {
my @spec = split /$;/, $packedspec, -1;
if (@spec == 2 or @spec == 3) {
my @p=( name => $spec[0], req => $spec[1]);
push @p, reply => $spec[2]
if @spec == 3;
push @api, { @p };
} else {
warn "got strange API spec: ".join(', ',@spec);
}
}
warn "$$: ".(0+@api)." APIs"
if DEBUG_SHOW_RPCS;
$o->{peer_api} = \@api;
my %peer_opname;
for (my $x=0; $x < @api; $x++) {
$peer_opname{$api[$x]{name}} = $x;
}
$o->{peer_opname} = \%peer_opname;
for my $rpc (@{$o->{delayed}}) {
$o->rpc(@$rpc);
}
$o->{delayed} = [];
} else {
die "Unknown TX $tx?";
}
} else {
if ($tx >= 0x8000 xor $o->is_server_side) {
my $opid = unpack 'n', $m;
$m = substr $m, 2;
my $api = $o->{api}[$opid];
if (!$api) {
warn "API $opid not found (ignored)";
next
}
# EVAL
my @args = unpack_args($api->{req}, $m);
warn "$$:Run($opid)(".join(", ", @args).") returning..."
if DEBUG_SHOW_RPCS;
my @ret = $api->{code}->($o, @args);
# what if exception? XXX
warn "$$:Return($opid)(".join(", ", @ret).")"
if DEBUG_SHOW_RPCS;
my $packed_ret = pack_args($api->{reply}, @ret);
warn("'$api->{name}' returned (".join(', ',@ret).
" yet doesn't have a reply pack template")
if !defined $packed_ret;
append_obuf($o, $tx, pack('n',$opid).$packed_ret);
} else {
my $pend = $o->{pend}{$tx};
if (!$pend) {
warn "Got unexpected reply for TXN $tx (ignored)";
next;
}
my ($api,$cb) = @$pend;
my $opid = unpack 'n', $m; # can double check opid XXX
# EVAL
my @args= unpack_args($api->{reply}, substr($m, 2));
warn "$$:RunReply($opid)(".join(", ", @args).")"
if DEBUG_SHOW_RPCS;
$cb->($o, @args);
}
}
}
$o->{ibuf} = $buf;
}
if (length $o->{obuf}) {
my $buf = $o->{obuf};
my $sent = syswrite($w->fd, $buf, length($buf), 0);
if ($!{EAGAIN}) {
$sent ||= 0;
} elsif (!defined $sent) {
return $o->disconnect("syswrite: $!")
}
if ($sent) {
warn "$$:W:".unpack('h*', substr($buf, 0, $sent)).":"
if DEBUG_BYTES;
$buf = substr $buf, $sent;
$o->{obuf} = $buf;
}
}
if (length $o->{obuf}) {
$o->poll($o->poll | W);
} else {
$o->poll($o->poll & ~W);
if (keys %{$o->{pend}}) {
# close connection if a timeout is exceeded
}
}
}
sub rpc {
my $o = shift;
if (!defined $o->fd or !exists $o->{peer_opname}) {
my @copy = @_;
#my $fileno = $o->fd? fileno($o->fd) : 'undef';
#warn "$$: delay $copy[0] ($fileno, $o->{peer_opname})";
push @{$o->{delayed}}, \@copy;
return;
}
my $opname = shift;
confess "No opname?"
if !$opname;
my $id = $o->{peer_opname}{$opname};
croak "'$opname' not found on peer (".
join(' ', sort keys %{$o->{peer_opname}}).")"
if !defined $id;
my $api = $o->{peer_api}[$id];
# prepare for reply (if any)
my $tx;
my $save;
if (!exists $api->{reply}) {
$tx = NOREPLY_ID;
} else {
$tx = $o->get_next_transaction_id;
die "too many pending transactions"
if exists $o->{pend}{$tx};
$save = $o->{pend}{$tx} = [$api, shift];
}
warn "$$:Call($id)(".join(", ", @_).")"
if DEBUG_SHOW_RPCS;
my $packed_args = pack_args($api->{req}, @_);
croak("Attempt to invoke '$opname' with (".join(', ', @_).
") without pack template")
if !defined $packed_args;
my $packed_msg = pack('n', $id).$packed_args;
$save->[2] = $packed_msg
if $save;
append_obuf($o, $tx, $packed_msg);
}
1;
__END__
=head1 NAME
Event::tcpsession - reliable bidirectional RPC session layer
=head1 SYNOPSIS
my $api = [
{ name => 'my_rpc',
req => 'nN', # network short, network long
reply => '', # no translator for reply
code =>
sub { 'returned to caller' } # server-side code
},
...
];
Event->tcpsession(fd => $socket, api => $api);
=head1 DESCRIPTION
Automatic client-side recovery.
Embedded NULLs are OK.
What are the arbitrary limits?
=head1 SUPPORT
If you have insights or complaints then please subscribe to the
mailing list! Send email to:
majordomo@perl.org
The body of your message should read:
subscribe perl-loop
This list is archived at
http://www.xray.mpe.mpg.de/mailing-lists/perl-loop/
Thanks!
=head1 COPYRIGHT
Copyright © 1999 Joshua Nathaniel Pritikin. All rights reserved.
This program is free software; you can redistribute it and/or modify
it under the same terms as Perl itself.
=cut