Skip to content

Instantly share code, notes, and snippets.

View confluentgist's full-sized avatar

Confluent, Inc. confluentgist

View GitHub Profile
@confluentgist
confluentgist / topic-as-stream.java
Created January 9, 2020 07:38 — forked from miguno/topic-as-stream.java
Kafka Streams Example: read a topic as a stream
// Create KStream from Kafka topic.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream =
builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String()));
@confluentgist
confluentgist / gist:5cc32f336179d2b37dfafcabfc220f7e
Created November 30, 2019 08:23 — forked from jolshan/gist:94a6335312ed7af35d77da3abbc6f13b
Example Task Spec for Sticky Partitioner
“task”: {
“type”: “.TaskRole,
“initialDelayMs”: 20000,
“taskSpecs”: {
"bench0": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"startMs": 0,
"durationMs": 720000,
"producerNode": "node0",
"bootstrapServers": "%{bootstrapServers}",
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
final KStream<String, String> inputEvents = builder.stream(inputTopic);
inputEvents.foreach((key, value) -> {
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
// Model inference in real time:
output = model.output(input);
// Configure Kafka Streams Application
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
// Specify default (de)serializers for record keys and for record values
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> {
System.out.println("Image path: " + value);
imagePath = value;
TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);
System.out.println("Image = " + imagePath);
InputStream jpegStream;
// Configure Kafka Streams Application
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique
// in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
// Where to find Kafka broker(s).