#!/usr/bin/perl -I. use strict; use warnings; my $smallsleep = 0.; my $bigsleep = 0.5; my $debug = 0; my $syncdebug = 0; my $inactivity = 5; my $heartbeat = 0.1; BEGIN { unless (eval { require Test::More; }) { print "1..0 # Skipped: must have Test::More installed\n"; exit; } } BEGIN { unless (eval { require Time::HiRes; }) { print "1..0 # Skipped: must have Time::HiRes installed\n"; exit; } } use Time::HiRes qw(sleep gettimeofday tv_interval); use IO::Pipe; use IO::Event; use IO::Socket::INET; use Carp qw(verbose); use Sys::Hostname; use Socket; my $t0 = [gettimeofday]; sleep(0.2); my $elapsed = tv_interval ( $t0 ); print "# elsapsed: $elapsed\n"; unless ($elapsed > 0.1 && $elapsed < 0.5) { print "# Time::HiRes::sleep() doesn't work - going slow\n"; $smallsleep = 1; $bigsleep = 2; } my @tests; my $testcount; BEGIN { @tests = ( { #0 repeat => 5, desc => "lines end in \\n", receive => sub { my $serverTest = shift; my $ieo = shift; my $got = <$ieo>; return $got; }, results => [ "howdy\n", "doody", ], sendqueue => [ "how", "dy\n", "doo", "dy" ], }, { #1 repeat => 5, desc => "paragraph mode", setup => sub { my $serverTest = shift; my $ieo = shift; $ieo->input_record_separator(''); }, receive => sub { my $serverTest = shift; my $ieo = shift; my $got = <$ieo>; return $got; }, results => [ "this is a test\n\n", "a\nb\n\n", "c\n\n", "d\n\n", "e\n", ], sendqueue => [ "this is ", "a test\n", "\n", "a\nb\n\nc\n\n\nd\n\n\n\ne\n", ], }, { #2 repeat => 5, desc => "paragraph mode, getlines", setup => sub { my $serverTest = shift; my $ieo = shift; $ieo->input_record_separator(''); }, receive => sub { my $serverTest = shift; my $ieo = shift; my (@got) = <$ieo>; return undef unless @got; return \@got; }, results => [ [ "this is a test\n\n", ], [ "a\nb\n\n", "c\n\n", "d\n\n", ], [ "e\n", ], ], sendqueue => [ "this is ", "a test\n", "\n", "a\nb\n\nc\n\n\nd\n\n\n\ne\n", ], }, { #3 repeat => 5, desc => "paragraph mode, getline, \$/ set funny", setup => sub { my $serverTest = shift; my $ieo = shift; $/ = 'xyz'; $ieo->input_record_separator(''); }, receive => sub { my $serverTest = shift; my $ieo = shift; return <$ieo>; }, results => [ "this is a test\n\n", "a\nb\n\n", "c\n\n", "d\n\n", "e\n", ], sendqueue => [ "this is ", "a test\n", "\n", "a\nb\n\nc\n\n\nd\n\n\n\ne\n", ], }, { #4 repeat => 5, desc => "paragraph mode, getlines, \$/ set funny", setup => sub { my $serverTest = shift; my $ieo = shift; $/ = 'abc'; $ieo->input_record_separator(''); }, receive => sub { my $serverTest = shift; my $ieo = shift; my (@got) = <$ieo>; return undef unless @got; return \@got; }, results => [ [ "this is a test\n\n", ], [ "a\nb\n\n", "c\n\n", "d\n\n", ], [ "e\n", ], ], sendqueue => [ "this is ", "a test\n", "\n", "a\nb\n\nc\n\n\nd\n\n\n\ne\n", ], }, ); # @tests = ($tests[3]); # splice(@tests, 0, 4); # $tests[0]->{repeat} = 1; $testcount = 0; for my $t (@tests) { my $subtests = scalar(@{$t->{results}}) + 1; $testcount += $t->{repeat} > 0 ? $t->{repeat} * $subtests : $subtests; } } BEGIN { use Test::More tests => $testcount; } my $startingport = 1025; my $rp = pickport(); my $child; my $timer; my $hbtimer; $SIG{PIPE} = sub { print "# SIGPIPE recevied in $$\n"; }; my $pipe = new IO::Pipe; if ($child = fork()) { print "# PARENT $$ will listen at 127.0.0.1:$rp\n" if $debug; my $listener = IO::Event::Socket::INET->new( Listen => 10, Proto => 'tcp', LocalPort => $rp, LocalAddr => '127.0.0.1', Handler => new Server, Description => 'Listener', ); $timer = Timer->new(); $hbtimer = Heartbeat->new(); $Event::DIED = $Event::DIED = sub { Event::verbose_exception_handler(@_); Event::unloop_all(); }; $pipe->writer(); $pipe->autoflush(1); print $pipe "l"; print "# PARENT looping\n"; IO::Event::loop(); print "# PARENT done looping\n"; } elsif (defined($child)) { print "# CHILD $$ will connect to 127.0.0.1:$rp\n" if $debug; $pipe->reader(); syncto("l"); while (@tests) { my $test = $tests[0] || last; shift @tests if --$test->{repeat} < 1; print "# test $test->{desc}\n"; my $s = IO::Socket::INET->new( PeerAddr => '127.0.0.1', PeerPort => $rp, Proto => 'tcp', ); syncto("a"); die "$$ could not connect: $!" unless $s; die "$$ socket not open" if eof($s); my $go = <$s>; $go =~ s/\n/\\n/g; print "# got '$go'\n" if $debug; for (my $sqi = 0; $sqi <= $#{$test->{sendqueue}}; $sqi++) { syncclear(); if ($debug) { my $x = $test->{sendqueue}[$sqi]; $x =~ s/\n/\\n/g; print "# SENDING '$x'\n"; } (print $s $test->{sendqueue}[$sqi]) || die "print $$: $!\n"; syncany(); } print "# CHILD closing\n"; close($s); } } else { die "fork: $!"; } exit 0; # support routine sub pickport { for (my $i = 0; $i < 1000; $i++) { my $s = new IO::Socket::INET ( Listen => 1, LocalPort => $startingport, ); if ($s) { $s->close(); return $startingport++; } $startingport++; } die "could not find an open port"; } sub syncany { print "syncany\n" if $syncdebug; $pipe->blocking(1); my $buf; $pipe->read($buf, 1); syncclear(); print "syncany done - $buf\n" if $syncdebug; } sub syncto { my $lookfor = shift; print "syncto $lookfor\n" if $syncdebug; $pipe->blocking(1); my $buf; while ($pipe->read($buf, 1) > 0) { print "syncto got $buf\n" if $syncdebug; last if $buf eq $lookfor; } print "syncto $lookfor done\n" if $syncdebug; } sub syncclear { print "synclear\n" if $syncdebug; $pipe->blocking(0); my $buf; while ($pipe->read($buf, 4096)) { print "syncclear: '$buf'\n" if $syncdebug; } print "syncclear done\n" if $syncdebug; } package Server; use Test::More; sub new { my $pkg = shift; return bless { @_ }; } sub ie_connection { my ($self, $s) = @_; $timer->reset; my $serverTest = new Server; my $stream = $s->accept($serverTest); $serverTest->{stream} = $stream; $serverTest->{rqi} = 0; my $test = $tests[0]; shift @tests if --$test->{repeat} < 1; @$serverTest{keys %$test} = values %$test; my $setup = $serverTest->{setup}; &$setup($serverTest, $stream) if $setup; print "# ACCEPTED CONNECTION\n" if $debug; print "pipesend 'a'\n" if $syncdebug; print $pipe "a"; print $stream "go\n"; } sub ie_input { my ($self, $s) = @_; my $rec = $self->{receive}; die unless $rec; for (;;) { my $r = &$rec($self, $s); return unless defined $r; my $expect = $self->{results}[$self->{rqi}++]; is_deeply($r, $expect); } print "pipesend 'i'\n" if $syncdebug; print $pipe "i"; } sub ie_eof { my ($self, $s) = @_; is($self->{rqi}, scalar(@{$self->{results}})); $s->close(); print "pipesend 'e'\n" if $syncdebug; print $pipe "e"; exit 0 unless @tests; } package Timer; use Carp; use strict; use warnings; sub new { my ($pkg) = @_; my $self = bless { }, $pkg; $self->{event} = IO::Event->timer( cb => [ $self, 'timeout' ], interval => $inactivity, hard => 0, desc => 'inactivity timer', ); return $self; } sub timeout { print STDERR "Timeout\n"; kill 9, $child; IO::Event::unloop_all(7.2); exit(1); } sub reset { my ($self) = @_; $self->{event}->stop(); $self->{event}->again(); } package Heartbeat; use Carp; use strict; use warnings; sub new { my ($pkg) = @_; my $self = bless { }, $pkg; $self->{event} = IO::Event->timer( cb => [ $self, 'timeout' ], interval => $heartbeat, hard => 0, desc => 'heartbeat timer', ); return $self; } sub timeout { print "pipesend 't'\n" if $syncdebug; print $pipe "t"; } 1; __END__