-
-
Save anonymous/dced79d80a7f1fe5070d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/lib/Mojo/Pg.pm b/lib/Mojo/Pg.pm | |
index 4870d84..1d862f3 100644 | |
--- a/lib/Mojo/Pg.pm | |
+++ b/lib/Mojo/Pg.pm | |
@@ -21,8 +21,7 @@ has options => sub { | |
AutoCommit => 1, | |
AutoInactiveDestroy => 1, | |
PrintError => 0, | |
- RaiseError => 1, | |
- pg_server_prepare => 0 | |
+ RaiseError => 1 | |
}; | |
}; | |
has [qw(password username)] => ''; | |
@@ -40,8 +39,13 @@ sub db { | |
# Fork-safety | |
delete @$self{qw(pid queue)} unless ($self->{pid} //= $$) eq $$; | |
- my ($dbh, $handle) = @{$self->_dequeue}; | |
- return Mojo::Pg::Database->new(dbh => $dbh, handle => $handle, pg => $self); | |
+ my ($dbh, $handle, $cache) = @{$self->_dequeue}; | |
+ return Mojo::Pg::Database->new( | |
+ cache => $cache, | |
+ dbh => $dbh, | |
+ handle => $handle, | |
+ pg => $self | |
+ ); | |
} | |
sub from_string { | |
@@ -88,9 +92,9 @@ sub _dequeue { | |
} | |
sub _enqueue { | |
- my ($self, $dbh, $handle) = @_; | |
+ my ($self, $dbh, $handle, $cache) = @_; | |
my $queue = $self->{queue} ||= []; | |
- push @$queue, [$dbh, $handle] if $dbh->{Active}; | |
+ push @$queue, [$dbh, $handle, $cache] if $dbh->{Active}; | |
shift @$queue while @$queue > $self->max_connections; | |
} | |
diff --git a/lib/Mojo/Pg/Database.pm b/lib/Mojo/Pg/Database.pm | |
index eb8d0bf..94da642 100644 | |
--- a/lib/Mojo/Pg/Database.pm | |
+++ b/lib/Mojo/Pg/Database.pm | |
@@ -11,6 +11,7 @@ use Mojo::Util 'deprecated'; | |
use Scalar::Util 'weaken'; | |
has [qw(dbh pg)]; | |
+has max_statements => 10; | |
sub DESTROY { | |
my $self = shift; | |
@@ -19,7 +20,7 @@ sub DESTROY { | |
$_->{cb}($self, 'Premature connection close', undef) for @$waiting; | |
return unless (my $pg = $self->pg) && (my $dbh = $self->dbh); | |
- $pg->_enqueue($dbh, $self->{handle}); | |
+ $pg->_enqueue($dbh, @$self{qw(handle cache)}); | |
} | |
sub backlog { scalar @{shift->{waiting} || []} } | |
@@ -88,19 +89,19 @@ sub query { | |
my @values = map { _json($_) ? encode_json $_->{json} : $_ } @_; | |
# Dollar only | |
- my $dbh = $self->dbh; | |
- local $dbh->{pg_placeholder_dollaronly} = 1 if delete $self->{dollar_only}; | |
+ local $self->dbh->{pg_placeholder_dollaronly} = 1 | |
+ if delete $self->{dollar_only}; | |
# Blocking | |
unless ($cb) { | |
- my $sth = $dbh->prepare($query); | |
+ my $sth = $self->_dequeue($query); | |
$sth->execute(@values); | |
$self->_notifications; | |
- return Mojo::Pg::Results->new(sth => $sth); | |
+ return Mojo::Pg::Results->new(db => $self, sth => $sth); | |
} | |
# Non-blocking | |
- my $sth = $dbh->prepare($query, {pg_async => PG_ASYNC}); | |
+ my $sth = $self->_dequeue($query, 1); | |
push @{$self->{waiting}}, {args => \@values, cb => $cb, sth => $sth}; | |
$self->$_ for qw(_next _watch); | |
} | |
@@ -117,6 +118,25 @@ sub unlisten { | |
return $self; | |
} | |
+sub _dequeue { | |
+ my ($self, $query, $async) = @_; | |
+ | |
+ my $queue = $self->{queue} ||= []; | |
+ for (my $i = 0; $i <= $#$queue; $i++) { | |
+ my $sth = $queue->[$i]; | |
+ return splice @$queue, $i, 1 | |
+ if !(!$sth->{pg_async} ^ !$async) && $sth->{Statement} eq $query; | |
+ } | |
+ | |
+ return $self->dbh->prepare($query, $async ? {pg_async => PG_ASYNC} : ()); | |
+} | |
+ | |
+sub _enqueue { | |
+ my ($self, $sth) = @_; | |
+ push @{$self->{queue}}, $sth; | |
+ shift @{$self->{queue}} while @{$self->{queue}} > $self->max_statements; | |
+} | |
+ | |
sub _json { ref $_[0] eq 'HASH' && (keys %{$_[0]})[0] eq 'json' } | |
sub _next { | |
@@ -157,7 +177,7 @@ sub _watch { | |
my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result }; | |
my $err = defined $result ? undef : $dbh->errstr; | |
- $self->$cb($err, Mojo::Pg::Results->new(sth => $sth)); | |
+ $self->$cb($err, Mojo::Pg::Results->new(db => $self, sth => $sth)); | |
$self->_next; | |
$self->_unwatch unless $self->backlog || $self->is_listening; | |
} | |
diff --git a/lib/Mojo/Pg/Results.pm b/lib/Mojo/Pg/Results.pm | |
index 72bf8ca..d802921 100644 | |
--- a/lib/Mojo/Pg/Results.pm | |
+++ b/lib/Mojo/Pg/Results.pm | |
@@ -5,7 +5,12 @@ use Mojo::Collection; | |
use Mojo::JSON 'decode_json'; | |
use Mojo::Util 'tablify'; | |
-has 'sth'; | |
+has [qw(db sth)]; | |
+ | |
+sub DESTROY { | |
+ my $self = shift; | |
+ if ((my $db = $self->db) && (my $sth = $self->sth)) { $db->_enqueue($sth) } | |
+} | |
sub array { ($_[0]->_expand($_[0]->sth->fetchrow_arrayref))[0] } |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment