Skip to content

Instantly share code, notes, and snippets.

@rkrzewski
Last active October 2, 2020 21:02
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rkrzewski/a85aaae0dc16b6b43923a08d7f62fdb0 to your computer and use it in GitHub Desktop.
Save rkrzewski/a85aaae0dc16b6b43923a08d7f62fdb0 to your computer and use it in GitHub Desktop.
import cats._
import cats.data._
import cats.effect._
import cats.syntax.all._
import doobie._
import doobie.syntax.stream._
import fs2._
object Streaming {
/**
* Requirements:
* . incoming incoming from an earlier processing stage is a `Stream[F[_], Either[NonEmptyChain[E], A]]`
* . data should be inserted to db in batches, batches should not linger in memory longer that necessary
* . before writing the data begins, a metadata record needs to be inserted, returning a generated key
* . subsequent data inserts may refer to aforementioned generated key (say FK to `file_imports.id`)
* . whole stream should be processed in a single transaction (assuming sufficient transaction timeout)
* . as long as `Right[A]` elements are coming, they should be inserted, but when `Left[NonEmptyChain[E]]` elements appear, whatever has been already inserted should be rolled back
* . the incoming stream should be fully drained even in presence of errors
* . errors should be accumulated, up to a defined number. after exceeding the limit, further errors are discarded.
* . at the end `Either[NonEmptyChain[E], Int]` should be emitted, where `Int` represents total number of inserted rows
*
* @tparam F[_] polymorphic effect type
* @tparam M metadata type
* @tparam A data type
* @tparam E error type
* @param insertMeta DB operation that generates metadata
* @param insertRow prepare an `Update` for batch insert for given metadata value
* @param batchSize number of data rows to be inerted in a single batch
* @param maxErrors maximum number of errors to be accumuluated
* @param xa DB transactor
* @return a `Pipe` that converts incoming `Either[NonEmptyChain[E], Int]` into a stram that emits single
* `Either[NonEmptyChain[E], Int]` value, where `Int` represents total number of inserted rows
*/
def insert[F[_]: Effect, M, A, E](
insertMeta: ConnectionIO[M],
insertRow: M => Update[A],
batchSize: Int,
maxErrors: Int,
xa: Transactor[F]
): Pipe[F, Either[NonEmptyChain[E], A], Either[NonEmptyChain[E], Int]] = {
def concatRetainN(
a: NonEmptyChain[E],
b: NonEmptyChain[E],
n: Int
): Option[NonEmptyChain[E]] =
NonEmptyChain.fromSeq(a.concat(b).toList.take(n))
val toConnectionIO = λ[F ~> ConnectionIO](fa =>
Async[ConnectionIO].async(cb =>
Effect[F].runAsync(fa)(res => IO(cb(res))).unsafeRunSync()
)
)
in =>
Stream
.eval(insertMeta)
.flatMap { meta =>
val update = insertRow(meta)
def pull(
meta: M,
stream: Stream[ConnectionIO, Either[NonEmptyChain[E], A]],
errors: Option[NonEmptyChain[E]],
written: Int
): Pull[ConnectionIO, Either[NonEmptyChain[E], Int], Unit] =
stream.pull.uncons.flatMap {
case Some((chunk, tail)) =>
(errors, chunk.toVector.parSequence) match {
case (None, Right(rows)) =>
Pull.eval(update.updateMany(rows)).flatMap(w => pull(meta, tail, None, written + w))
case (None, Left(errors)) =>
Pull.eval(FC.rollback) >> pull(meta, tail, Some(errors), 0)
case (Some(prevErrors), Right(_)) =>
pull(meta, tail, Some(prevErrors), 0)
case (Some(prevErrors), Left(newErrors)) =>
pull(meta, tail, concatRetainN(prevErrors, newErrors, maxErrors), 0)
}
case None =>
Pull.output1(
errors match {
case None => Right(written)
case Some(errors) => Left(errors)
}
) >> Pull.done
}
pull(meta, in.buffer(batchSize).translate(toConnectionIO), None, 0).stream
}
.transact(xa)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment