Skip to content

Instantly share code, notes, and snippets.

/sth.diff Secret

Created March 17, 2015 23:20
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anonymous/dced79d80a7f1fe5070d to your computer and use it in GitHub Desktop.
Save anonymous/dced79d80a7f1fe5070d to your computer and use it in GitHub Desktop.
diff --git a/lib/Mojo/Pg.pm b/lib/Mojo/Pg.pm
index 4870d84..1d862f3 100644
--- a/lib/Mojo/Pg.pm
+++ b/lib/Mojo/Pg.pm
@@ -21,8 +21,7 @@ has options => sub {
AutoCommit => 1,
AutoInactiveDestroy => 1,
PrintError => 0,
- RaiseError => 1,
- pg_server_prepare => 0
+ RaiseError => 1
};
};
has [qw(password username)] => '';
@@ -40,8 +39,13 @@ sub db {
# Fork-safety
delete @$self{qw(pid queue)} unless ($self->{pid} //= $$) eq $$;
- my ($dbh, $handle) = @{$self->_dequeue};
- return Mojo::Pg::Database->new(dbh => $dbh, handle => $handle, pg => $self);
+ my ($dbh, $handle, $cache) = @{$self->_dequeue};
+ return Mojo::Pg::Database->new(
+ cache => $cache,
+ dbh => $dbh,
+ handle => $handle,
+ pg => $self
+ );
}
sub from_string {
@@ -88,9 +92,9 @@ sub _dequeue {
}
sub _enqueue {
- my ($self, $dbh, $handle) = @_;
+ my ($self, $dbh, $handle, $cache) = @_;
my $queue = $self->{queue} ||= [];
- push @$queue, [$dbh, $handle] if $dbh->{Active};
+ push @$queue, [$dbh, $handle, $cache] if $dbh->{Active};
shift @$queue while @$queue > $self->max_connections;
}
diff --git a/lib/Mojo/Pg/Database.pm b/lib/Mojo/Pg/Database.pm
index eb8d0bf..94da642 100644
--- a/lib/Mojo/Pg/Database.pm
+++ b/lib/Mojo/Pg/Database.pm
@@ -11,6 +11,7 @@ use Mojo::Util 'deprecated';
use Scalar::Util 'weaken';
has [qw(dbh pg)];
+has max_statements => 10;
sub DESTROY {
my $self = shift;
@@ -19,7 +20,7 @@ sub DESTROY {
$_->{cb}($self, 'Premature connection close', undef) for @$waiting;
return unless (my $pg = $self->pg) && (my $dbh = $self->dbh);
- $pg->_enqueue($dbh, $self->{handle});
+ $pg->_enqueue($dbh, @$self{qw(handle cache)});
}
sub backlog { scalar @{shift->{waiting} || []} }
@@ -88,19 +89,19 @@ sub query {
my @values = map { _json($_) ? encode_json $_->{json} : $_ } @_;
# Dollar only
- my $dbh = $self->dbh;
- local $dbh->{pg_placeholder_dollaronly} = 1 if delete $self->{dollar_only};
+ local $self->dbh->{pg_placeholder_dollaronly} = 1
+ if delete $self->{dollar_only};
# Blocking
unless ($cb) {
- my $sth = $dbh->prepare($query);
+ my $sth = $self->_dequeue($query);
$sth->execute(@values);
$self->_notifications;
- return Mojo::Pg::Results->new(sth => $sth);
+ return Mojo::Pg::Results->new(db => $self, sth => $sth);
}
# Non-blocking
- my $sth = $dbh->prepare($query, {pg_async => PG_ASYNC});
+ my $sth = $self->_dequeue($query, 1);
push @{$self->{waiting}}, {args => \@values, cb => $cb, sth => $sth};
$self->$_ for qw(_next _watch);
}
@@ -117,6 +118,25 @@ sub unlisten {
return $self;
}
+sub _dequeue {
+ my ($self, $query, $async) = @_;
+
+ my $queue = $self->{queue} ||= [];
+ for (my $i = 0; $i <= $#$queue; $i++) {
+ my $sth = $queue->[$i];
+ return splice @$queue, $i, 1
+ if !(!$sth->{pg_async} ^ !$async) && $sth->{Statement} eq $query;
+ }
+
+ return $self->dbh->prepare($query, $async ? {pg_async => PG_ASYNC} : ());
+}
+
+sub _enqueue {
+ my ($self, $sth) = @_;
+ push @{$self->{queue}}, $sth;
+ shift @{$self->{queue}} while @{$self->{queue}} > $self->max_statements;
+}
+
sub _json { ref $_[0] eq 'HASH' && (keys %{$_[0]})[0] eq 'json' }
sub _next {
@@ -157,7 +177,7 @@ sub _watch {
my $result = do { local $dbh->{RaiseError} = 0; $dbh->pg_result };
my $err = defined $result ? undef : $dbh->errstr;
- $self->$cb($err, Mojo::Pg::Results->new(sth => $sth));
+ $self->$cb($err, Mojo::Pg::Results->new(db => $self, sth => $sth));
$self->_next;
$self->_unwatch unless $self->backlog || $self->is_listening;
}
diff --git a/lib/Mojo/Pg/Results.pm b/lib/Mojo/Pg/Results.pm
index 72bf8ca..d802921 100644
--- a/lib/Mojo/Pg/Results.pm
+++ b/lib/Mojo/Pg/Results.pm
@@ -5,7 +5,12 @@ use Mojo::Collection;
use Mojo::JSON 'decode_json';
use Mojo::Util 'tablify';
-has 'sth';
+has [qw(db sth)];
+
+sub DESTROY {
+ my $self = shift;
+ if ((my $db = $self->db) && (my $sth = $self->sth)) { $db->_enqueue($sth) }
+}
sub array { ($_[0]->_expand($_[0]->sth->fetchrow_arrayref))[0] }
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment