Skip to content

Instantly share code, notes, and snippets.

@mp911de
Created February 2, 2019 17:44
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mp911de/00f63febd484c6bc5bc5d4c1b57b2f5b to your computer and use it in GitHub Desktop.
Save mp911de/00f63febd484c6bc5bc5d4c1b57b2f5b to your computer and use it in GitHub Desktop.
Playing with Postgres and locking
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import io.r2dbc.spi.IsolationLevel;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.postgresql.ds.PGSimpleDataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.HashMap;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Playground test class for transaction interaction.
*
* @author Mark Paluch
*/
class JdbcTransactionPlayground {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
private final Map<IsolationLevel, Integer> isolationLevels = new HashMap<IsolationLevel, Integer>() {
{
put(IsolationLevel.READ_UNCOMMITTED, Connection.TRANSACTION_READ_UNCOMMITTED);
put(IsolationLevel.READ_COMMITTED, Connection.TRANSACTION_READ_COMMITTED);
put(IsolationLevel.REPEATABLE_READ, Connection.TRANSACTION_REPEATABLE_READ);
put(IsolationLevel.SERIALIZABLE, Connection.TRANSACTION_SERIALIZABLE);
}
};
static PGSimpleDataSource dataSource;
@BeforeAll
static void beforeAll() {
dataSource = new PGSimpleDataSource();
dataSource.setServerName("localhost");
dataSource.setPortNumber(5432);
dataSource.setUser("postgres");
dataSource.setPassword("postgres");
}
@BeforeEach
void setup() throws Exception {
Connection connection = dataSource.getConnection();
Statement statement = connection.createStatement();
statement.execute("DROP TABLE IF EXISTS tx_example");
statement.execute("CREATE TABLE tx_example (" + "id serial PRIMARY KEY, " + "first_name varchar(255), "
+ "last_name varchar(255))");
statement.close();
connection.close();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception {
Connection insertConnection = dataSource.getConnection();
Connection selectConnection = dataSource.getConnection();
selectConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue());
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel);
selectConnection.setAutoCommit(false);
insertConnection.setAutoCommit(false);
logger.info("WRITER: ✏️ INSERTing record (Walter, White)...");
Statement insertStatement = insertConnection.createStatement();
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')");
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done");
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)");
Statement selectStatement = selectConnection.createStatement();
/*
* This should block.
*/
ResultSet resultSet = selectStatement.executeQuery("SELECT * FROM tx_example FOR UPDATE");
while (resultSet.next()) {
logger.info("READER: 🥳 first_name=" + resultSet.getString("first_name"));
}
resultSet.close();
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)...");
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')");
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done");
logger.info("WRITER: ⚙️️ COMMITing...");
insertConnection.commit();
logger.info("WRITER: ⚙️ ✅ COMMIT done");
insertConnection.close();
selectConnection.close();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception {
Connection insertConnection = dataSource.getConnection();
Connection updateConnection = dataSource.getConnection();
insertConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue());
updateConnection.setTransactionIsolation(isolationLevels.get(isolationLevel).intValue());
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel);
updateConnection.setAutoCommit(false);
insertConnection.setAutoCommit(false);
logger.info("FIRST: ✏️ INSERTing record (Walter, White)...");
Statement insertStatement = insertConnection.createStatement();
insertStatement.execute("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')");
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done");
/*
* This should block.
*/
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)...");
Statement updateStatement = updateConnection.createStatement();
int updatedRows = updateStatement
.executeUpdate("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'");
logger.info("SECND: ⚙️️ COMMITing...");
updateConnection.commit();
logger.info("FIRST: ⚙️️ COMMITing...");
insertConnection.commit();
logger.info("FIRST: ⚙️ ✅ COMMIT done");
insertConnection.close();
updateConnection.close();
assertThat(updatedRows).isEqualTo(1);
}
}
<?xml version="1.0" encoding="UTF-8"?>
<configuration>
<appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%date{HH:mm:ss.SSS} %-18thread %-55logger %msg%n</pattern>
</encoder>
</appender>
<root level="INFO">
<appender-ref ref="STDOUT"/>
</root>
</configuration>
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>biz.paluch</groupId>
<artifactId>postgres-locking</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>
<repositories>
<repository>
<id>spring-libs-snapshot</id>
<url>https://repo.spring.io/libs-snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
</repositories>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.junit</groupId>
<artifactId>junit-bom</artifactId>
<version>5.4.0-RC1</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>
<dependencies>
<dependency>
<groupId>io.r2dbc</groupId>
<artifactId>r2dbc-postgresql</artifactId>
<version>1.0.0.BUILD-SNAPSHOT</version>
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.5.jre7</version>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-engine</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.projectreactor</groupId>
<artifactId>reactor-test</artifactId>
<version>3.2.5.RELEASE</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.assertj</groupId>
<artifactId>assertj-core</artifactId>
<version>3.11.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>ch.qos.logback</groupId>
<artifactId>logback-classic</artifactId>
<version>1.2.3</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-surefire-plugin</artifactId>
<version>2.22.1</version>
<configuration>
<runOrder>random</runOrder>
<includes>
<include>**/*Tests.java</include>
</includes>
</configuration>
</plugin>
</plugins>
</build>
</project>
/*
* Copyright 2019 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import static io.r2dbc.spi.ConnectionFactoryOptions.*;
import static org.assertj.core.api.Assertions.*;
import io.r2dbc.postgresql.PostgresqlConnection;
import io.r2dbc.postgresql.PostgresqlConnectionFactory;
import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider;
import io.r2dbc.postgresql.PostgresqlResult;
import io.r2dbc.spi.ConnectionFactories;
import io.r2dbc.spi.ConnectionFactoryOptions;
import io.r2dbc.spi.IsolationLevel;
import reactor.core.Disposable;
import reactor.core.publisher.ConnectableFlux;
import reactor.core.publisher.Mono;
import reactor.core.publisher.MonoProcessor;
import reactor.test.StepVerifier;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Playground test class for transaction interaction.
*
* @author Mark Paluch
*/
public class R2dbcTransactionPlayground {
private final Logger logger = LoggerFactory.getLogger(this.getClass());
protected static PostgresqlConnectionFactory connectionFactory;
@BeforeAll
static void beforeAll() {
ConnectionFactoryOptions options = builder()
.option(DRIVER, PostgresqlConnectionFactoryProvider.POSTGRESQL_DRIVER)
.option(HOST, "localhost")
.option(PORT, 5432)
.option(USER, "postgres")
.option(PASSWORD, "postgres")
.build();
connectionFactory = (PostgresqlConnectionFactory) ConnectionFactories.get(options);
}
@BeforeEach
void setup() {
PostgresqlConnection connection = connectionFactory.create().block();
connection.createStatement("DROP TABLE tx_example").execute()
.flatMap(PostgresqlResult::getRowsUpdated)
.onErrorResume(e -> Mono.empty())
.thenMany(connection.createStatement("CREATE TABLE tx_example (" +
"id serial PRIMARY KEY, " +
"first_name varchar(255), " +
"last_name varchar(255))")
.execute().flatMap(PostgresqlResult::getRowsUpdated).then())
.as(StepVerifier::create)
.verifyComplete();
connection.close().as(StepVerifier::create).verifyComplete();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void selectDuringInsert(IsolationLevel isolationLevel) throws Exception {
PostgresqlConnection insertConnection = connectionFactory.create().block();
PostgresqlConnection selectConnection = connectionFactory.create().block();
selectConnection.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
logger.info("READER: 🧐 Setting isolation level to " + isolationLevel);
selectConnection.beginTransaction().as(StepVerifier::create).verifyComplete();
insertConnection.beginTransaction().as(StepVerifier::create).verifyComplete();
selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE").execute().flatMap(PostgresqlResult::getRowsUpdated).blockLast();
logger.info("WRITER: ✏️ INSERTing record (Walter, White)...");
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNext(1) //
.verifyComplete();
logger.info("WRITER: ✏️ ✅️ INSERT record (Walter, White) done");
CountDownLatch data = new CountDownLatch(2);
CountDownLatch completion = new CountDownLatch(1);
logger.info("READER: 🧐 SELECTing on a different connection (should wait 3sec)");
/*
* This should block.
*/
ConnectableFlux<String> connectable = selectConnection.createStatement("SELECT * FROM tx_example FOR UPDATE") //
.execute().flatMap(it -> it.map((row, rowMetadata) -> row.get("first_name", String.class))) //
.doOnNext(it -> logger.info("READER: 🥳 first_name=" + it))
.doOnComplete(() -> {
logger.info("READER: 🤩 SELECT complete");
})
.doOnNext(it -> data.countDown())
.doOnComplete(completion::countDown)
.publish();
Disposable disposable = connectable.connect();
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse();
logger.info("READER: 🧐 ⏳ No results yet");
logger.info("WRITER: ✏️ INSERTing record (Jesse, Pinkman)...");
insertConnection.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Jesse', 'Jesse')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNext(1) //
.verifyComplete();
logger.info("WRITER: ✏️ ✅ INSERT record (Jesse, Pinkman) done");
logger.info("READER: 🧐 ⏳ Still waiting (3sec)");
assertThat(data.await(3, TimeUnit.SECONDS)).isFalse();
logger.info("WRITER: ⚙️️ COMMITing...");
insertConnection.commitTransaction().as(StepVerifier::create).verifyComplete();
logger.info("WRITER: ⚙️ ✅ COMMIT done");
disposable.dispose();
insertConnection.close().subscribe();
selectConnection.close().subscribe();
logger.info("READER: 🧐 ⏳ Awaiting data");
assertThat(data.await(3, TimeUnit.SECONDS)).isTrue();
logger.info("READER: 🧐 ⏳ Awaiting completion");
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue();
}
@ParameterizedTest
@EnumSource(IsolationLevel.class)
void updateDuringInsert(IsolationLevel isolationLevel) throws Exception {
PostgresqlConnection insert1 = connectionFactory.create().block();
PostgresqlConnection insert2 = connectionFactory.create().block();
try {
insert1.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
insert2.setTransactionIsolationLevel(isolationLevel).as(StepVerifier::create).verifyComplete();
logger.info("ALL : 🧐 Setting isolation level to " + isolationLevel);
insert1.beginTransaction().as(StepVerifier::create).verifyComplete();
insert2.beginTransaction().as(StepVerifier::create).verifyComplete();
logger.info("FIRST: ✏️ INSERTing record (Walter, White)...");
insert1.createStatement("INSERT INTO tx_example (first_name, last_name) VALUES ('Walter', 'White')") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated) //
.as(StepVerifier::create) //
.expectNextCount(1) //
.verifyComplete();
logger.info("FIRST: ✏️ ✅️ INSERT record (Walter, White) done");
CountDownLatch completion = new CountDownLatch(1);
logger.info("SECND: ✏️ UPDATEing record (Jesse, Pinkman)...");
/*
* This should block.
*/
insert2.createStatement("UPDATE tx_example SET first_name = 'Jesse', last_name = 'Pinkman'") //
.returnGeneratedValues() //
.execute().flatMap(PostgresqlResult::getRowsUpdated)
.as(StepVerifier::create)
.expectNextCount(1)
.verifyComplete();
logger.info("SECND: ✏️✅️ UPDATEing record (Jesse, Pinkman) done");
logger.info("SECND: ⚙️️ COMMITing...");
MonoProcessor<Void> processor = insert2.commitTransaction()
.doOnTerminate(completion::countDown)
.doOnSuccess(v -> {
logger.info("SECND: ✏️ 🤩 COMMIT 2 complete");
}).toProcessor();
logger.info("SECND: 🧐 ⏳ Waiting for INSERT to complete (3sec)");
completion.await(3, TimeUnit.SECONDS);
logger.info("FIRST: ⚙️️ COMMITing...");
insert1.commitTransaction().as(StepVerifier::create).verifyComplete();
logger.info("FIRST: ⚙️ ✅ COMMIT done");
logger.info("SECND: 🧐 ⏳ Awaiting completion");
assertThat(completion.await(3, TimeUnit.SECONDS)).isTrue();
processor.dispose();
} finally {
insert1.close().subscribe();
insert2.close().subscribe();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment