Skip to content

Instantly share code, notes, and snippets.

@yappo
Last active August 29, 2015 13:56
Show Gist options
  • Save yappo/8953049 to your computer and use it in GitHub Desktop.
Save yappo/8953049 to your computer and use it in GitHub Desktop.
#!/usr/bin/env perl
use strict;
use warnings;
use v5.10;
use AnyEvent;
use Benchmark ':all';
use DBI;
use Parallel::ForkManager;
use IO::Select;
my $max_process = 8;
my $db_user = '';
my $db_pass = '';
my %dbh_host = (
a => 'db-a.example.com',
b => 'db-b.example.com',
c => 'db-c.example.com',
);
my %enable_server = (
a => 1,
b => 1,
c => 1,
);
sub create_dbh {
(
DBI->connect("dbi:mysql:database=test;host=$dbh_host{a}", $db_user, $db_pass),
DBI->connect("dbi:mysql:database=test;host=$dbh_host{b}", $db_user, $db_pass),
DBI->connect("dbi:mysql:database=test;host=$dbh_host{c}", $db_user, $db_pass),
);
}
my $query_a = q{SELECT SLEEP(0.05)};
my $query_b = q{SELECT SLEEP(0.05)};
my $query_c = q{SELECT SLEEP(0.05)};
my $code = {
'Normal' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
if ($enable_server{a}) {
my $sth_a = $dbh_a->prepare($query_a);
$sth_a->execute;
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
}
if ($enable_server{a}) {
my $sth_b = $dbh_b->prepare($query_b);
$sth_b->execute;
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
}
if ($enable_server{a}) {
my $sth_c = $dbh_c->prepare($query_c);
$sth_c->execute;
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
}
},
'Async-Serial' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
if ($enable_server{a}) {
my $cv = AE::cv;
my $sth_a = $dbh_a->prepare($query_a, { async => 1 });
$sth_a->execute;
$cv->begin;
my $timer_a; $timer_a = AnyEvent->io(
fh => $dbh_a->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
$cv->end('a end');
undef $timer_a;
}
);
$cv->recv;
}
if ($enable_server{b}) {
my $cv = AE::cv;
my $sth_b = $dbh_b->prepare($query_b, { async => 1 });
$sth_b->execute;
$cv->begin;
my $timer_b; $timer_b = AnyEvent->io(
fh => $dbh_b->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
$cv->end('b end');
undef $timer_b;
}
);
$cv->recv;
}
if ($enable_server{c}) {
my $cv = AE::cv;
my $sth_c = $dbh_c->prepare($query_c, { async => 1 });
$sth_c->execute;
$cv->begin;
my $timer_c; $timer_c = AnyEvent->io(
fh => $dbh_c->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
$cv->end('c end');
undef $timer_c;
}
);
$cv->recv;
}
},
'Async-Parallel' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
my $cv = AE::cv;
if ($enable_server{a}) {
my $sth_a = $dbh_a->prepare($query_a, { async => 1 });
$sth_a->execute;
$cv->begin;
my $timer_a; $timer_a = AnyEvent->io(
fh => $dbh_a->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
$cv->end('a end');
undef $timer_a;
}
);
}
if ($enable_server{b}) {
my $sth_b = $dbh_b->prepare($query_b, { async => 1 });
$sth_b->execute;
$cv->begin;
my $timer_b; $timer_b = AnyEvent->io(
fh => $dbh_b->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
$cv->end('b end');
undef $timer_b;
}
);
}
if ($enable_server{c}) {
my $sth_c = $dbh_c->prepare($query_c, { async => 1 });
$sth_c->execute;
$cv->begin;
my $timer_c; $timer_c = AnyEvent->io(
fh => $dbh_c->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
$cv->end('c end');
undef $timer_c;
}
);
}
$cv->recv;
},
'IO-Select' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
my $sth_a = $dbh_a->prepare($query_a, { async => 1 });
$sth_a->execute;
my $sth_b = $dbh_b->prepare($query_b, { async => 1 });
$sth_b->execute;
my $sth_c = $dbh_c->prepare($query_c, { async => 1 });
$sth_c->execute;
my $select = IO::Select->new(
$dbh_a->mysql_fd,
$dbh_b->mysql_fd,
$dbh_c->mysql_fd,
);
while ($select->count) {
my @fd = $select->can_read;
$select->remove(@fd);
}
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
die unless scalar(@list_a) == 1 && scalar(@list_b) == 1 && scalar(@list_c) == 1;
},
};
say '1 process';
cmpthese(200, $code);
say "$max_process process";
cmpthese(100, {
'Normal' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Normal'}();
$pm->finish;
}
$pm->wait_all_children;
},
'Async-Serial' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Async-Serial'}();
$pm->finish;
}
$pm->wait_all_children;
},
'Async-Parallel' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Async-Parallel'}();
$pm->finish;
}
$pm->wait_all_children;
},
'IO-Select' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'IO-Select'}();
$pm->finish;
}
$pm->wait_all_children;
},
});
__END__
1 process
(warning: too few iterations for a reliable count)
(warning: too few iterations for a reliable count)
(warning: too few iterations for a reliable count)
(warning: too few iterations for a reliable count)
Rate Async-Parallel Async-Serial Normal IO-Select
Async-Parallel 556/s -- -0% -17% -19%
Async-Serial 556/s 0% -- -17% -19%
Normal 667/s 20% 20% -- -3%
IO-Select 690/s 24% 24% 3% --
8 process
Rate Async-Parallel Async-Serial Normal IO-Select
Async-Parallel 17.8/s -- -6% -6% -11%
Async-Serial 18.9/s 6% -- -1% -6%
Normal 19.0/s 7% 1% -- -6%
IO-Select 20.1/s 13% 6% 6% --
#!/usr/bin/env perl
use strict;
use warnings;
use v5.10;
use AnyEvent;
use Benchmark ':all';
use DBI;
use Parallel::ForkManager;
my $max_process = 8;
my $db_user = '';
my $db_pass = '';
my %dbh_host = (
a => 'db-a.example.com',
b => 'db-b.example.com',
c => 'db-c.example.com',
);
my %enable_server = (
a => 1,
b => 1,
c => 1,
);
sub create_dbh {
(
DBI->connect("dbi:mysql:database=test;host=$dbh_host{a}", $db_user, $db_pass),
DBI->connect("dbi:mysql:database=test;host=$dbh_host{b}", $db_user, $db_pass),
DBI->connect("dbi:mysql:database=test;host=$dbh_host{c}", $db_user, $db_pass),
);
}
my $query_a = q{SELECT SLEEP(0.05)};
my $query_b = q{SELECT SLEEP(0.05)};
my $query_c = q{SELECT SLEEP(0.05)};
my $code = {
'Normal' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
if ($enable_server{a}) {
my $sth_a = $dbh_a->prepare($query_a);
$sth_a->execute;
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
}
if ($enable_server{a}) {
my $sth_b = $dbh_b->prepare($query_b);
$sth_b->execute;
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
}
if ($enable_server{a}) {
my $sth_c = $dbh_c->prepare($query_c);
$sth_c->execute;
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
}
},
'Async-Serial' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
if ($enable_server{a}) {
my $cv = AE::cv;
my $sth_a = $dbh_a->prepare($query_a, { async => 1 });
$sth_a->execute;
$cv->begin;
my $timer_a; $timer_a = AnyEvent->io(
fh => $dbh_a->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
$cv->end('a end');
undef $timer_a;
}
);
$cv->recv;
}
if ($enable_server{b}) {
my $cv = AE::cv;
my $sth_b = $dbh_b->prepare($query_b, { async => 1 });
$sth_b->execute;
$cv->begin;
my $timer_b; $timer_b = AnyEvent->io(
fh => $dbh_b->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
$cv->end('b end');
undef $timer_b;
}
);
$cv->recv;
}
if ($enable_server{c}) {
my $cv = AE::cv;
my $sth_c = $dbh_c->prepare($query_c, { async => 1 });
$sth_c->execute;
$cv->begin;
my $timer_c; $timer_c = AnyEvent->io(
fh => $dbh_c->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
$cv->end('c end');
undef $timer_c;
}
);
$cv->recv;
}
},
'Async-Parallel' => sub {
my($dbh_a, $dbh_b, $dbh_c) = create_dbh();
my(@list_a, @list_b, @list_c);
my $cv = AE::cv;
if ($enable_server{a}) {
my $sth_a = $dbh_a->prepare($query_a, { async => 1 });
$sth_a->execute;
$cv->begin;
my $timer_a; $timer_a = AnyEvent->io(
fh => $dbh_a->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_a->fetchrow_hashref) {
push @list_a, $row;
}
$cv->end('a end');
undef $timer_a;
}
);
}
if ($enable_server{b}) {
my $sth_b = $dbh_b->prepare($query_b, { async => 1 });
$sth_b->execute;
$cv->begin;
my $timer_b; $timer_b = AnyEvent->io(
fh => $dbh_b->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_b->fetchrow_hashref) {
push @list_b, $row;
}
$cv->end('b end');
undef $timer_b;
}
);
}
if ($enable_server{c}) {
my $sth_c = $dbh_c->prepare($query_c, { async => 1 });
$sth_c->execute;
$cv->begin;
my $timer_c; $timer_c = AnyEvent->io(
fh => $dbh_c->mysql_fd,
poll => 'r',
cb => sub {
while (my $row = $sth_c->fetchrow_hashref) {
push @list_c, $row;
}
$cv->end('c end');
undef $timer_c;
}
);
}
$cv->recv;
},
};
say '1 process';
cmpthese(1_000, $code);
say "$max_process process";
cmpthese(500, {
'Normal' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Normal'}();
$pm->finish;
}
$pm->wait_all_children;
},
'Async-Serial' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Async-Serial'}();
$pm->finish;
}
$pm->wait_all_children;
},
'Async-Parallel' => sub {
my $pm = Parallel::ForkManager->new($max_process);
for my $i ( 1..$max_process ) {
$pm->start and next;
$code->{'Async-Parallel'}();
$pm->finish;
}
$pm->wait_all_children;
}
});
__END__
1 process
Rate Async-Serial Async-Parallel Normal
Async-Serial 546/s -- -4% -15%
Async-Parallel 568/s 4% -- -11%
Normal 641/s 17% 13% --
4 process
Rate Async-Parallel Async-Serial Normal
Async-Parallel 33.2/s -- -5% -11%
Async-Serial 35.0/s 6% -- -6%
Normal 37.3/s 13% 7% --
8 process
Rate Async-Parallel Async-Serial Normal
Async-Parallel 17.9/s -- -7% -9%
Async-Serial 19.3/s 8% -- -2%
Normal 19.7/s 10% 2% --
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment