Skip to content

Instantly share code, notes, and snippets.

@mp911de
Created October 29, 2019 15:20
Show Gist options
  • Save mp911de/c9c739bee64c40f666f44c302d75f7d5 to your computer and use it in GitHub Desktop.
Save mp911de/c9c739bee64c40f666f44c302d75f7d5 to your computer and use it in GitHub Desktop.
R2DBC Postgres benchmarks
Benchmark Mode Cnt Score Error Units
StatementBenchmarks.extendedJdbc thrpt 5 23971,062 ± 2809,755 ops/s
StatementBenchmarks.extendedR2dbc thrpt 5 15656,539 ± 726,292 ops/s
StatementBenchmarks.extendedR2dbc10 thrpt 5 39049,901 ± 770,095 ops/s
StatementBenchmarks.extendedR2dbc5 thrpt 5 32766,778 ± 3271,027 ops/s
StatementBenchmarks.extendedVertx thrpt 5 6187,994 ± 265,470 ops/s
StatementBenchmarks.extendedVertxPipelining10 thrpt 5 13585,651 ± 1173,464 ops/s
StatementBenchmarks.extendedVertxPipelining5 thrpt 5 11213,209 ± 1431,447 ops/s
StatementBenchmarks.simpleJdbc thrpt 5 15329,027 ± 666,951 ops/s
StatementBenchmarks.simpleR2dbc thrpt 5 12389,330 ± 641,802 ops/s
StatementBenchmarks.simpleR2dbcPipelining10 thrpt 5 23470,295 ± 4802,142 ops/s
StatementBenchmarks.simpleR2dbcPipelining5 thrpt 5 17821,469 ± 5068,742 ops/s
StatementBenchmarks.simpleVertx thrpt 5 12747,444 ± 1882,564 ops/s
StatementBenchmarks.simpleVertxPipelining10 thrpt 5 25945,471 ± 812,527 ops/s
StatementBenchmarks.simpleVertxPipelining5 thrpt 5 22406,710 ± 4550,162 ops/s
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.r2dbc.postgresql;
import io.r2dbc.postgresql.util.PostgresqlServerExtension;
import io.vertx.core.Vertx;
import io.vertx.pgclient.PgConnectOptions;
import io.vertx.pgclient.PgConnection;
import io.vertx.sqlclient.PreparedQuery;
import io.vertx.sqlclient.Row;
import io.vertx.sqlclient.Tuple;
import org.junit.platform.commons.annotation.Testable;
import org.openjdk.jmh.annotations.Benchmark;
import org.openjdk.jmh.annotations.BenchmarkMode;
import org.openjdk.jmh.annotations.Mode;
import org.openjdk.jmh.annotations.OperationsPerInvocation;
import org.openjdk.jmh.annotations.OutputTimeUnit;
import org.openjdk.jmh.annotations.Scope;
import org.openjdk.jmh.annotations.State;
import org.openjdk.jmh.annotations.TearDown;
import org.openjdk.jmh.infra.Blackhole;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
/**
* Benchmarks for Statement execution modes. Contains the following execution methods:
*/
@BenchmarkMode(Mode.Throughput)
@OutputTimeUnit(TimeUnit.SECONDS)
@Testable
@SuppressWarnings("all")
public class StatementBenchmarks extends BenchmarkSettings {
private static PostgresqlServerExtension extension = new PostgresqlServerExtension();
@State(Scope.Benchmark)
public static class ConnectionHolder {
final Connection jdbc;
final io.r2dbc.spi.Connection r2dbc;
final PgConnection connection;
final Vertx vertx = Vertx.vertx();
public ConnectionHolder() {
extension.initialize();
try {
jdbc = extension.getDataSource().getConnection();
Statement statement = jdbc.createStatement();
try {
statement.execute("DROP TABLE IF EXISTS simple_test");
} catch (SQLException e) {
}
statement.execute("CREATE TABLE simple_test (name VARCHAR(255))");
statement.execute("INSERT INTO simple_test VALUES('foo')");
statement.execute("INSERT INTO simple_test VALUES('bar')");
statement.execute("INSERT INTO simple_test VALUES('baz')");
jdbc.setAutoCommit(false);
r2dbc = new PostgresqlConnectionFactory(extension.getConnectionConfiguration()).create().block();
Mono.from(r2dbc.setAutoCommit(false)).block();
PgConnectOptions pgConnectOptions = PgConnectOptions.fromUri("postgresql://postgres:postgres@localhost:5432/postgres");
CompletableFuture<PgConnection> future = new CompletableFuture<>();
PgConnection.connect(vertx, pgConnectOptions, pgConnectionAsyncResult -> future.complete(pgConnectionAsyncResult.result()));
connection = future.join();
connection.query("BEGIN", rowSetAsyncResult -> {
});
} catch (SQLException e) {
throw new RuntimeException(e);
}
}
@TearDown
public void cleanup() {
connection.close();
vertx.close();
}
}
@Benchmark
public void simpleJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws SQLException {
Statement statement = connectionHolder.jdbc.createStatement();
statement.setFetchSize(0);
ResultSet resultSet = statement.executeQuery("SELECT * FROM simple_test");
while (resultSet.next()) {
voodoo.consume(resultSet.getString("name"));
}
resultSet.close();
statement.close();
}
@Benchmark
public void simpleR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) {
io.r2dbc.spi.Statement statement = connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test");
String name = Flux.from(statement.execute()).flatMap(it -> it.map((row, rowMetadata) -> row.get("name", String.class))).blockLast();
voodoo.consume(name);
}
@Benchmark
@OperationsPerInvocation(5)
public void simpleR2dbcPipelining5(ConnectionHolder connectionHolder, Blackhole voodoo) {
io.r2dbc.spi.Statement statement = connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test");
Flux<String> execution = Flux.from(statement.execute()).flatMap(it -> it.map((row, rowMetadata) -> row.get("name", String.class)));
Object result = Flux.merge(execution, execution, execution, execution, execution).blockLast();
voodoo.consume(result);
}
@Benchmark
@OperationsPerInvocation(10)
public void simpleR2dbcPipelining10(ConnectionHolder connectionHolder, Blackhole voodoo) {
io.r2dbc.spi.Statement statement = connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test");
Flux<String> execution = Flux.from(statement.execute()).flatMap(it -> it.map((row, rowMetadata) -> row.get("name", String.class)));
Object result = Flux.merge(execution, execution, execution, execution, execution, execution, execution, execution, execution, execution).blockLast();
voodoo.consume(result);
}
@Benchmark
public void simpleVertx(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
connectionHolder.connection.query("SELECT * FROM simple_test", rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
latch.await();
}
@Benchmark
@OperationsPerInvocation(5)
public void simpleVertxPipelining5(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
for (int i = 0; i < 5; i++) {
connectionHolder.connection.query("SELECT * FROM simple_test", rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
}
latch.await();
}
@Benchmark
@OperationsPerInvocation(10)
public void simpleVertxPipelining10(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
connectionHolder.connection.query("SELECT * FROM simple_test", rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
}
latch.await();
}
@Benchmark
public void extendedJdbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws SQLException {
PreparedStatement statement = connectionHolder.jdbc.prepareStatement("SELECT * FROM simple_test WHERE name = ?");
statement.setString(1, "plpgsql");
statement.setFetchSize(50);
ResultSet resultSet = statement.executeQuery();
while (resultSet.next()) {
voodoo.consume(resultSet.getString("name"));
}
resultSet.close();
statement.close();
}
@Benchmark
public void extendedR2dbc(ConnectionHolder connectionHolder, Blackhole voodoo) throws SQLException {
io.r2dbc.spi.Statement statement = connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test WHERE name = $1").bind("$1", "plpgsql");
String name = Flux.from(statement.execute()).flatMap(it -> it.map((row, rowMetadata) -> row.get("name", String.class))).blockLast();
voodoo.consume(name);
}
@Benchmark
@OperationsPerInvocation(5)
public void extendedR2dbc5(ConnectionHolder connectionHolder, Blackhole voodoo) {
Flux<String> execution = Flux.from(connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test WHERE name = $1").bind("$1", "plpgsql").execute()).flatMap(it -> it.map((row,
rowMetadata) -> row.get("name", String.class)));
Object result = Flux.merge(execution, execution, execution, execution, execution).blockLast();
voodoo.consume(result);
}
@Benchmark
@OperationsPerInvocation(10)
public void extendedR2dbc10(ConnectionHolder connectionHolder, Blackhole voodoo) {
Flux<String> execution = Flux.from(connectionHolder.r2dbc.createStatement("SELECT * FROM simple_test WHERE name = $1").bind("$1", "plpgsql").execute()).flatMap(it -> it.map((row,
rowMetadata) -> row.get("name", String.class)));
Object result = Flux.merge(execution, execution, execution, execution, execution, execution, execution, execution, execution, execution).blockLast();
voodoo.consume(result);
}
@Benchmark
public void extendedVertx(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(1);
CompletableFuture<PreparedQuery> pq = new CompletableFuture<>();
connectionHolder.connection.prepare("SELECT * FROM simple_test WHERE name = $1", it -> {
pq.complete(it.result());
});
pq.join().execute(Tuple.of("plpgsql"), rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
latch.await();
}
@Benchmark
@OperationsPerInvocation(5)
public void extendedVertxPipelining5(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(5);
CompletableFuture<PreparedQuery> pq = new CompletableFuture<>();
connectionHolder.connection.prepare("SELECT * FROM simple_test WHERE name = $1", it -> {
pq.complete(it.result());
});
for (int i = 0; i < 5; i++) {
pq.join().execute(Tuple.of("plpgsql"), rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
}
latch.await();
}
@Benchmark
@OperationsPerInvocation(10)
public void extendedVertxPipelining10(ConnectionHolder connectionHolder, Blackhole voodoo) throws InterruptedException {
CountDownLatch latch = new CountDownLatch(10);
CompletableFuture<PreparedQuery> pq = new CompletableFuture<>();
connectionHolder.connection.prepare("SELECT * FROM simple_test WHERE name = $1", it -> {
pq.complete(it.result());
});
for (int i = 0; i < 10; i++) {
pq.join().execute(Tuple.of("plpgsql"), rowSetAsyncResult -> {
for (Row row : rowSetAsyncResult.result()) {
voodoo.consume(row.getString("name"));
}
latch.countDown();
});
}
latch.await();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment