Skip to content

Instantly share code, notes, and snippets.

@alexandru
Last active December 4, 2023 05:35
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save alexandru/60e977fcf75ed88c4d0b824b312dba44 to your computer and use it in GitHub Desktop.
Save alexandru/60e977fcf75ed88c4d0b824b312dba44 to your computer and use it in GitHub Desktop.

Using Java's JDBC API, create a function that executes a query. The one concurrency-related requirement is that it has to be cancellable (timeouts, race conditions, etc.) without resource leaks.

Proposed signature:

// Scala
def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A]
// Kotlin
suspend fun <A> Connection.query(sql: String, block: (PreparedStatement) -> A): A =

Warning

What makes this challenging is that JDBC queries don't listen to Java's interruption protocol (no InterruptedException in their signature). What we can do is to trigger Statement.cancel(), but this has to be done concurrently, from another thread/fiber.


Which of the below implementations do you like best?

I'm interested to know your thoughts. Or comparisons to other languages.

Connect with me for more shitposting.

Scala

The Scala version uses Cats-Effect with its cancelation model.

// SCALA

import cats.effect.IO
import java.sql.Connection
import java.sql.PreparedStatement
import cats.effect.kernel.Outcome
import java.util.concurrent.CancellationException

def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A] =
  // Impl. detail: Scala's Cats-Effect is interruptible, preemptively, not cooperatively
  // (within the JVM's limits). For resource handling, we need uncancelable blocks:
  IO.uncancelable { markCancelable =>
    for {
      // (1) Acquire the resource (SQL statement)
      statement <- IO.blocking(summon[Connection].prepareStatement(sql))
      // (2) Start a concurrent task, because we need it for the interruption procedure
      fiber <- IO.interruptible(block(statement)).start
      // (3) Waits for the result of the computation
      result <- markCancelable {
        // Impl. detail: cancelling this will not back-pressure any interruption
        fiber.joinWithNever
      }.guaranteeCase {
        case Outcome.Canceled() =>
          // (4) Catches and handles cancellation signal
          for {
            // (5) Cancel the SQL statement, to forcefully interrupt any query
            _ <- IO.blocking(stm.cancel()).voidError
            // (6) Cancel the fiber, back-pressuring on its interruption
            _ <- fiber.cancel
          } yield ()
        case _ =>
          // (7) Normal termination procedure, closing connection
          IO.blocking(stm.close()).voidError
      }
    } yield result
  }

Since my initial publication, I realized that we have an utility in IO#cancelable that implements the pattern above to the letter. Still, this isn't fair, as you can abstract away such patterns in Kotlin too. But here's how shorter the code can get:

def query[A](sql: String)(block: PreparedStatement => A)(using Connection): IO[A] =
  // (1) Acquire the resource (SQL statement)
  IO.blocking(summon[Connection].prepareStatement(sql))
    // Impl. detail: this is like a try-finally
    .bracket { stm =>
      // (2) Start the task
      IO.interruptible(block(stm))
        // Installs an extra finalizer to run concurrently, on cancellation
        .cancelable {
          // (5) Cancel the SQL statement, to forcefully interrupt any query
          IO.blocking(stm.cancel())
        }
    } { stm =>
      // (7) Normal termination procedure, closing connection
      IO.blocking(stm.close())
    }

Kotlin

The Kotlin version uses the cancelation protocol baked in its coroutines.

// KOTLIN

import kotlinx.coroutines.*
import java.sql.Connection
import java.sql.PreparedStatement
import java.sql.SQLException

suspend fun <A> Connection.query(sql: String, block: (PreparedStatement) -> A): A =
  withContext(Dispatchers.IO) {
    // (1) Acquire the resource (SQL statement)
    prepareStatement(sql).use { statement =>
      // (2) Start a concurrent task, because we need it for the interruption procedure
      val task = async {
        // Impl. detail: `runInterruptible` is like `IO.interruptible`
        runInterruptible {
          try {
            block(statement)
          } catch (e: SQLException) {
            // Impl. detail: if we don't filter this, it will be logged downstream
            if (e.sqlState == "57014") throw CancellationException("Query cancelled", e)
            throw e
          }
        }
      }
      // (3) Waits for the result of the computation
      try {
        // Impl. detail: cancelling this works, and won't back-pressure any interruption
        task.await()
      } catch (e: CancellationException) {
        // (4) Catches and handles cancellation signal
        // (5) Cancel the SQL statement, to forcefully interrupt any query
        try { statement.cancel() } catch (e2: Throwable) { e.addSuppressed(e2) }
        // (6) Cancel the fiber, back-pressuring on its interruption
        task.cancelAndJoin()
        // Impl. detail: continuing interruption; 
        // this isn't needed in Scala's Cats-Effect
        throw e
      }
    } // (7) Normal termination procedure, closing connection
  }

Java 21 Virtual Threads (Project Loom)

Apparently, when running on top of Virtual Threads, JDBC queries can be interrupted directly. Say, what?

Pretty awesome, except that if we depend on Virtual Threads for correctness, we need to ensure that virtual threads are getting used. The good news is that, virtual threads being cheaper, this is less of a problem. This implementation is a little bare-bones, might be improved with some utilities (structured concurrency maybe?).

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.function.Function;

public <A> A query(
    Connection conn,
    String sql,
    Function<PreparedStatement, A> block
  ) throws SQLException, InterruptedException {

  try (PreparedStatement stmt = conn.prepareStatement(sql)) {
    final var runnable = new StatementRunnable<>(stmt, block);
    final var th = Thread.ofVirtual().start(runnable);
    try {
      th.join();
      if (runnable.error instanceof SQLException e) {
        throw e;
      } else if (runnable.error instanceof InterruptedException e) {
        throw e;
      } else if (runnable.error != null) {
        throw new RuntimeException(runnable.error);
      }
      return runnable.result;
    } catch (InterruptedException e) {
        // Interrupts the thread and waits for it to finish.
        do
          try {
            th.interrupt();
            th.join();
          } catch (InterruptedException e2) {}
        while (th.isAlive());
        throw e;
    }
  }
}

class StatementRunnable<A> implements Runnable {
  private final PreparedStatement stmt;
  private final Function<PreparedStatement, A> block;
  public A result = null;
  public Exception error = null;

  public StatementRunnable(
      PreparedStatement stmt,
      Function<PreparedStatement, A> block) {
    this.stmt = stmt;
    this.block = block;
  }

  @Override
  public void run() {
    try {
      result = block.apply(stmt);
    } catch (Exception e) {
      error = e;
    }
  }
}

Other

@adamw
Copy link

adamw commented Oct 7, 2023

I think one difference in the Java example from the other implementations is that when join is interrupted, the code doesn't wait for th to complete (it just propagates the interrupt using th.interrupt(). (Which is exactly the point of using structured concurrency :) )

@alexandru
Copy link
Author

@adamw yikes, I forgot about it 🤣 Of course, having those “structured concurrency” extensions would be sweet.

I think that doing something like this would be fine, as the interruption signal for the main thread is reset after join throws. What do you think?

do
  try {
    th.interrupt();
    th.join();
  } catch (InterruptedException e2) {}
while (th.isAlive());

@adamw
Copy link

adamw commented Oct 7, 2023

@alexandru yes, I think this should do the trick :)

@debasishg
Copy link

Will this be an equivalent ZIO implementation ?

import java.sql.PreparedStatement
import java.sql.Connection
import zio.{ Task, ZIO }

def query[A](sql: String)(block: PreparedStatement => A)(implicit conn: Connection): Task[A] =
  ZIO
    // (1) prepare the resource (statement) in a blocking way
    .attemptBlocking(conn.prepareStatement(sql))
    // (2) acquire/release cycle with auto-close
    .acquireReleaseWithAuto { stm =>
      // (3) use the resource to execute the query in a blocking way
      // (4) pass the cancel handler to handle interruption
      ZIO.attemptBlockingCancelable(block(stm))(ZIO.succeed(stm.cancel()))
    }

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment