Skip to content

Instantly share code, notes, and snippets.

@n4to4
Last active April 17, 2018 23:15
Show Gist options
  • Save n4to4/d425793c331455113e7f28e84e90414f to your computer and use it in GitHub Desktop.
Save n4to4/d425793c331455113e7f28e84e90414f to your computer and use it in GitHub Desktop.
import cats.implicits._
import cats.effect.IO
import doobie._
import doobie.implicits._
import fs2.Stream
object Main extends App {
case class RedisClient()
val xa = Transactor.fromDriverManager[IO](
"org.postgresql.Driver", "jdbc:postgresql:world", "postgres", ""
)
type Query[A] = (Int, Int) => ConnectionIO[A]
type Consume[A] = (RedisClient, A) => Unit
def offsets(lim: Int): Stream[IO, Int] =
Stream.iterate(0)(_ + lim)
def withClient[A](f: RedisClient => A): A = ???
def find[A](limit: Int, offset: Int): ConnectionIO[List[Int]] = ???
def store(rc: RedisClient, rows: List[Int]): Unit = ???
def queryWithLimit[A](q: Query[A], limit: Int, sink: Consume[A]): IO[Unit] = {
val s = for {
off <- offsets(limit)
rows <- Stream.eval(find(off, limit).transact(xa))
_ <- Stream.eval(IO(sink()))
} yield rows.size
s.takeThrough(_ == limit).compile.drain
}
def run(rc: RedisClient, limit: Int): IO[Unit] = {
val s = for {
off <- offsets(limit)
rows <- Stream.eval(find(off, limit).transact(xa))
_ <- Stream.eval(IO(store(rc, rows)))
} yield rows.size
s.takeThrough(_ == limit).compile.drain
}
val main: IO[Unit] = withClient { rc => run(rc, 100) }
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment