Skip to content

Instantly share code, notes, and snippets.

@ctoestreich
Created May 23, 2020 19:24
Show Gist options
  • Save ctoestreich/3ff9702a2b5c1a3d93502cb4b695bcf6 to your computer and use it in GitHub Desktop.
Save ctoestreich/3ff9702a2b5c1a3d93502cb4b695bcf6 to your computer and use it in GitHub Desktop.
class TestEventConsumerSpec extends AbstractEmbeddedKafkaSpec {
def "Test message gets routed"() {
given:
TestConsumer consumer = embeddedServer.getApplicationContext().getBean(TestConsumer)
TestKafkaClient client = embeddedServer.getApplicationContext().getBean(TestKafkaClient)
when:
SomeAvroData event = ...//create test data here
then:
event
and:
client.publishEvent(UUID.randomUUID().toString(), event)
then: 'Make sure the message gets to the down stream topic eventually'
conditions.eventually {
consumer.messages.size() > 0
consumer.messages.find { it == uuid }
true
}
}
@KafkaListener(
groupId = "TestConsumer",
properties = [
@Property(name = ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, value = 'org.apache.kafka.common.serialization.StringDeserializer'),
@Property(name = ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, value = "io.confluent.kafka.serializers.KafkaAvroDeserializer"),
@Property(name = KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, value = "true")
],
offsetReset = OffsetReset.EARLIEST,
offsetStrategy = OffsetStrategy.AUTO
)
static class TestConsumer {
List<String> messages = []
@Topic('foo.routed.topic')
void receive(ConsumerRecord<String, SpecificRecord> consumerRecord) {
messages.add(consumerRecord.key())
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment