Skip to content

Instantly share code, notes, and snippets.

@seratch
Last active December 25, 2018 12:01
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save seratch/823dfc0f718a9b0deaa88b0020a455a6 to your computer and use it in GitHub Desktop.
Save seratch/823dfc0f718a9b0deaa88b0020a455a6 to your computer and use it in GitHub Desktop.
r2dbc sample in Scala
lazy val root = (project in file("."))
.settings(
scalaVersion := "2.12.8",
scalacOptions ++= Seq("-deprecation", "-unchecked", "-feature", "-Xfuture"),
libraryDependencies ++= Seq(
"org.scala-lang.modules" %% "scala-java8-compat" % "0.9.0" % Compile,
"io.monix" %% "monix-reactive" % "2.3.3" % Compile,
"io.r2dbc" % "r2dbc-spi" % "1.0.0.M6" % Compile,
"io.r2dbc" % "r2dbc-client" % "1.0.0.M6" % Compile,
"io.r2dbc" % "r2dbc-h2" % "1.0.0.M6" % Test,
"io.r2dbc" % "r2dbc-postgresql" % "1.0.0.M6" % Test,
"org.scalatest" %% "scalatest" % "3.0.5" % Test
),
resolvers += "spring-milestone" at "https://repo.spring.io/milestone",
scalafmtOnCompile := true
)
import org.scalatest._
class H2Spec extends FlatSpec with Matchers {
case class Sample(id: Long, name: Option[String])
"H2 connections" should "work" in {
import io.r2dbc.h2.{ H2ConnectionConfiguration, H2ConnectionFactory }
import io.r2dbc.client.R2dbc
val config = H2ConnectionConfiguration.builder().url("mem:sample").build()
val r2dbc = new R2dbc(new H2ConnectionFactory(config))
import reactor.core.publisher.Flux
val tableCreation: Flux[Integer] = r2dbc.inTransaction { handle =>
handle.execute("create table sample (id bigint primary key, name varchar(100))")
}
tableCreation.blockFirst()
val result: Flux[Sample] = {
val insertions: Flux[Integer] = r2dbc.inTransaction { handle =>
// Identifier '$id' is not a valid identifier. Should be of the pattern '.*\$([\d]+).*'
val updates = handle.createUpdate("insert into sample (id, name) values ($1, $2)")
updates.bind("$1", 1).bind("$2", "Alice").add()
updates.bind("$1", 2).bind("$2", "Bob").add()
updates.bind("$1", 3).bindNull("$2", classOf[String]).add()
updates.execute()
/*
val batch = handle.createBatch()
batch.add("insert into sample (id, name) values (1, 'Alice')")
batch.add("insert into sample (id, name) values (2, 'Bob')")
batch.add("insert into sample (id, name) values (3, null)")
batch.mapResult(_.getRowsUpdated)
*/
}
val fetchingAll: Flux[Sample] = r2dbc.inTransaction { handle =>
handle.select("select id, name from sample order by id desc").mapRow { row =>
Sample(
id = Long.unbox(row.get("id", classOf[java.lang.Long])),
name = Option(row.get("name", classOf[String]))
)
}
}
insertions.thenMany(fetchingAll)
}
// simple example to run with monix-reactive
import monix.reactive.Observable
val observable: Observable[Sample] =
Observable.fromReactivePublisher(result)
import monix.execution.Scheduler.Implicits.global
observable.toListL.runSyncMaybe match {
case Right(samples) =>
samples.size should equal(3)
samples.map(_.id) should equal(Seq(3, 2, 1))
case Left(cancelableFuture) =>
fail(s"Failed to complete Task#runSyncMaybe with ${cancelableFuture}")
}
// simple example to convert the Publisher to a Future object
/*
import java.util.concurrent.CompletableFuture
val jFuture: CompletableFuture[java.util.List[Sample]] = result.collectList().toFuture
import scala.collection.JavaConverters._
import scala.compat.java8.FutureConverters._
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import scala.concurrent.ExecutionContext.Implicits.global
val f: Future[Seq[Sample]] = jFuture.toScala.map(_.asScala)
val allRows: Seq[Sample] = Await.result(f, Duration.Inf)
allRows.size should equal(3)
allRows.map(_.id) should equal(Seq(3, 2, 1))
*/
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment