Skip to content

Instantly share code, notes, and snippets.

@kiambogo
Last active December 21, 2018 16:02
Show Gist options
  • Save kiambogo/f8508c571d20830121b97a1d1ee322fc to your computer and use it in GitHub Desktop.
Save kiambogo/f8508c571d20830121b97a1d1ee322fc to your computer and use it in GitHub Desktop.
fs2-aws KCL throughput
import $ivy.`io.github.dmateusp::fs2-aws:0.26.4`
import fs2._
import cats.implicits._
import cats.effect.{IO, ContextShift, Timer}
import cats.effect.concurrent.Ref
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import fs2.aws.kinesis.kcl._
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
val KINESIS_APP_NAME = ""
val KINESIS_STREAM_NAME = ""
val stream = fs2.Stream.eval(Ref.of[IO, Long](0)).flatMap { counter =>
readFromKinesisStream[IO](KINESIS_APP_NAME, KINESIS_STREAM_NAME)
.evalMap{i => counter.modify(x => x+1 -> x) *> IO(i)}
.groupWithin(Int.MaxValue, 1.second)
.evalMap{chunk => counter.get.flatMap { c => IO {println(s"Throughput: ${chunk.size}/sec. Total received: $c"); chunk } } }
.flatMap {chunk => Stream.chunk(chunk)}
.to(checkpointRecords_[IO]())
}
stream.compile.drain.unsafeRunSync
import $ivy.`io.github.dmateusp::fs2-aws:0.26.5`
import $ivy.`io.circe::circe-core:0.10.0`
import $ivy.`io.circe::circe-fs2:0.10.0`
import $ivy.`io.circe::circe-generic:0.10.0`
import fs2.{Stream}
import cats.implicits._
import cats.effect.{IO, ContextShift, Timer}
import cats.effect.concurrent.Ref
import io.circe.Printer
import io.circe.syntax._
import io.circe.generic.auto._
import scala.concurrent.duration._
import scala.concurrent.ExecutionContext
import fs2.aws.kinesis.publisher._
import fs2.aws.internal.Internal.KinesisProducerClient
import java.nio.ByteBuffer
import scala.util.Random
implicit val ec: ExecutionContext = ExecutionContext.global
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec)
implicit val timer: Timer[IO] = IO.timer(ec)
val KINESIS_STREAM_NAME = ""
def rndString = Random.alphanumeric take 10 mkString
def randomInt: Int = Random.nextInt
case class TestModel(val name: String, val size: Int, val i: Int)
implicit def encoder: TestModel => ByteBuffer =
tm => Printer.noSpaces.prettyByteBuffer(tm.asJson)
val producerClient = new fs2.aws.internal.Internal.KinesisProducerClient[IO] {
override val region = Some("us-east-1")
}
{
val stream = Stream.range(1,10000)
.covary[IO]
.map(i => TestModel(name = rndString, size=randomInt, i = i))
.map(tm => (tm.name, tm))
.balanceThrough(Int.MaxValue, 75)(writeObjectToKinesis[IO, TestModel](KINESIS_STREAM_NAME, 100, producerClient))
}
stream.compile.drain.unsafeRunSync
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment