Created
August 24, 2018 14:09
-
-
Save itzg/e3ebfd7aec220bf0522e23a65b1296c8 to your computer and use it in GitHub Desktop.
Example of configuring Kafka Streams within a Spring Boot application with an example of SSL configuration
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# With SSL configured | |
spring: | |
application: | |
name: my-stream-app | |
kafka: | |
bootstrap-servers: | |
- server1:9092 | |
- server2:9092 | |
ssl: | |
truststore-location: file:ca-truststore-client.jks | |
truststore-password: | |
keystore-location: file:client-keystore.p12 | |
keystore-password: | |
key-password: | |
properties: | |
security.protocol: SSL |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.fasterxml.jackson.databind.JsonNode; | |
import com.fasterxml.jackson.databind.node.ObjectNode; | |
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.GlobalKTable; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.Materialized; | |
import org.springframework.beans.factory.annotation.Autowired; | |
import org.springframework.beans.factory.annotation.Value; | |
import org.springframework.boot.autoconfigure.kafka.KafkaProperties; | |
import org.springframework.context.annotation.Bean; | |
import org.springframework.context.annotation.Configuration; | |
import org.springframework.kafka.support.serializer.JsonDeserializer; | |
import org.springframework.kafka.support.serializer.JsonSerde; | |
import java.util.Properties; | |
@Configuration | |
public class KafkaStreamsConfig { | |
@Bean | |
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties, | |
@Value("${spring.application.name}") String appName) { | |
final Properties props = new Properties(); | |
// inject SSL related properties | |
props.putAll(kafkaProperties.getSsl().buildProperties()); | |
props.putAll(kafkaProperties.getProperties()); | |
// stream config centric ones | |
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); | |
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName); | |
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); | |
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class); | |
props.put(StreamsConfig.STATE_DIR_CONFIG, "data"); | |
// others | |
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class); | |
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props); | |
kafkaStreams.start(); | |
return kafkaStreams; | |
} | |
@Bean | |
public Topology kafkaStreamTopology() { | |
final StreamsBuilder streamsBuilder = new StreamsBuilder(); | |
// streamsBuilder.stream("some_topic") etc ... | |
return streamsBuilder.build(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment