Created
April 21, 2017 12:36
-
-
Save ufobat/d6236c1860aea87bf5a94c06ed5b3ae5 to your computer and use it in GitHub Desktop.
mojo async psgi
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package E2L::PSGI::Mojo::CacheObject; | |
use Moose; | |
use common::sense; | |
use namespace::autoclean; | |
use MooseX::StrictConstructor; | |
has cv => ( | |
is => 'ro', | |
isa => 'AnyEvent::CondVar', | |
required => 1, | |
default => sub { AnyEvent->condvar } | |
); | |
has write_buff => ( is => 'rw', isa => 'Object', required => 0 ); | |
has first_resp => ( is => 'rw', isa => 'Bool', required => 1, default => 1); | |
has offset => ( is => 'rw', isa => 'Int', required => 1, default => 0 ); | |
__PACKAGE__->meta->make_immutable; | |
1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package E2L::PSGI::Mojo::Server2; | |
use common::sense; | |
use Mojo::Base 'Mojo::Server'; | |
use Time::HiRes qw/time/; | |
use Carp 'carp'; | |
use E2L::PSGI::Mojo::CacheObject; | |
sub run { | |
my ( $self, $env ) = @_; | |
my $tx = $self->build_tx; | |
my $req = $tx->req->parse($env); | |
my $req_cache = E2L::PSGI::Mojo::CacheObject->new(); | |
$tx->local_port( $env->{SERVER_PORT} )->remote_address( $env->{REMOTE_ADDR} ); | |
# Request body (may block if we try to read too much) | |
my $len = $env->{CONTENT_LENGTH}; | |
until ( $req->is_finished ) { | |
my $chunk = ( $len && $len < 131072 ) ? $len : 131072; | |
last unless my $read = $env->{'psgi.input'}->read( my $buffer, $chunk, 0 ); | |
$req->parse($buffer); | |
last if ( $len -= $read ) <= 0; | |
} | |
$tx->on( resume => $self->curry::resume_cb($req_cache, $tx) ); | |
$tx->on( finish => sub { | |
# keep a reference to $tx, because it gets weakend according to mojo documentation | |
undef $self; | |
undef $tx; | |
} | |
); | |
# PSGI response | |
my $response = sub { | |
my $responder = shift; | |
my $callback = sub { | |
my $write_buff = $responder->( shift->recv ); # statuscode and headers | |
$req_cache->write_buff($write_buff); # sets io handle | |
$self->write_data( $req_cache, $tx ); # need to be called untill things are empty | |
}; | |
$req_cache->cv->cb($callback); | |
$self->emit( request => $tx ); | |
}; | |
return $response; | |
} | |
sub resume_cb { | |
my $self = shift; | |
my $req_cache = shift; | |
my $tx = shift; | |
if ( $req_cache->write_buff ) { | |
$self->write_data($req_cache, $tx); | |
} elsif ($req_cache->first_resp) { | |
$req_cache->first_resp(0); | |
# FIRST RESPONSE | |
my $res = $tx->res->fix_headers; | |
my $hash = $res->headers->to_hash(1); | |
my $code = $res->code; | |
my @headers; | |
for my $name ( keys %$hash ) { | |
push @headers, map { $name => $_ } @{ $hash->{$name} }; | |
} | |
$req_cache->cv->send( [ $code // 404, \@headers ] ); | |
} else { | |
# but as soon as cv->cb gets call we do a write buff | |
} | |
} | |
sub write_data { | |
my $self = shift; | |
my $req_cache = shift; | |
my $tx = shift; | |
my $offset = $req_cache->offset; | |
my $chunk = $tx->res->get_body_chunk($offset); | |
if (defined $chunk and my $len = length($chunk)) { | |
$req_cache->offset( $offset + $len ); | |
$req_cache->write_buff->write($chunk); | |
# again maybe there is more to write | |
$self->write_data($req_cache, $tx); | |
} else { | |
# nothing to write | |
if ($tx->res->is_finished) { | |
# otherwise $c->on(finish => ...) does not get called. | |
$tx->closed(); | |
} | |
} | |
} | |
sub to_psgi_app { | |
my $self = shift; | |
# Preload application and wrap it | |
$self->app; | |
return sub { $self->run(@_) } | |
} | |
1; |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment