Using Java's JDBC API, create a function that executes a query. The one concurrency-related requirement is that it has to be cancellable (timeouts, race conditions, etc.) without resource leaks.
Proposed signature:
// Scala
def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A]
// Kotlin
suspend fun <A> Connection.query(sql: String, block: (PreparedStatement) -> A): A =
Warning
What makes this challenging is that JDBC queries don't listen to Java's interruption protocol (no InterruptedException
in their signature). What we can do is to trigger Statement.cancel()
, but this has to be done concurrently, from another thread/fiber.
Which of the below implementations do you like best?
I'm interested to know your thoughts. Or comparisons to other languages.
Connect with me for more shitposting.
The Scala version uses Cats-Effect with its cancelation model.
// SCALA
import cats.effect.IO
import java.sql.Connection
import java.sql.PreparedStatement
import cats.effect.kernel.Outcome
import java.util.concurrent.CancellationException
def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A] =
// Impl. detail: Scala's Cats-Effect is interruptible, preemptively, not cooperatively
// (within the JVM's limits). For resource handling, we need uncancelable blocks:
IO.uncancelable { markCancelable =>
for {
// (1) Acquire the resource (SQL statement)
statement <- IO.blocking(summon[Connection].prepareStatement(sql))
// (2) Start a concurrent task, because we need it for the interruption procedure
fiber <- IO.interruptible(block(statement)).start
// (3) Waits for the result of the computation
result <- markCancelable {
// Impl. detail: cancelling this will not back-pressure any interruption
fiber.joinWithNever
}.guaranteeCase {
case Outcome.Canceled() =>
// (4) Catches and handles cancellation signal
for {
// (5) Cancel the SQL statement, to forcefully interrupt any query
_ <- IO.blocking(stm.cancel()).voidError
// (6) Cancel the fiber, back-pressuring on its interruption
_ <- fiber.cancel
} yield ()
case _ =>
// (7) Normal termination procedure, closing connection
IO.blocking(stm.close()).voidError
}
} yield result
}
Since my initial publication, I realized that we have an utility in IO#cancelable
that implements the pattern above to the letter. Still, this isn't fair, as you can abstract away such patterns in Kotlin too. But here's how shorter the code can get:
def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A] =
// (1) Acquire the resource (SQL statement)
IO.blocking(summon[Connection].prepareStatement(sql))
// Impl. detail: this is like a try-finally
.bracket { stm =>
// (2) Start the task
IO.interruptible(block(stm))
// Installs an extra finalizer to run concurrently, on cancellation
.cancelable {
// (5) Cancel the SQL statement, to forcefully interrupt any query
IO.blocking(stm.cancel())
}
} { stm =>
// (7) Normal termination procedure, closing connection
IO.blocking(stm.close())
}
The Kotlin version uses the cancelation protocol baked in its coroutines.
// KOTLIN
import kotlinx.coroutines.*
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException
suspend fun <A> Connection.query(sql: String, block: (PreparedStatement) -> A): A =
withContext(Dispatchers.IO) {
// (1) Acquire the resource (SQL statement)
prepareStatement(sql).use { statement =>
// (2) Start a concurrent task, because we need it for the interruption procedure
val task = async {
// Impl. detail: `runInterruptible` is like `IO.interruptible`
runInterruptible {
try {
block(statement)
} catch (e: SQLException) {
// Impl. detail: if we don't filter this, it will be logged downstream
if (e.sqlState == "57014") throw CancellationException("Query cancelled", e)
throw e
}
}
}
// (3) Waits for the result of the computation
try {
// Impl. detail: cancelling this works, and won't back-pressure any interruption
task.await()
} catch (e: CancellationException) {
// (4) Catches and handles cancellation signal
// (5) Cancel the SQL statement, to forcefully interrupt any query
try { statement.cancel() } catch (e2: Throwable) { e.addSuppressed(e2) }
// (6) Cancel the fiber, back-pressuring on its interruption
task.cancelAndJoin()
// Impl. detail: continuing interruption;
// this isn't needed in Scala's Cats-Effect
throw e
}
} // (7) Normal termination procedure, closing connection
}
Apparently, when running on top of Virtual Threads, JDBC queries can be interrupted directly. Say, what?
Pretty awesome, except that if we depend on Virtual Threads for correctness, we need to ensure that virtual threads are getting used. The good news is that, virtual threads being cheaper, this is less of a problem. This implementation is a little bare-bones, might be improved with some utilities (structured concurrency maybe?).
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.function.Function;
public <A> A query(
Connection conn,
String sql,
Function<PreparedStatement, A> block
) throws SQLException, InterruptedException {
try (PreparedStatement stmt = conn.prepareStatement(sql)) {
final var runnable = new StatementRunnable<>(stmt, block);
final var th = Thread.ofVirtual().start(runnable);
try {
th.join();
if (runnable.error instanceof SQLException e) {
throw e;
} else if (runnable.error instanceof InterruptedException e) {
throw e;
} else if (runnable.error != null) {
throw new RuntimeException(runnable.error);
}
return runnable.result;
} catch (InterruptedException e) {
// Interrupts the thread and waits for it to finish.
do
try {
th.interrupt();
th.join();
} catch (InterruptedException e2) {}
while (th.isAlive());
throw e;
}
}
}
class StatementRunnable<A> implements Runnable {
private final PreparedStatement stmt;
private final Function<PreparedStatement, A> block;
public A result = null;
public Exception error = null;
public StatementRunnable(
PreparedStatement stmt,
Function<PreparedStatement, A> block) {
this.stmt = stmt;
this.block = block;
}
@Override
public void run() {
try {
result = block.apply(stmt);
} catch (Exception e) {
error = e;
}
}
}
- For Scala, also see this sample by @lbialy, making use of softwaremill/ox, a light integration with Virtual Threads;
I think one difference in the Java example from the other implementations is that when
join
is interrupted, the code doesn't wait forth
to complete (it just propagates the interrupt usingth.interrupt()
. (Which is exactly the point of using structured concurrency :) )