package Continuity::Adapt::PSGI; =head1 NAME Continuity::Adapt::PSGI - PSGI backend for Continuity =head1 SYNOPSIS # Run with on of these: # corona demo.pl # twiggy demo.pl # ./myapp.pl # Will try to fall back to HttpDaemon ;) # "Twiggy is a lightweight and fast HTTP server" # "Corona is a Coro based Plack web server. It uses Net::Server::Coro under the hood" use Continuity; my $server = Continuity->new; sub main { my $request = shift; my $i = 0; while(++$i) { $request->print("Hello number $i!"); $request->next; } } # This is actually returning a subref to PSI/Plack # So put it at the end $server->loop; =cut use strict; use warnings; use Continuity::Request; use base 'Continuity::Request'; use Coro; use Coro::Channel; use Plack; use Plack::App::File; # use this now; no surprises for later warn "tested against Plack 0.9938; you have $Plack::VERSION" if $Plack::VERSION < 0.9938; sub debug_level { exists $_[1] ? $_[0]->{debug_level} = $_[1] : $_[0]->{debug_level} } sub debug_callback { exists $_[1] ? $_[0]->{debug_callback} = $_[1] : $_[0]->{debug_callback} } sub docroot { exists $_[1] ? $_[0]->{docroot} = $_[1] : $_[0]->{docroot} } sub new { my $class = shift; bless { first_request => 1, debug_level => 1, debug_callback => sub { print STDERR "@_\n" }, request_queue => Coro::Channel->new(), @_ }, $class; } sub get_request { # called from Continuity's main loop (new calls start_request_loop; start_request_loop gets requests from here or whereever and passes them to the mapper) my ($self) = @_; my $request = $self->{request_queue}->get or die; return $request; } sub loop_hook { my $self = shift; # $server->loop calls this; plackup run .psgi files except a coderef as the # last value and this lets that coderef fall out of the call to # $server->loop. # uniqe to the PSGI adapter -- a coderef that gets invoked when a request # comes in my $app = sub { my $env = shift; unless ($env->{'psgi.streaming'}) { die 'This application needs psgi.streaming support!'; } # stuff $env onto a queue that get_request above pulls from; get_request is # called from Continuity's main execution context/loop. Continuity's main # execution loop invokes the Mapper to send the request across a queue to # the per session execution context (creating a new one as needed). return sub { my $response = shift; async { local $Coro::current->{desc} = 'PSGI Response Maker'; # make it now and send it through the queue fully formed my $request = Continuity::Adapt::PSGI::Request->new( $env, $response ); $self->{request_queue}->put($request); # Now... we wait! $request->{response_done_watcher}->wait; }; }; }; # Is this needed? Coro::cede(); return $app; } =head2 C<< $adapter->map_path($path) >> Decodes URL-encoding in the path and attempts to guard against malice. Returns the processed filesystem path. =cut sub map_path { my $self = shift; my $path = shift() || ''; my $docroot = $self->docroot || ''; # my $docroot = Cwd::getcwd(); # $docroot .= '/' if $docroot and $docroot ne '.' and $docroot !~ m{/$}; # some massaging, also makes it more secure $path =~ s/%([0-9a-fA-F][0-9a-fA-F])/chr hex $1/ge; $path =~ s%//+%/%g unless $docroot; $path =~ s%/\.(?=/|$)%%g; $path =~ s%/[^/]+/\.\.(?=/|$)%%g; # if($path =~ m%^/?\.\.(?=/|$)%) then bad $self->Continuity::debug(2,"path: $docroot$path\n"); return "$docroot$path"; } sub send_static { my ($self, $r) = @_; # this is called from Continuity.pm to give a request back to us to deal with that it got from our get_request. # rather than sending it to the mapper to get sent to the per-user execution context, it gets returned straight back here. # $r is an instance of Continuity::Adapt::PSGI::Request my $url_path = $r->url_path; $url_path =~ s{\?.*}{}; my $path = $self->map_path($url_path) or do { $self->Continuity::debug(1, "can't map path: " . $url_path); # die; # XXX don't die except in debugging ( $r->{response_code}, $r->{response_headers}, $r->{response_content} ) = ( 404, [], [ "Static file not found" ] ); $r->{response_done_watcher}->send; return; }; my $stuff = Plack::App::File->serve_path({},$path); ( $r->{response_code}, $r->{response_headers}, $r->{response_content} ) = @$stuff; $r->response->( [ $r->response_code, $r->response_headers, $r->response_content ] ); $r->{response_done_watcher}->send; } # # # package Continuity::Adapt::PSGI::Request; use Coro::Signal; use Coro::AnyEvent; # List of cookies to send sub cookies { exists $_[1] ? $_[0]->{cookies} = $_[1] : $_[0]->{cookies} } # Flag, never send type sub no_content_type { exists $_[1] ? $_[0]->{no_content_type} = $_[1] : $_[0]->{no_content_type} } # CGI query params sub cached_params { exists $_[1] ? $_[0]->{cached_params} = $_[1] : $_[0]->{cached_params} } # The writer is kinda like our connection sub writer { exists $_[1] ? $_[0]->{writer} = $_[1] : $_[0]->{writer} } sub response { exists $_[1] ? $_[0]->{response} = $_[1] : $_[0]->{response} } sub response_code { exists $_[1] ? $_[0]->{response_code} = $_[1] : $_[0]->{response_code} } sub response_headers { exists $_[1] ? $_[0]->{response_headers} = $_[1] : $_[0]->{response_headers} } sub response_content { exists $_[1] ? $_[0]->{response_content} = $_[1] : $_[0]->{response_content} } sub debug_level { exists $_[1] ? $_[0]->{debug_level} = $_[1] : $_[0]->{debug_level} } sub debug_callback { exists $_[1] ? $_[0]->{debug_callback} = $_[1] : $_[0]->{debug_callback} } sub new { my ($class, $env, $response) = @_; my $self = { response_code => 200, response_headers => [], response_content => [], response_done_watcher => Coro::Signal->new, response => $response, debug_level => 3, debug_callback => sub { print STDERR "@_\n" }, %$env }; bless $self, $class; return $self; } sub param { my $self = shift; my $env = { %$self }; unless($self->cached_params) { use Plack::Request; my $req = Plack::Request->new($env); $self->cached_params( [ %{$req->parameters} ] ); }; my @params = @{ $self->cached_params }; if(@_) { my @values; while(@_) { my $param = shift; for(my $i = 0; $i < @params; $i += 2) { push @values, $params[$i+1] if $params[$i] eq $param; } } return unless @values; return wantarray ? @values : $values[0]; } else { return @{$self->cached_params}; } } sub params { my $self = shift; $self->param; return @{$self->cached_params}; } sub method { my ($self) = @_; return $self->{REQUEST_METHOD}; } sub url { my ($self) = @_; return $self->{'psgi.url_scheme'} . '://' . $self->{HTTP_HOST} . $self->{PATH_INFO}; } sub url_path { my ($self) = @_; return $self->{PATH_INFO}; } sub uri { my $self = shift; return $self->url(@_); } sub set_cookie { my $self = shift; my $cookie = shift; # record cookies and then send them the next time send_basic_header() is called and a header is sent. #$self->{Cookie} = $self->{Cookie} . "Set-Cookie: $cookie"; push @{ $self->{response_headers} }, "Set-Cookie" => "$cookie"; } sub get_cookie { my $self = shift; my $cookie_name = shift; my ($cookie) = map $_->[1], grep $_->[0] eq $cookie_name, map [ m/(.*?)=(.*)/ ], split /; */, $self->{HTTP_COOKIE} || ''; return $cookie; } sub immediate { } sub send_basic_header { my $self = shift; my $cookies = $self->cookies; $self->cookies(''); unless($self->no_content_type) { push @{ $self->{response_headers} }, "Cache-Control" => "private, no-store, no-cache", "Pragma" => "no-cache", "Expires" => "0", "Content-type" => "text/html", ; } my $writer = $self->response->( [ $self->response_code, $self->response_headers ] ); $self->writer( $writer ); } sub print { my $self = shift; eval { $self->writer->write( @_ ); }; # This is a good time to let other stuff run Coro::AnyEvent::idle(); return $self; } sub end_request { my $self = shift; # Tell our writer that we're done $self->writer->close if $self->writer; # Signal that we are done building our response $self->{response_done_watcher}->send; } 1;