Created
September 23, 2009 11:25
-
-
Save hakobe/191924 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use lib qw(./lib); | |
use Coro; | |
use Coro::Timer; | |
use Coro::Handle qw(unblock); | |
use Coro::LWP; | |
use LWP::UserAgent; | |
use XML::Feed; | |
use Regexp::Common qw(URI); | |
use Executor; | |
use CompletionService; | |
my $executor = Executor->new(20); # 全体で動作するWorkerの総数 | |
# ダウンロード用サービス | |
my $download_service = CompletionService->new( | |
executor => $executor, | |
); | |
# ファイル保存用サービス | |
my $filesave_service = CompletionService->new( | |
executor => $executor, | |
); | |
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss')) | |
or die "cannot fetch feed"; | |
my @entries = $feed->entries; | |
# 画像ダウンロードWorkerを登録 | |
for my $entry (@entries) { | |
$download_service->submit(sub { | |
my ($img_url) | |
= grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/); | |
return unless $img_url; | |
my ($name) = $img_url =~ m{([^/]+\.jpg$)}; | |
return unless $name; | |
warn "start downloading $img_url"; | |
my $ua = LWP::UserAgent->new; | |
my $res = $ua->get($img_url); | |
$res->is_success or die "request to $img_url failed"; | |
warn "finish downloading $img_url"; | |
warn "result: $name " . length($res->content); | |
[$name, $res->content]; | |
}); | |
} | |
# 画像ダウンロードWorkerが終了しだい結果をファイル保存サービスに投げる | |
# takeはもっとも早く終了したWorkerの結果を取得する | |
while (my $result = $download_service->take) { | |
$result = $result->(); | |
next unless $result->[0]; | |
my $name = $result->[0]; | |
my $content = $result->[1]; | |
$filesave_service->submit(sub { | |
warn "start saving $name"; | |
open my $fh, '>', $name | |
or die "cannot open $name: $!"; | |
$fh = unblock $fh; | |
print $fh $content; | |
close $fh; | |
warn "finish saving $name"; | |
$name; | |
}); | |
} | |
$executor->start; | |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package CompletionService; | |
use strict; | |
use warnings; | |
use Class::Accessor qw(antlers); | |
use Coro; | |
use Coro::AnyEvent; | |
use Coro::Channel; | |
use Executor; | |
has queue => (is => 'ro'); | |
has executor => (is => 'ro'); | |
sub new { | |
my $class = shift; | |
my $self = $class->SUPER::new({ | |
queue => Coro::Channel->new, | |
executor => Executor->new, | |
@_, | |
}); | |
$self; | |
} | |
sub start { shift->executor->schedule } | |
sub take { | |
my ($self) = @_; | |
$self->queue->get; | |
} | |
sub submit { | |
my ($self, $code) = @_; | |
$self->executor->submit($code, sub { | |
my $result = shift; | |
$self->queue->put($result); | |
}); | |
} | |
1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use lib qw(./lib); | |
use Executor; | |
use Coro; | |
use Coro::Timer; | |
use Coro::Handle qw(unblock); | |
use Coro::LWP; | |
use LWP::UserAgent; | |
use Coro::Debug; | |
our $server = new_unix_server Coro::Debug "/tmp/corodebug"; | |
use Regexp::Common qw(URI); | |
use XML::Feed; | |
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss')) | |
or die "cannot fetch feed"; | |
# 最大20このWorkerがはしるExecutorを作成 | |
my $executor = Executor->new(20); | |
for my $entry ($feed->entries) { | |
# 画像をダウンロードして保存するWorkerを登録 | |
$executor->execute( sub { | |
my ($img_url) | |
= grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/); | |
return unless $img_url; | |
my ($name) = $img_url =~ m{([^/]+\.jpg$)}; | |
warn "start downloading $img_url to $name"; | |
my $ua = LWP::UserAgent->new; | |
my $res = $ua->get($img_url); | |
$res->is_success or die "request to $img_url failed"; | |
open my $fh, '>', $name | |
or die "cannot open $name: $!"; | |
$fh = unblock $fh; | |
print $fh $res->content; | |
close $fh; | |
warn "finish downloading $img_url to $name"; | |
}); | |
} | |
$executor->start; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package Executor; | |
use strict; | |
use warnings; | |
use Class::Accessor qw(antlers); | |
use Coro; | |
use Coro::AnyEvent; | |
use Coro::Channel; | |
use Coro::Semaphore; | |
has lock => (is => 'ro'); | |
sub new { | |
my $class = shift; | |
my $size = shift || 10; | |
my $self = $class->SUPER::new({ | |
lock => Coro::Semaphore->new($size), | |
@_, | |
}); | |
$self; | |
} | |
sub start { schedule } | |
sub execute { | |
my ($self, $code) = @_; | |
my $coro = async { | |
my $guard = $self->lock->guard; | |
$code->(); | |
}; | |
} | |
sub submit { | |
my ($self, $code, $on_done) = @_; | |
my $f = $self->create_future($code, $on_done); | |
$f; | |
} | |
sub create_future { | |
my $self = shift; | |
my $code = shift; | |
my $on_done = shift; | |
my $result_store = Coro::Channel->new; | |
my $result = sub { | |
$result_store->get(); | |
}; | |
$self->execute( sub { | |
my $r = $code->(); | |
$result_store->put($r); | |
$on_done->($result) if $on_done; | |
}); | |
$result; | |
} | |
1; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
use strict; | |
use warnings; | |
use lib qw(./lib); | |
use Executor; | |
use Coro; | |
use Coro::LWP; | |
use LWP::UserAgent; | |
my $executor = Executor->new(1); | |
# おもめの IO 処理を行う FutureTask | |
my $future = $executor->submit(sub { | |
my $ua = LWP::UserAgent->new; | |
my $res = $ua->get('http://d.hatena.ne.jp/diarylist?mode=newbierss'); | |
$res->is_success or die "request failed"; | |
$res->content; | |
}); | |
# !! おもめのIO処理が走っているスキにCPUでいろいろ実行する | |
# FutureTaskの結果を取得する | |
# 結果がすでにあればすぐ返ってくる | |
# 結果が計算しおわっていなければブロックする | |
my $result = $future->(); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment