# Copyright 2000 by Stem Systems, Inc. All rights reserved. # If you have this software as part of the prototype release, you are # not allowed to distribute any copies to anyone. This software is not # to shown to anyone else without prior permission from Stem Systems. use strict ; package Stem::Portal ; use Data::Dumper ; use Carp ; use Stem::AsyncIO ; use Stem::Debug ; my %name_to_portal ; my %portal_to_names ; my $default_portal ; my %ssh_procs ; Stem::Route::register_class( __PACKAGE__, 'port' ) ; my $attr_spec_portal = [ { 'name' => 'reg_name', 'help' => < 'server', 'env' => 'server', 'help' => < 'port', 'default' => 10_000, 'env' => 'port', 'help' => < 'host', 'default' => 'localhost', 'env' => 'host', 'help' => < 'use_ssh', 'env' => 'use_ssh', 'help' => < 'ssh_port', 'env' => 'ssh_port', 'help' => < 'ssh_path', 'env' => 'ssh_path', 'default' => '/usr/local/bin/ssh', 'help' => <{ 'reg_name' } || $Stem::Vars::Hub_name ; #print "portal name is '$name'\n" ; # this is for a forked hub and socketpair if ( $self->{ fh } ) { $self->{'fh'} = $self->{ fh } ; $self->_activate() ; return ; } if ( $self->{ 'use_ssh' } ) { $self->_ssh1() ; } elsif ( $self->{ 'use_ssh2' } ) { return $self->_ssh2() ; } if ( $self->{'server'} ) { $self->{'type'} = 'listener' ; $self->{'server_name'} = $name ; } else { $self->{'type'} = 'client' ; $self->{'name'} = $name ; } my $sock_obj = Stem::Socket->new( 'object' => $self, 'host' => $self->{'host'}, 'port' => $self->{'port'}, 'server' => $self->{'server'}, ) ; ref $sock_obj or return $sock_obj ; $self->{'sock_obj'} = $sock_obj ; return ; } sub _ssh1 { my( $self ) = @_ ; my $ssh_port = $self->{'ssh_port'} ; $ssh_port or return "Missing ssh_port in Portal '$self->{'reg_name'}" ; my $remote_port = $self->{'port'} ; my $remote_host = $self->{'host'}, $self->{'port'} = $ssh_port ; $self->{'host'} = 'localhost' ; $self->{'remote_port'} = $remote_port ; my $remote_interface = 'localhost' ; require Stem::Proc ; my $proc = Stem::Proc->new( 'path' => $self->{'ssh_path'}, 'proc_args' => [ '-L', "$ssh_port:$remote_interface:$remote_port", $remote_host, qw( while true ; do sleep 3600 ; done ), ], 'no_io' => 1, 'no_clone' => 1, # 'use_pty' => 1, ) ; return $proc unless ref $proc ; $self->{'proc'} = $proc ; $ssh_procs{ $proc } = $proc ; sleep 3 ; } END { foreach my $proc ( values %ssh_procs ) { print "killing ssh proc\n" ; $proc->shut_down() ; } } sub _ssh2 { my( $self ) = @_ ; require Stem::Proc ; # old style ssh calling a tty2sock program which may be ressurected # qw( -q -e none mail), # "(cd /wrk/stem/src/stem ; ./run_stem ttysock tty_port=$self->{'port'})", my $proc = Stem::Proc->new( 'path' => '/usr/local/bin/ssh', 'proc_args' => [ '-f', '-L', "sleep","1000", ], 'no_io' => 1, 'no_clone' => 1, # 'use_pty' => 1, ) ; return $proc unless ref $proc ; $self->{'proc'} = $proc ; # $self->{'write_fh'} = $proc->write_fh() ; # $self->{'read_fh'} = $proc->read_fh() ; # my $err = $self->_activate() ; return ; } sub connected { my( $self, $connected_sock ) = @_ ; my( $portal ) ; #print "Portal Connected\n" ; $self->{'mode'} = 'connected' ; $self->{'read_fh'} = $connected_sock ; $self->{'write_fh'} = $connected_sock ; my $type = $self->{'type'} ; if ( $type eq 'listener' ) { # fork off a new portal by making a clone of the listener portal $portal = bless { %$self } ; $portal->{'type'} = 'accepted' ; my $name = $portal->{'server_name'} ; # my $target = $self->{'target'}->next() ; $portal->{'name'} = $name ; # $portal->{'target'} = $target ; # $portal->register( "$name-$target" ) ; delete( $portal->{'sock_obj'} ) ; } else { # a client portal is just itself $portal = $self ; unless ( $default_portal ) { $portal->register( 'DEFAULT' ) ; $default_portal = $portal ; } } $portal->_activate() ; } sub _activate { my( $self ) = @_ ; #print "activate: ", Dumper($self), "\n" ; #print "active port\n" ; my $stream_obj = Stem::Portal::Stream->new( 'object' => $self, 'read_fh' => $self->{'read_fh'}, 'write_fh' => $self->{'write_fh'}, ) ; return $stream_obj unless ref $stream_obj ; $self->{'stream'} = $stream_obj ; # $name = $self->{'name'} || $self->{'server_name'} ; # my $name = $self->{'name'} ; my $msg = Stem::Msg->new( 'from_hub' => $Stem::Vars::Hub_name, 'type' => 'register', ) ; #print $msg->dump( 'reg msg' ) ; $self->send( $msg ) ; return ; } sub stream_received { my( $self, $stream_buf ) = @_ ; my $msg = Stem::Msg::from_stream( $stream_buf ) ; if ( $msg->type() eq 'register' ) { #print $self->dump( 'REG msg' ) ; $self->register( $msg->from_hub() ) ; return ; } $msg->in_portal( $self ) ; #print $msg->dump( 'recvd' ) ; $msg->dispatch() ; } sub stream_closed { my( $self ) = @_ ; print "Portal closed\n" ; #print "CLOSED Portal : ", Dumper($self), "\n" ; Stem::Route::unregister_cell( $self ) ; $self->unregister() ; if ( $self->{'type'} eq 'accepted' ) { print "client hub '$self->{'name'}' closed\n" ; $self->shut_down() ; return ; } if ( $self->{'type'} eq 'connected' ) { die "server hub '$self->{'name'}' closed\n" ; } die "this server hub '$self->{'server_name'}' closed\n" ; } sub shut_down { my( $self ) = @_ ; print "SHUT DOWN portal\n" ; $self->{'stream'}->shut_down() ; } sub send { my( $self, $msg ) = @_ ; $msg->from_hub( $self->{'name'} ) unless $msg->from_hub() ; #print $msg->dump( 'Portal send' ) ; my $stream_buf = $msg->to_stream() ; my $stream = $self->{'stream'} ; $stream->send_to( $stream_buf ) ; } # this is for messages directly to this portal. messages are sent out # the portal via the send method sub msg_in { my( $self, $msg ) = @_ ; #print "portal msg in\n" ; } sub register { my( $portal, $name ) = @_ ; # print "portal arg: [$portal] [$name]\n\t", map( "<$_>", caller() ), "\n" ; $name_to_portal{ $name } = $portal ; push( @{$portal_to_names{ $portal }}, $name ) ; # print "\nREG table:\n", # map( "\t$_ => $name_to_portal{$_}\n", sort keys %name_to_portal ), "\n" ; } sub unregister { my( $name ) = @_ ; # convert a name to its object ; my $portal = ref $name ? $name : $name_to_portal{ $name } ; if ( $portal ) { delete $name_to_portal{ $portal } ; delete $portal_to_names{ $portal } ; } #print "UNREG: [$_] => [$name_to_portal{$_}]\n" for sort keys %name_to_portal ; } sub find { my( $hub_name ) = shift ; return $name_to_portal{ $hub_name } ; } sub status_cmd { return _dump_portals() ; } sub _dump_portals { return join '', "\nPortal Status for Hub '$Stem::Vars::Hub_name'\n", sort map "\t$_\t\t=> $name_to_portal{ $_ }\n", keys %name_to_portal ; } ############################################################################ package Stem::Portal::Stream ; =head2 Stem::Portal::Stream::new this just does blocking/unblocking of Portal messages in a text stream. it uses null bytes as separators. in the future it will also support a digit string size header line for efficiency but it will still check for the null byte for robustness. many options to this: immediate shutdown, soft shutdown, disk backing, recovery, etc. =cut use Data::Dumper ; use Carp ; my $attr_spec_stream = [ { 'name' => 'object', 'required' => 1, 'help' => < 'read_fh', 'required' => 1, 'help' => < 'write_fh', 'required' => 1, 'help' => < 'receive_method', 'default' => 'stream_received', 'help' => < 'closed_method', 'default' => 'stream_closed', 'help' => <new( 'object' => $self, 'read_fh' => $self->{'read_fh'}, 'write_fh' => $self->{'write_fh'}, ) ; $self->{'aio'} = $aio ; return $self ; } sub shut_down { my( $self ) = @_ ; #print "SHUT DOWN port : ", Dumper($self), "\n" ; $self->{'aio'}->shut_down() ; delete @{$self}{qw( object aio )} ; } sub send_to { my( $self, $stream_buf ) = @_ ; defined( $stream_buf ) || croak "no message in Stream::send" ; #print "send [$stream_buf]\n" ; # mark the message boundary with null byte $stream_buf .= "\0" ; $self->{'aio'}->write( \$stream_buf ) ; } sub async_read_data { my( $self, $data_ref ) = @_ ; my $buf_ref = \$self->{'read_buf'} ; $$buf_ref .= ${$data_ref} ; #print "buf [$$buf_ref]\n" ; my $method = $self->{'receive_method'} ; while( $$buf_ref ) { my $sep_ind = index( $$buf_ref, "\0" ) ; last unless $sep_ind > -1 ; my $msg_buf = substr( $$buf_ref, 0, $sep_ind + 1, '' ) ; chop $msg_buf ; #print "MSG BUF: [", unpack( 'H*', $msg_buf ), "]\n" ; $self->{'object'}->$method( $msg_buf ) ; } } sub async_closed { my( $self ) = @_ ; my $method = $self->{ 'closed_method' } ; print "PS async closed: [$method]\n" ; $self->{'object'}->$method() ; } ############################################################################ package Stem::Portal::Stream::Test ; use Socket ; sub go { my( $self, $sock_1, $sock_2, $stream ) ; require IO::Handle ; print __PACKAGE__, " testing\n" ; # get a pair of sockets to create connected streams $sock_1 = IO::Handle->new() ; $sock_2 = IO::Handle->new() ; socketpair( $sock_1, $sock_2, AF_UNIX, SOCK_STREAM, PF_UNSPEC ) || die "can't make socket pair $!" ; # create the test object ; # create the read and write events $self = bless { 'stream_num' => 1 } ; $stream = Stem::Portal::Stream->new( 'object' => $self, 'read_fh' => $sock_1, 'write_fh' => $sock_1, ) ; $self->{'stream'} = $stream ; # send the test message from stream 1 to 2 $stream->send_to( 'test stream1' ) ; $stream->send_to( 'test stream1 2' ) ; $stream->send_to( 'test stream2' ) ; $stream->send_to( 'test stream2 2' ) ; $self = bless { 'stream_num' => 2 } ; $stream = Stem::Portal::Stream->new( 'object' => $self, 'read_fh' => $sock_2, 'write_fh' => $sock_2, ) ; $self->{'stream'} = $stream ; Stem::Event::start_loop() ; print "end test\n" ; } # default callback method sub stream_received { my( $self, $msg_text ) = @_ ; #print "Stream rcv $self->{'stream_num'} [$msg_text]\n" ; if ( $self->{'stream_num'} == 2 ) { $self->{'stream'}->send_to( "Echo <$msg_text>" ) ; } elsif ( $msg_text =~ /Echo.+2/ ) { print "success\n" ; exit ; } } 1 ;