hakobe (owner)

Revisions

gist: 191924 Download_button fork
public
Public Clone URL: git://gist.github.com/191924.git
Embed All Files: show embed
completion_service.pl #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
use strict;
use warnings;
use lib qw(./lib);
 
use Coro;
use Coro::Timer;
use Coro::Handle qw(unblock);
use Coro::LWP;
use LWP::UserAgent;
 
use XML::Feed;
use Regexp::Common qw(URI);
 
use Executor;
use CompletionService;
 
my $executor = Executor->new(20); # 全体で動作するWorkerの総数
 
# ダウンロード用サービス
my $download_service = CompletionService->new(
    executor => $executor,
);
 
# ファイル保存用サービス
my $filesave_service = CompletionService->new(
    executor => $executor,
);
 
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss'))
    or die "cannot fetch feed";
my @entries = $feed->entries;
 
# 画像ダウンロードWorkerを登録
for my $entry (@entries) {
    $download_service->submit(sub {
        my ($img_url)
            = grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/);
        return unless $img_url;
 
        my ($name) = $img_url =~ m{([^/]+\.jpg$)};
        return unless $name;
 
        warn "start downloading $img_url";
 
        my $ua = LWP::UserAgent->new;
        my $res = $ua->get($img_url);
        $res->is_success or die "request to $img_url failed";
 
        warn "finish downloading $img_url";
 
        warn "result: $name " . length($res->content);
        [$name, $res->content];
    });
}
 
# 画像ダウンロードWorkerが終了しだい結果をファイル保存サービスに投げる
# takeはもっとも早く終了したWorkerの結果を取得する
while (my $result = $download_service->take) {
    $result = $result->();
    next unless $result->[0];
 
    my $name = $result->[0];
    my $content = $result->[1];
 
    $filesave_service->submit(sub {
        warn "start saving $name";
        
        open my $fh, '>', $name
            or die "cannot open $name: $!";
        $fh = unblock $fh;
        print $fh $content;
        close $fh;
 
        warn "finish saving $name";
        $name;
    });
}
 
$executor->start;
 
 
executor.pl #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
use strict;
use warnings;
use lib qw(./lib);
 
use Executor;
 
use Coro;
use Coro::Timer;
use Coro::Handle qw(unblock);
use Coro::LWP;
use LWP::UserAgent;
 
use Coro::Debug;
our $server = new_unix_server Coro::Debug "/tmp/corodebug";
 
use Regexp::Common qw(URI);
 
use XML::Feed;
my $feed = XML::Feed->parse(URI->new('http://hkstack.tumblr.com/rss'))
    or die "cannot fetch feed";
 
 
# 最大20このWorkerがはしるExecutorを作成
my $executor = Executor->new(20);
 
for my $entry ($feed->entries) {
    # 画像をダウンロードして保存するWorkerを登録
    $executor->execute( sub {
        my ($img_url)
            = grep {m/\.jpg$/xms} ($entry->content->body =~ m/($RE{URI}{HTTP})/);
        return unless $img_url;
 
        my ($name) = $img_url =~ m{([^/]+\.jpg$)};
 
        warn "start downloading $img_url to $name";
 
        my $ua = LWP::UserAgent->new;
        my $res = $ua->get($img_url);
        $res->is_success or die "request to $img_url failed";
 
        open my $fh, '>', $name
            or die "cannot open $name: $!";
        $fh = unblock $fh;
        print $fh $res->content;
        close $fh;
 
        warn "finish downloading $img_url to $name";
    });
}
 
$executor->start;
 
future.pl #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
use strict;
use warnings;
use lib qw(./lib);
 
use Executor;
 
use Coro;
use Coro::LWP;
use LWP::UserAgent;
 
my $executor = Executor->new(1);
 
# おもめの IO 処理を行う FutureTask
my $future = $executor->submit(sub {
    my $ua = LWP::UserAgent->new;
    my $res = $ua->get('http://d.hatena.ne.jp/diarylist?mode=newbierss');
    $res->is_success or die "request failed";
 
    $res->content;
});
 
 
# !! おもめのIO処理が走っているスキにCPUでいろいろ実行する
 
 
# FutureTaskの結果を取得する
# 結果がすでにあればすぐ返ってくる
# 結果が計算しおわっていなければブロックする
my $result = $future->();
 
lib/CompletionService.pm #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package CompletionService;
use strict;
use warnings;
 
use Class::Accessor qw(antlers);
 
use Coro;
use Coro::AnyEvent;
 
use Coro::Channel;
 
use Executor;
 
has queue => (is => 'ro');
has executor => (is => 'ro');
 
sub new {
    my $class = shift;
    my $self = $class->SUPER::new({
        queue => Coro::Channel->new,
        executor => Executor->new,
        @_,
    });
 
    $self;
}
 
sub start { shift->executor->schedule }
 
sub take {
    my ($self) = @_;
    $self->queue->get;
}
 
sub submit {
    my ($self, $code) = @_;
    $self->executor->submit($code, sub {
        my $result = shift;
        $self->queue->put($result);
    });
}
 
1;
 
lib/Executor.pm #
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
package Executor;
use strict;
use warnings;
 
use Class::Accessor qw(antlers);
 
use Coro;
use Coro::AnyEvent;
 
use Coro::Channel;
use Coro::Semaphore;
 
has lock => (is => 'ro');
 
sub new {
    my $class = shift;
    my $size = shift || 10;
    my $self = $class->SUPER::new({
        lock => Coro::Semaphore->new($size),
        @_,
    });
 
    $self;
}
 
sub start { schedule }
 
sub execute {
    my ($self, $code) = @_;
 
    my $coro = async {
        my $guard = $self->lock->guard;
        $code->();
    };
}
 
sub submit {
    my ($self, $code, $on_done) = @_;
 
    my $f = $self->create_future($code, $on_done);
 
    $f;
}
 
sub create_future {
    my $self = shift;
    my $code = shift;
    my $on_done = shift;
    my $result_store = Coro::Channel->new;
    my $result = sub {
        $result_store->get();
    };
 
    $self->execute( sub {
        my $r = $code->();
        $result_store->put($r);
        $on_done->($result) if $on_done;
    });
 
    $result;
}
 
1;