Created
January 29, 2019 19:01
-
-
Save mp911de/e7702df256793a496016fddd990cbc14 to your computer and use it in GitHub Desktop.
Transaction Playground for PostgreSQL to play how locks play with R2DBC.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
package io.r2dbc.postgresql; | |
import io.r2dbc.postgresql.util.PostgresqlServerExtension; | |
import io.r2dbc.spi.ConnectionFactories; | |
import io.r2dbc.spi.ConnectionFactoryOptions; | |
import io.r2dbc.spi.IsolationLevel; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.api.extension.RegisterExtension; | |
import org.junit.jupiter.params.ParameterizedTest; | |
import org.junit.jupiter.params.provider.EnumSource; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import org.springframework.util.StopWatch; | |
import reactor.core.Disposable; | |
import reactor.core.publisher.ConnectableFlux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.MonoProcessor; | |
import reactor.test.StepVerifier; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.USER; | |
import static io.r2dbc.spi.ConnectionFactoryOptions.builder; | |
import static org.assertj.core.api.Assertions.assertThat; | |
/** | |
* Playground test class for transaction interaction. | |
* | |
* @author Mark Paluch | |
*/ | |
public class TransactionPlayground { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
@RegisterExtension | |
protected static final PostgresqlServerExtension SERVER = new PostgresqlServerExtension(); | |
protected static PostgresqlConnectionFactory connectionFactory; | |
@BeforeAll | |
static void beforeAll() { | |
ConnectionFactoryOptions options = builder() | |
.option(DRIVER, PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER) | |
.option(HOST, SERVER.getHost()) | |
.option(PORT, SERVER.getPort()) | |
.option(PASSWORD, SERVER.getPassword()) | |
.option(USER, SERVER.getUsername()) | |
.build(); | |
connectionFactory = (PostgresqlConnectionFactory) ConnectionFactories.get(options); | |
} | |
@BeforeEach | |
void setup() { | |
PostgresqlConnection connection = connectionFactory.create().block(); | |
connection.createStatement("DROP TABLE tx_example").execute() | |
.flatMap(PostgresqlResult::getRowsUpdated) | |
.onErrorResume(e -> Mono.empty()) | |
.thenMany(connection.createStatement("CREATE TABLE tx_example (" + | |
"id serial PRIMARY KEY, " + | |
"first_name varchar(255), " + | |
"last_name varchar(255))") | |
.execute().flatMap(PostgresqlResult::getRowsUpdated).then()) | |
.as(StepVerifier::create) | |
.verifyComplete(); | |
connection.close().as(StepVerifier::create).verifyComplete(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
PostgresqlConnection insertConnection = connectionFactory.create().block(); | |
PostgresqlConnection selectConnection = connectionFactory.create().block(); | |
selectConnection.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel); | |
selectConnection.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
insertConnection.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE").execute().flatMap(PostgresqlResult::getRowsUpdated).blockLast(); | |
logger.info("WRITER: ✏️ INSERTing record (Walter, White)..."); | |
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNext(1) // | |
.verifyComplete(); | |
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done"); | |
CountDownLatch data = new CountDownLatch(2); | |
CountDownLatch completion = new CountDownLatch(1); | |
StopWatch stopWatch = new StopWatch(); | |
stopWatch.start(); | |
ConnectableFlux<String> connectable = selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE") // | |
.execute().flatMap(it -> it.map((row, rowMetadata) -> row.get("first_name", String.class))) // | |
.doOnNext(it -> logger.info("READER: 🥳 first_name=" + it)) | |
.doOnComplete(() -> { | |
stopWatch.stop(); | |
logger.info("READER: 🤩 SELECT complete, non-blocking wait time=" + stopWatch.getLastTaskTimeMillis() + "ms"); | |
}) | |
.doOnNext(it -> data.countDown()) | |
.doOnComplete(completion::countDown) | |
.publish(); | |
Disposable disposable = connectable.connect(); | |
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)"); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse(); | |
logger.info("READER: 🧐 ⏳ No results yet"); | |
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)..."); | |
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNext(1) // | |
.verifyComplete(); | |
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done"); | |
logger.info("READER: 🧐 ⏳ Still waiting (3sec)"); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse(); | |
logger.info("WRITER: ⚙️️ COMMITing..."); | |
insertConnection.commitTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("WRITER: ⚙️ ✅ COMMIT done"); | |
disposable.dispose(); | |
insertConnection.close().subscribe(); | |
selectConnection.close().subscribe(); | |
logger.info("READER: 🧐 ⏳ Awaiting data"); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isTrue(); | |
logger.info("READER: 🧐 ⏳ Awaiting completion"); | |
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
PostgresqlConnection insert1 = connectionFactory.create().block(); | |
PostgresqlConnection insert2 = connectionFactory.create().block(); | |
try { | |
insert1.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
insert2.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel); | |
insert1.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
insert2.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("FIRST: ✏️ INSERTing record (Walter, White)..."); | |
insert1.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNextCount(1) // | |
.verifyComplete(); | |
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done"); | |
CountDownLatch completion = new CountDownLatch(1); | |
StopWatch stopWatch = new StopWatch(); | |
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)..."); | |
insert2.createStatement("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) | |
.as(StepVerifier::create) | |
.expectNextCount(1) | |
.verifyComplete(); | |
stopWatch.start(); | |
logger.info("SECND: ⚙️️ COMMITing..."); | |
MonoProcessor<Void> processor = insert2.commitTransaction() | |
.doOnTerminate(completion::countDown) | |
.doOnSuccess(v -> { | |
stopWatch.stop(); | |
logger.info("SECND: ✏️ 🤩 COMMIT 2 complete, non-blocking wait time=" + stopWatch.getLastTaskTimeMillis() + "ms"); | |
}).toProcessor(); | |
logger.info("SECND: 🧐 ⏳ Waiting for INSERT to complete (3sec)"); | |
completion.await(3, TimeUnit.SECONDS); | |
logger.info("FIRST: ⚙️️ COMMITing..."); | |
insert1.commitTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("FIRST: ⚙️ ✅ COMMIT done"); | |
logger.info("SECND: 🧐 ⏳ Awaiting completion"); | |
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue(); | |
processor.dispose(); | |
} finally { | |
insert1.close().subscribe(); | |
insert2.close().subscribe(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment