Skip to content

Instantly share code, notes, and snippets.

@surysharma
Created December 4, 2020 13:34
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save surysharma/7ffa66b2129b1d589c84cd30cd603359 to your computer and use it in GitHub Desktop.
Save surysharma/7ffa66b2129b1d589c84cd30cd603359 to your computer and use it in GitHub Desktop.
logging:
pattern:
console: "[Kafka Pattern] %clr(%d{HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:%5p}) %clr(---){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:%wEx}"
spring:
main:
log-startup-info: false
kafka:
listener:
missing-topics-fatal: false
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
bootstrap-servers: localhost:9092
input-lowercase-topic: t.lower.case
output-uppercase-topic: t.upper.case
package com.thebigscale.kstreamsample.config;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.streams.StreamsConfig;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.annotation.KafkaStreamsDefaultConfiguration;
import org.springframework.kafka.config.KafkaStreamsConfiguration;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
@Configuration
@EnableKafkaStreams
public class KafkaStreamConfiguration {
public static final String APP_ID = "upper-case-demo";
private final KafkaProperties kafkaProperties;
private final String inputTopic;
private final String outputTopic;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration getStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
return new KafkaStreamsConfiguration(props);
}
@Bean
public NewTopic createInputTopic() { return new NewTopic(inputTopic,Optional.of(1), Optional.empty()); }
@Bean
public NewTopic createOutputTopic() { return new NewTopic(outputTopic,Optional.of(1), Optional.empty()); }
public KafkaStreamConfiguration(KafkaProperties kafkaProperties, Environment env) {
this.kafkaProperties = kafkaProperties;
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
}
}
@SpringBootApplication
public class KStreamSampleApplication implements CommandLineRunner {
public static void main(String[] args) {
SpringApplication.run(KStreamSampleApplication.class, args);
}
@Override
public void run(String... args) {
System.out.println("Started the KStream spring boot CLI...");
}
}
package com.thebigscale.kstreamsample;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.core.env.Environment;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.test.context.EmbeddedKafka;
import org.springframework.kafka.test.utils.KafkaTestUtils;
import java.time.Duration;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@SpringBootTest
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" })
class KStreamSampleApplicationTests {
private final KafkaProperties kafkaProperties;
private final String inputTopic;
private final String outputTopic;
@Autowired
public KStreamSampleApplicationTests(KafkaProperties kafkaProperties, Environment env) {
this.kafkaProperties = kafkaProperties;
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
}
@Test
@DisplayName("should test uppercaseStream topology")
void shouldTestUppercaseStreamTopology() {
//Given
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(
String.join(",", kafkaProperties.getBootstrapServers())));
//Create a kafka producer
Producer<String, String> producer = new DefaultKafkaProducerFactory<>(producerProps, new StringSerializer(), new StringSerializer()).createProducer();
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(String.join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
//Create a Consumer client
Consumer<String, String> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, new StringDeserializer(), new StringDeserializer()).createConsumer();
consumer.subscribe(Collections.singleton(outputTopic));
//When
producer.send(new ProducerRecord<>(inputTopic, "test"));
producer.flush();
//Then
assertThat(producer).isNotNull();
//And
ConsumerRecords<String, String> rec = consumer.poll(Duration.ofSeconds(3));
Iterable<ConsumerRecord<String, String>> records = rec.records(outputTopic);
Iterator<ConsumerRecord<String, String>> iterator = records.iterator();
if (!iterator.hasNext()) Assertions.fail();
ConsumerRecord<String, String> next = iterator.next();
assertThat(next.value()).isEqualTo("TEST");
}
}
package com.thebigscale.kstreamsample.processors;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.env.Environment;
@Configuration
public class UppercaseTopologyProcessor {
private final String inputTopic;
private final String outputTopic;
UppercaseTopologyProcessor(Environment env) {
this.inputTopic = env.getProperty("spring.kafka.input-lowercase-topic");
this.outputTopic = env.getProperty("spring.kafka.output-uppercase-topic");
}
@Bean
public KStream<String, String> kStreamPromoToUppercase(StreamsBuilder builder) {
KStream<String, String> sourceStream = builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
sourceStream.print(Printed.<String, String>toSysOut().withLabel("Original KStream in getTopology..."));
KStream<String, String> upperCaseStream = sourceStream.mapValues((ValueMapper<String, String>) String::toUpperCase);
upperCaseStream.print(Printed.<String, String>toSysOut().withLabel("Uppercase KStream..."));
upperCaseStream.to(outputTopic, Produced.with(Serdes.String(), Serdes.String()));
Topology topology = builder.build();
System.out.println(topology.describe());
return upperCaseStream;
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment