Skip to content

Instantly share code, notes, and snippets.

@CremboC
Created April 2, 2019 08:59
Show Gist options
  • Save CremboC/2e95d6ff8fbcda48c7d43e207dea276e to your computer and use it in GitHub Desktop.
Save CremboC/2e95d6ff8fbcda48c7d43e207dea276e to your computer and use it in GitHub Desktop.
it("should handle rebalance") {
withKafka { (config, topic) =>
createCustomTopic(topic, partitions = 300)
val maxP1 = 10000
val maxP2 = 20000
val produced1 = (0 until maxP1).map(n => s"key-$n" -> s"value->$n")
val produced2 = (maxP1 until maxP2).map(n => s"key-$n" -> s"value->$n")
val producedTotal = produced1.size.toLong + produced2.size.toLong
val consumer1 = (queue: Queue[IO, CommittableMessage[IO, String, String]]) =>
consumerStream[IO]
.using(consumerSettings(config).withClientId("1"))
.evalTap(_.subscribeTo(topic))
.evalMap(_.stream.evalMap(queue.enqueue1).compile.drain.start.void)
(for {
queue <- Stream.eval(Queue.unbounded[IO, CommittableMessage[IO, String, String]])
ref <- Stream.eval(Ref.of[IO, Map[String, Int]](Map.empty))
_ <- consumer1(queue)
_ <- Stream.eval(IO.sleep(5.seconds))
_ <- Stream.eval(IO(publishToKafka(topic, produced1)))
_ <- consumer1(queue)
_ <- Stream.eval(IO.sleep(5.seconds))
_ <- Stream.eval(IO(publishToKafka(topic, produced2)))
_ <- Stream.eval {
queue.dequeue
.evalMap { message =>
ref.modify { counts =>
val key = message.record.key
val newCounts = counts.updated(key, counts.getOrElse(key, 0) + 1)
(newCounts, newCounts)
}
}
.takeWhile(_.size < maxP2)
.compile
.drain
}
keys <- Stream.eval(ref.get)
_ <- Stream.eval(IO(keys.size.toLong shouldEqual producedTotal))
_ <- Stream.eval(IO(keys shouldEqual (0 until maxP2).map(n => s"key-$n" -> (if (n < maxP1) 2 else 1)).toMap))
} yield ()).compile.drain.unsafeRunSync
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment