Skip to content

Instantly share code, notes, and snippets.

@lbialy
Created October 6, 2023 19:56
Show Gist options
  • Save lbialy/320b28dba6575cef3af8173e390abe54 to your computer and use it in GitHub Desktop.
Save lbialy/320b28dba6575cef3af8173e390abe54 to your computer and use it in GitHub Desktop.
Scala Ox handling JDBC interruption

A structured concurrency response to https://twitter.com/alexelcu/status/1710305127729438994

I am using Ox and JDK 21 in this gist.

First run: docker run --name test-postgres -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres

Then: scala-cli run ox-jdbc.scala

Alternatively just run (if you're brave enough to run gists from intarwebz): scala-cli run https://gist.github.com/lbialy/320b28dba6575cef3af8173e390abe54

//> using scala "3.3.1"
//> using lib "com.softwaremill.ox::core:0.0.14"
//> using lib "org.postgresql:postgresql:42.6.0"
//> using lib "com.zaxxer:hikaricp:5.0.1"
//> using jvm "21"
import ox.*
import java.sql.*
import com.zaxxer.hikari.*
import java.util.concurrent.CountDownLatch
// everything you need, really
def query[A](sql: String)(block: PreparedStatement => A)(using Connection): A =
useSupervised(summon[Connection].prepareStatement(sql))(block)
// for reference if was required, similar to Kotlin's coroutines. Probably because it uses coroutines, duh.
def queryCancelStatementButNotNeeded[A](sql: String)(block: PreparedStatement => A)(using Connection): A =
supervised {
val stmt = useInScope(summon[Connection].prepareStatement(sql))(_.close())
val task = forkCancellable { block(stmt) }
try task.join()
catch
case ie: InterruptedException =>
println("Query interrupted!")
try stmt.cancel()
catch case e2: Throwable => ie.addSuppressed(e2)
task.cancel() match
case Left(ex) =>
ex.printStackTrace()
ie.addSuppressed(ex) // we get org.postgresql.util.PSQLException here
case Right(_) => // we're no longer interested in return value
throw ie
}
// run `docker run --name test-postgres -p 5432:5432 -e POSTGRES_PASSWORD=postgres -d postgres` first
// then `scala-cli run ox-jdbc.scala`
@main def main(): Unit =
val config = HikariConfig()
config.setJdbcUrl("jdbc:postgresql://localhost:5432/postgres")
config.setUsername("postgres")
config.setPassword("postgres")
supervised {
val pool = useInScope(HikariDataSource(config))(_.close())
given Connection = useInScope(pool.getConnection())(_.close())
val l1 = CountDownLatch(1)
val f1 = forkCancellable {
query("SELECT pg_sleep(5)") { stmt =>
println("Starting query...")
l1.countDown()
stmt.executeQuery()
println("Done executing query!")
}
}
println("Awaiting on query start...")
l1.await()
println("Trying to cancel...")
f1.cancelNow() // do not wait for cancellation to complete
try
f1.join()
println("Finished successfully!")
catch
case ie: InterruptedException =>
println(s"Finished interrupted, got ${ie.getSuppressed().length} suppressed exceptions.")
}
@adamw
Copy link

adamw commented Oct 7, 2023

One minor improvement: once you do stmt.cancel() in the extended version, it should be enough to let the scope close - any running forks will be interrupted (cancelled) anyway. Unless there is something very wrong with exception handling, then please report an issue :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment