Skip to content

Instantly share code, notes, and snippets.

@CremboC
Created April 2, 2019 09:52
Show Gist options
  • Save CremboC/a376c3bc2180c53fe717648f9f31c548 to your computer and use it in GitHub Desktop.
Save CremboC/a376c3bc2180c53fe717648f9f31c548 to your computer and use it in GitHub Desktop.
it("should handle rebalance with multiple consumers") {
withKafka { (config, topic) =>
createCustomTopic(topic, partitions = 300)
val maxP1 = 10000
val maxP2 = 20000
val produced1 = (0 until maxP1).map(n => s"key-$n" -> s"value->$n")
val produced2 = (maxP1 until maxP2).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong
val mkConsumer = (id: String, queue: Queue[IO, CommittableMessage[IO, String, String]]) =>
consumerStream[IO]
.using(consumerSettings(config).withClientId(id))
.evalTap(_.subscribeTo(topic))
.evalMap(_.stream.evalMap(queue.enqueue1).compile.drain.start.void)
(for {
q1 <- Stream.eval(Queue.unbounded[IO, CommittableMessage[IO, String, String]])
q2 <- Stream.eval(Queue.unbounded[IO, CommittableMessage[IO, String, String]])
ref <- Stream.eval(Ref.of[IO, Map[String, Int]](Map.empty))
_ <- Stream.eval(IO.sleep(5.seconds))
_ <- mkConsumer("1", q1).concurrently(mkConsumer("2", q2))
_ <- Stream.eval(IO(publishToKafka(topic, produced1)))
_ <- Stream.eval(IO(publishToKafka(topic, produced2)))
_ <- Stream.eval(IO(println("Done")))
_ <- Stream.eval(IO.sleep(5.seconds))
_ <- Stream.eval(q1.dequeue.compile.toList.flatMap(lst => IO(println(lst))))
_ <- Stream.eval(q2.dequeue.compile.toList.flatMap(lst => IO(println(lst))))
keys <- Stream.eval(ref.get)
_ <- Stream.eval(IO(keys.size.toLong shouldEqual producedTotal))
_ <- Stream.eval(IO(keys shouldEqual (0 until maxP2).map(n => s"key-$n" -> (if (n < maxP1) 2 else 1)).toMap))
} yield ()).compile.drain.unsafeRunSync
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment