Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Transaction Playground for PostgreSQL to play how locks play with R2DBC.
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package io.r2dbc.postgresql;
import io.r2dbc.postgresql.util.PostgresqlServerExtension;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.IsolationLevel;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.extension.RegisterExtension;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.util.StopWatch;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.test.StepVerifier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static io.r2dbc.spi.ConnectionFactoryOptions.DRIVER;
import static io.r2dbc.spi.ConnectionFactoryOptions.HOST;
import static io.r2dbc.spi.ConnectionFactoryOptions.PASSWORD;
import static io.r2dbc.spi.ConnectionFactoryOptions.PORT;
import static io.r2dbc.spi.ConnectionFactoryOptions.USER;
import static io.r2dbc.spi.ConnectionFactoryOptions.builder;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Playground test class for transaction interaction.
*
* @author Mark Paluch
*/
public class TransactionPlayground {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
@RegisterExtension
protected static final PostgresqlServerExtension SERVER = new PostgresqlServerExtension();
protected static PostgresqlConnectionFactory connectionFactory;
@BeforeAll
static void beforeAll() {
ConnectionFactoryOptions options = builder()
.option(DRIVER, PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER)
.option(HOST, SERVER.getHost())
.option(PORT, SERVER.getPort())
.option(PASSWORD, SERVER.getPassword())
.option(USER, SERVER.getUsername())
.build();
connectionFactory = (PostgresqlConnectionFactory) ConnectionFactories.get(options);
}
@BeforeEach
void setup() {
PostgresqlConnection connection = connectionFactory.create().block();
connection.createStatement("DROP TABLE tx_example").execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.onErrorResume(e -> Mono.empty())
.thenMany(connection.createStatement("CREATE TABLE tx_example (" +
"id serial PRIMARY KEY, " +
"first_name varchar(255), " +
"last_name varchar(255))")
.execute().flatMap(PostgresqlResult::getRowsUpdated).then())
.as(StepVerifier::create)
.verifyComplete();
connection.close().as(StepVerifier::create).verifyComplete();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception {
PostgresqlConnection insertConnection = connectionFactory.create().block();
PostgresqlConnection selectConnection = connectionFactory.create().block();
selectConnection.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel);
selectConnection.beginTransaction().as(StepVerifier::create).verifyComplete();
insertConnection.beginTransaction().as(StepVerifier::create).verifyComplete();
selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE").execute().flatMap(PostgresqlResult::getRowsUpdated).blockLast();
logger.info("WRITER: ✏️ INSERTing record (Walter, White)...");
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNext(1) //
.verifyComplete();
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done");
CountDownLatch data = new CountDownLatch(2);
CountDownLatch completion = new CountDownLatch(1);
StopWatch stopWatch = new StopWatch();
stopWatch.start();
ConnectableFlux<String> connectable = selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE") //
.execute().flatMap(it -> it.map((row, rowMetadata) -> row.get("first_name", String.class))) //
.doOnNext(it -> logger.info("READER: 🥳 first_name=" + it))
.doOnComplete(() -> {
stopWatch.stop();
logger.info("READER: 🤩 SELECT complete, non-blocking wait time=" + stopWatch.getLastTaskTimeMillis() + "ms");
})
.doOnNext(it -> data.countDown())
.doOnComplete(completion::countDown)
.publish();
Disposable disposable = connectable.connect();
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)");
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse();
logger.info("READER: 🧐 ⏳ No results yet");
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)...");
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNext(1) //
.verifyComplete();
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done");
logger.info("READER: 🧐 ⏳ Still waiting (3sec)");
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse();
logger.info("WRITER: ⚙️️ COMMITing...");
insertConnection.commitTransaction().as(StepVerifier::create).verifyComplete();
logger.info("WRITER: ⚙️ ✅ COMMIT done");
disposable.dispose();
insertConnection.close().subscribe();
selectConnection.close().subscribe();
logger.info("READER: 🧐 ⏳ Awaiting data");
assertThat(data.await(3, TimeUnit.SECONDS)).isTrue();
logger.info("READER: 🧐 ⏳ Awaiting completion");
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception {
PostgresqlConnection insert1 = connectionFactory.create().block();
PostgresqlConnection insert2 = connectionFactory.create().block();
try {
insert1.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
insert2.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel);
insert1.beginTransaction().as(StepVerifier::create).verifyComplete();
insert2.beginTransaction().as(StepVerifier::create).verifyComplete();
logger.info("FIRST: ✏️ INSERTing record (Walter, White)...");
insert1.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done");
CountDownLatch completion = new CountDownLatch(1);
StopWatch stopWatch = new StopWatch();
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)...");
insert2.createStatement("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
stopWatch.start();
logger.info("SECND: ⚙️️ COMMITing...");
MonoProcessor<Void> processor = insert2.commitTransaction()
.doOnTerminate(completion::countDown)
.doOnSuccess(v -> {
stopWatch.stop();
logger.info("SECND: ✏️ 🤩 COMMIT 2 complete, non-blocking wait time=" + stopWatch.getLastTaskTimeMillis() + "ms");
}).toProcessor();
logger.info("SECND: 🧐 ⏳ Waiting for INSERT to complete (3sec)");
completion.await(3, TimeUnit.SECONDS);
logger.info("FIRST: ⚙️️ COMMITing...");
insert1.commitTransaction().as(StepVerifier::create).verifyComplete();
logger.info("FIRST: ⚙️ ✅ COMMIT done");
logger.info("SECND: 🧐 ⏳ Awaiting completion");
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue();
processor.dispose();
} finally {
insert1.close().subscribe();
insert2.close().subscribe();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment