Skip to content

Instantly share code, notes, and snippets.

View kaiwaehner's full-sized avatar

Kai Waehner kaiwaehner

View GitHub Profile
View Streaming_Machine_Learning_with_Kafka_and_TensorFlow.py
import numpy as np
import tensorflow as tf
import tensorflow_io.kafka as kafka_io
# 1. Consume streaming data with Kafka and TensorFlow I/O
def func_x(x):
# Decode image to (28, 28)
x = tf.io.decode_raw(x, out_type=tf.uint8)
x = tf.reshape(x, [28, 28])
# Convert to float32 for tf.keras
View start_app.java
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration);
streams.cleanUp();
streams.start();
View inference.java
final KStream<String, String> inputEvents = builder.stream(inputTopic);
inputEvents.foreach((key, value) -> {
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
// Model inference in real time:
output = model.output(input);
View config.java
// Configure Kafka Streams Application
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
// Specify default (de)serializers for record keys and for record values
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
View load_model.java
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
System.out.println(simpleMlp.toString());
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);
View import.java
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
View import.java
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Printed;
import org.tensorflow.DataType;
import org.tensorflow.Graph;
import org.tensorflow.Output;
View start_app.java
// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
View rpc_call.java
// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration);
streams.start();
View rpc_call.java
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> {
System.out.println("Image path: " + value);
imagePath = value;
TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port);
System.out.println("Image = " + imagePath);
InputStream jpegStream;