Skip to content

Instantly share code, notes, and snippets.

@kazeburo
Created April 26, 2010 08:49
Show Gist options
  • Save kazeburo/379116 to your computer and use it in GitHub Desktop.
Save kazeburo/379116 to your computer and use it in GitHub Desktop.
#!/usr/bin/perl
use strict;
use warnings;
use AE;
use AnyEvent::HTTP;
use AnyEvent::Curl;
use AnyEvent::MPRPC::Server;
use Coro;
use Coro::Channel;
use Coro::AnyEvent;
use Parallel::Prefork;
$AnyEvent::HTTP::MAX_PER_HOST = 100;
# worker threads
sub build_channel {
warn "[$$] build channel";
my $channel = Coro::Channel->new();
for my $i (0..29) {
my $ua = AnyEvent::Curl->new;
$ua->start;
async {
while(1) {
my $req = $channel->get();
warn "[$$] start req:", time;
$ua->add($req->[0], Coro::rouse_cb );
my ( $res ) = Coro::rouse_wait;
# http_get $req->[0], Coro::rouse_cb;
# my ($body, $header) = Coro::rouse_wait;
# my $res;
# $res->{header} = \$body;
warn "[$$] end req:", time;
$req->[1]->send($res);
}
};
}
return $channel;
}
my $channel;
my $server = AnyEvent::MPRPC::Server->new( port => 4423 );
$server->reg_cb(
req => sub {
my ($res_cv, @params) = @_;
$server->{exit_guard}->begin;
$channel ||= build_channel;
my $cv = AE::cv;
$channel->put([ $params[0], $cv ]);
$cv->cb(sub {
my $res = shift->recv;
$res_cv->result( ${$res->{header}} );
});
},
);
my $pm = Parallel::Prefork->new({
max_workers => 4,
trap_signals => {
TERM => 'TERM',
HUP => 'TERM',
USR1 => undef,
}
});
while ( $pm->signal_received ne 'TERM' ) {
$pm->start and next;
my $exit = $server->{exit_guard} = AE::cv;
$exit->begin;
my $w; $w = AE::signal TERM => sub { $exit->end; undef $w };
$exit->recv;
$pm->finish;
}
$pm->wait_all_children();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment