-
-
Save mp911de/9ea13939e8fd9a6b4ef138419f085715 to your computer and use it in GitHub Desktop.
R2DBC over PgClient
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.function.Supplier; | |
import io.r2dbc.spi.Batch; | |
import io.r2dbc.spi.Connection; | |
import io.r2dbc.spi.IsolationLevel; | |
import io.r2dbc.spi.Statement; | |
import io.reactiverse.pgclient.PgClient; | |
import io.reactiverse.pgclient.PgConnection; | |
import io.reactiverse.pgclient.PgTransaction; | |
import io.reactivex.BackpressureStrategy; | |
import io.reactivex.Flowable; | |
import org.reactivestreams.Publisher; | |
/** | |
* {@link Connection} wrapper for {@link PgConnection}. | |
*/ | |
class PgClientConnection implements Connection, Supplier<PgClient> { | |
private final PgConnection connection; | |
private volatile PgTransaction transaction; | |
PgClientConnection(PgConnection connection) { | |
this.connection = connection; | |
} | |
@Override | |
public Publisher<Void> beginTransaction() { | |
return Flowable.defer(() -> { | |
connection.begin(); | |
return Flowable.empty(); | |
}); | |
} | |
@Override | |
public Publisher<Void> close() { | |
return Flowable.create(emitter -> { | |
connection.close(); | |
emitter.onComplete(); | |
}, BackpressureStrategy.BUFFER); | |
} | |
@Override | |
public Publisher<Void> commitTransaction() { | |
return Flowable.create(emitter -> { | |
if (transaction != null) { | |
transaction.commit(res -> { | |
transaction = null; | |
if (res.succeeded()) { | |
emitter.onComplete(); | |
} | |
else { | |
emitter.onError(res.cause()); | |
} | |
}); | |
} | |
else { | |
emitter.onComplete(); | |
} | |
}, BackpressureStrategy.BUFFER); | |
} | |
@Override | |
public Batch createBatch() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Publisher<Void> createSavepoint(String s) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Statement createStatement(String s) { | |
return new SimpleStatement(this, s); | |
} | |
@Override | |
public Publisher<Void> releaseSavepoint(String s) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Publisher<Void> rollbackTransaction() { | |
return Flowable.create(emitter -> { | |
if (transaction != null) { | |
transaction.rollback(res -> { | |
transaction = null; | |
if (res.succeeded()) { | |
emitter.onComplete(); | |
} | |
else { | |
emitter.onError(res.cause()); | |
} | |
}); | |
} | |
else { | |
emitter.onComplete(); | |
} | |
}, BackpressureStrategy.BUFFER); | |
} | |
@Override | |
public Publisher<Void> rollbackTransactionToSavepoint(String s) { | |
return rollbackTransaction(); | |
} | |
@Override | |
public Publisher<Void> setTransactionIsolationLevel(IsolationLevel isolationLevel) { | |
return Flowable.create(emitter -> { | |
connection.query("SET TRANSACTION ISOLATION LEVEL " + isolationLevel | |
.asSql(), res -> { | |
if (res.succeeded()) { | |
emitter.onComplete(); | |
} | |
else { | |
emitter.onError(res.cause()); | |
} | |
}); | |
}, BackpressureStrategy.BUFFER); | |
} | |
@Override | |
public PgClient get() { | |
if (transaction != null) { | |
return transaction; | |
} | |
return connection; | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.concurrent.atomic.AtomicReference; | |
import io.r2dbc.spi.Connection; | |
import io.r2dbc.spi.ConnectionFactory; | |
import io.r2dbc.spi.ConnectionFactoryMetadata; | |
import io.reactiverse.pgclient.PgConnection; | |
import io.reactiverse.pgclient.PgPool; | |
import io.reactivex.BackpressureStrategy; | |
import io.reactivex.Flowable; | |
/** | |
* R2DBC {@link ConnectionFactory} for {@link PgPool}. | |
*/ | |
public class PgConnectionFactory implements ConnectionFactory { | |
private final PgPool pool; | |
private PgConnectionFactory(PgPool pool) { | |
this.pool = pool; | |
} | |
/** | |
* Create a new {@link ConnectionFactory} given {@link PgPool}. | |
* @param pool the connection pool. | |
* @return the newly created {@link PgConnectionFactory}. | |
*/ | |
public static PgConnectionFactory create(PgPool pool) { | |
return new PgConnectionFactory(pool); | |
} | |
@Override | |
public Flowable<Connection> create() { | |
return Flowable.defer(() -> { | |
AtomicReference<PgConnection> ref = new AtomicReference<>(); | |
return Flowable.<PgClientConnection>create(emitter -> { | |
pool.getConnection(res -> { | |
if (res.succeeded()) { | |
ref.set(res.result()); | |
emitter.onNext(new PgClientConnection(res.result())); | |
emitter.onComplete(); | |
} | |
else { | |
emitter.onError(res.cause()); | |
} | |
}); | |
}, BackpressureStrategy.BUFFER).doOnCancel(() -> { | |
PgConnection pgConnection = ref.get(); | |
if (pgConnection != null) { | |
pgConnection.close(); | |
} | |
}); | |
}); | |
} | |
@Override | |
public ConnectionFactoryMetadata getMetadata() { | |
return Metadata.INSTANCE; | |
} | |
enum Metadata implements ConnectionFactoryMetadata { | |
INSTANCE; | |
@Override | |
public String getName() { | |
return "PostgreSQL"; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.ArrayList; | |
import java.util.HashMap; | |
import java.util.List; | |
import java.util.Map; | |
import java.util.NoSuchElementException; | |
import io.r2dbc.spi.ColumnMetadata; | |
import io.r2dbc.spi.RowMetadata; | |
import io.reactiverse.pgclient.PgRowSet; | |
/** | |
* {@link RowMetadata} wrapper for {@link PgRowSet}. | |
*/ | |
public class PgMetadata implements RowMetadata { | |
private final List<String> columnNames = new ArrayList<>(); | |
private final Map<String, ColumnMetadata> metadata = new HashMap<>(); | |
PgMetadata(PgRowSet rows) { | |
columnNames.addAll(rows.columnsNames()); | |
for (String columnName : columnNames) { | |
metadata.put(columnName, new PgColumnMetadata(columnName)); | |
} | |
} | |
@Override | |
public ColumnMetadata getColumnMetadata(Object identifier) { | |
if (identifier instanceof String) { | |
ColumnMetadata metadata = this.metadata.get(identifier); | |
if (metadata == null) { | |
throw new NoSuchElementException(String | |
.format("Column name '%s' does not exist in column names %s", identifier, columnNames)); | |
} | |
} | |
if (identifier instanceof Integer) { | |
int index = (Integer) identifier; | |
if (index >= this.columnNames.size()) { | |
throw new ArrayIndexOutOfBoundsException(String | |
.format("Column index %d is larger than the number of columns %d", index, columnNames | |
.size())); | |
} | |
if (0 > index) { | |
throw new ArrayIndexOutOfBoundsException(String | |
.format("Column index %d is negative", index)); | |
} | |
return this.metadata.get(columnNames.get(0)); | |
} | |
throw new IllegalArgumentException(String | |
.format("Identifier '%s' is not a valid identifier. Should either be an Integer index or a String column name.", identifier)); | |
} | |
@Override | |
public Iterable<? extends ColumnMetadata> getColumnMetadatas() { | |
return metadata.values(); | |
} | |
static class PgColumnMetadata implements ColumnMetadata { | |
private final String name; | |
PgColumnMetadata(String name) { | |
this.name = name; | |
} | |
@Override | |
public String getName() { | |
return name; | |
} | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.function.BiFunction; | |
import io.r2dbc.spi.Result; | |
import io.r2dbc.spi.RowMetadata; | |
import io.reactiverse.pgclient.PgRowSet; | |
import io.reactivex.Flowable; | |
import org.reactivestreams.Publisher; | |
/** | |
* {@link Result} wrapper for {@link PgRowSet}. | |
*/ | |
class PgResult implements Result { | |
private final PgMetadata metadata; | |
private volatile PgRowSet rowSet; | |
PgResult(PgRowSet result) { | |
this.rowSet = result; | |
this.metadata = new PgMetadata(result); | |
} | |
@Override | |
public Publisher<Integer> getRowsUpdated() { | |
return Flowable.just(rowSet.rowCount()); | |
} | |
@Override | |
public <T> Publisher<T> map(BiFunction<io.r2dbc.spi.Row, RowMetadata, ? extends T> mappingFunction) { | |
return Flowable.fromIterable(rowSet) | |
.map(it -> mappingFunction.apply(new PgRow(it), metadata)); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.math.BigDecimal; | |
import java.time.LocalDate; | |
import java.time.LocalDateTime; | |
import java.time.LocalTime; | |
import java.time.OffsetDateTime; | |
import java.time.OffsetTime; | |
import java.time.temporal.Temporal; | |
import java.util.HashMap; | |
import java.util.Map; | |
import java.util.UUID; | |
import java.util.function.BiFunction; | |
import io.reactiverse.pgclient.Row; | |
import io.reactiverse.pgclient.data.Box; | |
import io.reactiverse.pgclient.data.Circle; | |
import io.reactiverse.pgclient.data.Interval; | |
import io.reactiverse.pgclient.data.Json; | |
import io.reactiverse.pgclient.data.Line; | |
import io.reactiverse.pgclient.data.LineSegment; | |
import io.reactiverse.pgclient.data.Numeric; | |
import io.reactiverse.pgclient.data.Path; | |
import io.reactiverse.pgclient.data.Point; | |
import io.reactiverse.pgclient.data.Polygon; | |
import io.vertx.core.buffer.Buffer; | |
/** | |
* {@link io.r2dbc.spi.Row} wrapper for a Postgres {@link Row}. | |
*/ | |
class PgRow implements io.r2dbc.spi.Row { | |
private static final Map<Class<?>, BiFunction<Row, String, ?>> TYPED_BY_NAME_ACCESSORS = new HashMap<>(); | |
private static final Map<Class<?>, BiFunction<Row, Integer, ?>> TYPED_BY_INDEX_ACCESSORS = new HashMap<>(); | |
static { | |
registerAccessor(Short.class, Row::getShort, Row::getShort); | |
registerAccessor(Boolean.class, Row::getBoolean, Row::getBoolean); | |
registerAccessor(Integer.class, Row::getInteger, Row::getInteger); | |
registerAccessor(Long.class, Row::getLong, Row::getLong); | |
registerAccessor(Float.class, Row::getFloat, Row::getFloat); | |
registerAccessor(Double.class, Row::getDouble, Row::getDouble); | |
registerAccessor(String.class, Row::getString, Row::getString); | |
registerAccessor(Json.class, Row::getJson, Row::getJson); | |
registerAccessor(Buffer.class, Row::getBuffer, Row::getBuffer); | |
registerAccessor(Temporal.class, Row::getTemporal, Row::getTemporal); | |
registerAccessor(LocalDate.class, Row::getLocalDate, Row::getLocalDate); | |
registerAccessor(LocalTime.class, Row::getLocalTime, Row::getLocalTime); | |
registerAccessor(LocalDateTime.class, Row::getLocalDateTime, Row::getLocalDateTime); | |
registerAccessor(OffsetTime.class, Row::getOffsetTime, Row::getOffsetTime); | |
registerAccessor(OffsetDateTime.class, Row::getOffsetDateTime, Row::getOffsetDateTime); | |
registerAccessor(UUID.class, Row::getUUID, Row::getUUID); | |
registerAccessor(BigDecimal.class, Row::getBigDecimal, Row::getBigDecimal); | |
registerAccessor(Numeric.class, Row::getNumeric, Row::getNumeric); | |
registerAccessor(Point.class, Row::getPoint, Row::getPoint); | |
registerAccessor(Line.class, Row::getLine, Row::getLine); | |
registerAccessor(LineSegment.class, Row::getLineSegment, Row::getLineSegment); | |
registerAccessor(Box.class, Row::getBox, Row::getBox); | |
registerAccessor(Path.class, Row::getPath, Row::getPath); | |
registerAccessor(Polygon.class, Row::getPolygon, Row::getPolygon); | |
registerAccessor(Circle.class, Row::getCircle, Row::getCircle); | |
registerAccessor(Interval.class, Row::getInterval, Row::getInterval); | |
} | |
private static <T> void registerAccessor(Class<T> type, BiFunction<Row, String, T> byName, BiFunction<Row, Integer, T> byIndex) { | |
TYPED_BY_NAME_ACCESSORS.put(type, byName); | |
TYPED_BY_INDEX_ACCESSORS.put(type, byIndex); | |
} | |
private final Row row; | |
PgRow(Row row) { | |
this.row = row; | |
} | |
@Override | |
public <T> T get(Object identifier, Class<T> requestedType) { | |
if (identifier instanceof String) { | |
BiFunction<Row, String, ?> accessor = TYPED_BY_NAME_ACCESSORS | |
.get(requestedType); | |
if (accessor == null) { | |
throw new IllegalArgumentException("Type " + requestedType + " not supported"); | |
} | |
return requestedType.cast(accessor.apply(row, (String) identifier)); | |
} | |
if (identifier instanceof Integer) { | |
BiFunction<Row, Integer, ?> accessor = TYPED_BY_INDEX_ACCESSORS | |
.get(requestedType); | |
if (accessor == null) { | |
throw new IllegalArgumentException("Type " + requestedType + " not supported"); | |
} | |
return requestedType.cast(accessor.apply(row, (Integer) identifier)); | |
} | |
throw new IllegalArgumentException("Identifier must be a String or an Integer"); | |
} | |
@Override | |
public Object get(Object identifier) { | |
if (identifier instanceof String) { | |
return row.getValue((String) identifier); | |
} | |
if (identifier instanceof Integer) { | |
return row.getValue((Integer) identifier); | |
} | |
throw new IllegalArgumentException("Identifier must be a String or an Integer"); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0"?> | |
<!-- | |
~ Copyright (C) 2017 Julien Viet | |
~ | |
~ Licensed under the Apache License, Version 2.0 (the "License"); | |
~ you may not use this file except in compliance with the License. | |
~ You may obtain a copy of the License at | |
~ | |
~ http://www.apache.org/licenses/LICENSE-2.0 | |
~ | |
~ Unless required by applicable law or agreed to in writing, software | |
~ distributed under the License is distributed on an "AS IS" BASIS, | |
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
~ See the License for the specific language governing permissions and | |
~ limitations under the License. | |
~ | |
--> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<parent> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-parent</artifactId> | |
<version>12</version> | |
</parent> | |
<groupId>io.reactiverse</groupId> | |
<artifactId>reactive-pg-client</artifactId> | |
<version>0.11.3-SNAPSHOT</version> | |
<name>Reactive Postgres Client</name> | |
<url>https://github.com/reactiverse/reactive-pg-client</url> | |
<description>The reactive Postgres client</description> | |
<licenses> | |
<license> | |
<name>The Apache Software License, Version 2.0</name> | |
<url>http://www.apache.org/licenses/LICENSE-2.0.txt</url> | |
<distribution>repo</distribution> | |
</license> | |
</licenses> | |
<scm> | |
<connection>scm:git:git@github.com:reactiverse/reactive-pg-client.git</connection> | |
<developerConnection>scm:git:git@github.com:reactiverse/reactive-pg-client.git</developerConnection> | |
<url>git@github.com:reactiverse/reactive-pg-client.git</url> | |
</scm> | |
<developers> | |
<developer> | |
<name>Julien Viet</name> | |
<email>julien@julienviet.com</email> | |
</developer> | |
<developer> | |
<name>Emad Alblueshi</name> | |
<email>emad.albloushi@gmail.com</email> | |
</developer> | |
</developers> | |
<properties> | |
<stack.version>3.6.2</stack.version> | |
<jmh.version>1.19</jmh.version> | |
<docs.dir>${project.basedir}/src/main/docs</docs.dir> | |
<generated.dir>${project.basedir}/src/main/generated</generated.dir> | |
<!-- Set to a value for testing with a specific database --> | |
<embedded.postgres.version /> | |
<connection.uri /> | |
<tls.connection.uri /> | |
<unix.socket.directory /> | |
<unix.socket.port /> | |
<!-- We skip sources jar generation as we do it with the assembly plugin to have greater control over the content --> | |
<source.skip>true</source.skip> | |
<kotlin.version>1.3.0</kotlin.version> | |
<r2dbc.version>1.0.0.BUILD-SNAPSHOT</r2dbc.version> | |
</properties> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-dependencies</artifactId> | |
<version>${stack.version}</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-core</artifactId> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-codegen</artifactId> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-docgen</artifactId> | |
<version>0.9.0</version> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-codetrans</artifactId> | |
<scope>provided</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-rx-java</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-rx-java-gen</artifactId> | |
<version>${stack.version}</version> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-rx-java2</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-rx-java2-gen</artifactId> | |
<version>${stack.version}</version> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-reactive-streams</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-js</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-ruby</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-groovy</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-kotlin</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-kotlin-coroutines</artifactId> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-lang-kotlin-gen</artifactId> | |
<version>${stack.version}</version> | |
<optional>true</optional> | |
</dependency> | |
<dependency> | |
<groupId>io.r2dbc</groupId> | |
<artifactId>r2dbc-spi</artifactId> | |
<version>${r2dbc.version}</version> | |
<optional>true</optional> | |
</dependency> | |
<!-- | |
<dependency> | |
<groupId>ch.qos.logback</groupId> | |
<artifactId>logback-classic</artifactId> | |
<version>1.2.3</version> | |
<scope>test</scope> | |
</dependency> | |
--> | |
<dependency> | |
<groupId>junit</groupId> | |
<artifactId>junit</artifactId> | |
<version>4.12</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.vertx</groupId> | |
<artifactId>vertx-unit</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.netty</groupId> | |
<artifactId>netty-transport-native-epoll</artifactId> | |
<classifier>linux-x86_64</classifier> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.netty</groupId> | |
<artifactId>netty-transport-native-kqueue</artifactId> | |
<classifier>osx-x86_64</classifier> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>ru.yandex.qatools.embed</groupId> | |
<artifactId>postgresql-embedded</artifactId> | |
<version>2.10</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.postgresql</groupId> | |
<artifactId>postgresql</artifactId> | |
<version>42.1.1</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.openjdk.jmh</groupId> | |
<artifactId>jmh-core</artifactId> | |
<version>${jmh.version}</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.openjdk.jmh</groupId> | |
<artifactId>jmh-generator-annprocess</artifactId> | |
<version>${jmh.version}</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<repositories> | |
<repository> | |
<id>spring-libs-snapshot</id> | |
<url>https://repo.spring.io/libs-snapshot</url> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</repository> | |
</repositories> | |
<build> | |
<pluginManagement> | |
<plugins> | |
<plugin> | |
<artifactId>maven-clean-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>default-clean</id> | |
<configuration> | |
<filesets> | |
<fileset> | |
<directory>${generated.dir}</directory> | |
</fileset> | |
</filesets> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<artifactId>maven-compiler-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>default-compile</id> | |
<configuration> | |
<annotationProcessors> | |
<annotationProcessor>io.vertx.codegen.CodeGenProcessor</annotationProcessor> | |
<annotationProcessor>io.vertx.docgen.JavaDocGenProcessor</annotationProcessor> | |
<annotationProcessor>io.vertx.docgen.DocGenProcessor</annotationProcessor> | |
</annotationProcessors> | |
<compilerArgs> | |
<arg>-Acodegen.output=${project.basedir}/src/main</arg> | |
<arg>-Adocgen.source=${docs.dir}/*.md</arg> | |
<arg>-Adocgen.output=${project.basedir}/jekyll/guide/$lang</arg> | |
<arg>-Adocgen.syntax=markdown</arg> | |
<arg>-Amaven.groupId=${project.groupId}</arg> | |
<arg>-Amaven.artifactId=${project.artifactId}</arg> | |
<arg>-Amaven.version=${project.version}</arg> | |
</compilerArgs> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<configuration> | |
<argLine>-Xmx1024M</argLine> | |
<systemPropertyVariables> | |
<target.dir>${project.build.directory}</target.dir> | |
<embedded.postgres.version>${embedded.postgres.version}</embedded.postgres.version> | |
<connection.uri>${connection.uri}</connection.uri> | |
<tls.connection.uri>${tls.connection.uri}</tls.connection.uri> | |
<unix.socket.directory>${unix.socket.directory}</unix.socket.directory> | |
<unix.socket.port>${unix.socket.port}</unix.socket.port> | |
</systemPropertyVariables> | |
<excludes> | |
<exclude>io/reactiverse/pgclient/it/**</exclude> | |
</excludes> | |
</configuration> | |
</plugin> | |
</plugins> | |
</pluginManagement> | |
<plugins> | |
<plugin> | |
<groupId>org.codehaus.mojo</groupId> | |
<artifactId>build-helper-maven-plugin</artifactId> | |
<version>3.0.0</version> | |
<executions> | |
<execution> | |
<id>add-source</id> | |
<phase>generate-sources</phase> | |
<goals> | |
<goal>add-source</goal> | |
</goals> | |
<configuration> | |
<sources> | |
<source>${generated.dir}</source> | |
</sources> | |
</configuration> | |
</execution> | |
<execution> | |
<id>add-test-source</id> | |
<goals> | |
<goal>add-test-source</goal> | |
</goals> | |
<configuration> | |
<sources> | |
<source>${project.basedir}/src/benchmark/java</source> | |
</sources> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<artifactId>maven-failsafe-plugin</artifactId> | |
<version>2.21.0</version> | |
<executions> | |
<execution> | |
<id>env-test</id> | |
<goals> | |
<goal>integration-test</goal> | |
</goals> | |
<phase>integration-test</phase> | |
<configuration> | |
<includes> | |
<include>io/reactiverse/pgclient/it/EnvTest.java</include> | |
</includes> | |
<environmentVariables> | |
<PGHOSTADDR>test_host</PGHOSTADDR> | |
<PGDATABASE>test_database</PGDATABASE> | |
<PGPORT>1234</PGPORT> | |
<PGUSER>test_user</PGUSER> | |
<PGPASSWORD>test_password</PGPASSWORD> | |
<PGSSLMODE>require</PGSSLMODE> | |
</environmentVariables> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<artifactId>maven-javadoc-plugin</artifactId> | |
<configuration> | |
<sourcepath>${project.build.sourceDirectory};${project.build.directory}/generated-sources/annotations</sourcepath> | |
<sourceFileIncludes> | |
<sourceFileInclude>io/reactiverse/**/*.java</sourceFileInclude> | |
</sourceFileIncludes> | |
<sourceFileExcludes> | |
<sourceFileExclude>io/reactiverse/groovy/**/*.java</sourceFileExclude> | |
<sourceFileExclude>**/package-info.java</sourceFileExclude> | |
<sourceFileExclude>**/impl/**</sourceFileExclude> | |
</sourceFileExcludes> | |
<detectLinks /> | |
<detectJavaApiLink /> | |
<links> | |
<link>http://vertx.io/docs/apidocs/</link> | |
<link>http://reactivex.io/RxJava/1.x/javadoc/</link> | |
<link>http://reactivex.io/RxJava/javadoc/</link> | |
<link>http://fasterxml.github.com/jackson-annotations/javadoc/2.9/</link> | |
<link>http://fasterxml.github.io/jackson-core/javadoc/2.9/</link> | |
<link>http://fasterxml.github.io/jackson-databind/javadoc/2.9/</link> | |
</links> | |
</configuration> | |
</plugin> | |
<plugin> | |
<artifactId>kotlin-maven-plugin</artifactId> | |
<groupId>org.jetbrains.kotlin</groupId> | |
<version>${kotlin.version}</version> | |
<executions> | |
<execution> | |
<configuration> | |
<jvmTarget>1.8</jvmTarget> | |
<sourceDirs> | |
<sourceDir>${basedir}/src/main/kotlin</sourceDir> | |
</sourceDirs> | |
</configuration> | |
<goals> | |
<goal>compile</goal> | |
</goals> | |
<phase>compile</phase> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>package-sources</id> | |
<phase>package</phase> | |
<goals> | |
<goal>single</goal> | |
</goals> | |
<configuration> | |
<attach>true</attach> | |
<archive> | |
<!-- Need a manifest to avoid empty archive --> | |
<manifest> | |
</manifest> | |
</archive> | |
<descriptors> | |
<descriptor>src/assembly/sources.xml</descriptor> | |
</descriptors> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
<profiles> | |
<profile> | |
<id>benchmark</id> | |
<build> | |
<plugins> | |
<plugin> | |
<artifactId>maven-assembly-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>package-benchmark</id> | |
<phase>package</phase> | |
<goals> | |
<goal>single</goal> | |
</goals> | |
<configuration> | |
<archive> | |
<manifest> | |
<mainClass>io.reactiverse.pgclient.RawBenchmark</mainClass> | |
</manifest> | |
</archive> | |
<descriptors> | |
<descriptor>src/assembly/benchmark.xml</descriptor> | |
</descriptors> | |
<attach>false</attach> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</profile> | |
<profile> | |
<id>jitwatch</id> | |
<dependencies> | |
<dependency> | |
<groupId>com.chrisnewland</groupId> | |
<artifactId>jitwatch</artifactId> | |
<version>1.0.0</version> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.codehaus.mojo</groupId> | |
<artifactId>exec-maven-plugin</artifactId> | |
<executions> | |
<execution> | |
<id>default-cli</id> | |
<goals> | |
<goal>java</goal> | |
</goals> | |
<configuration> | |
<mainClass>org.adoptopenjdk.jitwatch.launch.LaunchUI</mainClass> | |
<cleanupDaemonThreads>false</cleanupDaemonThreads> | |
<classpathScope>compile</classpathScope> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</profile> | |
<!-- Site gen --> | |
<profile> | |
<id>site-gen</id> | |
<activation> | |
<property> | |
<name>!skipSite</name> | |
</property> | |
</activation> | |
<pluginRepositories> | |
<pluginRepository> | |
<id>rubygems-releases</id> | |
<url>http://rubygems-proxy.torquebox.org/releases</url> | |
</pluginRepository> | |
</pluginRepositories> | |
<repositories> | |
<repository> | |
<id>rubygems-releases</id> | |
<url>http://rubygems-proxy.torquebox.org/releases</url> | |
</repository> | |
</repositories> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-site-plugin</artifactId> | |
<version>3.4</version> | |
<configuration> | |
<skip>true</skip> | |
<skipDeploy>true</skipDeploy> | |
</configuration> | |
</plugin> | |
<plugin> | |
<artifactId>maven-javadoc-plugin</artifactId> | |
<executions> | |
<execution> | |
<phase>site</phase> | |
<configuration> | |
<skip>false</skip> | |
<reportOutputDirectory>${project.basedir}/jekyll</reportOutputDirectory> | |
</configuration> | |
<goals> | |
<goal>javadoc</goal> | |
</goals> | |
</execution> | |
</executions> | |
</plugin> | |
<plugin> | |
<groupId>de.saumya.mojo</groupId> | |
<artifactId>gem-maven-plugin</artifactId> | |
<version>1.1.4</version> | |
<executions> | |
<execution> | |
<goals> | |
<goal>exec</goal> | |
</goals> | |
<phase>site</phase> | |
<configuration> | |
<execArgs>${project.build.directory}/rubygems/bin/yardoc --no-private -m html -o ${project.basedir}/jekyll/yardoc ${project.build.outputDirectory}/**/*.rb</execArgs> | |
</configuration> | |
</execution> | |
</executions> | |
<dependencies> | |
<dependency> | |
<groupId>rubygems</groupId> | |
<artifactId>yard</artifactId> | |
<version>0.8.7.6</version> | |
<type>gem</type> | |
</dependency> | |
<dependency> | |
<groupId>rubygems</groupId> | |
<artifactId>asciidoctor</artifactId> | |
<version>1.5.6</version> | |
<type>gem</type> | |
</dependency> | |
</dependencies> | |
</plugin> | |
<plugin> | |
<groupId>com.phasebash.jsdoc</groupId> | |
<artifactId>jsdoc3-maven-plugin</artifactId> | |
<executions> | |
<execution> | |
<phase>site</phase> | |
<goals> | |
<goal>jsdoc3</goal> | |
</goals> | |
<configuration> | |
<recursive>true</recursive> | |
<directoryRoots> | |
<directoryRoot>${project.build.outputDirectory}</directoryRoot> | |
</directoryRoots> | |
<outputDirectory>${project.basedir}/jekyll/jsdoc</outputDirectory> | |
</configuration> | |
</execution> | |
</executions> | |
</plugin> | |
</plugins> | |
</build> | |
</profile> | |
</profiles> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.concurrent.TimeUnit; | |
import io.r2dbc.spi.Connection; | |
import io.reactiverse.pgclient.PgClient; | |
import io.reactiverse.pgclient.PgConnectOptions; | |
import io.reactiverse.pgclient.PgPool; | |
import io.reactiverse.pgclient.PgPoolOptions; | |
import io.reactiverse.pgclient.PgTestBase; | |
import io.reactivex.Flowable; | |
import io.vertx.core.Vertx; | |
import org.junit.Test; | |
import static org.junit.Assert.*; | |
public class R2dbcTest extends PgTestBase { | |
private PgPool createPool(PgConnectOptions options, int size) { | |
return PgClient.pool(Vertx.vertx(), new PgPoolOptions(options).setMaxSize(size)); | |
} | |
@Test | |
public void testPool() { | |
PgPool pool = createPool(options, 4); | |
PgConnectionFactory factory = PgConnectionFactory.create(pool); | |
factory.create().test().requestMore(1).awaitCount(1).assertValue(connection -> { | |
assertNotNull(connection); | |
Flowable.fromPublisher(connection.close()).subscribe(); | |
return true; | |
}).assertComplete(); | |
} | |
@Test | |
public void testConnectionClose() { | |
PgPool pool = createPool(options, 4); | |
PgConnectionFactory factory = PgConnectionFactory.create(pool); | |
factory.create().flatMap(Connection::close).test() | |
.awaitDone(5, TimeUnit.SECONDS).assertComplete() | |
.assertNoValues(); | |
} | |
@Test | |
public void testSimpleStatement() { | |
PgPool pool = createPool(options, 4); | |
PgConnectionFactory factory = PgConnectionFactory.create(pool); | |
factory.create().flatMap(it -> { | |
return Flowable.fromPublisher(it.createStatement("SELECT * FROM pg_type").execute()) | |
.flatMap(result -> result | |
.map((row, rowMetadata) -> row.get("typname", String.class))) | |
.toList().toFlowable().doOnComplete(() -> { | |
Flowable.fromPublisher(it.close()).subscribe(); | |
}); | |
}).test().requestMore(100).awaitCount(1).assertValue(actual -> { | |
assertFalse(actual.isEmpty()); | |
return true; | |
}).assertComplete(); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 Mark Paluch | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.reactiverse.pgclient.r2dbc; | |
import java.util.function.Supplier; | |
import io.r2dbc.spi.Result; | |
import io.r2dbc.spi.Statement; | |
import io.reactiverse.pgclient.PgClient; | |
import io.reactiverse.pgclient.PgRowSet; | |
import io.reactivex.BackpressureStrategy; | |
import io.reactivex.Flowable; | |
import org.reactivestreams.Publisher; | |
class SimpleStatement implements Statement { | |
private final Supplier<PgClient> clientSupplier; | |
private final String sql; | |
SimpleStatement(Supplier<PgClient> clientSupplier, String sql) { | |
this.clientSupplier = clientSupplier; | |
this.sql = sql; | |
} | |
@Override | |
public Statement add() { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Statement bind(Object o, Object o1) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Statement bind(int i, Object o) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Statement bindNull(Object o, Class<?> aClass) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Statement bindNull(int i, Class<?> aClass) { | |
throw new UnsupportedOperationException(); | |
} | |
@Override | |
public Publisher<? extends Result> execute() { | |
return Flowable.create(emitter -> { | |
PgClient pgClient = clientSupplier.get(); | |
pgClient.query(this.sql, res -> { | |
if (res.succeeded()) { | |
PgRowSet result = res.result(); | |
do { | |
emitter.onNext(new PgResult(result)); | |
} | |
while ((result = result.next()) != null); | |
emitter.onComplete(); | |
} | |
else { | |
emitter.onError(res.cause()); | |
} | |
}); | |
}, BackpressureStrategy.BUFFER); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment