Skip to content

Instantly share code, notes, and snippets.

Avatar

Confluent, Inc. confluentgist

View GitHub Profile
View start_app.java
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
View inference.java
final KStream<String, String> inputEvents = builder.stream(inputTopic);
inputEvents.foreach((key, value) -> {
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
// Model inference in real time:
output = model.output(input);
View config.java
// Configure Kafka Streams Application
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
// Specify default (de)serializers for record keys and for record values
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
View load_model.java
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);
View import.java
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
View start_app.java
// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
View rpc_call.java
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> {
System.out.println("Image path: " + value);
imagePath = value;
TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);
System.out.println("Image = " + imagePath);
InputStream jpegStream;
View configure.java
// Configure Kafka Streams Application
final String bootstrapServers = args.length > 0 ? args[0] : "localhost:9092";
final Properties streamsConfiguration = new Properties();
// Give the Streams application a unique name. The name must be unique
// in the Kafka cluster against which the application is run.
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-serving-gRPC-example");
// Where to find Kafka broker(s).
View import.java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import com.github.megachucky.kafka.streams.machinelearning.TensorflowObjectRecogniser;
View OptimizingStreams.java
// imports and license left out for clarity
public class OptimizedStreams {
public static void main(String[] args) {
final Properties properties = new Properties();
properties.setProperty(StreamsConfig.APPLICATION_ID_CONFIG, "test-application");
properties.setProperty(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092 ");
properties.setProperty(StreamsConfig.TOPOLOGY_OPTIMIZATION, StreamsConfig.OPTIMIZE);
properties.setProperty(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());