Skip to content

Instantly share code, notes, and snippets.

@artembilan
Created April 9, 2015 09:24
Show Gist options
  • Save artembilan/49e36fb6eb77796cd9b8 to your computer and use it in GitHub Desktop.
Save artembilan/49e36fb6eb77796cd9b8 to your computer and use it in GitHub Desktop.
@Bean
public IntegrationFlow sendToKafkaFlow(String serverAddress) {
return f -> f.<String>split(p -> FastList.newWithNValues(100, () -> p), null)
.handle(kafkaMessageHandler(serverAddress));
}
private KafkaProducerMessageHandlerSpec kafkaMessageHandler(String serverAddress) {
return Kafka.outboundChannelAdapter(props -> props.put("queue.buffering.max.ms", "15000"))
.messageKey(m -> m.getHeaders().get(IntegrationMessageHeaderAccessor.SEQUENCE_NUMBER))
.addProducer(TEST_TOPIC, serverAddress, this::producer);
}
private void producer(KafkaProducerMessageHandlerSpec.ProducerMetadataSpec metadata) {
metadata.async(true)
.batchNumMessages(10)
.valueClassType(String.class)
.valueEncoder(new StringEncoder(null))
.keyEncoder(new IntEncoder(null));
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment