The Perl Toolchain Summit needs more sponsors. If your company depends on Perl, please support this very important event.
package Net::Gnutella::Connection;
use Net::Gnutella::Packet::Ping;
use Net::Gnutella::Packet::Pong;
use Net::Gnutella::Packet::Push;
use Net::Gnutella::Packet::Query;
use Net::Gnutella::Packet::Reply;
use Net::Gnutella::Event;
use HTTP::Request;
use HTTP::Date;
use HTTP::Status;
use LWP::MediaTypes qw(guess_media_type);
use URI::URL;
use IO::File;
use Carp qw(carp croak confess);
use strict;
use vars qw/$VERSION $AUTOLOAD/;

$VERSION = $VERSION = "0.1";

# Use AUTOHANDLER to supply generic attribute methods
#
sub AUTOLOAD {
	my $self = shift;
	my $attr = $AUTOLOAD;
	$attr =~ s/.*:://;
	return unless $attr =~ /[^A-Z]/; # skip DESTROY and all-cap methods
	confess sprintf "invalid attribute method: %s->%s()", ref($self), $attr unless exists $self->{_attr}->{lc $attr};
	$self->{_attr}->{lc $attr} = shift if @_;
	return $self->{_attr}->{lc $attr};
}

sub disconnect {
	my ($self, $type) = @_;

	printf STDERR "+ Disconnecting socket (%s)\n", $type if $self->debug;

	if ($type) {
		my $event = Net::Gnutella::Event->new(
			from => $self,
			type => $type,
		);

		$self->parent->_handler($event);
	}

	$self->parent->_remove_fh($self->socket, "rw");
	$self->readbuf("");
	$self->writebuf("");
	$self->connected(0);
	$self->socket("");
}

# ->forward( PACKET [, REPLY_PATH ] )
#
# Composes the packet and delivers it to all other ESTABLISHED connections
#
sub forward {
	my ($self, $packet, $path) = @_;

	unless ($packet && ref $packet) {
		carp "Invalid argument to Net::Gnutella::Connection->forward";
	}

	my $data = $packet->format;
	my $head = pack("L4CCCL", @{ $packet->msgid }, $packet->function, $packet->ttl, $packet->hops, length $data);

	if ($path) {
		if ($path ne $self) {
			printf STDERR " - Returning down path to %s\n", $path->ip if $self->debug >= 2;

			$path->_write_wrapper($head.$data);
		}
	} else {
		foreach my $conn ($self->parent->connections) {
			next if $conn eq $self;
			next unless $conn->connected;

			printf STDERR " - Forwarding to %s\n", $conn->ip if $self->debug >= 2;

			$conn->_write_wrapper($head.$data);
		}
	}
}

sub is_outgoing    { $_[0]->connected == 1 } # Outgoing
sub is_incoming    { $_[0]->connected == 2 } # Incoming
sub is_established { $_[0]->connected == 3 } # Gnutella DATA Stream
sub is_http        { $_[0]->connected == 4 } # HTTP Serving
sub is_upload      { $_[0]->connected == 5 } # Sending file

sub new {
	my $proto = shift;
	my $parent = shift;
	my %args = @_;

	my $self = {
		_handler => {},
		_attr    => {
			parent    => $parent,
			debug     => $parent->debug,
			timeout   => $parent->timeout,
			socket    => undef,
			ip        => '',
			connected => 0,
			readbuf   => '',
			writebuf  => '',
			error     => '',
			allow     => 0,
			msgid     => [],
		},
		_msgid   => {},
	};

	bless $self, $proto;

	foreach my $key (keys %args) {
		my $lkey = lc $key;

		$self->$lkey($args{$key});
	}

	if ($self->connected and $self->socket) {
		$self->parent->_add_fh($self->socket, $self->can("_read_socket"), "r", $self);
	}

	return $self;
}

sub send_error {
	my ($self, $status, $error) = @_;

	unless ($self->is_http) {
		croak "Invalid state for ->send_error";
	}

	$status ||= RC_BAD_REQUEST;
	$error  ||= "";

	my $message = status_message($status);
	my $CRLF = "\r\n";

	my $ip = "Unknown";
	my $port = "Unknown";

	my $body = <<EOT;
<!DOCTYPE HTML PUBLIC "-//IETF//DTD HTML 2.0//EN">
<HTML><HEAD>
<TITLE>$status $message</TITLE>
</HEAD><BODY>
<H1>$message</H1>
$error<BR><BR>
<ADDRESS><A HREF="http://gnutella.habitue.net">Net::Gnutella</A> $VERSION Server at $ip Port $port</ADDRESS>
</BODY></HTML>
EOT

	my $head;
	$head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF;
	$head .= sprintf "Date: %s%s", time2str(time), $CRLF;
	$head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF;
	$head .= sprintf "Content-Type: %s%s", "text/html", $CRLF;
	$head .= sprintf "Content-Length: %s%s", length($body), $CRLF;
	$head .= sprintf "%s", $CRLF;

	$self->_write_wrapper($head.$body);

	return;
}

sub send_file {
	my ($self, $file, $offset) = @_;

	unless ($self->is_http) {
		croak "Invalid state for ->send_file";
	}

	if (-f $file) {
		my ($ct, $ce) = guess_media_type($file);
		my ($size, $mtime) = (stat _)[7,9];

		my $fh = new IO::File $file or
			return $self->send_error(RC_FORBIDDEN);

		binmode($fh);

		if ($offset && $offset > $size) {
			$offset = 0;
		} elsif ($offset) {
			$fh->seek($offset, 0) or $offset = 0;
		}

		my $status = $offset ? RC_PARTIAL_CONTENT : RC_OK;
		my $message = status_message($status);
		my $CRLF = "\r\n";

		my $head;
		$head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF;
		$head .= sprintf "Date: %s%s", time2str(time), $CRLF;
		$head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF;
		$head .= sprintf "Content-Type: %s%s", $ct, $CRLF;
		$head .= sprintf "Content-Encoding: %s%s", $ce, $CRLF if $ce;
		$head .= sprintf "Content-Length: %d%s", $offset ? $size - $offset : $size, $CRLF if $size;
		$head .= sprintf "Content-Range: bytes %d-%d/%d%s", $offset, $size-1, $size, $CRLF if $offset;
		$head .= sprintf "Last-Modified: %s%s", time2str($mtime), $CRLF if $mtime;
		$head .= sprintf "%s", $CRLF;

		$self->_write_wrapper($head, $fh);
		$self->connected(5);
	} else {
		return $self->send_error(RC_NOT_FOUND);
	}

	return 1;
}

sub send_packet {
	my ($self, $packet) = @_;

	unless ($self->is_established) {
		croak "Invalid state for ->send_packet";
	}

	unless ($packet && ref $packet) {
		carp "Invalid argument to Net::Gnutella::Connection->send_packet";
	}

	printf STDERR "+ Sending packet '%s'\n", ref($packet) if $self->debug >= 2;

	my @msgid = @{ $packet->msgid };

	unless (scalar @msgid) {
		@msgid = $self->_new_msgid;
	}

	my $data = $packet->format;
	my $head = pack("L4CCCL", @msgid, $packet->function, $packet->ttl, $packet->hops, length $data);

	$self->parent->_msgid_source(\@msgid, $self);

	$self->_write_wrapper($head.$data);

	return \@msgid;
}

sub send_page {
	my ($self, $data) = @_;

	unless ($self->is_http) {
		croak "Invalid state in ->send_page";
	}

	my $status = RC_OK;
	my $message = status_message($status);
	my $CRLF = "\r\n";

	my $head;
	$head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF;
	$head .= sprintf "Date: %s%s", time2str(time), $CRLF;
	$head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF;
	$head .= sprintf "Content-Type: %s%s", "text/html", $CRLF;
	$head .= sprintf "Content-Length: %d%s", length($data), $CRLF;
	$head .= sprintf "%s", $CRLF;

	$self->_write_wrapper($head.$data);
}

sub _default {
	my $self = shift;
	my $event = shift;

	my $type = $event->type;
	my $packet = $event->packet;

	printf STDERR "%s->%s: Handling event '%s'\n", ref($self), "_default", $type if $self->debug;

	unless ($packet and ref($packet) =~ /^Net::Gnutella::Packet::/) {
		return 1;
	}

	if ($packet->hops > 7) {
		printf STDERR "+ Not forwarding, large hop count (%s)\n", $packet->hops if $self->debug;
		return 1;
	}

	if ($packet->ttl > 50) {
		printf STDERR "+ Not forwarding, large ttl (%s)\n", $packet->ttl if $self->debug;
		return 1;
	}

	if ($packet->ttl > 7) {
		$packet->ttl(7);
	}

	if ($packet->ttl <= 0) {
		printf STDERR "+ Not forwarding, ttl <= 0 (%s)\n", $packet->ttl if $self->debug;
		return 1;
	} else {
		$packet->ttl($packet->ttl - 1);
		$packet->hops($packet->hops + 1);
	}

	if ($type eq "pong") {
		$self->parent->_host_cache( join(":", $packet->ip_as_string, $packet->port) );
	}

	# Drop any routed replies which we haven't seen
	# Drop any duplicate packets
	#
	if ($type =~ /^(ping|query|push)$/) {
		if ($self->parent->_msgid_source($packet->msgid)) {
			return; # duplicate
		} else {
			$self->parent->_msgid_source($packet->msgid, $self);
		}
	} elsif ($type =~ /^(pong|reply)$/) {
		unless ($self->parent->_msgid_source($packet->msgid)) {
			printf STDERR "+ Not forwarding, unseen msgid to routed type (%s)\n", join(":", @{$packet->msgid}) if $self->debug;
			return;
		}
	}

	# If the packet is a routed reply (pong and reply) and it didn't originate
	# from this connection, forward it to the other connection.
	#
	# Otherwise, throw it at all the connections (broadcast).
	#
	if ($type =~ /^(pong|reply)$/) {
		my $conn = $self->parent->_msgid_source($packet->msgid);

		$self->forward($packet, $conn);
	} elsif ($type =~ /^(ping|push|query)$/) {
		$self->forward($packet);
	}

	return 1;
}

sub _new_msgid {
	my $self = shift;
	my $msgid = $self->msgid;

	if (scalar @$msgid) {
		$self->msgid([ $msgid->[0], $msgid->[1], $msgid->[2], ++$msgid->[3] ]);
	} else {
		$msgid = [ int rand(65536**2), int rand(65536**2), int rand(65536**2), int rand(65536**2) ];

		$self->msgid($msgid);
	}

	return wantarray ? @$msgid : $msgid;
}

sub _read_socket {
	my $self = shift;
	my $buf = $self->readbuf;

	local $SIG{PIPE} = 'IGNORE';

	if ($self->is_outgoing) {
		my $ret = $self->socket->sysread($buf, 13, length $buf);

		if ($ret == 0) {
			$self->disconnect;
			return;
		}

		$self->readbuf($buf);

		if ($buf eq "GNUTELLA OK\n\n") {
			$self->readbuf("");
			$self->connected(3); # ESTABLISHED

			my $event = Net::Gnutella::Event->new(
				from => $self,
				type => "connected",
			);

			$self->parent->_handler($event);

			return;
		}

		if (length $buf >= 13) {
			$self->error("Invalid response");
			$self->disconnect;
			return;
		}
	} elsif ($self->is_incoming) {
		my $ret = $self->socket->sysread($buf, 1, length $buf);

		if ($ret == 0) {
			$self->disconnect;
			return;
		}

		$self->readbuf($buf);

		if ($buf =~ /^\w+[^\012]+HTTP\/\d+\.\d+\015?\012/) {
			if ($buf =~ /\015?\012\015?\012/) {
				unless ($self->allow & 2) {
					$self->disconnect;
					return;
				}

				$self->readbuf("");
				$self->connected(4); # HTTP

				unless ($buf =~ s/^(\w+)[ \t]+(.+)[ \t]+(HTTP\/\d+\.\d+)[^\012]*\012//) {
					$self->send_error(400);  # BAD_REQUEST
					$self->error("Bad request line");
					return;
				}

				my $url = URI::URL->new($2);
				my $request = HTTP::Request->new($1, $url);

				my ($key, $val);

				HEADER: while ($buf =~ s/^([^\012]*)\012//) {
					$_ = $1;
					s/\015$//;

					if (/^([\w\-]+)\s*:\s*(.*)/) {
						$request->push_header($key, $val) if $key;
						($key, $val) = ($1, $2);
					} elsif (/^\s+(.*)/) {
						$val .= " $1";
					} else {
						last HEADER;
					}
				}

				$request->push_header($key, $val) if $key;

				my $event = Net::Gnutella::Event->new(
					from   => $self,
					type   => "download_req",
					packet => $request,
				);

				$self->parent->_handler($event);

				return;
			} elsif (length($buf) > 1*1024) {
				$self->disconnect;
				$self->error("Very long header");
				return;
			}
		} elsif ($buf =~ /^GNUTELLA CONNECT\/(\d+\.\d+)\015?\012\015?\012/) {
			if ($1 le "0.4") {
				unless ($self->allow & 1) {
					$self->disconnect;
					return;
				}

				$self->readbuf("");
				$self->connected(3); # ESTABLISHED

				$self->_write_wrapper("GNUTELLA OK\n\n");

				my $event = Net::Gnutella::Event->new(
					from => $self,
					type => "connected",
				);

				$self->parent->_handler($event);

				return;
			} else {
				$self->disconnect;
				return;
			}
		} elsif (length($buf) > 1*1024) {
			$self->disconnect;
			$self->error("Very long first line");
			return;
		}
	} elsif ($self->is_established) {
		my $ret = $self->socket->sysread($buf, 256, length $buf);

		if ($ret == 0) {
			$self->disconnect("disconnect");
			return;
		}

		$self->readbuf($buf);

		printf STDERR " - Read %d bytes, buffer has %d bytes\n", $ret, length $buf if $self->debug;

		PROCESS: {
			if (length $buf < 23) {
				last PROCESS;
			}

			my @msgid = unpack("L4", substr($buf, 0, 16));
			my $func  = unpack("C",  substr($buf, 16, 1));
			my $ttl   = unpack("C",  substr($buf, 17, 1));
			my $hops  = unpack("C",  substr($buf, 18, 1));
			my $len   = unpack("L",  substr($buf, 19, 4));

			if (length($buf) < 23+$len) {
				last PROCESS;
			}

			my $head = substr($buf, 0, 23, '');
			my $data = substr($buf, 0, $len, '');

			printf STDERR " - Full packet read, %d bytes left\n", length $buf if $self->debug;

			my $class;

			if ($func == 0) {
				goto PROCESS if $len != 0;
				$class = "Net::Gnutella::Packet::Ping";
			} elsif ($func == 1) {
				goto PROCESS if $len != 14;
				$class = "Net::Gnutella::Packet::Pong";
			} elsif ($func == 64) {
				goto PROCESS if $len != 26;
				$class = "Net::Gnutella::Packet::Push";
			} elsif ($func == 128) {
				goto PROCESS if $len >= 257;
				$class = "Net::Gnutella::Packet::Query";
			} elsif ($func == 129) {
				goto PROCESS if $len >= 67_075;
				$class = "Net::Gnutella::Packet::Reply";
			} else {
				goto PROCESS;
			}

			my $packet = $class->new(
				Msgid    => \@msgid,
				Function => $func,
				TTL      => $ttl,
				Hops     => $hops,
				Parse    => $data,
			);

			my $event = Net::Gnutella::Event->new(
				from   => $self,
				type   => $func,
				packet => $packet,
			);

			$self->parent->_handler($event);

			goto PROCESS;
		}

		printf STDERR " - buffer has %d bytes\n\n", length $buf if $self->debug;

		$self->readbuf($buf);
	} else {
		my $ret = $self->socket->sysread($buf, 16*1024, length($buf));

		if ($self->is_upload) {
			$self->disconnect("upload_error");
		} else {
			$self->disconnect;
		}
	}

	return;
}

sub _write_socket {
	my ($self, $sock, $fh) = @_;
	my $buf = $self->writebuf;

	local $SIG{PIPE} = 'IGNORE';

printf STDERR " - Writing to FH, bytes in buffer: %s\n", length($buf) if $self->debug;

	if (length($buf) == 0) {
		return;
	}

	my $len = $self->socket->syswrite($buf, length $buf);

	if ($len == 0) {
		$self->disconnect;
		return;
	}

	substr($buf, 0, $len, '');

printf STDERR " - Wrote %d bytes, %d bytes left\n", $len, length($buf) if $self->debug;

	if ($self->is_upload and defined $fh) {
printf "Buf length %d, pos %d\n", length $buf, tell($fh);
		my $read = sysread($fh, $buf, (16*1024)-length($buf), length($buf));
printf "Reading [%s] [%d] [%d]\n", $fh, $read, tell($fh);
	}

	$self->writebuf($buf);

	if (length($buf)) {
		return;
	}

	$self->parent->_remove_fh($self->socket, "w");

	if ($self->is_upload and $fh and ref($fh) eq "IO::File") {
		$self->disconnect("download_complete");
	} elsif ($self->is_http) {
		$self->disconnect;
	}
}

sub _write_wrapper {
	my ($self, $data, @args) = @_;
	my $buf = $self->writebuf;

	$self->writebuf($buf.$data);
	$self->parent->_add_fh($self->socket, $self->can("_write_socket"), "w", $self, @args);
}

1;