Skip to content

Instantly share code, notes, and snippets.

@jferris
Created March 1, 2019 22:04
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jferris/8a93b07f32bfd70792891dbbfd990a25 to your computer and use it in GitHub Desktop.
Save jferris/8a93b07f32bfd70792891dbbfd990a25 to your computer and use it in GitHub Desktop.
Buffered commit to Kafka
import cats.effect.{ContextShift, IO}
import cats.effect.concurrent.Ref
import cats.implicits._
import fs2.{Chunk, Pipe, Stream}
class BufferedCommitter(
config: Config,
consumer: Consumer,
pipe: Pipe[IO, Record, Unit])(implicit cs: ContextShift[IO]) {
def run: IO[Unit] =
for {
state <- Ref.of[IO, State](newState)
_ <- consumer.chunks
.evalMap(receivedRecords(state, _))
.compile
.drain
} yield ()
private def receivedRecords(ref: Ref[IO, State],
records: Chunk[Record]): IO[Unit] =
for {
_ <- ref.update(_.receive(records))
state <- ref.get
_ <- flush(ref).whenA(state.isReady)
} yield ()
private def flush(ref: Ref[IO, State]): IO[Unit] =
for {
state <- ref.get
_ <- Stream.chunk(state.toChunk).through(pipe).compile.drain
_ <- consumer.commit
_ <- ref.set(newState)
} yield ()
case class State(queue: Chunk.Queue[Record], caughtUp: Boolean) {
def receive(chunk: Chunk[Record]): State =
State(queue :+ chunk, chunk.isEmpty)
def isReady: Boolean =
isQueueFull || hasLastRecords
def toChunk: Chunk[Record] =
queue.toChunk
private def isQueueFull: Boolean =
queue.size >= config.batchSize
private def isQueueNonEmpty: Boolean =
queue.size > 0
private def hasLastRecords: Boolean =
caughtUp && isQueueNonEmpty
}
def newState: State =
State(queue = Chunk.Queue.empty[Record], caughtUp = false)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment