Skip to content

Instantly share code, notes, and snippets.

@surysharma
Last active January 10, 2021 09:49
Show Gist options
  • Save surysharma/3e2c312b709ae39cba0f2535058c81bb to your computer and use it in GitHub Desktop.
Save surysharma/3e2c312b709ae39cba0f2535058c81bb to your computer and use it in GitHub Desktop.
Cannot get schema from schema registry!
spring:
kafka:
bootstrap-servers: localhost:9092
schema.registry.url: mock://test.com
spring:
main:
log-startup-info: false
application:
name: purchase-processor
kafka:
listener:
missing-topics-fatal: false
bootstrap-servers: 129.128.9.300:9092
schema.registry.url: http://localhost:8081
input-topic: t.input.purchase
output-topic: t.masked.purchase
@SpringBootApplication
@EnableKafkaStreams
@Log4j2
public class Main {
public static final String APP_ID = "upper-case-demo";
private final KafkaProperties kafkaProperties;
private Environment env;
@Bean(name = KafkaStreamsDefaultConfiguration.DEFAULT_STREAMS_CONFIG_BEAN_NAME)
public KafkaStreamsConfiguration getStreamsConfig() {
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID);
props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 4);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers());
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.class.getName());
props.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, env.getProperty("spring.kafka.schema.registry.url"));
return new KafkaStreamsConfiguration(props);
}
public Main(KafkaProperties kafkaProperties, Environment env) {
this.kafkaProperties = kafkaProperties;
this.env = env;
}
public static void main(String[] args) { run(Main.class, args); }
}
{
"type" : "record",
"name" : "Purchase",
"namespace" : "com.thebigscale.workflow.avro",
"fields" : [ {
"name" : "creditCardNumber",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "customerId",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "department",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "employeeId",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "firstName",
"type" : "string"
}, {
"name" : "itemPurchased",
"type" : "string"
}, {
"name" : "lastName",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "price",
"type" : "double"
}, {
"name" : "quantity",
"type" : [ "null", "int" ],
"default" : null
}, {
"name" : "storeId",
"type" : [ "null", "string" ],
"default" : null
}, {
"name" : "zipCode",
"type" : [ "null", "string" ],
"default" : null
} ]
}
@Configuration
public class PurchaseProcessor {
@Value("${spring.kafka.input-topic}")
private String inputTopic;
@Value("${spring.kafka.output-topic}")
private String outputTopic;
@Bean
public KStream<String, Purchase> process(StreamsBuilder builder) {
KStream<String, Purchase> purchaseStream = builder.stream(inputTopic);
purchaseStream.print(Printed.<String, Purchase>toSysOut().withLabel("Original KStream in getTopology..."));
KStream<String, Purchase> maskedPurchaseStream = purchaseStream.mapValues(PurchaseUtil::maskCreditCard);
maskedPurchaseStream.to(outputTopic);
Topology topology = builder.build();
System.out.println(topology.describe());
return purchaseStream;
}
}
@SpringBootTest
@DirtiesContext
@EmbeddedKafka(partitions = 1, brokerProperties = { "listeners=PLAINTEXT://localhost:9092", "port=9092" },
topics = {
"${spring.kafka.input-topic}",
"${spring.kafka.output-topic}"})
@ActiveProfiles("test")
class PurchaseProcessorIntegrationTest {
@Value("${spring.kafka.input-topic}")
private String inputTopic;
@Value("${spring.kafka.output-topic}")
private String outputTopic;
private final KafkaProperties kafkaProperties;
@Autowired
PurchaseProcessorIntegrationTest(KafkaProperties kafkaProperties) {
this.kafkaProperties = kafkaProperties;
}
@Test
void testStream() throws IOException, RestClientException {
Purchase purchase = Purchase.newBuilder().setFirstName("someUser").setPrice(2.0d).setItemPurchased("roll").setCreditCardNumber("123").build();
SchemaRegistryClient client = new MockSchemaRegistryClient();
client.register(inputTopic + "-value", purchase.getSchema());
client.register(outputTopic + "-value", purchase.getSchema());
SpecificAvroSerde<Purchase> purchaseSerde = new SpecificAvroSerde<>(client);
Serdes.StringSerde keySerde = new Serdes.StringSerde();
purchaseSerde.configure(Map.of(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "mock://" + this.getClass().getSimpleName()), false);
//Given a Producer
Map<String, Object> producerProps = new HashMap<>(KafkaTestUtils.producerProps(join(",", kafkaProperties.getBootstrapServers())));
producerProps.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, true);
Producer<String, Purchase> producer = new DefaultKafkaProducerFactory<>(producerProps, keySerde.serializer(), purchaseSerde.serializer()).createProducer();
//And a Consumer
Map<String, Object> consumerProps = KafkaTestUtils.consumerProps(join(",", kafkaProperties.getBootstrapServers()), "testGroup", "true");
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(AbstractKafkaSchemaSerDeConfig.AUTO_REGISTER_SCHEMAS, true);
Consumer<String, Purchase> consumer = new DefaultKafkaConsumerFactory<>(consumerProps, keySerde.deserializer(), purchaseSerde.deserializer()).createConsumer();
consumer.subscribe(Collections.singleton(outputTopic));
//When
producer.send(new ProducerRecord<>(inputTopic, purchase);
producer.flush();
//Then
assertThat(producer).isNotNull();
//And
ConsumerRecords<String, Purchase> rec = consumer.poll(Duration.ofSeconds(3));
Iterable<ConsumerRecord<String, Purchase>> records = rec.records(outputTopic);
Iterator<ConsumerRecord<String, Purchase>> iterator = records.iterator();
if (!iterator.hasNext()) Assertions.fail();
ConsumerRecord<String, Purchase> next = iterator.next();
assertThat(next.value().getFirstName()).isEqualTo("someUser");
assertThat(next.value().getCreditCardNumber()).isEqualTo("***");
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment