package Net::Gnutella::Connection; use Net::Gnutella::Packet::Ping; use Net::Gnutella::Packet::Pong; use Net::Gnutella::Packet::Push; use Net::Gnutella::Packet::Query; use Net::Gnutella::Packet::Reply; use Net::Gnutella::Event; use HTTP::Request; use HTTP::Date; use HTTP::Status; use LWP::MediaTypes qw(guess_media_type); use URI::URL; use IO::File; use Carp qw(carp croak confess); use strict; use vars qw/$VERSION $AUTOLOAD/; $VERSION = $VERSION = "0.1"; # Use AUTOHANDLER to supply generic attribute methods # sub AUTOLOAD { my $self = shift; my $attr = $AUTOLOAD; $attr =~ s/.*:://; return unless $attr =~ /[^A-Z]/; # skip DESTROY and all-cap methods confess sprintf "invalid attribute method: %s->%s()", ref($self), $attr unless exists $self->{_attr}->{lc $attr}; $self->{_attr}->{lc $attr} = shift if @_; return $self->{_attr}->{lc $attr}; } sub disconnect { my ($self, $type) = @_; printf STDERR "+ Disconnecting socket (%s)\n", $type if $self->debug; if ($type) { my $event = Net::Gnutella::Event->new( from => $self, type => $type, ); $self->parent->_handler($event); } $self->parent->_remove_fh($self->socket, "rw"); $self->readbuf(""); $self->writebuf(""); $self->connected(0); $self->socket(""); } # ->forward( PACKET [, REPLY_PATH ] ) # # Composes the packet and delivers it to all other ESTABLISHED connections # sub forward { my ($self, $packet, $path) = @_; unless ($packet && ref $packet) { carp "Invalid argument to Net::Gnutella::Connection->forward"; } my $data = $packet->format; my $head = pack("L4CCCL", @{ $packet->msgid }, $packet->function, $packet->ttl, $packet->hops, length $data); if ($path) { if ($path ne $self) { printf STDERR " - Returning down path to %s\n", $path->ip if $self->debug >= 2; $path->_write_wrapper($head.$data); } } else { foreach my $conn ($self->parent->connections) { next if $conn eq $self; next unless $conn->connected; printf STDERR " - Forwarding to %s\n", $conn->ip if $self->debug >= 2; $conn->_write_wrapper($head.$data); } } } sub is_outgoing { $_[0]->connected == 1 } # Outgoing sub is_incoming { $_[0]->connected == 2 } # Incoming sub is_established { $_[0]->connected == 3 } # Gnutella DATA Stream sub is_http { $_[0]->connected == 4 } # HTTP Serving sub is_upload { $_[0]->connected == 5 } # Sending file sub new { my $proto = shift; my $parent = shift; my %args = @_; my $self = { _handler => {}, _attr => { parent => $parent, debug => $parent->debug, timeout => $parent->timeout, socket => undef, ip => '', connected => 0, readbuf => '', writebuf => '', error => '', allow => 0, msgid => [], }, _msgid => {}, }; bless $self, $proto; foreach my $key (keys %args) { my $lkey = lc $key; $self->$lkey($args{$key}); } if ($self->connected and $self->socket) { $self->parent->_add_fh($self->socket, $self->can("_read_socket"), "r", $self); } return $self; } sub send_error { my ($self, $status, $error) = @_; unless ($self->is_http) { croak "Invalid state for ->send_error"; } $status ||= RC_BAD_REQUEST; $error ||= ""; my $message = status_message($status); my $CRLF = "\r\n"; my $ip = "Unknown"; my $port = "Unknown"; my $body = < $status $message

$message

$error

Net::Gnutella $VERSION Server at $ip Port $port
EOT my $head; $head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF; $head .= sprintf "Date: %s%s", time2str(time), $CRLF; $head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF; $head .= sprintf "Content-Type: %s%s", "text/html", $CRLF; $head .= sprintf "Content-Length: %s%s", length($body), $CRLF; $head .= sprintf "%s", $CRLF; $self->_write_wrapper($head.$body); return; } sub send_file { my ($self, $file, $offset) = @_; unless ($self->is_http) { croak "Invalid state for ->send_file"; } if (-f $file) { my ($ct, $ce) = guess_media_type($file); my ($size, $mtime) = (stat _)[7,9]; my $fh = new IO::File $file or return $self->send_error(RC_FORBIDDEN); binmode($fh); if ($offset && $offset > $size) { $offset = 0; } elsif ($offset) { $fh->seek($offset, 0) or $offset = 0; } my $status = $offset ? RC_PARTIAL_CONTENT : RC_OK; my $message = status_message($status); my $CRLF = "\r\n"; my $head; $head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF; $head .= sprintf "Date: %s%s", time2str(time), $CRLF; $head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF; $head .= sprintf "Content-Type: %s%s", $ct, $CRLF; $head .= sprintf "Content-Encoding: %s%s", $ce, $CRLF if $ce; $head .= sprintf "Content-Length: %d%s", $offset ? $size - $offset : $size, $CRLF if $size; $head .= sprintf "Content-Range: bytes %d-%d/%d%s", $offset, $size-1, $size, $CRLF if $offset; $head .= sprintf "Last-Modified: %s%s", time2str($mtime), $CRLF if $mtime; $head .= sprintf "%s", $CRLF; $self->_write_wrapper($head, $fh); $self->connected(5); } else { return $self->send_error(RC_NOT_FOUND); } return 1; } sub send_packet { my ($self, $packet) = @_; unless ($self->is_established) { croak "Invalid state for ->send_packet"; } unless ($packet && ref $packet) { carp "Invalid argument to Net::Gnutella::Connection->send_packet"; } printf STDERR "+ Sending packet '%s'\n", ref($packet) if $self->debug >= 2; my @msgid = @{ $packet->msgid }; unless (scalar @msgid) { @msgid = $self->_new_msgid; } my $data = $packet->format; my $head = pack("L4CCCL", @msgid, $packet->function, $packet->ttl, $packet->hops, length $data); $self->parent->_msgid_source(\@msgid, $self); $self->_write_wrapper($head.$data); return \@msgid; } sub send_page { my ($self, $data) = @_; unless ($self->is_http) { croak "Invalid state in ->send_page"; } my $status = RC_OK; my $message = status_message($status); my $CRLF = "\r\n"; my $head; $head .= sprintf "%s %s %s%s", "HTTP/1.0", $status, $message, $CRLF; $head .= sprintf "Date: %s%s", time2str(time), $CRLF; $head .= sprintf "Server: %s/%s%s", "Net-Gnutella", $VERSION, $CRLF; $head .= sprintf "Content-Type: %s%s", "text/html", $CRLF; $head .= sprintf "Content-Length: %d%s", length($data), $CRLF; $head .= sprintf "%s", $CRLF; $self->_write_wrapper($head.$data); } sub _default { my $self = shift; my $event = shift; my $type = $event->type; my $packet = $event->packet; printf STDERR "%s->%s: Handling event '%s'\n", ref($self), "_default", $type if $self->debug; unless ($packet and ref($packet) =~ /^Net::Gnutella::Packet::/) { return 1; } if ($packet->hops > 7) { printf STDERR "+ Not forwarding, large hop count (%s)\n", $packet->hops if $self->debug; return 1; } if ($packet->ttl > 50) { printf STDERR "+ Not forwarding, large ttl (%s)\n", $packet->ttl if $self->debug; return 1; } if ($packet->ttl > 7) { $packet->ttl(7); } if ($packet->ttl <= 0) { printf STDERR "+ Not forwarding, ttl <= 0 (%s)\n", $packet->ttl if $self->debug; return 1; } else { $packet->ttl($packet->ttl - 1); $packet->hops($packet->hops + 1); } if ($type eq "pong") { $self->parent->_host_cache( join(":", $packet->ip_as_string, $packet->port) ); } # Drop any routed replies which we haven't seen # Drop any duplicate packets # if ($type =~ /^(ping|query|push)$/) { if ($self->parent->_msgid_source($packet->msgid)) { return; # duplicate } else { $self->parent->_msgid_source($packet->msgid, $self); } } elsif ($type =~ /^(pong|reply)$/) { unless ($self->parent->_msgid_source($packet->msgid)) { printf STDERR "+ Not forwarding, unseen msgid to routed type (%s)\n", join(":", @{$packet->msgid}) if $self->debug; return; } } # If the packet is a routed reply (pong and reply) and it didn't originate # from this connection, forward it to the other connection. # # Otherwise, throw it at all the connections (broadcast). # if ($type =~ /^(pong|reply)$/) { my $conn = $self->parent->_msgid_source($packet->msgid); $self->forward($packet, $conn); } elsif ($type =~ /^(ping|push|query)$/) { $self->forward($packet); } return 1; } sub _new_msgid { my $self = shift; my $msgid = $self->msgid; if (scalar @$msgid) { $self->msgid([ $msgid->[0], $msgid->[1], $msgid->[2], ++$msgid->[3] ]); } else { $msgid = [ int rand(65536**2), int rand(65536**2), int rand(65536**2), int rand(65536**2) ]; $self->msgid($msgid); } return wantarray ? @$msgid : $msgid; } sub _read_socket { my $self = shift; my $buf = $self->readbuf; local $SIG{PIPE} = 'IGNORE'; if ($self->is_outgoing) { my $ret = $self->socket->sysread($buf, 13, length $buf); if ($ret == 0) { $self->disconnect; return; } $self->readbuf($buf); if ($buf eq "GNUTELLA OK\n\n") { $self->readbuf(""); $self->connected(3); # ESTABLISHED my $event = Net::Gnutella::Event->new( from => $self, type => "connected", ); $self->parent->_handler($event); return; } if (length $buf >= 13) { $self->error("Invalid response"); $self->disconnect; return; } } elsif ($self->is_incoming) { my $ret = $self->socket->sysread($buf, 1, length $buf); if ($ret == 0) { $self->disconnect; return; } $self->readbuf($buf); if ($buf =~ /^\w+[^\012]+HTTP\/\d+\.\d+\015?\012/) { if ($buf =~ /\015?\012\015?\012/) { unless ($self->allow & 2) { $self->disconnect; return; } $self->readbuf(""); $self->connected(4); # HTTP unless ($buf =~ s/^(\w+)[ \t]+(.+)[ \t]+(HTTP\/\d+\.\d+)[^\012]*\012//) { $self->send_error(400); # BAD_REQUEST $self->error("Bad request line"); return; } my $url = URI::URL->new($2); my $request = HTTP::Request->new($1, $url); my ($key, $val); HEADER: while ($buf =~ s/^([^\012]*)\012//) { $_ = $1; s/\015$//; if (/^([\w\-]+)\s*:\s*(.*)/) { $request->push_header($key, $val) if $key; ($key, $val) = ($1, $2); } elsif (/^\s+(.*)/) { $val .= " $1"; } else { last HEADER; } } $request->push_header($key, $val) if $key; my $event = Net::Gnutella::Event->new( from => $self, type => "download_req", packet => $request, ); $self->parent->_handler($event); return; } elsif (length($buf) > 1*1024) { $self->disconnect; $self->error("Very long header"); return; } } elsif ($buf =~ /^GNUTELLA CONNECT\/(\d+\.\d+)\015?\012\015?\012/) { if ($1 le "0.4") { unless ($self->allow & 1) { $self->disconnect; return; } $self->readbuf(""); $self->connected(3); # ESTABLISHED $self->_write_wrapper("GNUTELLA OK\n\n"); my $event = Net::Gnutella::Event->new( from => $self, type => "connected", ); $self->parent->_handler($event); return; } else { $self->disconnect; return; } } elsif (length($buf) > 1*1024) { $self->disconnect; $self->error("Very long first line"); return; } } elsif ($self->is_established) { my $ret = $self->socket->sysread($buf, 256, length $buf); if ($ret == 0) { $self->disconnect("disconnect"); return; } $self->readbuf($buf); printf STDERR " - Read %d bytes, buffer has %d bytes\n", $ret, length $buf if $self->debug; PROCESS: { if (length $buf < 23) { last PROCESS; } my @msgid = unpack("L4", substr($buf, 0, 16)); my $func = unpack("C", substr($buf, 16, 1)); my $ttl = unpack("C", substr($buf, 17, 1)); my $hops = unpack("C", substr($buf, 18, 1)); my $len = unpack("L", substr($buf, 19, 4)); if (length($buf) < 23+$len) { last PROCESS; } my $head = substr($buf, 0, 23, ''); my $data = substr($buf, 0, $len, ''); printf STDERR " - Full packet read, %d bytes left\n", length $buf if $self->debug; my $class; if ($func == 0) { goto PROCESS if $len != 0; $class = "Net::Gnutella::Packet::Ping"; } elsif ($func == 1) { goto PROCESS if $len != 14; $class = "Net::Gnutella::Packet::Pong"; } elsif ($func == 64) { goto PROCESS if $len != 26; $class = "Net::Gnutella::Packet::Push"; } elsif ($func == 128) { goto PROCESS if $len >= 257; $class = "Net::Gnutella::Packet::Query"; } elsif ($func == 129) { goto PROCESS if $len >= 67_075; $class = "Net::Gnutella::Packet::Reply"; } else { goto PROCESS; } my $packet = $class->new( Msgid => \@msgid, Function => $func, TTL => $ttl, Hops => $hops, Parse => $data, ); my $event = Net::Gnutella::Event->new( from => $self, type => $func, packet => $packet, ); $self->parent->_handler($event); goto PROCESS; } printf STDERR " - buffer has %d bytes\n\n", length $buf if $self->debug; $self->readbuf($buf); } else { my $ret = $self->socket->sysread($buf, 16*1024, length($buf)); if ($self->is_upload) { $self->disconnect("upload_error"); } else { $self->disconnect; } } return; } sub _write_socket { my ($self, $sock, $fh) = @_; my $buf = $self->writebuf; local $SIG{PIPE} = 'IGNORE'; printf STDERR " - Writing to FH, bytes in buffer: %s\n", length($buf) if $self->debug; if (length($buf) == 0) { return; } my $len = $self->socket->syswrite($buf, length $buf); if ($len == 0) { $self->disconnect; return; } substr($buf, 0, $len, ''); printf STDERR " - Wrote %d bytes, %d bytes left\n", $len, length($buf) if $self->debug; if ($self->is_upload and defined $fh) { printf "Buf length %d, pos %d\n", length $buf, tell($fh); my $read = sysread($fh, $buf, (16*1024)-length($buf), length($buf)); printf "Reading [%s] [%d] [%d]\n", $fh, $read, tell($fh); } $self->writebuf($buf); if (length($buf)) { return; } $self->parent->_remove_fh($self->socket, "w"); if ($self->is_upload and $fh and ref($fh) eq "IO::File") { $self->disconnect("download_complete"); } elsif ($self->is_http) { $self->disconnect; } } sub _write_wrapper { my ($self, $data, @args) = @_; my $buf = $self->writebuf; $self->writebuf($buf.$data); $self->parent->_add_fh($self->socket, $self->can("_write_socket"), "w", $self, @args); } 1;