Skip to content

Instantly share code, notes, and snippets.

@mjedynak
Last active February 12, 2022 11:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mjedynak/4eeedc0135e643bfdec4a9eff37c7623 to your computer and use it in GitHub Desktop.
Save mjedynak/4eeedc0135e643bfdec4a9eff37c7623 to your computer and use it in GitHub Desktop.
@SpringBootTest
@EmbeddedKafka(ports = [9092])
class KafkaExampleApplicationTests(
@Autowired val embeddedKafkaBroker: EmbeddedKafkaBroker,
@Autowired val adminClient: AdminClient,
@Autowired val topicPurger: TopicPurger
) {
private val topicName = "test.topic"
@Test
fun purgesTopic() {
embeddedKafkaBroker.addTopics(NewTopic(topicName, 2, 1))
val producer = DefaultKafkaProducerFactory<Int, String>(producerProps(embeddedKafkaBroker)).createProducer()
val partition1 = 0
val partition2 = 1
producer.send(ProducerRecord(topicName, partition1, 0, "value1"))
producer.send(ProducerRecord(topicName, partition2, 1, "value2"))
val topicPartition1 = TopicPartition(topicName, partition1)
val topicPartition2 = TopicPartition(topicName, partition2)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 0)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1)
topicPurger.purge(topicName)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.earliest(), 1)
checkOffsets(topicPartition1, topicPartition2, OffsetSpec.latest(), 1)
}
private fun checkOffsets(topicPartition1: TopicPartition,
topicPartition2: TopicPartition,
offsetSpec: OffsetSpec,
expectedOffset: Long) {
await().until {
val offsets = adminClient.listOffsets(
mapOf(
topicPartition1 to offsetSpec,
topicPartition2 to offsetSpec,
)).all().get()
offsets[topicPartition1]?.offset() == expectedOffset
&& offsets[topicPartition2]?.offset() == expectedOffset
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment