### simple package for handling the stream request port package Mogstored::SideChannelClient; use strict; use base qw{Perlbal::Socket}; use fields ( 'count', # how many requests we've serviced 'read_buf', # unprocessed read buffer 'mogsvc', # the mogstored Perlbal::Service object ); use Digest; use POSIX qw(O_RDONLY); use Mogstored::TaskQueue; # TODO: interface to make this tunable my %digest_queues; # needed since we're pretending to be a Perlbal::Socket... never idle out sub max_idle_time { return 0; } sub new { my Mogstored::SideChannelClient $self = shift; $self = fields::new($self) unless ref $self; $self->SUPER::new(@_); $self->{count} = 0; $self->{read_buf} = ''; $self->{mogsvc} = Perlbal->service('mogstored'); return $self; } sub validate_uri { my ($self, $uri) = @_; if ($uri =~ /\.\./) { $self->write("ERROR: uri invalid (contains ..)\r\n"); return; } $uri; } sub event_read { my Mogstored::SideChannelClient $self = shift; my $bref = $self->read(1024); return $self->close unless defined $bref; $self->{read_buf} .= $$bref; $self->read_buf_consume; } sub read_buf_consume { my $self = shift; my $path = $self->{mogsvc}->{docroot}; while ($self->{read_buf} =~ s/^(.+?)\r?\n//) { my $cmd = $1; if ($cmd =~ /^size (\S+)$/) { # increase our count $self->{count}++; my $uri = $self->validate_uri($1); return unless defined($uri); # now stat the file to get the size and such Perlbal::AIO::aio_stat("$path$uri", sub { return if $self->{closed}; my $size = -e _ ? -s _ : -1; $self->write("$uri $size\r\n"); }); } elsif ($cmd =~ /^watch$/i) { unless (Mogstored->iostat_available) { $self->write("ERR iostat unavailable\r\n"); next; } $self->watch_read(0); Mogstored->iostat_subscribe($self); } elsif ($cmd =~ /^(MD5|SHA-1) (\S+)(?: (\w+))?$/) { # we can easily enable other hash algorithms with the above # regexp, but we won't for now (see MogileFS::Checksum) my $alg = $1; my $uri = $self->validate_uri($2); my $reason = $3; return unless defined($uri); return $self->digest($alg, $path, $uri, $reason); } else { # we don't understand this so pass it on to manage command interface my @out; Perlbal::run_manage_command($cmd, sub { push @out, $_[0]; }); $self->write(join("\r\n", @out) . "\r\n"); } } } # stop watching writeability if we've nothing else to # write to them. else just kick off more writes. sub event_write { my $self = shift; $self->watch_write(0) if $self->write(undef); } # override Danga::Socket's event handlers which die sub event_err { $_[0]->close; } sub event_hup { $_[0]->close; } # as_string handler sub as_string { my Mogstored::SideChannelClient $self = shift; my $ret = $self->SUPER::as_string; $ret .= "; size_requests=$self->{count}"; return $ret; } sub close { my Mogstored::SideChannelClient $self = shift; Mogstored->iostat_unsubscribe($self); $self->SUPER::close; } sub die_gracefully { Mogstored->on_sidechannel_die_gracefully; } sub digest { my ($self, $alg, $path, $uri, $reason) = @_; $self->watch_read(0); Perlbal::AIO::aio_open("$path$uri", O_RDONLY, 0, sub { my $fh = shift; if ($self->{closed}) { CORE::close($fh) if $fh; return; } if ($fh) { my $queue; if ($reason && $reason eq "fsck") { # fstat(2) should return immediately, no AIO needed my $devid = (stat($fh))[0]; $queue = $digest_queues{$devid} ||= Mogstored::TaskQueue->new; $queue->run(sub { $self->digest_fh($alg, $fh, $uri, $queue) }); } else { $self->digest_fh($alg, $fh, $uri); } } else { $self->write("$uri $alg=-1\r\n"); $self->after_long_request; } }); } sub digest_fh { my ($self, $alg, $fh, $uri, $queue) = @_; my $offset = 0; my $data = ''; my $digest = Digest->new($alg); my $cb; $cb = sub { my $retval = shift; if ($retval > 0) { my $bytes = length($data); $offset += $bytes; $digest->add($data); Perlbal::AIO::aio_read($fh, $offset, 0x100000, $data, $cb); } elsif ($retval == 0) { # EOF $cb = undef; CORE::close($fh); $digest = $digest->hexdigest; $self->write("$uri $alg=$digest\r\n"); $queue->task_done if $queue; $self->after_long_request; } else { $cb = undef; CORE::close($fh); $self->write("ERR read $uri at $offset failed\r\n"); $queue->task_done if $queue; $self->after_long_request; # should we try to continue? } }; Perlbal::AIO::aio_read($fh, $offset, 0x100000, $data, $cb); } sub after_long_request { my $self = shift; if ($self->{read_buf} =~ /^(.+?)\r?\n/) { $self->read_buf_consume; } else { $self->watch_read(1); } } 1;