Skip to content

Instantly share code, notes, and snippets.

@ufobat
Created April 21, 2017 12:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ufobat/d6236c1860aea87bf5a94c06ed5b3ae5 to your computer and use it in GitHub Desktop.
Save ufobat/d6236c1860aea87bf5a94c06ed5b3ae5 to your computer and use it in GitHub Desktop.
mojo async psgi
package E2L::PSGI::Mojo::CacheObject;
use Moose;
use common::sense;
use namespace::autoclean;
use MooseX::StrictConstructor;
has cv => (
is => 'ro',
isa => 'AnyEvent::CondVar',
required => 1,
default => sub { AnyEvent->condvar }
);
has write_buff => ( is => 'rw', isa => 'Object', required => 0 );
has first_resp => ( is => 'rw', isa => 'Bool', required => 1, default => 1);
has offset => ( is => 'rw', isa => 'Int', required => 1, default => 0 );
__PACKAGE__->meta->make_immutable;
1;
package E2L::PSGI::Mojo::Server2;
use common::sense;
use Mojo::Base 'Mojo::Server';
use Time::HiRes qw/time/;
use Carp 'carp';
use E2L::PSGI::Mojo::CacheObject;
sub run {
my ( $self, $env ) = @_;
my $tx = $self->build_tx;
my $req = $tx->req->parse($env);
my $req_cache = E2L::PSGI::Mojo::CacheObject->new();
$tx->local_port( $env->{SERVER_PORT} )->remote_address( $env->{REMOTE_ADDR} );
# Request body (may block if we try to read too much)
my $len = $env->{CONTENT_LENGTH};
until ( $req->is_finished ) {
my $chunk = ( $len && $len < 131072 ) ? $len : 131072;
last unless my $read = $env->{'psgi.input'}->read( my $buffer, $chunk, 0 );
$req->parse($buffer);
last if ( $len -= $read ) <= 0;
}
$tx->on( resume => $self->curry::resume_cb($req_cache, $tx) );
$tx->on( finish => sub {
# keep a reference to $tx, because it gets weakend according to mojo documentation
undef $self;
undef $tx;
}
);
# PSGI response
my $response = sub {
my $responder = shift;
my $callback = sub {
my $write_buff = $responder->( shift->recv ); # statuscode and headers
$req_cache->write_buff($write_buff); # sets io handle
$self->write_data( $req_cache, $tx ); # need to be called untill things are empty
};
$req_cache->cv->cb($callback);
$self->emit( request => $tx );
};
return $response;
}
sub resume_cb {
my $self = shift;
my $req_cache = shift;
my $tx = shift;
if ( $req_cache->write_buff ) {
$self->write_data($req_cache, $tx);
} elsif ($req_cache->first_resp) {
$req_cache->first_resp(0);
# FIRST RESPONSE
my $res = $tx->res->fix_headers;
my $hash = $res->headers->to_hash(1);
my $code = $res->code;
my @headers;
for my $name ( keys %$hash ) {
push @headers, map { $name => $_ } @{ $hash->{$name} };
}
$req_cache->cv->send( [ $code // 404, \@headers ] );
} else {
# but as soon as cv->cb gets call we do a write buff
}
}
sub write_data {
my $self = shift;
my $req_cache = shift;
my $tx = shift;
my $offset = $req_cache->offset;
my $chunk = $tx->res->get_body_chunk($offset);
if (defined $chunk and my $len = length($chunk)) {
$req_cache->offset( $offset + $len );
$req_cache->write_buff->write($chunk);
# again maybe there is more to write
$self->write_data($req_cache, $tx);
} else {
# nothing to write
if ($tx->res->is_finished) {
# otherwise $c->on(finish => ...) does not get called.
$tx->closed();
}
}
}
sub to_psgi_app {
my $self = shift;
# Preload application and wrap it
$self->app;
return sub { $self->run(@_) }
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment