package POE::Component::Client::opentick::Socket; # # opentick.com POE client # # Socket handling # # infi/2008 # # Full POD documentation after __END__ # use strict; use Carp qw( croak ); use Data::Dumper; use Socket; use POE qw( Wheel::SocketFactory Wheel::ReadWrite Driver::SysRW Filter::Stream ); # Ours use POE::Component::Client::opentick::Constants; use POE::Component::Client::opentick::Util; use POE::Component::Client::opentick::Output; use POE::Component::Client::opentick::Error; ### ### Variables ### use vars qw( $VERSION $TRUE $FALSE $KEEP $DELETE $poe_kernel ); ($VERSION) = q$Revision: 43 $ =~ /(\d+)/; *TRUE = \1; *FALSE = \0; *KEEP = \0; *DELETE = \1; # Arguments are for this object. my %valid_args = ( alias => $KEEP, debug => $KEEP, servers => $DELETE, port => $DELETE, realtime => $DELETE, conntimeout => $DELETE, autoreconnect => $DELETE, reconninterval => $DELETE, reconnretries => $DELETE, bindaddress => $DELETE, bindport => $DELETE, ); ######################################################################## ### Public methods ### ######################################################################## sub new { my( $class, @args ) = @_; croak( "$class requires an even number of parameters" ) if( @args & 1 ); my $self = { alias => OTDefault( 'alias' ), debug => $FALSE, servers => undef, myserver => undef, port => undef, state => OTConstant( 'OT_STATUS_INACTIVE' ), realtime => OTDefault( 'realtime' ), redirected => $FALSE, # were we redirected? # For reconnection logic conntimeout => OTDefault( 'conntimeout' ), autoreconnect => OTDefault( 'autoreconnect' ), reconninterval => OTDefault( 'reconninterval' ), reconnretries => OTDefault( 'reconnretries' ), reconncount => 0, bindaddress => undef, bindport => undef, # 'socket' => undef, # Statistical parameters packets_sent => 0, packets_recv => 0, bytes_sent => 0, bytes_recv => 0, connect_time => 0, }; bless( $self, $class ); $self->initialize( @args ); return( $self ); } # Initialize this object instance sub initialize { my( $self, %args ) = @_; # Store things. Things that make us go. # We're a leaf node; go ahead and delete. for( keys( %args ) ) { $self->{lc $_} = delete( $args{ $_ } ) if( exists( $valid_args{lc $_} ) ); } $self->{servers} = $self->_get_server_list( $self->{servers} ); $self->{port} = $self->_get_port( $self->{port} ); return; } # High level manual disconnect method # NOTE: HYBRID POE EVENT HANDLER/METHOD sub disconnect { my( $self ) = @_; $self->_pause_autoreconnect(); $self->_reset_reconn_count(); # Step through and back out for each step. my $state = $self->_get_state(); if( $state >= OTConstant( 'OT_STATUS_LOGGED_IN' ) ) { $poe_kernel->call( $self->{alias}, 'logout' ); } else { if( $state >= OTConstant( 'OT_STATUS_CONNECTED' ) ) { # Disconnect. This should do it. delete( $self->{socket} ); } if( $state >= OTConstant( 'OT_STATUS_CONNECTING' ) ) { # Cancel connection and clean up delete( $self->{SocketFactory} ); $self->{myserver} = undef; } $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) ); # $self->_set_redirected_flag( $FALSE ); } return; } # High level reconnect method sub reconnect { my( $self ) = @_; $poe_kernel->yield( 'disconnect' ); $poe_kernel->yield( 'connect' ); return; } # High level redirect METHOD (only) # Server can send host redirect response; we must comply. # High priority, so we call it synchronously. sub redirect { my( $self, $host, $port ) = @_; $self->_set_redirected_flag( $TRUE ); $poe_kernel->call( $poe_kernel->get_active_session(), 'disconnect' ); $poe_kernel->call( $poe_kernel->get_active_session(), 'connect', $host, $port ); return; } ### Statistical accessors sub get_packets_recv { my( $self ) = @_; return( $self->{packets_recv} ); } sub get_packets_sent { my( $self ) = @_; return( $self->{packets_sent} ); } sub get_bytes_recv { my( $self ) = @_; return( $self->{bytes_recv} ); } sub get_bytes_sent { my( $self ) = @_; return( $self->{bytes_sent} ); } sub get_connect_time { my( $self ) = @_; return( $self->{connect_time} ? time - $self->{connect_time} : 0 ); } ######################################################################## ### POE event handlers ### ######################################################################## # Public event handlers # quick event handler to marshal args over to redirect method sub _redirect { my( $self, $host, $port ) = @_[OBJECT,ARG0,ARG1]; $self->redirect( $host, $port ); return; } # Connect to the OT server sub connect { my( $self, $kernel, $host, $port ) = @_[OBJECT,KERNEL,ARG0,ARG1]; if( $self->_get_state() == OTConstant( 'OT_STATUS_INACTIVE' ) ) { $self->_reset_autoreconnect(); $self->{myserver} = $host || $self->_get_server(); O_DEBUG( "Connecting to " . $self->{myserver} . "..." ); my $wheel = POE::Wheel::SocketFactory->new( SocketDomain => AF_INET, SocketType => SOCK_STREAM, SocketProtocol => 'tcp', BindAddress => $self->{bindaddress}, BindPort => $self->{bindport}, Reuse => $TRUE, RemoteAddress => $self->{myserver}, RemotePort => $port || $self->_get_port(), SuccessEvent => '_ot_sock_connected', FailureEvent => '_ot_sock_connfail', ); $self->{SocketFactory} = $wheel; $self->_set_state( OTConstant( 'OT_STATUS_CONNECTING' ) ); if( $self->_get_conn_timeout() ) { $kernel->alarm_remove( delete( $self->{timeout_id} ) ) if( $self->{timeout_id} ); $self->{timeout_id} = $kernel->alarm_set( '_ot_sock_conntimeout', time + $self->_get_conn_timeout() ); } } return; } ### Connection initiation handling # Successfully connected! sub _ot_sock_connected { my( $self, $kernel, $socket ) = @_[OBJECT, KERNEL, ARG0]; my ($port, $addr) = sockaddr_in( getpeername( $socket ) ); O_DEBUG( sprintf( "Connected to %s [%s]:%s.", scalar( gethostbyaddr( $addr, AF_INET ) ), inet_ntoa( $addr ), $port ) ); # We don't need no steenkeen factory anymore. delete( $self->{SocketFactory} ); # Leave the alarm removal until opentick.pm:_logged_in(). # $kernel->alarm_remove( delete( $self->{timeout_id} ) ) # if( $self->{timeout_id} ); # Create the socket handler. $self->{'socket'} = POE::Wheel::ReadWrite->new( Handle => $socket, Driver => POE::Driver::SysRW->new(), Filter => POE::Filter::Stream->new(), InputEvent => '_ot_sock_receive_packet', ErrorEvent => '_ot_sock_error', ); # Set the state variables $self->_reset_object(); $self->_set_connect_time( time ); # Send login command, unless we have been redirected by the OT server if( $self->_is_redirected() ) { $self->_set_state( OTConstant( 'OT_STATUS_LOGGED_IN' ) ); } else { $self->_set_state( OTConstant( 'OT_STATUS_CONNECTED' ) ); $kernel->call( $kernel->get_active_session(), '_ot_proto_issue_command', OTConstant( 'OT_LOGIN' ) ) } return; } # Connection failed for whatever reason. sub _ot_sock_connfail { my( $self, $kernel, $op, $err_code, $err_str, $wheel ) = @_[OBJECT, KERNEL, ARG0..ARG3]; O_DEBUG( "Connection failed. $op() returned $err_str" ); delete( $self->{'socket'} ); retry_connect( @_ ); } # Connection timed out. sub _ot_sock_conntimeout { my( $self, $kernel ) = @_[OBJECT, KERNEL]; O_DEBUG( "Connection timed out." ); delete( $self->{'socket'} ); retry_connect( @_ ); } # Retry a connection ReconnRetries times, or give up. sub retry_connect { my( $self, $kernel ) = @_[OBJECT, KERNEL]; # Fix our states $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) ); $kernel->alarm_remove( delete( $self->{timeout_id} ) ) if( exists( $self->{timeout_id} ) ); # Retry if( $self->_get_autoreconnect() ) { if( $self->_inc_reconn_count() < $self->_get_reconn_retries() or $self->_get_reconn_retries() == 0 ) { my $timeout = $self->_get_reconn_interval(); O_DEBUG( "Retrying connection in $timeout seconds..." ); $kernel->delay( 'connect', $timeout ); } else { delete( $self->{SocketFactory} ); $kernel->yield( '_reconn_giveup', @_[ARG0..$#_] ); } } return; } # A socket error has occurred. sub _ot_sock_error { my( $self, $kernel, $op, $err_code, $err_str, $wheel ) = @_[OBJECT,KERNEL,ARG0..ARG3]; O_DEBUG( "Socket disconnected: $op() returned $err_code" ); # Socket disconnected if( $op eq 'read' && $err_code == 0 ) { # Stop heartbeats immediately and synchronously. $kernel->yield( '_ot_proto_heartbeat_stop' ); $self->_reset_object(); delete( $self->{'socket'} ); retry_connect( @_ ); } return; } ### Live connection handling # Got a packet! sub _ot_sock_receive_packet { my( $self, $kernel, $packet ) = @_[OBJECT, KERNEL, ARG0]; O_DEBUG( "_ot_sock_receive_packet( " . length( $packet ) . " )" ); # Tell the protocol handler we got a packet $kernel->yield( '_ot_proto_process_response', $packet ); $self->_update_stats_recv( length( $packet ) ); return; } # Send a packet! sub _ot_sock_send_packet { my( $self, $packet ) = @_[OBJECT, ARG0]; O_DEBUG( "_ot_sock_send_packet( " . length( $packet ) . " )" ); # Put the packet on the wire my $buffered = $self->{'socket'}->put( $packet ); $self->_update_stats_sent( length( $packet ) ); return( $buffered ? $TRUE : $FALSE ); } ######################################################################## ### Private methods ### ######################################################################## # Return the correct port for initialization based on user preferences sub _get_port { my( $self, $user_port ) = @_; my $port = ( defined( $user_port ) && $user_port =~ /^\d+/ ) ? $user_port : $self->{port} ? $self->{port} : $self->{realtime} ? OTDefault( 'port_realtime' ) : OTDefault( 'port_delayed' ); return( $port ); } # Return the server list for initialization based on user preferences sub _get_server_list { my( $self, $user_list ) = @_; my $servers = ( defined( $user_list ) && ref( $user_list ) eq 'ARRAY' ) ? $user_list : $self->{servers} ? $self->{servers} : $self->{realtime} ? OTDefault( 'servers_realtime' ) : OTDefault( 'servers_delayed' ); return( $servers ); } sub _set_servers { my( $self, $user_list ) = @_; $self->{servers} = $user_list; return; } sub _set_port { my( $self, $port ) = @_; return( $self->{port} = $port ); } # Get one of the servers from our server list round-robin { # CLOSURE my $server_num = 0; sub _get_server { my( $self ) = @_; my $server = $self->{servers}->[ $server_num++ ]; $server_num = 0 if $server_num > $#{ $self->{servers} }; return( $server ); } } # /CLOSURE ### Accessor methods # The USER variable setting the number of retries to attempt sub _get_reconn_retries { my( $self ) = @_; return( $self->{reconnretries} ); } sub _get_state { my( $self ) = @_; return( $self->{state} ); } sub _set_state { my( $self, $state ) = @_; throw( O_ERROR( 'Tried to set invalid state: ' . $state ) ) if( $state < OTConstant( 'OT_STATUS_INACTIVE') || $state > OTConstant( 'OT_STATUS_LOGGED_IN' ) ); $poe_kernel->yield( '_notify_of_event', OTEvent( 'OT_STATUS_CHANGED' ), [ $self->{alias} ], $state ); return( $self->{state} = $state ); } sub _set_redirected_flag { my( $self, $value ) = @_; $self->{redirected} = defined( $value ) ? $value ? $TRUE : $FALSE : $TRUE; } sub _is_redirected { my( $self ) = @_; return( $self->{redirected} ); } # The ACTUAL count of retry attempts sub _reset_reconn_count { my( $self ) = @_; return( $self->{reconncount} = 0 ); } # The ACTUAL count of retry attempts sub _inc_reconn_count { my( $self ) = @_; return( ++$self->{reconncount} ); } # The ACTUAL count of retry attempts sub _get_reconn_count { my( $self ) = @_; return( $self->{reconncount} ); } sub _get_autoreconnect { my( $self ) = @_; return( $self->{autoreconnect} ); } sub _set_autoreconnect { my( $self, $value ) = @_; $self->{autoreconnect} = $value ? $TRUE : $FALSE; return; } sub _get_conn_timeout { my( $self ) = @_; return( $self->{conntimeout} ); } sub _get_reconn_interval { my( $self ) = @_; return( $self->{reconninterval} ); } # Pause the autoreconnect state; save the current value sub _pause_autoreconnect { my( $self ) = @_; # make idempotent return if( $self->{autoreconnbak} ); $self->{autoreconnbak} = $self->_get_autoreconnect(); $self->_set_autoreconnect( $FALSE ); return; } # Restore the autoreconnect state sub _reset_autoreconnect { my( $self ) = @_; if( exists( $self->{autoreconnbak} ) ) { $self->_set_autoreconnect( $self->{autoreconnbak} ); delete( $self->{autoreconnbak} ); } return; } ### Statistics logging sub _update_stats_recv { my( $self, $bytes ) = @_; $self->_inc_bytes_recv( $bytes ); $self->_inc_packets_recv(); return; } sub _update_stats_sent { my( $self, $bytes ) = @_; $self->_inc_bytes_sent( $bytes ); $self->_inc_packets_sent(); return; } sub _reset_stats_recv { my( $self ) = @_; $self->{packets_recv} = 0; $self->{bytes_recv} = 0; return; } sub _reset_stats_sent { my( $self ) = @_; $self->{packets_sent} = 0; $self->{bytes_sent} = 0; return; } sub _inc_bytes_recv { my( $self, $num ) = @_; return( $self->{bytes_recv} += $num || 1 ); } sub _inc_bytes_sent { my( $self, $num ) = @_; return( $self->{bytes_sent} += $num || 1 ); } sub _inc_packets_recv { my( $self, $num ) = @_; return( $self->{packets_recv} += $num || 1 ); } sub _inc_packets_sent { my( $self, $num ) = @_; return( $self->{packets_sent} += $num || 1 ); } sub _set_connect_time { my( $self, $value ) = @_; return( $self->{connect_time} = $value || time ); } sub _reset_object { my( $self ) = @_[OBJECT]; return if( $self->_get_state() == OTConstant( 'OT_STATUS_INACTIVE' ) ); # $self->_set_state( OTConstant( 'OT_STATUS_INACTIVE' ) ); $self->_set_connect_time( 0 ); $self->{myserver} = undef; # $self->_set_redirected_flag( $FALSE ); $self->_reset_reconn_count(); # $self->_reset_stats_sent(); # $self->_reset_stats_recv(); return; } 1; __END__ =pod =head1 NAME POE::Component::Client::opentick::Socket - Socket handling routines for the opentick POE component. =head1 SYNOPSIS use POE::Component::Client::opentick::Socket; =head1 DESCRIPTION See L for the main documentation. =head1 METHODS =over 4 =item B =item B =item B Connect. =item B Disconnect. =item B Yep. =item B Called when redirected by server. =item B =item B =item B =item B =item B =item B Statistical information. Just read the main docs, already. =back =head1 AUTHOR Jason McManus (infi) -- infidel@cpan.org =head1 LICENSE Copyright (c) Jason McManus This module may be used, modified, and distributed under the same terms as Perl itself. Please see the license that came with your Perl distribution for details. The data from opentick.com are under an entirely separate license that varies according to exchange rules, etc. It is your responsibility to follow the opentick.com and exchange license agreements with the data. Further details are available on L. =cut