This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create ksqlDB table from Kafka topic. | |
CREATE TABLE myTable (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', KEY='username', VALUE_FORMAT='...'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Create KStream from Kafka topic. | |
StreamsBuilder builder = new StreamsBuilder(); | |
KStream<String, String> stream = | |
builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create ksqlDB stream from Kafka topic. | |
CREATE STREAM myStream (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...'); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
“task”: { | |
“type”: “.TaskRole, | |
“initialDelayMs”: 20000, | |
“taskSpecs”: { | |
"bench0": { | |
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", | |
"startMs": 0, | |
"durationMs": 720000, | |
"producerNode": "node0", | |
"bootstrapServers": "%{bootstrapServers}", |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// +build testrunmain | |
package main | |
import ( | |
"testing" | |
"github.com/confluentinc/bincover" | |
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration); | |
streams.cleanUp(); | |
streams.start(); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
final KStream<String, String> inputEvents = builder.stream(inputTopic); | |
inputEvents.foreach((key, value) -> { | |
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values): | |
String[] valuesAsArray = value.split(","); | |
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1])); | |
// Model inference in real time: | |
output = model.output(input); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Configure Kafka Streams Application | |
Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | |
// Specify default (de)serializers for record keys and for record values | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API | |
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath(); | |
System.out.println(simpleMlp.toString()); | |
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp); |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | |
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport; | |
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; |