Skip to content

Instantly share code, notes, and snippets.

@mala
Created June 4, 2009 11:46
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mala/123579 to your computer and use it in GitHub Desktop.
Save mala/123579 to your computer and use it in GitHub Desktop.
package Coro::Mysql::DBI;
use strict;
use warnings;
use DBI;
use Coro;
use Coro::Mysql;
use Hash::FieldHash qw(:all);
# our $ROOT_CLASS = 'DBIx::ContextualFetch';
our $ROOT_CLASS = 'DBI';
# MAX CONNECTION for same DB
our $MAX_CACHED_CONNECTIONS = 5;
# unblocking
our $unblock = 1;
# private
fieldhash my %coro_of;
my $conn_cache;
# connect & managed by Coro
sub connect {
my $class = shift;
my @info = @_;
my $current = $Coro::current;
my $dbh = $ROOT_CLASS->connect(@_);
Coro::Mysql::unblock $dbh if $unblock;
$coro_of{$dbh} = $current;
$current->on_destroy(sub { $coro_of{$dbh} = undef });
return $dbh;
}
# connect_cached by coro, but other coro create new connection
sub connect_cached {
my $class = shift;
my @info = @_;
my $current = $Coro::current;
my $cache = ($conn_cache ||= Coro::Mysql::DBI::ConnCache->new($MAX_CACHED_CONNECTIONS));
my $key = _cache_key(@info);
my $dbh = $cache->get($key);
unless ($dbh) {
# if connections of $key > $MAX_CACHED_CONNECTIONS, wait for other coro release $dbh
my $lock_success = $cache->get_lock($key);
if (!$lock_success) {
$cache->wait_for_release($key);
# retry find cached dbh
$dbh = $cache->get($key);
}
unless ($dbh) {
$dbh = $ROOT_CLASS->connect(@info);
Coro::Mysql::unblock $dbh if $unblock;
$cache->set($key, $dbh);
}
}
$coro_of{$dbh} = $current;
$current->on_destroy(sub {
$coro_of{$dbh} = undef;
$cache->release_lock($key);
});
return $dbh;
}
sub coro_of {
my ($class, $dbh) = @_;
return unless $dbh;
$coro_of{$dbh};
}
# copy from DBI
sub _cache_key {
my ($dsn, $user, $auth, $attr) = @_;
$attr ||= {};
my $key = do { local $^W;
join "!\001", $dsn, $user, $auth, DBI::_concat_hash_sorted($attr, "=\001", ",\001", 0, 0)
};
}
1;
package Coro::Mysql::DBI::ConnCache;
use strict;
use warnings;
use Coro;
use Coro::Semaphore;
our $DEBUG;
my %CACHE;
my %LOCK;
sub new {
my ($class, $max) = @_;
bless {
max => $max,
}, $class;
}
sub get {
my ($self, $key) = @_;
my ($dbh) = $self->_select_usable_dbh($key);
return $dbh;
}
sub wait_for_release {
my ($self, $key) = @_;
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max}));
# warn $lock->count;
$lock->down;
}
sub get_lock {
my ($self, $key) = @_;
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max}));
# warn $lock->count;
$lock->try;
}
sub release_lock {
my ($self, $key) = @_;
my $lock = ($LOCK{$key} ||= Coro::Semaphore->new($self->{max}));
# warn $lock->count;
$lock->up;
}
sub get_active_count {
my ($self, $key) = @_;
my $count = $self->select_all_dbh($key);
return $count;
}
sub set {
my ($self, $key, $dbh) = @_;
my $cache = ($CACHE{$key} ||= []);
push @{$cache}, $dbh;
return 1;
}
sub _select_usable_dbh {
my ($self, $key) = @_;
my $cache = ($CACHE{$key} ||= []);
my @usable;
my $current = $Coro::current;
# warn $current;
my @pairs = map {
[$_, Coro::Mysql::DBI->coro_of($_)]
} @{$cache};
# cached by current coro
for my $pair (@pairs) {
my ($dbh, $coro) = @{$pair};
if ($coro && $coro == $current) {
return $dbh;
}
}
for my $pair (@pairs) {
my ($dbh, $coro) = @{$pair};
# used by other coro
next if $coro;
# reuse connection what is other coro used
# TODO: ping
# unless ($dbh && $dbh->FETCH('Active') && $dbh->ping) {
unless ($dbh && $dbh->FETCH('Active')) {
warn "dead $dbh" if $DEBUG;
$dbh = undef;
next;
}
push @usable, $dbh;
}
@{$cache} = grep { $_ } @{$cache};
return @usable;
}
sub _select_all_dbh {
my ($self, $key) = @_;
my $cache = ($CACHE{$key} ||= []);
return grep {$_} @{$cache};
}
1;
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment