Skip to content

Instantly share code, notes, and snippets.

@kubukoz
Last active January 14, 2019 16:58
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kubukoz/55c72ae91f74eb2b29961730616f1718 to your computer and use it in GitHub Desktop.
Save kubukoz/55c72ae91f74eb2b29961730616f1718 to your computer and use it in GitHub Desktop.
SlickRunIO.scala
trait SlickRunIOImplicits { self: API =>
implicit class SlickRunIO(db: Database) {
/** Return an effect that runs a DB action. */
def runIO[R](a: DBIOAction[R, NoStream, Nothing])(implicit cs: ContextShift[IO]): IO[R] =
IO.fromFuture(IO(db.run(a))).guarantee(cs.shift)
/** Return an effect that runs a DB action with timing metrics collected */
def runIO[R](a: DBIOAction[R, NoStream, Nothing], timerName: String)(
implicit metric: MetricsF
): IO[R] =
metric.timer(s"query.$timerName")(runIO(a))
def streamF[F[_]: ConcurrentEffect, A](
action: DBIOAction[_, Streaming[A], _],
fetchSize: Int
): Stream[F, A] =
db.stream(
action
.withStatementParameters(
rsType = ResultSetType.ForwardOnly,
rsConcurrency = ResultSetConcurrency.ReadOnly,
fetchSize = fetchSize
)
.transactionally)
.toStream[F]
/** Lift a DBIO-returning function to an [[fs2.Sink]] */
def sink[A, R](f: A => DBIOAction[R, NoStream, Nothing]): Sink[IO, A] =
Sink(a => db.runIO(f(a)).void)
/** Lift a DBIO-returning function to an [[fs2.Pipe]] */
def pipe[A, R](f: A => DBIOAction[R, NoStream, Nothing]): Pipe[IO, A, R] =
_.evalMap(a => db.runIO(f(a)))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment