Skip to content

Instantly share code, notes, and snippets.

@hakobe
Created September 23, 2009 11:25
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save hakobe/191924 to your computer and use it in GitHub Desktop.
Save hakobe/191924 to your computer and use it in GitHub Desktop.
use strict;
use warnings;
use lib qw(./lib);
use Coro;
use Coro::Timer;
use Coro::Handle qw(unblock);
use Coro::LWP;
use LWP::UserAgent;
use XML::Feed;
use Regexp::Common qw(URI);
use Executor;
use CompletionService;
my $executor = Executor->new(20); # 全体で動作するWorkerの総数
# ダウンロード用サービス
my $download_service = CompletionService->new(
executor => $executor,
);
# ファイル保存用サービス
my $filesave_service = CompletionService->new(
executor => $executor,
);
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss'))
or die "cannot fetch feed";
my @entries = $feed->entries;
# 画像ダウンロードWorkerを登録
for my $entry (@entries) {
$download_service->submit(sub {
my ($img_url)
= grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/);
return unless $img_url;
my ($name) = $img_url =~ m{([^/]+\.jpg$)};
return unless $name;
warn "start downloading $img_url";
my $ua = LWP::UserAgent->new;
my $res = $ua->get($img_url);
$res->is_success or die "request to $img_url failed";
warn "finish downloading $img_url";
warn "result: $name " . length($res->content);
[$name, $res->content];
});
}
# 画像ダウンロードWorkerが終了しだい結果をファイル保存サービスに投げる
# takeはもっとも早く終了したWorkerの結果を取得する
while (my $result = $download_service->take) {
$result = $result->();
next unless $result->[0];
my $name = $result->[0];
my $content = $result->[1];
$filesave_service->submit(sub {
warn "start saving $name";
open my $fh, '>', $name
or die "cannot open $name: $!";
$fh = unblock $fh;
print $fh $content;
close $fh;
warn "finish saving $name";
$name;
});
}
$executor->start;
package CompletionService;
use strict;
use warnings;
use Class::Accessor qw(antlers);
use Coro;
use Coro::AnyEvent;
use Coro::Channel;
use Executor;
has queue => (is => 'ro');
has executor => (is => 'ro');
sub new {
my $class = shift;
my $self = $class->SUPER::new({
queue => Coro::Channel->new,
executor => Executor->new,
@_,
});
$self;
}
sub start { shift->executor->schedule }
sub take {
my ($self) = @_;
$self->queue->get;
}
sub submit {
my ($self, $code) = @_;
$self->executor->submit($code, sub {
my $result = shift;
$self->queue->put($result);
});
}
1;
use strict;
use warnings;
use lib qw(./lib);
use Executor;
use Coro;
use Coro::Timer;
use Coro::Handle qw(unblock);
use Coro::LWP;
use LWP::UserAgent;
use Coro::Debug;
our $server = new_unix_server Coro::Debug "/tmp/corodebug";
use Regexp::Common qw(URI);
use XML::Feed;
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss'))
or die "cannot fetch feed";
# 最大20このWorkerがはしるExecutorを作成
my $executor = Executor->new(20);
for my $entry ($feed->entries) {
# 画像をダウンロードして保存するWorkerを登録
$executor->execute( sub {
my ($img_url)
= grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/);
return unless $img_url;
my ($name) = $img_url =~ m{([^/]+\.jpg$)};
warn "start downloading $img_url to $name";
my $ua = LWP::UserAgent->new;
my $res = $ua->get($img_url);
$res->is_success or die "request to $img_url failed";
open my $fh, '>', $name
or die "cannot open $name: $!";
$fh = unblock $fh;
print $fh $res->content;
close $fh;
warn "finish downloading $img_url to $name";
});
}
$executor->start;
package Executor;
use strict;
use warnings;
use Class::Accessor qw(antlers);
use Coro;
use Coro::AnyEvent;
use Coro::Channel;
use Coro::Semaphore;
has lock => (is => 'ro');
sub new {
my $class = shift;
my $size = shift || 10;
my $self = $class->SUPER::new({
lock => Coro::Semaphore->new($size),
@_,
});
$self;
}
sub start { schedule }
sub execute {
my ($self, $code) = @_;
my $coro = async {
my $guard = $self->lock->guard;
$code->();
};
}
sub submit {
my ($self, $code, $on_done) = @_;
my $f = $self->create_future($code, $on_done);
$f;
}
sub create_future {
my $self = shift;
my $code = shift;
my $on_done = shift;
my $result_store = Coro::Channel->new;
my $result = sub {
$result_store->get();
};
$self->execute( sub {
my $r = $code->();
$result_store->put($r);
$on_done->($result) if $on_done;
});
$result;
}
1;
use strict;
use warnings;
use lib qw(./lib);
use Executor;
use Coro;
use Coro::LWP;
use LWP::UserAgent;
my $executor = Executor->new(1);
# おもめの IO 処理を行う FutureTask
my $future = $executor->submit(sub {
my $ua = LWP::UserAgent->new;
my $res = $ua->get('http://d.hatena.ne.jp/diarylist?mode=newbierss');
$res->is_success or die "request failed";
$res->content;
});
# !! おもめのIO処理が走っているスキにCPUでいろいろ実行する
# FutureTaskの結果を取得する
# 結果がすでにあればすぐ返ってくる
# 結果が計算しおわっていなければブロックする
my $result = $future->();
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment