package Perlbal::Plugin::AtomStream; use URI; use Perlbal; use strict; use warnings; our @subs; # subscribers our @recent; # recent items in format [$epoch, $atom_ref, $path_segments_arrayref] our $last_timestamp = 0; use constant MAX_LAG => 262144; sub InjectFeed { my $class = shift; my ($atomref, $path) = @_; # maintain queue of last 60 seconds worth of posts my $now = time(); my @put_segments = URI->new($path)->path_segments; push @recent, [ $now, $atomref, \@put_segments ]; shift @recent while @recent && $recent[0][0] <= $now - 60; emit_timestamp($now) if $now > $last_timestamp; my $need_clean = 0; foreach my $s (@subs) { if ($s->{closed}) { $need_clean = 1; next; } next unless filter(\@put_segments, $s->{scratch}{get_segments}); my $lag = $s->{write_buf_size}; if ($lag > MAX_LAG) { $s->{scratch}{skipped_atom}++; } else { if (my $skip_count = $s->{scratch}{skipped_atom}) { $s->{scratch}{skipped_atom} = 0; $s->write(\ "\n"); } $s->watch_write(0) if $s->write($atomref); } } if ($need_clean) { @subs = grep { ! $_->{closed} } @subs; } } sub emit_timestamp { my $time = shift; $last_timestamp = $time; foreach my $s (@subs) { next if $s->{closed}; $s->{alive_time} = $time; $s->write(\ "\n"); } } sub filter { my ($put, $get) = @_; return 0 if scalar @$put < scalar @$get; for( my $i = 0 ; $i < scalar @$get ; $i++) { return 0 if $put->[$i] ne $get->[$i]; } return 1; } # called when we're being added to a service sub register { my ($class, $svc) = @_; Perlbal::Socket::register_callback(1, sub { my $now = time(); emit_timestamp($now) if $now > $last_timestamp; return 1; }); $svc->register_hook('AtomStream', 'start_http_request', sub { my Perlbal::ClientProxy $self = shift; my Perlbal::HTTPHeaders $hds = $self->{req_headers}; return 0 unless $hds; my $uri = URI->new($hds->request_uri); my @get_segments = $uri->path_segments; $self->{scratch}{get_segments} = \@get_segments; return 0 unless pop @get_segments eq 'atom-stream.xml'; my %params = $uri->query_form; my $since = $params{since} =~ /\d+/ ? $params{since} : 0; my $res = $self->{res_headers} = Perlbal::HTTPHeaders->new_response(200); $res->header("Content-Type", "text/xml"); $res->header('Connection', 'close'); push @subs, $self; $self->write($res->to_string_ref); my $last_rv = $self->write(\ "\n\n"); # if they'd like a playback, give them all items >= time requested if ($since) { foreach my $item (@recent) { next if $item->[0] < $since; next unless filter($item->[2], \@get_segments); $last_rv = $self->write($item->[1]); } } $self->watch_write(0) if $last_rv; return 1; }); return 1; } # called when we're no longer active on a service sub unregister { my ($class, $svc) = @_; return 1; } # called when we are loaded sub load { return 1; } # called for a global unload sub unload { return 1; } 1;