Skip to content

Instantly share code, notes, and snippets.

@gAmUssA
Created October 19, 2020 16:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gAmUssA/69d4d6da2cf06b5015b3514f14a8e210 to your computer and use it in GitHub Desktop.
Save gAmUssA/69d4d6da2cf06b5015b3514f14a8e210 to your computer and use it in GitHub Desktop.
Kafka Streams interactive questy + Spring webflux
package io.confluent.developer.iqrest;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StoreQueryParameters;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.event.ContextRefreshedEvent;
import org.springframework.context.event.EventListener;
import org.springframework.kafka.annotation.EnableKafkaStreams;
import org.springframework.kafka.config.StreamsBuilderFactoryBean;
import org.springframework.stereotype.Component;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
import java.util.Objects;
import reactor.core.publisher.Mono;
@SpringBootApplication
@EnableKafkaStreams
public class IqRestApplication {
public static void main(String[] args) {
SpringApplication.run(IqRestApplication.class, args);
}
}
@Component
class MyTopology {
@Autowired
public void createOrdersMaterializedView(final StreamsBuilder builder) {
final Serde<String> string = Serdes.String();
builder.table("orders", Consumed.with(string, string), Materialized.as("orders_store"));
}
}
@RestController()
class MyIQController {
protected KafkaStreams kafkaStreams;
private final StreamsBuilderFactoryBean streamsBuilderFactoryBean;
MyIQController(StreamsBuilderFactoryBean streamsBuilderFactoryBean) {
Objects.requireNonNull(streamsBuilderFactoryBean);
this.streamsBuilderFactoryBean = streamsBuilderFactoryBean;
kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
}
@EventListener(ContextRefreshedEvent.class)
public void refreshKafkaStreams() {
this.kafkaStreams = streamsBuilderFactoryBean.getKafkaStreams();
}
@GetMapping("/iq/{id}")
public String getValue(@PathVariable final String id) {
ReadOnlyKeyValueStore<String, String> keyValueStore = kafkaStreams.store(
StoreQueryParameters.fromNameAndType("orders_store", QueryableStoreTypes.keyValueStore()));
return keyValueStore.get(id);
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment