Last active
December 21, 2018 16:02
-
-
Save kiambogo/f8508c571d20830121b97a1d1ee322fc to your computer and use it in GitHub Desktop.
fs2-aws KCL throughput
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`io.github.dmateusp::fs2-aws:0.26.4` | |
import fs2._ | |
import cats.implicits._ | |
import cats.effect.{IO, ContextShift, Timer} | |
import cats.effect.concurrent.Ref | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
import fs2.aws.kinesis.kcl._ | |
implicit val ec: ExecutionContext = ExecutionContext.global | |
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) | |
implicit val timer: Timer[IO] = IO.timer(ec) | |
val KINESIS_APP_NAME = "" | |
val KINESIS_STREAM_NAME = "" | |
val stream = fs2.Stream.eval(Ref.of[IO, Long](0)).flatMap { counter => | |
readFromKinesisStream[IO](KINESIS_APP_NAME, KINESIS_STREAM_NAME) | |
.evalMap{i => counter.modify(x => x+1 -> x) *> IO(i)} | |
.groupWithin(Int.MaxValue, 1.second) | |
.evalMap{chunk => counter.get.flatMap { c => IO {println(s"Throughput: ${chunk.size}/sec. Total received: $c"); chunk } } } | |
.flatMap {chunk => Stream.chunk(chunk)} | |
.to(checkpointRecords_[IO]()) | |
} | |
stream.compile.drain.unsafeRunSync |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import $ivy.`io.github.dmateusp::fs2-aws:0.26.5` | |
import $ivy.`io.circe::circe-core:0.10.0` | |
import $ivy.`io.circe::circe-fs2:0.10.0` | |
import $ivy.`io.circe::circe-generic:0.10.0` | |
import fs2.{Stream} | |
import cats.implicits._ | |
import cats.effect.{IO, ContextShift, Timer} | |
import cats.effect.concurrent.Ref | |
import io.circe.Printer | |
import io.circe.syntax._ | |
import io.circe.generic.auto._ | |
import scala.concurrent.duration._ | |
import scala.concurrent.ExecutionContext | |
import fs2.aws.kinesis.publisher._ | |
import fs2.aws.internal.Internal.KinesisProducerClient | |
import java.nio.ByteBuffer | |
import scala.util.Random | |
implicit val ec: ExecutionContext = ExecutionContext.global | |
implicit val ioContextShift: ContextShift[IO] = IO.contextShift(ec) | |
implicit val timer: Timer[IO] = IO.timer(ec) | |
val KINESIS_STREAM_NAME = "" | |
def rndString = Random.alphanumeric take 10 mkString | |
def randomInt: Int = Random.nextInt | |
case class TestModel(val name: String, val size: Int, val i: Int) | |
implicit def encoder: TestModel => ByteBuffer = | |
tm => Printer.noSpaces.prettyByteBuffer(tm.asJson) | |
val producerClient = new fs2.aws.internal.Internal.KinesisProducerClient[IO] { | |
override val region = Some("us-east-1") | |
} | |
{ | |
val stream = Stream.range(1,10000) | |
.covary[IO] | |
.map(i => TestModel(name = rndString, size=randomInt, i = i)) | |
.map(tm => (tm.name, tm)) | |
.balanceThrough(Int.MaxValue, 75)(writeObjectToKinesis[IO, TestModel](KINESIS_STREAM_NAME, 100, producerClient)) | |
} | |
stream.compile.drain.unsafeRunSync |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment