Skip to content

Instantly share code, notes, and snippets.

@mp911de
Created March 2, 2019 23:16
Show Gist options
  • Star 4 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mp911de/9ea13939e8fd9a6b4ef138419f085715 to your computer and use it in GitHub Desktop.
Save mp911de/9ea13939e8fd9a6b4ef138419f085715 to your computer and use it in GitHub Desktop.
R2DBC over PgClient
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.function.Supplier;
import io.r2dbc.spi.Batch;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.IsolationLevel;
import io.r2dbc.spi.Statement;
import io.reactiverse.pgclient.PgClient;
import io.reactiverse.pgclient.PgConnection;
import io.reactiverse.pgclient.PgTransaction;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
/**
* {@link Connection} wrapper for {@link PgConnection}.
*/
class PgClientConnection implements Connection, Supplier<PgClient> {
private final PgConnection connection;
private volatile PgTransaction transaction;
PgClientConnection(PgConnection connection) {
this.connection = connection;
}
@Override
public Publisher<Void> beginTransaction() {
return Flowable.defer(() -> {
connection.begin();
return Flowable.empty();
});
}
@Override
public Publisher<Void> close() {
return Flowable.create(emitter -> {
connection.close();
emitter.onComplete();
}, BackpressureStrategy.BUFFER);
}
@Override
public Publisher<Void> commitTransaction() {
return Flowable.create(emitter -> {
if (transaction != null) {
transaction.commit(res -> {
transaction = null;
if (res.succeeded()) {
emitter.onComplete();
}
else {
emitter.onError(res.cause());
}
});
}
else {
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Batch createBatch() {
throw new UnsupportedOperationException();
}
@Override
public Publisher<Void> createSavepoint(String s) {
throw new UnsupportedOperationException();
}
@Override
public Statement createStatement(String s) {
return new SimpleStatement(this, s);
}
@Override
public Publisher<Void> releaseSavepoint(String s) {
throw new UnsupportedOperationException();
}
@Override
public Publisher<Void> rollbackTransaction() {
return Flowable.create(emitter -> {
if (transaction != null) {
transaction.rollback(res -> {
transaction = null;
if (res.succeeded()) {
emitter.onComplete();
}
else {
emitter.onError(res.cause());
}
});
}
else {
emitter.onComplete();
}
}, BackpressureStrategy.BUFFER);
}
@Override
public Publisher<Void> rollbackTransactionToSavepoint(String s) {
return rollbackTransaction();
}
@Override
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) {
return Flowable.create(emitter -> {
connection.query("SET TRANSACTION ISOLATION LEVEL " + isolationLevel
.asSql(), res -> {
if (res.succeeded()) {
emitter.onComplete();
}
else {
emitter.onError(res.cause());
}
});
}, BackpressureStrategy.BUFFER);
}
@Override
public PgClient get() {
if (transaction != null) {
return transaction;
}
return connection;
}
}
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.concurrent.atomic.AtomicReference;
import io.r2dbc.spi.Connection;
import io.r2dbc.spi.ConnectionFactory;
import io.r2dbc.spi.ConnectionFactoryMetadata;
import io.reactiverse.pgclient.PgConnection;
import io.reactiverse.pgclient.PgPool;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
/**
* R2DBC {@link ConnectionFactory} for {@link PgPool}.
*/
public class PgConnectionFactory implements ConnectionFactory {
private final PgPool pool;
private PgConnectionFactory(PgPool pool) {
this.pool = pool;
}
/**
* Create a new {@link ConnectionFactory} given {@link PgPool}.
* @param pool the connection pool.
* @return the newly created {@link PgConnectionFactory}.
*/
public static PgConnectionFactory create(PgPool pool) {
return new PgConnectionFactory(pool);
}
@Override
public Flowable<Connection> create() {
return Flowable.defer(() -> {
AtomicReference<PgConnection> ref = new AtomicReference<>();
return Flowable.<PgClientConnection>create(emitter -> {
pool.getConnection(res -> {
if (res.succeeded()) {
ref.set(res.result());
emitter.onNext(new PgClientConnection(res.result()));
emitter.onComplete();
}
else {
emitter.onError(res.cause());
}
});
}, BackpressureStrategy.BUFFER).doOnCancel(() -> {
PgConnection pgConnection = ref.get();
if (pgConnection != null) {
pgConnection.close();
}
});
});
}
@Override
public ConnectionFactoryMetadata getMetadata() {
return Metadata.INSTANCE;
}
enum Metadata implements ConnectionFactoryMetadata {
INSTANCE;
@Override
public String getName() {
return "PostgreSQL";
}
}
}
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.NoSuchElementException;
import io.r2dbc.spi.ColumnMetadata;
import io.r2dbc.spi.RowMetadata;
import io.reactiverse.pgclient.PgRowSet;
/**
* {@link RowMetadata} wrapper for {@link PgRowSet}.
*/
public class PgMetadata implements RowMetadata {
private final List<String> columnNames = new ArrayList<>();
private final Map<String, ColumnMetadata> metadata = new HashMap<>();
PgMetadata(PgRowSet rows) {
columnNames.addAll(rows.columnsNames());
for (String columnName : columnNames) {
metadata.put(columnName, new PgColumnMetadata(columnName));
}
}
@Override
public ColumnMetadata getColumnMetadata(Object identifier) {
if (identifier instanceof String) {
ColumnMetadata metadata = this.metadata.get(identifier);
if (metadata == null) {
throw new NoSuchElementException(String
.format("Column name '%s' does not exist in column names %s", identifier, columnNames));
}
}
if (identifier instanceof Integer) {
int index = (Integer) identifier;
if (index >= this.columnNames.size()) {
throw new ArrayIndexOutOfBoundsException(String
.format("Column index %d is larger than the number of columns %d", index, columnNames
.size()));
}
if (0 > index) {
throw new ArrayIndexOutOfBoundsException(String
.format("Column index %d is negative", index));
}
return this.metadata.get(columnNames.get(0));
}
throw new IllegalArgumentException(String
.format("Identifier '%s' is not a valid identifier. Should either be an Integer index or a String column name.", identifier));
}
@Override
public Iterable<? extends ColumnMetadata> getColumnMetadatas() {
return metadata.values();
}
static class PgColumnMetadata implements ColumnMetadata {
private final String name;
PgColumnMetadata(String name) {
this.name = name;
}
@Override
public String getName() {
return name;
}
}
}
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.function.BiFunction;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.RowMetadata;
import io.reactiverse.pgclient.PgRowSet;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
/**
* {@link Result} wrapper for {@link PgRowSet}.
*/
class PgResult implements Result {
private final PgMetadata metadata;
private volatile PgRowSet rowSet;
PgResult(PgRowSet result) {
this.rowSet = result;
this.metadata = new PgMetadata(result);
}
@Override
public Publisher<Integer> getRowsUpdated() {
return Flowable.just(rowSet.rowCount());
}
@Override
public <T> Publisher<T> map(BiFunction<io.r2dbc.spi.Row, RowMetadata, ? extends T> mappingFunction) {
return Flowable.fromIterable(rowSet)
.map(it -> mappingFunction.apply(new PgRow(it), metadata));
}
}
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.math.BigDecimal;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.LocalTime;
import java.time.OffsetDateTime;
import java.time.OffsetTime;
import java.time.temporal.Temporal;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.function.BiFunction;
import io.reactiverse.pgclient.Row;
import io.reactiverse.pgclient.data.Box;
import io.reactiverse.pgclient.data.Circle;
import io.reactiverse.pgclient.data.Interval;
import io.reactiverse.pgclient.data.Json;
import io.reactiverse.pgclient.data.Line;
import io.reactiverse.pgclient.data.LineSegment;
import io.reactiverse.pgclient.data.Numeric;
import io.reactiverse.pgclient.data.Path;
import io.reactiverse.pgclient.data.Point;
import io.reactiverse.pgclient.data.Polygon;
import io.vertx.core.buffer.Buffer;
/**
* {@link io.r2dbc.spi.Row} wrapper for a Postgres {@link Row}.
*/
class PgRow implements io.r2dbc.spi.Row {
private static final Map<Class<?>, BiFunction<Row, String, ?>> TYPED_BY_NAME_ACCESSORS = new HashMap<>();
private static final Map<Class<?>, BiFunction<Row, Integer, ?>> TYPED_BY_INDEX_ACCESSORS = new HashMap<>();
static {
registerAccessor(Short.class, Row::getShort, Row::getShort);
registerAccessor(Boolean.class, Row::getBoolean, Row::getBoolean);
registerAccessor(Integer.class, Row::getInteger, Row::getInteger);
registerAccessor(Long.class, Row::getLong, Row::getLong);
registerAccessor(Float.class, Row::getFloat, Row::getFloat);
registerAccessor(Double.class, Row::getDouble, Row::getDouble);
registerAccessor(String.class, Row::getString, Row::getString);
registerAccessor(Json.class, Row::getJson, Row::getJson);
registerAccessor(Buffer.class, Row::getBuffer, Row::getBuffer);
registerAccessor(Temporal.class, Row::getTemporal, Row::getTemporal);
registerAccessor(LocalDate.class, Row::getLocalDate, Row::getLocalDate);
registerAccessor(LocalTime.class, Row::getLocalTime, Row::getLocalTime);
registerAccessor(LocalDateTime.class, Row::getLocalDateTime, Row::getLocalDateTime);
registerAccessor(OffsetTime.class, Row::getOffsetTime, Row::getOffsetTime);
registerAccessor(OffsetDateTime.class, Row::getOffsetDateTime, Row::getOffsetDateTime);
registerAccessor(UUID.class, Row::getUUID, Row::getUUID);
registerAccessor(BigDecimal.class, Row::getBigDecimal, Row::getBigDecimal);
registerAccessor(Numeric.class, Row::getNumeric, Row::getNumeric);
registerAccessor(Point.class, Row::getPoint, Row::getPoint);
registerAccessor(Line.class, Row::getLine, Row::getLine);
registerAccessor(LineSegment.class, Row::getLineSegment, Row::getLineSegment);
registerAccessor(Box.class, Row::getBox, Row::getBox);
registerAccessor(Path.class, Row::getPath, Row::getPath);
registerAccessor(Polygon.class, Row::getPolygon, Row::getPolygon);
registerAccessor(Circle.class, Row::getCircle, Row::getCircle);
registerAccessor(Interval.class, Row::getInterval, Row::getInterval);
}
private static <T> void registerAccessor(Class<T> type, BiFunction<Row, String, T> byName, BiFunction<Row, Integer, T> byIndex) {
TYPED_BY_NAME_ACCESSORS.put(type, byName);
TYPED_BY_INDEX_ACCESSORS.put(type, byIndex);
}
private final Row row;
PgRow(Row row) {
this.row = row;
}
@Override
public <T> T get(Object identifier, Class<T> requestedType) {
if (identifier instanceof String) {
BiFunction<Row, String, ?> accessor = TYPED_BY_NAME_ACCESSORS
.get(requestedType);
if (accessor == null) {
throw new IllegalArgumentException("Type " + requestedType + " not supported");
}
return requestedType.cast(accessor.apply(row, (String) identifier));
}
if (identifier instanceof Integer) {
BiFunction<Row, Integer, ?> accessor = TYPED_BY_INDEX_ACCESSORS
.get(requestedType);
if (accessor == null) {
throw new IllegalArgumentException("Type " + requestedType + " not supported");
}
return requestedType.cast(accessor.apply(row, (Integer) identifier));
}
throw new IllegalArgumentException("Identifier must be a String or an Integer");
}
@Override
public Object get(Object identifier) {
if (identifier instanceof String) {
return row.getValue((String) identifier);
}
if (identifier instanceof Integer) {
return row.getValue((Integer) identifier);
}
throw new IllegalArgumentException("Identifier must be a String or an Integer");
}
}
<?xml version="1.0"?>
<!--
~ Copyright (C) 2017 Julien Viet
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~
-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>io.vertx</groupId>
<artifactId>vertx-parent</artifactId>
<version>12</version>
</parent>
<groupId>io.reactiverse</groupId>
<artifactId>reactive-pg-client</artifactId>
<version>0.11.3-SNAPSHOT</version>
<name>Reactive Postgres Client</name>
<url>https://github.com/reactiverse/reactive-pg-client</url>
<description>The reactive Postgres client</description>
<licenses>
<license>
<name>The Apache Software License, Version 2.0</name>
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url>
<distribution>repo</distribution>
</license>
</licenses>
<scm>
<connection>scm:git:git@github.com:reactiverse/reactive-pg-client.git</connection>
<developerConnection>scm:git:git@github.com:reactiverse/reactive-pg-client.git</developerConnection>
<url>git@github.com:reactiverse/reactive-pg-client.git</url>
</scm>
<developers>
<developer>
<name>Julien Viet</name>
<email>julien@julienviet.com</email>
</developer>
<developer>
<name>Emad Alblueshi</name>
<email>emad.albloushi@gmail.com</email>
</developer>
</developers>
<properties>
<stack.version>3.6.2</stack.version>
<jmh.version>1.19</jmh.version>
<docs.dir>${project.basedir}/src/main/docs</docs.dir>
<generated.dir>${project.basedir}/src/main/generated</generated.dir>
<!-- Set to a value for testing with a specific database -->
<embedded.postgres.version />
<connection.uri />
<tls.connection.uri />
<unix.socket.directory />
<unix.socket.port />
<!-- We skip sources jar generation as we do it with the assembly plugin to have greater control over the content -->
<source.skip>true</source.skip>
<kotlin.version>1.3.0</kotlin.version>
<r2dbc.version>1.0.0.BUILD-SNAPSHOT</r2dbc.version>
</properties>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-dependencies</artifactId>
<version>${stack.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-core</artifactId>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codegen</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-docgen</artifactId>
<version>0.9.0</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-codetrans</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java-gen</artifactId>
<version>${stack.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-rx-java2-gen</artifactId>
<version>${stack.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-reactive-streams</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-js</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-ruby</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-groovy</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-kotlin</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-kotlin-coroutines</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-lang-kotlin-gen</artifactId>
<version>${stack.version}</version>
<optional>true</optional>
</dependency>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-spi</artifactId>
<version>${r2dbc.version}</version>
<optional>true</optional>
</dependency>
<!--
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
-->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.vertx</groupId>
<artifactId>vertx-unit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ru.yandex.qatools.embed</groupId>
<artifactId>postgresql-embedded</artifactId>
<version>2.10</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.1.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-core</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.openjdk.jmh</groupId>
<artifactId>jmh-generator-annprocess</artifactId>
<version>${jmh.version}</version>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>spring-libs-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<build>
<pluginManagement>
<plugins>
<plugin>
<artifactId>maven-clean-plugin</artifactId>
<executions>
<execution>
<id>default-clean</id>
<configuration>
<filesets>
<fileset>
<directory>${generated.dir}</directory>
</fileset>
</filesets>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<executions>
<execution>
<id>default-compile</id>
<configuration>
<annotationProcessors>
<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor>
<annotationProcessor>io.vertx.docgen.JavaDocGenProcessor</annotationProcessor>
<annotationProcessor>io.vertx.docgen.DocGenProcessor</annotationProcessor>
</annotationProcessors>
<compilerArgs>
<arg>-Acodegen.output=${project.basedir}/src/main</arg>
<arg>-Adocgen.source=${docs.dir}/*.md</arg>
<arg>-Adocgen.output=${project.basedir}/jekyll/guide/$lang</arg>
<arg>-Adocgen.syntax=markdown</arg>
<arg>-Amaven.groupId=${project.groupId}</arg>
<arg>-Amaven.artifactId=${project.artifactId}</arg>
<arg>-Amaven.version=${project.version}</arg>
</compilerArgs>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
<argLine>-Xmx1024M</argLine>
<systemPropertyVariables>
<target.dir>${project.build.directory}</target.dir>
<embedded.postgres.version>${embedded.postgres.version}</embedded.postgres.version>
<connection.uri>${connection.uri}</connection.uri>
<tls.connection.uri>${tls.connection.uri}</tls.connection.uri>
<unix.socket.directory>${unix.socket.directory}</unix.socket.directory>
<unix.socket.port>${unix.socket.port}</unix.socket.port>
</systemPropertyVariables>
<excludes>
<exclude>io/reactiverse/pgclient/it/**</exclude>
</excludes>
</configuration>
</plugin>
</plugins>
</pluginManagement>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>build-helper-maven-plugin</artifactId>
<version>3.0.0</version>
<executions>
<execution>
<id>add-source</id>
<phase>generate-sources</phase>
<goals>
<goal>add-source</goal>
</goals>
<configuration>
<sources>
<source>${generated.dir}</source>
</sources>
</configuration>
</execution>
<execution>
<id>add-test-source</id>
<goals>
<goal>add-test-source</goal>
</goals>
<configuration>
<sources>
<source>${project.basedir}/src/benchmark/java</source>
</sources>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-failsafe-plugin</artifactId>
<version>2.21.0</version>
<executions>
<execution>
<id>env-test</id>
<goals>
<goal>integration-test</goal>
</goals>
<phase>integration-test</phase>
<configuration>
<includes>
<include>io/reactiverse/pgclient/it/EnvTest.java</include>
</includes>
<environmentVariables>
<PGHOSTADDR>test_host</PGHOSTADDR>
<PGDATABASE>test_database</PGDATABASE>
<PGPORT>1234</PGPORT>
<PGUSER>test_user</PGUSER>
<PGPASSWORD>test_password</PGPASSWORD>
<PGSSLMODE>require</PGSSLMODE>
</environmentVariables>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<configuration>
<sourcepath>${project.build.sourceDirectory};${project.build.directory}/generated-sources/annotations</sourcepath>
<sourceFileIncludes>
<sourceFileInclude>io/reactiverse/**/*.java</sourceFileInclude>
</sourceFileIncludes>
<sourceFileExcludes>
<sourceFileExclude>io/reactiverse/groovy/**/*.java</sourceFileExclude>
<sourceFileExclude>**/package-info.java</sourceFileExclude>
<sourceFileExclude>**/impl/**</sourceFileExclude>
</sourceFileExcludes>
<detectLinks />
<detectJavaApiLink />
<links>
<link>http://vertx.io/docs/apidocs/</link>
<link>http://reactivex.io/RxJava/1.x/javadoc/</link>
<link>http://reactivex.io/RxJava/javadoc/</link>
<link>http://fasterxml.github.com/jackson-annotations/javadoc/2.9/</link>
<link>http://fasterxml.github.io/jackson-core/javadoc/2.9/</link>
<link>http://fasterxml.github.io/jackson-databind/javadoc/2.9/</link>
</links>
</configuration>
</plugin>
<plugin>
<artifactId>kotlin-maven-plugin</artifactId>
<groupId>org.jetbrains.kotlin</groupId>
<version>${kotlin.version}</version>
<executions>
<execution>
<configuration>
<jvmTarget>1.8</jvmTarget>
<sourceDirs>
<sourceDir>${basedir}/src/main/kotlin</sourceDir>
</sourceDirs>
</configuration>
<goals>
<goal>compile</goal>
</goals>
<phase>compile</phase>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>package-sources</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<attach>true</attach>
<archive>
<!-- Need a manifest to avoid empty archive -->
<manifest>
</manifest>
</archive>
<descriptors>
<descriptor>src/assembly/sources.xml</descriptor>
</descriptors>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
<profile>
<id>benchmark</id>
<build>
<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<executions>
<execution>
<id>package-benchmark</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
<configuration>
<archive>
<manifest>
<mainClass>io.reactiverse.pgclient.RawBenchmark</mainClass>
</manifest>
</archive>
<descriptors>
<descriptor>src/assembly/benchmark.xml</descriptor>
</descriptors>
<attach>false</attach>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<profile>
<id>jitwatch</id>
<dependencies>
<dependency>
<groupId>com.chrisnewland</groupId>
<artifactId>jitwatch</artifactId>
<version>1.0.0</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<executions>
<execution>
<id>default-cli</id>
<goals>
<goal>java</goal>
</goals>
<configuration>
<mainClass>org.adoptopenjdk.jitwatch.launch.LaunchUI</mainClass>
<cleanupDaemonThreads>false</cleanupDaemonThreads>
<classpathScope>compile</classpathScope>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
<!-- Site gen -->
<profile>
<id>site-gen</id>
<activation>
<property>
<name>!skipSite</name>
</property>
</activation>
<pluginRepositories>
<pluginRepository>
<id>rubygems-releases</id>
<url>http://rubygems-proxy.torquebox.org/releases</url>
</pluginRepository>
</pluginRepositories>
<repositories>
<repository>
<id>rubygems-releases</id>
<url>http://rubygems-proxy.torquebox.org/releases</url>
</repository>
</repositories>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-site-plugin</artifactId>
<version>3.4</version>
<configuration>
<skip>true</skip>
<skipDeploy>true</skipDeploy>
</configuration>
</plugin>
<plugin>
<artifactId>maven-javadoc-plugin</artifactId>
<executions>
<execution>
<phase>site</phase>
<configuration>
<skip>false</skip>
<reportOutputDirectory>${project.basedir}/jekyll</reportOutputDirectory>
</configuration>
<goals>
<goal>javadoc</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<groupId>de.saumya.mojo</groupId>
<artifactId>gem-maven-plugin</artifactId>
<version>1.1.4</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
<phase>site</phase>
<configuration>
<execArgs>${project.build.directory}/rubygems/bin/yardoc --no-private -m html -o ${project.basedir}/jekyll/yardoc ${project.build.outputDirectory}/**/*.rb</execArgs>
</configuration>
</execution>
</executions>
<dependencies>
<dependency>
<groupId>rubygems</groupId>
<artifactId>yard</artifactId>
<version>0.8.7.6</version>
<type>gem</type>
</dependency>
<dependency>
<groupId>rubygems</groupId>
<artifactId>asciidoctor</artifactId>
<version>1.5.6</version>
<type>gem</type>
</dependency>
</dependencies>
</plugin>
<plugin>
<groupId>com.phasebash.jsdoc</groupId>
<artifactId>jsdoc3-maven-plugin</artifactId>
<executions>
<execution>
<phase>site</phase>
<goals>
<goal>jsdoc3</goal>
</goals>
<configuration>
<recursive>true</recursive>
<directoryRoots>
<directoryRoot>${project.build.outputDirectory}</directoryRoot>
</directoryRoots>
<outputDirectory>${project.basedir}/jekyll/jsdoc</outputDirectory>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</profile>
</profiles>
</project>
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.concurrent.TimeUnit;
import io.r2dbc.spi.Connection;
import io.reactiverse.pgclient.PgClient;
import io.reactiverse.pgclient.PgConnectOptions;
import io.reactiverse.pgclient.PgPool;
import io.reactiverse.pgclient.PgPoolOptions;
import io.reactiverse.pgclient.PgTestBase;
import io.reactivex.Flowable;
import io.vertx.core.Vertx;
import org.junit.Test;
import static org.junit.Assert.*;
public class R2dbcTest extends PgTestBase {
private PgPool createPool(PgConnectOptions options, int size) {
return PgClient.pool(Vertx.vertx(), new PgPoolOptions(options).setMaxSize(size));
}
@Test
public void testPool() {
PgPool pool = createPool(options, 4);
PgConnectionFactory factory = PgConnectionFactory.create(pool);
factory.create().test().requestMore(1).awaitCount(1).assertValue(connection -> {
assertNotNull(connection);
Flowable.fromPublisher(connection.close()).subscribe();
return true;
}).assertComplete();
}
@Test
public void testConnectionClose() {
PgPool pool = createPool(options, 4);
PgConnectionFactory factory = PgConnectionFactory.create(pool);
factory.create().flatMap(Connection::close).test()
.awaitDone(5, TimeUnit.SECONDS).assertComplete()
.assertNoValues();
}
@Test
public void testSimpleStatement() {
PgPool pool = createPool(options, 4);
PgConnectionFactory factory = PgConnectionFactory.create(pool);
factory.create().flatMap(it -> {
return Flowable.fromPublisher(it.createStatement("SELECT * FROM pg_type").execute())
.flatMap(result -> result
.map((row, rowMetadata) -> row.get("typname", String.class)))
.toList().toFlowable().doOnComplete(() -> {
Flowable.fromPublisher(it.close()).subscribe();
});
}).test().requestMore(100).awaitCount(1).assertValue(actual -> {
assertFalse(actual.isEmpty());
return true;
}).assertComplete();
}
}
/*
* Copyright 2019 Mark Paluch
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.reactiverse.pgclient.r2dbc;
import java.util.function.Supplier;
import io.r2dbc.spi.Result;
import io.r2dbc.spi.Statement;
import io.reactiverse.pgclient.PgClient;
import io.reactiverse.pgclient.PgRowSet;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import org.reactivestreams.Publisher;
class SimpleStatement implements Statement {
private final Supplier<PgClient> clientSupplier;
private final String sql;
SimpleStatement(Supplier<PgClient> clientSupplier, String sql) {
this.clientSupplier = clientSupplier;
this.sql = sql;
}
@Override
public Statement add() {
throw new UnsupportedOperationException();
}
@Override
public Statement bind(Object o, Object o1) {
throw new UnsupportedOperationException();
}
@Override
public Statement bind(int i, Object o) {
throw new UnsupportedOperationException();
}
@Override
public Statement bindNull(Object o, Class<?> aClass) {
throw new UnsupportedOperationException();
}
@Override
public Statement bindNull(int i, Class<?> aClass) {
throw new UnsupportedOperationException();
}
@Override
public Publisher<? extends Result> execute() {
return Flowable.create(emitter -> {
PgClient pgClient = clientSupplier.get();
pgClient.query(this.sql, res -> {
if (res.succeeded()) {
PgRowSet result = res.result();
do {
emitter.onNext(new PgResult(result));
}
while ((result = result.next()) != null);
emitter.onComplete();
}
else {
emitter.onError(res.cause());
}
});
}, BackpressureStrategy.BUFFER);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment