package TestServer; # test server for proxy test - Andrew V. Purshottam # accepts lines of form count: text # to do: # (done) fix same error as in proxy! the shutdown state variable is server wide, # not per client connectection! Ok, fixed it, now shutdown is in heap, # so per client connection. This was easy, because the PoCo::Server::TCP # had all the state already in the per connection session. I am begining to # appreciate POE... use warnings; use strict; use diagnostics; # sub POE::Kernel::ASSERT_DEFAULT () { 1 } use POE; use POE::Filter::Stream; use POE::Filter::Line; use POE qw(Component::Server::TCP); use POE::Component::Proxy::TCP::PoeDebug; use fields qw(port ); sub new { my TestServer $self = shift; unless (ref $self) { $self = fields::new($self); } # private instance variables init... # Extract parameters... my %param = @_; $self->{port} = delete $param{Port}; $self->{port} = 8000 unless defined ($self->{port}); my $base_delay = 0; # Create TCP server and start listener session. POE::Component::Server::TCP->new ( Alias => "proxy_test_server", Port => $self->{port}, Args => [$self], # so handle_client_connect_to_server gets $self ClientConnected => \&handle_client_connect_to_server, ClientFilter => "POE::Filter::Line", ClientInput => sub { my ( $kernel, $session, $heap, $input ) = @_[ KERNEL, SESSION, HEAP, ARG0 ]; dbprint(3, "Session ", $session->ID(), " got input: $input"); if ($input =~ m/^END/) { dbprint(2,"TEST SERVER GOT END REQUEST\n"); $heap->{_shutting_down} = 1; check_for_shutdown(); } else { my ($count, $text) = split /:/, $input; dbprint(2, "got request count:", $count, " text:", $text); for (my $i = 0; $i < $count; $i++) { reply_after_delay($i + $base_delay, "$i:$text:" ); dbprint(3, "sent $i:$text:"); } $base_delay += $count; } }, InlineStates => {send => sub { my ( $heap, $message ) = @_[ HEAP, ARG0 ]; $heap->{client}->put($message); }, send_delayed => sub { my ( $heap, $message ) = @_[ HEAP, ARG0 ]; $heap->{client}->put($message); $heap->{_pending_self_requests}--; check_for_shutdown(); }}, Args => [$self], ); return $self; } # Called inside per client connection session # sets up $heap->{self} to be the TestServer instance # a pure OO approach to POE where one could subclass # the per connection session and put instance data # there might have been nicer? sub handle_client_connect_to_server { my ( $kernel, $session, $heap, $self ) = @_[ KERNEL, SESSION, HEAP, ARG0 ]; my $session_id = $session->ID; $heap->{self} = $self; $heap->{_pending_self_requests} = 0; $heap->{_shutting_down} = 0; dbprint(1, "Client connected."); } # hack to support sending lines to the client, and # make sure the client connection session is kept running # until all pending lines have been set. # If we had access to the event queue, we could look # to see if any send events for the per client connection # session were available. Q for POE[tr]s: should this be possible? # reply_after_delay($delay_secs, $text) # does crap to get env without having to pass, # then updates pend and posts a delayed event. sub reply_after_delay { my $delay_secs = shift; my $text = shift; my $kernel = $poe_kernel; my $session = $kernel->get_active_session(); my $heap = $session->get_heap(); my $self = $heap->{self}; $heap->{_pending_self_requests}++; $kernel->delay_add( "send_delayed", $delay_secs, $text); } # check_for_shutdown() - ask TestServer per client session # if it should shut down. sub check_for_shutdown { my $kernel = $poe_kernel; my $session = $kernel->get_active_session(); my $heap = $session->get_heap(); my $self = $heap->{self}; dbprint(10, "check_for_shutdown: sd:$heap->{_shutting_down} psr:$heap->{_pending_self_requests}"); if ($heap->{_shutting_down}) { if ($heap->{_pending_self_requests}) { dbprint(10, "can't shut down"); } else { dbprint(1, "test server per client connection session shutting down"); # reply_after_delay(1, "END"); $kernel->yield( "shutdown" ); } } } 1;