# $Id: Session.pm,v 1.13 2004/04/22 20:45:58 davidb Exp $ # # Copyright (C) 2003 Verisign, Inc. # # This library is free software; you can redistribute it and/or # modify it under the terms of the GNU Lesser General Public # License as published by the Free Software Foundation; either # version 2.1 of the License, or (at your option) any later version. # # This library is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU # Lesser General Public License for more details. # # You should have received a copy of the GNU Lesser General Public # License along with this library; if not, write to the Free Software # Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 # USA package Net::BEEP::Lite::Session; =head1 NAME Net::BEEP::Lite::Session =head1 DESCRIPTION This is a base class for BEEP sessions. It handles core tasks common to both server and client sessions. This class isn't intended to be used directly. Instead, use one of its subclasses. Note that in reality, this is really a TCP session. It is not abstracted away from the TCP transport for BEEP. In the future it is possible that it will be and a new TCPSession subclass will be created. =cut use IO::Socket; use Net::BEEP::Lite::Channel; use Net::BEEP::Lite::MgmtProfile; use Carp; use strict; use warnings; =head1 CONSTRUCTOR =over 4 =item new( I ) This is the main constructor. It takes a named parameter list as its argument. See the C method for a list of valid parameter names. =back =cut sub new { my $this = shift; my $class = ref($this) || $this; my $self = {}; bless $self, $class; $self->initialize(@_); $self; } =head1 METHODS =over 4 =item initialize( I ) Initialize the object. This is generally called by the constructor and subclasses. It takes the following named parameters: =over 4 =item Socket the socket associated with this session. =item NoGreeting Do not send the greeting message. This can be sent later with the C method. This will be true if a socket isn't supplied. =item DefaultLocalWindow Set the base local TCP window to a particular value. This number should be 4096 (the default) or higher. =item IdleTimeout The number of seconds to wait for a frame. Zero (the default) means to wait indefinitely. =item Timeout The number of seconds to wait for a frame body to be completely read. This should ususally be non-zero to prevent framing errors from locking the session forever. The default is 30 seconds. =back It also takes the named parameters for C. =cut sub initialize { my $self = shift; my %args = @_; # some defaults: $self->{debug} = 0; $self->{trace} = 0; # we assume the initiator role. the listener subclass should set # this to 2. $self->{channelno_counter} = 1; # create our management profile. FIXME: we may want to allow this # to be passed in (and only instantiate it once for all sessions.) $self->{mgmt_profile} = Net::BEEP::Lite::MgmtProfile->new(%args); # our local profiles. $self->{profiles} = {}; # the remote profiles. $self->{remote_profiles} = {}; # our channels. basically, a hash of channel number to channel # object. $self->{channels} = {}; # The default size of our local windows. This should be at least # 4096. $self->{default_local_window} = 4096; # assign the management profile to channel zero. $self->_add_channel(0, $self->{mgmt_profile}); $self->channel(0)->msgno(1); # start msgno at one because of # greeting RPY. # our general received message queue; $self->{messages} = []; # our default idle timeout $self->{idle_timeout} = 0; # our default read timeout for frame bodies. $self->{timeout} = 60; for (keys %args) { my $val = $args{$_}; /^socket/io and do { $self->{sock} = $val; next; }; /^no.?greeting$/io and do { $self->{_no_greeting} = $val; next; }; /^default.?local.?window$/io and do { $self->{default_local_window} = $val; next; }; /^debug$/io and do { $self->{debug} = $val; next; }; /^trace$/io and do { $self->{trace} = $val; next; }; /^idle.?timeout/io and do { $self->{idle_timeout} = $val; next; }; /^timeout$/io and do { $self->{timeout} = $val; next; }; } # set NoGreeting to true if the socket wasn't set or isn't $self->{_no_greeting} = 1 if (! $self->{sock} or (ref($self->{sock}) and !$self->{sock}->isa('IO::Socket'))); } =item socket() Returns the internal socket. =cut sub _socket { my $self = shift; $self->{sock}; } =item _set_socket($socket) Change the session's internal socket to the supplied socket. Only use this if you know what you are doing. =cut sub _set_socket { my $self = shift; my $socket = shift; $self->{sock} = $socket; } =item _next_channel_number() This returns the next channel number to request. =cut sub _next_channel_number { my $self = shift; my $res = $self->{channelno_counter}; $self->{channelno_counter} += 2; $self->{channelno_counter} %= 2147483648; $res; } =item _add_channel($channel_number, [$profile, [$local_window_size]]) This is called by the management profile upon receiving a or message from the peer. If $profile is provided, then this will bind that profile to the channel. =cut sub _add_channel { my $self = shift; my $number = shift; my $profile = shift; my $window = shift || $self->{default_local_window}; $self->{channels}->{$number} = new Net::BEEP::Lite::Channel (Number => $number, Profile => $profile, Window => $window, Debug => $self->{debug}, Trace => $self->{trace}); } =item _del_channel($channel_number) This is called by profiles when it needs to close a channel (either a close request on the channel or a tuning reset (see C<_del_all_channels>). =cut sub _del_channel { my $self = shift; my $number = shift; delete $self->{channels}->{$number}; } =item _del_all_channels() Close (and destroy) all current channels. This is most likely to be done as part of a tuning reset. You will have to re-add channel zero after this. =cut sub _del_all_channels { my $self = shift; for my $n (keys %{$self->{channels}}) { $self->_del_channel($n); } } =item add_local_profile($profile) This method will add a (local) profile to the session. This will be advertised in the greeting message. =cut sub add_local_profile { my $self = shift; my $p = shift; $self->{profiles}->{$p->uri()} = $p; } =item get_local_profile($uri) Returns the profile implementation associated with the given uri. =cut sub get_local_profile { my $self = shift; my $uri = shift; $self->{profiles}->{$uri}; } =item get_local_profile_uris() Returns the list of profile URIs currently being advertised by this peer. =cut sub get_local_profile_uris { my $self = shift; keys %{$self->{profiles}}; } =item del_local_profile($uri) Removes a local profile. =cut sub del_local_profile { my $self = shift; my $uri = shift; delete $self->{profiles}->{$uri}; } =item add_remote_profile($uri) This method is used internally when the remote peer advertises a profile in the greeting message. =cut sub add_remote_profile { my $self = shift; my $uri = shift; $self->{remote_profiles}->{$uri} = 1; } =item has_remote_profile($uri) This method returns true if the remote profile advertised the given profile uri, false otherwise. =cut sub has_remote_profile { my $self = shift; my $uri = shift; $self->{remote_profiles}->{$uri}; } =item remote_profiles() This method returns a list of the peer's advertised profiles. =cut sub remote_profiles { my $self = shift; keys %{$self->{remote_profiles}}; } =item num_open_channels() Return the number of open channels associated with this session. This does not include channel zero. =cut sub num_open_channels { my $self = shift; # return the number of open channels, not including channel zero. (scalar keys %{$self->{channels}}) - 1; } =item channel($channel_number) Returns the C object for the given channel number. =cut sub channel { my $self = shift; my $chno = shift; $self->{channels}->{$chno}; } =item servername([$val]) Returns or sets the session's server name. This is normally set when the first "" message is encountered with a "serverName" attribute. =cut sub servername { my $self = shift; my $name = shift; $self->{servername} = $name if $name; $self->{servername}; } =item _tuning_reset([$send_greeting]) This does the full tuning reset: close all channels, delete pending messages in the message queue, recreate channel zero, and (optionally) re-send the greeting. This defaults to sending the greeting. This is normally called when a profile negotiates a security layer (i.e., TLS or maybe SASL/DIGEST-MD5's encryption). =cut sub _tuning_reset { my $self = shift; my $send_greeting = shift || 1; $self->_del_all_channels(); $self->{messages} = []; $self->{remote_profiles} = {}; $self->_add_channel(0, $self->{mgmt_profile}); $self->send_greeting() if $send_greeting; } =item send_greeting() Send the greeting message to the peer, and handle the greeting coming from the peer. It will advertise any profiles that have been configured in the session. Normally, this method is called as part of the initialization process of the subclasses of this class. =cut sub send_greeting { my $self = shift; # send the greeting message. $self->{mgmt_profile}->send_greeting_message($self); # handle the remote greeting. my $peer_greeting = $self->_recv_message(); $self->{mgmt_profile}->handle_message($self, $peer_greeting); } =item send_message($message) This will send a BEEP message to the peer (over the channel specified in the message). This will handle possible fragmentation due to the channel window size. =cut sub send_message { my $self = shift; my $message = shift; my $chno = $message->channel_number(); my $channel = $self->channel($chno); croak "send_message: message is on non-existent channel $chno" if not $channel; # New messages should never set msgno, and we will override it here # if it has. Replies (RPY, ANS, ERR, NULL) should have the same # msgno and the MSG to which they are replying. if ($message->type() eq 'MSG') { carp "MSG messages should NOT have a pre-set message number" if defined $message->msgno(); $message->msgno($channel->next_msgno()); } else { if (not defined $message->msgno()) { carp "non-MSG message should have a pre-set message number"; $message->msgno($channel->next_msgno()); } } while ($message->has_more_frames()) { my $window = $channel->remote_window(); # if there is no more space on this channel, switch to reading for # a bit while we wait for the channel to open. if ($window == 0) { $self->_read_for_seq($chno); $window = $channel->remote_window(); next if $window == 0; } my $seqno = $channel->seqno(); # calculate the next frame my $frame = $message->next_frame($seqno, $window); # actually send the frame. $self->_write_frame($frame); # update our current sequence number. $channel->update_seqno($frame->size()); # and adjust the remote window. $channel->remote_window($window - $frame->size()); } } =item _recv_message() This will fetch the next message from the peer, returning a message object. It will handle reassembling a fragmented message. It will return the first complete message received on any (existing) channel. It will discard frames on non-existent channels, issuing a warning. This method will block. It will return undef if it is not possible to read from the socket, otherwise it will return the message. =cut sub _recv_message { my $self = shift; my %args = @_; # first try and return a message off the queue. my $message = $self->_dequeue_message(); return $message if $message; # otherwise, we read one from the socket. # This handles interleaved frames for messages on different # channels, or ANS messages on the same (or different channels). # The channels have slots for building messages of both types. The # first time we see a completing frame, we return that message. while (1) { # get the next frame from the socket (will block here). my $frame = $self->_recv_frame(%args); # our frame will have already gone through SEQ processing. it # will also have been added to the appropriate message building # slot. next if $frame->type() eq 'SEQ'; my $channel = $self->channel($frame->channel_number()); # if we have a completing frame, we need to pull the complete # message from its build slot and clear it. if ($frame->completes()) { my $message; if ($frame->type() eq 'ANS') { $message = $channel->ans_message($frame->ansno()); $channel->ans_clear_message($frame->ansno()); } else { $message = $channel->message(); $channel->clear_message(); } return $message; } } } =item recv_message() This will fetch the next message (on any defined channel other than zero) from the peer, returning a message object. It will handle reassembling a fragmented message. This will directly handle channel zero messages, so this isn't all that useful for handling replies to management channel messages. =cut sub recv_message { my $self = shift; my %args = @_; my $message = undef; while (1) { $message = $self->_recv_message(%args); last if $message->channel_number() != 0; $self->{mgmt_profile}->handle_message($self, $message); return 0 if not $self->_is_connected(); } $message; } =item close_session() close the entire session. Normally, this should only be called after sending or receiving the "" message. It can also be used in fatal error situations. =cut sub close_session { my $self = shift; # we should have already sent or received the "ok" message by now # (unless we are aborting) $self->{sock}->close(); $self->{sock} = undef; print STDERR "closed socket\n" if $self->{debug}; } sub abort { my $self = shift; my $message = shift; print STDERR "aborting: $message\n" if $message && $self->{debug}; confess "abort: $message\n" if $self->{trace}; $self->close_session(); die "aborted\n"; } =item _read_for_seq([$channel_number]) Read frames from the socket until receiving a SEQ frame. If $channel_number is provided, then read until a SEQ frame on that channel has been read. Non SEQ frames read are place either in the various message-building slots (see _recv_message), or place on the general message queue. =cut sub _read_for_seq { my $self = shift; my $chno = shift; while (1) { my $frame = $self->_recv_frame(); if (not $frame) { $self->abort("null frame detected"); return; } # at this point, the SEQ has been processed. This just determines # if we are done. if ($frame->type() eq 'SEQ') { return if not defined $chno; return if $chno == $frame->channel_number(); next; } # for other frames, we need to make sure that we pull completed # messages off the build area and onto the queue. if ($frame->completes()) { my $channel = $self->channel($frame->channel_number()); my $message; if ($frame->type() eq 'ANS') { $message = $channel->ans_message($frame->ansno()); $channel->ans_clear_message($frame->ansno()); } else { $message = $channel->message(); $channel->clear_message(); } $self->_queue_message($message); } } } =item _recv_frame() This is an intermediate wrapper around C<_read_frame>. Essentially, it reads a frame from the socket, then does a little bit of post processing, finally returning that frame. The processing is: if it is a SEQ frame, it updates the channels remote window size accordingly; if it is some other frame, it adds it to the appropriate channel's message building slot. It returns undef if the socket could not be read from. It returns 0 if the frame was on a non-existent channel. =cut sub _recv_frame { my $self = shift; my %args = @_; my $noseqs; if ($args{NoSEQ}) { $noseqs = 1; } my $frame = $self->_read_frame(); # NOTE: this should never actually happen: _read_frame should # abort() instead of returning anything other than a valid frame. if (! $frame) { $self->abort("null frame received"); return; } my $channel = $self->channel($frame->channel_number()); if (not defined $channel) { $self->abort("frame received on non-existent channel " . $frame->channel_number()); return; } # handle SEQ frames independently. if ($frame->type() eq "SEQ") { # calculate new remote window: That is the advertise window # minus any bytes that we have already sent. my $new_window = $frame->window() - ($channel->seqno() - $frame->ackno()); $channel->remote_window($new_window); } # assemble message from (possibly) multiple (sequential) frames. # the ANS collating case. elsif ($frame->type() eq 'ANS') { $channel->ans_message_add_frame($frame); } # the normal message collating case. else { $channel->message_add_frame($frame); } # track our last seen seqno from the peer in the channel. $channel->peer_seqno($frame->seqno() + $frame->size()); # Emit a SEQ frame if we've actually read a frame with a payload. if ($frame->size() > 0 && !$noseqs) { # at the moment, we consider all frames to be "immediately" # consumed, so we just emit a constant for the window size. my $ackno = $channel->peer_seqno(); $self->_send_seq($channel, $ackno); } $frame; } =item _send_seq($channel, $ackno) Send a SEQ for $channel_number to the peer. $ackno is the sequence number to acknowledge. Generally this is the seqno of the frame this is responding to, plus the size of the payload of that frame. =cut sub _send_seq { my $self = shift; my $channel = shift; my $seqno = shift; my $seq_frame = new Net::BEEP::Lite::Frame (Type => 'SEQ', Channel => $channel->number(), Ackno => $seqno, Window => $channel->local_window(), Debug => $self->{debug}, Trace => $self->{trace}); $self->_write_frame($seq_frame); } =item _read_frame() This is an internal method for reading a single frame from the internal socket. It returns a C object. =cut sub _read_frame { my $self = shift; my $sock = $self->{sock}; my ($header, $read, $old_alarm_value); # set up an alarm handler for this method only. local $SIG{ALRM} = sub { die "alarm timeout\n"; }; # read the header. eval { $old_alarm_value = alarm($self->{idle_timeout}); $header = $sock->getline(); }; if ($@ and $@ =~ /^alarm timeout/io) { $self->abort("idle timeout"); return; } elsif ($@) { die $@; } alarm($old_alarm_value); # FIXME: what does a null header mean? if (!$header) { $self->abort("null header detected (socket closed?)"); return; } my $frame = Net::BEEP::Lite::Frame->new(Header => $header, Debug => $self->{debug}, Trace => $self->{trace}); # make sure the frame could be built \(i.e., known frame type, valid # frame headers...\) if (! $frame) { $self->abort("invalid frame header: '$header'"); return; } # if we have no payload (SEQ, NUL), then we are done. return $frame if $frame->size() == 0 and ($frame->type() eq 'SEQ' or $frame->type() eq 'NUL'); # read the payload. # FIXME: the following construct is not ideal. While the loop seems # necessary from a theoretical perspective (underlying read # operations are not guaranteed to return with all things read), it # is unknown if there is a real case where this read call would # return early and yet be able to continue. # Also note that a timer is set (and probably should always be set) # to (help) recover from cases where the frame size was incorrect # and too large. my $offset = 0; my $buffer; while (1) { eval { $old_alarm_value = alarm($self->{timeout}); $read = $sock->read($buffer, $frame->size(), $offset); }; if ($@ and $@ =~ /^alarm timeout/) { $self->abort("read operation timed out (invalid frame?)"); return; } elsif ($@) { die $@; } alarm($old_alarm_value); last if ($read == 0 || $read == $frame->size()); $offset += $read; } $frame->set_payload($buffer); # now read the trailer eval { $old_alarm_value = alarm($self->{timeout}); $read = $sock->read($buffer, 5); }; if ($@ and $@ =~ /^alarm timeout/) { $self->abort("read operation timed out (invalid frame?)"); return; } elsif ($@) { die $@; } alarm($old_alarm_value); if ($buffer ne "END\r\n") { $self->abort("invalid frame trailer for '$buffer'"); return; } print STDERR "_read_frame: read frame:\n", $frame->to_string, "\n" if $self->{trace}; $frame; } =item _write_frame($frame) This is an internal routine for writing a single frame to the internally held socket. $frame MUST be a C object. =cut sub _write_frame { my $self = shift; my $frame = shift; my $sock = $self->{sock}; $sock->print($frame->to_string()); $sock->flush(); print STDERR "_write_frame: wrote frame:\n", $frame->to_string(), "\n" if $self->{trace}; } sub _queue_message { my $self = shift; my $message = shift; push @{$self->{messages}}, $message; } sub _dequeue_message { my $self = shift; shift @{$self->{messages}}; } sub _is_connected { my $self = shift; return ($self->{sock} && $self->{sock}->connected()); } =pod =back =head1 SEE-ALSO =over 4 =item L =item L =item L =item L =back =cut 1;