Created
February 2, 2019 17:44
-
-
Save mp911de/00f63febd484c6bc5bc5d4c1b57b2f5b to your computer and use it in GitHub Desktop.
Playing with Postgres and locking
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import io.r2dbc.spi.IsolationLevel; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.params.ParameterizedTest; | |
import org.junit.jupiter.params.provider.EnumSource; | |
import org.postgresql.ds.PGSimpleDataSource; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
import java.sql.Connection; | |
import java.sql.ResultSet; | |
import java.sql.Statement; | |
import java.util.HashMap; | |
import java.util.Map; | |
import static org.assertj.core.api.Assertions.assertThat; | |
/** | |
* Playground test class for transaction interaction. | |
* | |
* @author Mark Paluch | |
*/ | |
class JdbcTransactionPlayground { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
private final Map<IsolationLevel, Integer> isolationLevels = new HashMap<IsolationLevel, Integer>() { | |
{ | |
put(IsolationLevel.READ_UNCOMMITTED, Connection.TRANSACTION_READ_UNCOMMITTED); | |
put(IsolationLevel.READ_COMMITTED, Connection.TRANSACTION_READ_COMMITTED); | |
put(IsolationLevel.REPEATABLE_READ, Connection.TRANSACTION_REPEATABLE_READ); | |
put(IsolationLevel.SERIALIZABLE, Connection.TRANSACTION_SERIALIZABLE); | |
} | |
}; | |
static PGSimpleDataSource dataSource; | |
@BeforeAll | |
static void beforeAll() { | |
dataSource = new PGSimpleDataSource(); | |
dataSource.setServerName("localhost"); | |
dataSource.setPortNumber(5432); | |
dataSource.setUser("postgres"); | |
dataSource.setPassword("postgres"); | |
} | |
@BeforeEach | |
void setup() throws Exception { | |
Connection connection = dataSource.getConnection(); | |
Statement statement = connection.createStatement(); | |
statement.execute("DROP TABLE IF EXISTS tx_example"); | |
statement.execute("CREATE TABLE tx_example (" + "id serial PRIMARY KEY, " + "first_name varchar(255), " | |
+ "last_name varchar(255))"); | |
statement.close(); | |
connection.close(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
Connection insertConnection = dataSource.getConnection(); | |
Connection selectConnection = dataSource.getConnection(); | |
selectConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue()); | |
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel); | |
selectConnection.setAutoCommit(false); | |
insertConnection.setAutoCommit(false); | |
logger.info("WRITER: ✏️ INSERTing record (Walter, White)..."); | |
Statement insertStatement = insertConnection.createStatement(); | |
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')"); | |
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done"); | |
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)"); | |
Statement selectStatement = selectConnection.createStatement(); | |
/* | |
* This should block. | |
*/ | |
ResultSet resultSet = selectStatement.executeQuery("SELECT * FROM tx_example FOR UPDATE"); | |
while (resultSet.next()) { | |
logger.info("READER: 🥳 first_name=" + resultSet.getString("first_name")); | |
} | |
resultSet.close(); | |
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)..."); | |
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')"); | |
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done"); | |
logger.info("WRITER: ⚙️️ COMMITing..."); | |
insertConnection.commit(); | |
logger.info("WRITER: ⚙️ ✅ COMMIT done"); | |
insertConnection.close(); | |
selectConnection.close(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
Connection insertConnection = dataSource.getConnection(); | |
Connection updateConnection = dataSource.getConnection(); | |
insertConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue()); | |
updateConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue()); | |
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel); | |
updateConnection.setAutoCommit(false); | |
insertConnection.setAutoCommit(false); | |
logger.info("FIRST: ✏️ INSERTing record (Walter, White)..."); | |
Statement insertStatement = insertConnection.createStatement(); | |
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')"); | |
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done"); | |
/* | |
* This should block. | |
*/ | |
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)..."); | |
Statement updateStatement = updateConnection.createStatement(); | |
int updatedRows = updateStatement | |
.executeUpdate("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'"); | |
logger.info("SECND: ⚙️️ COMMITing..."); | |
updateConnection.commit(); | |
logger.info("FIRST: ⚙️️ COMMITing..."); | |
insertConnection.commit(); | |
logger.info("FIRST: ⚙️ ✅ COMMIT done"); | |
insertConnection.close(); | |
updateConnection.close(); | |
assertThat(updatedRows).isEqualTo(1); | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<configuration> | |
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> | |
<encoder> | |
<pattern>%date{HH:mm:ss.SSS} %-18thread %-55logger %msg%n</pattern> | |
</encoder> | |
</appender> | |
<root level="INFO"> | |
<appender-ref ref="STDOUT"/> | |
</root> | |
</configuration> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
<?xml version="1.0" encoding="UTF-8"?> | |
<project xmlns="http://maven.apache.org/POM/4.0.0" | |
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" | |
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> | |
<modelVersion>4.0.0</modelVersion> | |
<groupId>biz.paluch</groupId> | |
<artifactId>postgres-locking</artifactId> | |
<version>1.0-SNAPSHOT</version> | |
<properties> | |
<maven.compiler.source>1.8</maven.compiler.source> | |
<maven.compiler.target>1.8</maven.compiler.target> | |
</properties> | |
<repositories> | |
<repository> | |
<id>spring-libs-snapshot</id> | |
<url>https://repo.spring.io/libs-snapshot</url> | |
<snapshots> | |
<enabled>true</enabled> | |
</snapshots> | |
</repository> | |
</repositories> | |
<dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>org.junit</groupId> | |
<artifactId>junit-bom</artifactId> | |
<version>5.4.0-RC1</version> | |
<type>pom</type> | |
<scope>import</scope> | |
</dependency> | |
</dependencies> | |
</dependencyManagement> | |
<dependencies> | |
<dependency> | |
<groupId>io.r2dbc</groupId> | |
<artifactId>r2dbc-postgresql</artifactId> | |
<version>1.0.0.BUILD-SNAPSHOT</version> | |
</dependency> | |
<dependency> | |
<groupId>org.postgresql</groupId> | |
<artifactId>postgresql</artifactId> | |
<version>42.2.5.jre7</version> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-api</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-engine</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.junit.jupiter</groupId> | |
<artifactId>junit-jupiter-params</artifactId> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>io.projectreactor</groupId> | |
<artifactId>reactor-test</artifactId> | |
<version>3.2.5.RELEASE</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>org.assertj</groupId> | |
<artifactId>assertj-core</artifactId> | |
<version>3.11.1</version> | |
<scope>test</scope> | |
</dependency> | |
<dependency> | |
<groupId>ch.qos.logback</groupId> | |
<artifactId>logback-classic</artifactId> | |
<version>1.2.3</version> | |
<scope>test</scope> | |
</dependency> | |
</dependencies> | |
<build> | |
<plugins> | |
<plugin> | |
<groupId>org.apache.maven.plugins</groupId> | |
<artifactId>maven-surefire-plugin</artifactId> | |
<version>2.22.1</version> | |
<configuration> | |
<runOrder>random</runOrder> | |
<includes> | |
<include>**/*Tests.java</include> | |
</includes> | |
</configuration> | |
</plugin> | |
</plugins> | |
</build> | |
</project> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Copyright 2019 the original author or authors. | |
* | |
* Licensed under the Apache License, Version 2.0 (the "License"); | |
* you may not use this file except in compliance with the License. | |
* You may obtain a copy of the License at | |
* | |
* http://www.apache.org/licenses/LICENSE-2.0 | |
* | |
* Unless required by applicable law or agreed to in writing, software | |
* distributed under the License is distributed on an "AS IS" BASIS, | |
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | |
* See the License for the specific language governing permissions and | |
* limitations under the License. | |
*/ | |
import static io.r2dbc.spi.ConnectionFactoryOptions.*; | |
import static org.assertj.core.api.Assertions.*; | |
import io.r2dbc.postgresql.PostgresqlConnection; | |
import io.r2dbc.postgresql.PostgresqlConnectionFactory; | |
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider; | |
import io.r2dbc.postgresql.PostgresqlResult; | |
import io.r2dbc.spi.ConnectionFactories; | |
import io.r2dbc.spi.ConnectionFactoryOptions; | |
import io.r2dbc.spi.IsolationLevel; | |
import reactor.core.Disposable; | |
import reactor.core.publisher.ConnectableFlux; | |
import reactor.core.publisher.Mono; | |
import reactor.core.publisher.MonoProcessor; | |
import reactor.test.StepVerifier; | |
import java.util.concurrent.CountDownLatch; | |
import java.util.concurrent.TimeUnit; | |
import org.junit.jupiter.api.BeforeAll; | |
import org.junit.jupiter.api.BeforeEach; | |
import org.junit.jupiter.params.ParameterizedTest; | |
import org.junit.jupiter.params.provider.EnumSource; | |
import org.slf4j.Logger; | |
import org.slf4j.LoggerFactory; | |
/** | |
* Playground test class for transaction interaction. | |
* | |
* @author Mark Paluch | |
*/ | |
public class R2dbcTransactionPlayground { | |
private final Logger logger = LoggerFactory.getLogger(this.getClass()); | |
protected static PostgresqlConnectionFactory connectionFactory; | |
@BeforeAll | |
static void beforeAll() { | |
ConnectionFactoryOptions options = builder() | |
.option(DRIVER, PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER) | |
.option(HOST, "localhost") | |
.option(PORT, 5432) | |
.option(USER, "postgres") | |
.option(PASSWORD, "postgres") | |
.build(); | |
connectionFactory = (PostgresqlConnectionFactory) ConnectionFactories.get(options); | |
} | |
@BeforeEach | |
void setup() { | |
PostgresqlConnection connection = connectionFactory.create().block(); | |
connection.createStatement("DROP TABLE tx_example").execute() | |
.flatMap(PostgresqlResult::getRowsUpdated) | |
.onErrorResume(e -> Mono.empty()) | |
.thenMany(connection.createStatement("CREATE TABLE tx_example (" + | |
"id serial PRIMARY KEY, " + | |
"first_name varchar(255), " + | |
"last_name varchar(255))") | |
.execute().flatMap(PostgresqlResult::getRowsUpdated).then()) | |
.as(StepVerifier::create) | |
.verifyComplete(); | |
connection.close().as(StepVerifier::create).verifyComplete(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
PostgresqlConnection insertConnection = connectionFactory.create().block(); | |
PostgresqlConnection selectConnection = connectionFactory.create().block(); | |
selectConnection.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel); | |
selectConnection.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
insertConnection.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE").execute().flatMap(PostgresqlResult::getRowsUpdated).blockLast(); | |
logger.info("WRITER: ✏️ INSERTing record (Walter, White)..."); | |
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNext(1) // | |
.verifyComplete(); | |
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done"); | |
CountDownLatch data = new CountDownLatch(2); | |
CountDownLatch completion = new CountDownLatch(1); | |
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)"); | |
/* | |
* This should block. | |
*/ | |
ConnectableFlux<String> connectable = selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE") // | |
.execute().flatMap(it -> it.map((row, rowMetadata) -> row.get("first_name", String.class))) // | |
.doOnNext(it -> logger.info("READER: 🥳 first_name=" + it)) | |
.doOnComplete(() -> { | |
logger.info("READER: 🤩 SELECT complete"); | |
}) | |
.doOnNext(it -> data.countDown()) | |
.doOnComplete(completion::countDown) | |
.publish(); | |
Disposable disposable = connectable.connect(); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse(); | |
logger.info("READER: 🧐 ⏳ No results yet"); | |
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)..."); | |
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNext(1) // | |
.verifyComplete(); | |
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done"); | |
logger.info("READER: 🧐 ⏳ Still waiting (3sec)"); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse(); | |
logger.info("WRITER: ⚙️️ COMMITing..."); | |
insertConnection.commitTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("WRITER: ⚙️ ✅ COMMIT done"); | |
disposable.dispose(); | |
insertConnection.close().subscribe(); | |
selectConnection.close().subscribe(); | |
logger.info("READER: 🧐 ⏳ Awaiting data"); | |
assertThat(data.await(3, TimeUnit.SECONDS)).isTrue(); | |
logger.info("READER: 🧐 ⏳ Awaiting completion"); | |
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue(); | |
} | |
@ParameterizedTest | |
@EnumSource(IsolationLevel.class) | |
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception { | |
PostgresqlConnection insert1 = connectionFactory.create().block(); | |
PostgresqlConnection insert2 = connectionFactory.create().block(); | |
try { | |
insert1.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
insert2.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete(); | |
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel); | |
insert1.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
insert2.beginTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("FIRST: ✏️ INSERTing record (Walter, White)..."); | |
insert1.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) // | |
.as(StepVerifier::create) // | |
.expectNextCount(1) // | |
.verifyComplete(); | |
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done"); | |
CountDownLatch completion = new CountDownLatch(1); | |
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)..."); | |
/* | |
* This should block. | |
*/ | |
insert2.createStatement("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'") // | |
.returnGeneratedValues() // | |
.execute().flatMap(PostgresqlResult::getRowsUpdated) | |
.as(StepVerifier::create) | |
.expectNextCount(1) | |
.verifyComplete(); | |
logger.info("SECND: ✏️✅️ UPDATEing record (Jesse, Pinkman) done"); | |
logger.info("SECND: ⚙️️ COMMITing..."); | |
MonoProcessor<Void> processor = insert2.commitTransaction() | |
.doOnTerminate(completion::countDown) | |
.doOnSuccess(v -> { | |
logger.info("SECND: ✏️ 🤩 COMMIT 2 complete"); | |
}).toProcessor(); | |
logger.info("SECND: 🧐 ⏳ Waiting for INSERT to complete (3sec)"); | |
completion.await(3, TimeUnit.SECONDS); | |
logger.info("FIRST: ⚙️️ COMMITing..."); | |
insert1.commitTransaction().as(StepVerifier::create).verifyComplete(); | |
logger.info("FIRST: ⚙️ ✅ COMMIT done"); | |
logger.info("SECND: 🧐 ⏳ Awaiting completion"); | |
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue(); | |
processor.dispose(); | |
} finally { | |
insert1.close().subscribe(); | |
insert2.close().subscribe(); | |
} | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment