Skip to content

Instantly share code, notes, and snippets.

@Daenyth
Created June 5, 2019 14:22
Show Gist options
  • Save Daenyth/afc6312bd881011ec6bfc0d0469690f5 to your computer and use it in GitHub Desktop.
Save Daenyth/afc6312bd881011ec6bfc0d0469690f5 to your computer and use it in GitHub Desktop.
Doobie postgres table copy
import cats.implicits._
import cats._
import cats.effect.implicits._
import cats.effect._
import doobie._
import doobie.implicits._
import doobie.postgres._
import java.io.{InputStream, OutputStream, PipedInputStream, PipedOutputStream}
import fs2.Stream
object TableCopy {
def copyTableCsv[F[_]: Concurrent: ContextShift](
xa: Transactor[F],
chunkSize: Int,
blockingExecutionContext: ExecutionContext
)(
table: String
): Stream[F, Byte] =
download[F](copyWithHeader(table))(
xa,
chunkSize,
blockingExecutionContext
)
private def download[F[_]: Concurrent: ContextShift](
f: OutputStream => ConnectionIO[Long])(
xa: Transactor[F],
chunkSize: Int,
blockingExecutionContext: ExecutionContext) =
Stream.resource(mkOutput[F]).flatMap {
case (os, is) =>
val write = f(os).transact(xa) *> Sync[F].delay(os.close())
val read = fs2.io.readInputStream(is.pure[F],
chunkSize,
blockingExecutionContext,
closeAfterUse = false)
read concurrently Stream.eval(write)
}
private def mkOutput[F[_]: Sync]: Resource[F, (OutputStream, InputStream)] =
Resource.make(Sync[F].delay {
val os = new PipedOutputStream()
val is = new PipedInputStream(os)
(os: OutputStream, is: InputStream)
})(ois =>
Sync[F].delay {
ois._2.close()
ois._1.close()
})
private def copyWithHeader(table: String)(os: OutputStream): ConnectionIO[Long] = {
val query = s"""
| COPY $table
| TO STDOUT (
| ENCODING 'utf-8',
| FORCE_QUOTE *,
| FORMAT CSV,
| HEADER
| )
""".stripMargin
PHC.pgGetCopyAPI(PFCM.copyOut(query, os))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment