View Streaming_Machine_Learning_with_Kafka_and_TensorFlow.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import numpy as np | |
import tensorflow as tf | |
import tensorflow_io.kafka as kafka_io | |
# 1. Consume streaming data with Kafka and TensorFlow I/O | |
def func_x(x): | |
# Decode image to (28, 28) | |
x = tf.io.decode_raw(x, out_type=tf.uint8) | |
x = tf.reshape(x, [28, 28]) | |
# Convert to float32 for tf.keras |
View start_app.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration); | |
streams.cleanUp(); | |
streams.start(); |
View inference.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KStream<String, String> inputEvents = builder.stream(inputTopic); | |
inputEvents.foreach((key, value) -> { | |
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values): | |
String[] valuesAsArray = value.split(","); | |
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1])); | |
// Model inference in real time: | |
output = model.output(input); |
View config.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Configure Kafka Streams Application | |
Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | |
// Specify default (de)serializers for record keys and for record values | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
View load_model.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API | |
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath(); | |
System.out.println(simpleMlp.toString()); | |
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp); |
View import.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | |
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport; | |
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; |
View import.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.common.serialization.Serdes; | |
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.Topology; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.apache.kafka.streams.kstream.Printed; | |
import org.tensorflow.DataType; | |
import org.tensorflow.Graph; | |
import org.tensorflow.Output; |
View start_app.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Start Kafka Streams Application to process new incoming images from the Input Topic | |
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); | |
streams.start(); |
View rpc_call.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Start Kafka Streams Application to process new incoming images from the Input Topic | |
final KafkaStreams streams = new KafkaStreams(builder.build(), streamsConfiguration); | |
streams.start(); |
View rpc_call.java
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
KStream<String, Object> transformedMessage = imageInputLines.mapValues(value -> { | |
System.out.println("Image path: " + value); | |
imagePath = value; | |
TensorflowObjectRecogniser recogniser = new TensorflowObjectRecogniser(server, port); | |
System.out.println("Image = " + imagePath); | |
InputStream jpegStream; |
NewerOlder