package Stardust; use 5.008; use strict; use warnings; use base 'Squatting'; use IO::All; use Set::Object; use File::ShareDir ':ALL'; our $VERSION = '0.07'; our %CONFIG = ( debug => 0, # Noisy output to STDERR? allow_from => '127.0.0.1', # Who may send us COMET messages? auth_user => undef, # If defined, make them HTTP Auth themselves auth_pass => undef, # ...before allowing them to send COMET messages to us. channel_length => 8, # How many messages should a channel hold on to? timeout => 55, # How many seconds before we end a long-poll request? port => 5742, # What port should Stardust listen on? base => '', # What should the base path for Stardust's URLs be? ); sub continue { my ($class, @args) = @_; if ($CONFIG{demo}) { require Sys::Hostname; my $hostname = lc Sys::Hostname::hostname(); my $path = "/demo/"; if ($CONFIG{base}) { $path = "$CONFIG{base}$path"; } print " The demo is at: http://$hostname:$CONFIG{port}$path\n"; } $class->next::method(@args, docroot => dist_dir('Stardust')); } package Stardust::Controllers; use strict; use warnings; use aliased 'Squatting::H'; use Squatting ':controllers'; use Time::HiRes 'time'; use JSON; use AnyEvent; use Coro; use Coro::AnyEvent; use Coro::Timer; use Coro::Signal; our $Channel = H->new({ i => 0, # current position in messages array size => $CONFIG{channel_length}, # size of messages array messages => [], # circular array of messages signal => Coro::Signal->new, # signal that is broadcast upon write subscribers => [], # subscribed client list # write messages to this channel write => sub { my ($self, @messages) = @_; my $i = $self->{i}; # warn $i; my $size = $self->{size}; my $m = $self->{messages}; for (@messages) { $_->{_ts} = time; $_->{_ch} = $self->{name}; $m->[$i++] = $_; $i = 0 if ($i >= $size); } # warn $i; $self->{i} = $i; $self->signal->broadcast; @messages; }, # read $y messages from this channel read => sub { my ($self, $y) = @_; my $size = $self->{size}; my $m = $self->{messages}; my $x; $y ||= 1; $y = $size if ($y > $size); my $i; $i = $self->{i} - 1; $i = ($size - 1) if ($i < 0); my @messages; for ($x = 0; $x < $y; $x++) { # warn $i; unshift @messages, $m->[$i]; $i--; $i = ($size - 1) if ($i < 0); } @messages; }, # read messages since $last time read_since => sub { my ($self, $last) = @_; grep { defined && ($_->{_ts} > $last) } $self->read($self->size); }, to_hash => sub { my ($self) = @_; { name => $self->name, i => $self->i, size => $self->size, messages => $self->messages, subscribers => $self->subscribers, }; }, }); our %channels; sub channel { my ($name) = @_; $channels{$name} ||= $Channel->clone({ name => $name }); } my $info = qq|{ "name" : "Stardust COMET Server", "language" : "Perl", "version" : $VERSION } |; our @C = ( # Home - [public] # General Information C( Home => [ '/' ], get => sub { my ($self) = @_; $self->headers->{'Content-Type'} = 'text/plain'; return $info; }, ), # ChannelList - [public] # This returns a list of all channel names currently in use. C( ChannelList => [ '/channel' ], get => sub { my ($self) = @_; encode_json([ sort keys %channels ]); } ), # Channel # To generate messages on a channel, POST a JSON object to this controller # using the CGI variable 'm'. # # NOTE: # The post method of this controller is meant for INTERNAL USE ONLY. # By default, only clients from 127.0.0.1 can access this controller. # Everyone else is rejected. C( Channel => [ '/channel/([\w+]+)' ], # [public] It should return a list of channel objects. get => sub { my ($self, $channels) = @_; my @ch = split(/\+/, $channels); encode_json([ map { my $ch = channel($_); $ch->to_hash } @ch ]); }, # [private] It should accept a JSON object and send it to the appropriate channels. post => sub { my ($self, $channels) = @_; my $m = $self->input->{m}; return unless $m; my @ch = split(/\+/, $channels); my @ev; my $messages = (ref($m) eq 'ARRAY') ? $m : [$m]; @ev = map { decode_json($_) } @$messages; for my $name (@ch) { for my $event (@ev) { channel($name)->write($event); } } }, ), # Message - [public] # This controller emits a stream of messages to long-polling clients. C( Message => [ '/channel/([\w+]+)/stream/([.\d]+)' ], get => sub { warn "coro [$Coro::current]" if $CONFIG{debug}; my ($self, $channels, $client_id) = @_; my $input = $self->input; my $cr = $self->cr; my @ch = split(/\+/, $channels); my $last = time; while (1) { # Output warn "top of loop" if $CONFIG{debug}; my @messages = grep { defined } map { my $ch = channel($_); $ch->read_since($last) } @ch; my $x = async { warn "printing...".encode_json(\@messages) if $CONFIG{debug}; $cr->print(encode_json(\@messages)); }; $x->join; $last = time; # Hold for a brief moment until the next long poll request comes in. warn "waiting for next request" if ($CONFIG{debug}); $cr->next; # Start 1 coro for each channel we're listening to. # Each coro will have the same Coro::Signal object, $activity. my $activity = Coro::Signal->new; my @coros = map { my $ch = channel($_); async { $ch->signal->wait; $activity->broadcast }; } @ch; # When running this behind a reverse proxy, # it's useful to timeout before your proxy kills the connection. push @coros, async { my $timeout = Coro::Timer::timeout $CONFIG{timeout}; while (not $timeout) { Coro::schedule; } warn "timeout\n" if $CONFIG{debug}; $activity->broadcast; }; # The first coro that does $activity->broadcast wins. warn "waiting for activity on any of (@ch); last is $last" if $CONFIG{debug}; $activity->wait; # Cancel the remaining coros. for (@coros) { $_->cancel } } }, continuity => 1, ), ); 1; __END__ =head1 NAME Stardust - the simplest COMET server I could imagine =head1 SYNOPSIS Installing Stardust: $ sudo cpan Stardust Running the COMET server on port 5555: $ stardust.pl --port=5555 --base=/comet Making pages subscribe to channel 'foo': Posting JSON messages to channel 'foo': curl -d 'm={ "type": "TestMessage", "data": [3, 2, 1] }' \ http://localhost:5555/comet/channel/foo =head1 DESCRIPTION Stardust is a simple COMET server that can be integrated alongside existing web applications. =head1 CONCEPTS =head2 Message Messages are just abritrary JSON objects. =head2 Channel Channels are where messages travel trough. =head1 API Communication with the Stardust COMET server uses JSON over HTTP. The following URLs represent your API. =head2 GET / This is just a little informational JSON-encoded data that tells you what version of the Stardust server you're using. =head2 GET /channel This returns a list of all the channel names currently in use as a JSON-encoded array of strings. =head2 GET /channel/([\w+]+) This returns info about the specified channels as a JSON-encoded array of objects. =head2 POST /channel/([\w+]+) This allows one to send a message to the specified channels. B: =over 4 =item m an JSON-encoded object. This parameter may be repeated if you want to send more than one message per POST request. =back =head2 GET /channel/([\w+]+)/stream/([.\d]+) Long poll on this URL to receive a stream of messages as they become available. They will come back to you as a JSON-encoded array of objects. =head1 CONFIGURATION =head2 nginx static + stardust upstream stardust_com_et { server 127.0.0.1:5742; } server { listen 80; server_name stardust.com.et; location / { root /thttpd/stardust.com.et; index index.html index.htm; } location /comet { proxy_pass http://stardust_com_et; } } =head2 nginx fastcgi + stardust TODO =head2 nginx reverse proxy + stardust TODO =head2 apache2 static + stardust TODO =head2 apache2 fastcgi + stardust TODO =head2 apache2 reverse proxy + stardust TODO =head1 SEE ALSO =over 4 =item GitHub Repository L =item jQuery.ev L =item AnyEvent, Coro, Continuity L, L, L =item Squatting L L =back =head1 AUTHOR John BEPPU Ebeppu@cpan.orgE =head1 SPECIAL THANKS Thanks to Marc Lehmann for his work on L and L. Thanks to Brock Wilcox and Scott Walters for their work on L. =head1 COPYRIGHT Copyright (c) 2009 John BEPPU Ebeppu@cpan.orgE. =head2 The "MIT" License Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. =cut # Local Variables: *** # mode: cperl *** # indent-tabs-mode: nil *** # cperl-close-paren-offset: -2 *** # cperl-continued-statement-offset: 2 *** # cperl-indent-level: 2 *** # cperl-indent-parens-as-block: t *** # cperl-tab-always-indent: nil *** # End: *** # vim:tabstop=8 softtabstop=2 shiftwidth=2 shiftround expandtab