Skip to content

Instantly share code, notes, and snippets.

@itzg
Created August 24, 2018 14:09
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 7 You must be signed in to fork a gist
  • Save itzg/e3ebfd7aec220bf0522e23a65b1296c8 to your computer and use it in GitHub Desktop.
Save itzg/e3ebfd7aec220bf0522e23a65b1296c8 to your computer and use it in GitHub Desktop.
Example of configuring Kafka Streams within a Spring Boot application with an example of SSL configuration
# With SSL configured
spring:
application:
name: my-stream-app
kafka:
bootstrap-servers:
- server1:9092
- server2:9092
ssl:
truststore-location: file:ca-truststore-client.jks
truststore-password:
keystore-location: file:client-keystore.p12
keystore-password:
key-password:
properties:
security.protocol: SSL
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.GlobalKTable;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Materialized;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.support.serializer.JsonDeserializer;
import org.springframework.kafka.support.serializer.JsonSerde;
import java.util.Properties;
@Configuration
public class KafkaStreamsConfig {
@Bean
public KafkaStreams kafkaStreams(KafkaProperties kafkaProperties,
@Value("${spring.application.name}") String appName) {
final Properties props = new Properties();
// inject SSL related properties
props.putAll(kafkaProperties.getSsl().buildProperties());
props.putAll(kafkaProperties.getProperties());
// stream config centric ones
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.APPLICATION_ID_CONFIG, appName);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, JsonSerde.class);
props.put(StreamsConfig.STATE_DIR_CONFIG, "data");
// others
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, JsonNode.class);
final KafkaStreams kafkaStreams = new KafkaStreams(kafkaStreamTopology(), props);
kafkaStreams.start();
return kafkaStreams;
}
@Bean
public Topology kafkaStreamTopology() {
final StreamsBuilder streamsBuilder = new StreamsBuilder();
// streamsBuilder.stream("some_topic") etc ...
return streamsBuilder.build();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment