View topic-as-table.sql
-- Create ksqlDB table from Kafka topic. | |
CREATE TABLE myTable (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', KEY='username', VALUE_FORMAT='...'); |
View topic-as-stream.java
// Create KStream from Kafka topic. | |
StreamsBuilder builder = new StreamsBuilder(); | |
KStream<String, String> stream = | |
builder.stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())); |
View topic-as-stream.sql
-- Create ksqlDB stream from Kafka topic. | |
CREATE STREAM myStream (username VARCHAR, location VARCHAR) | |
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...'); |
View gist:5cc32f336179d2b37dfafcabfc220f7e
“task”: { | |
“type”: “.TaskRole, | |
“initialDelayMs”: 20000, | |
“taskSpecs”: { | |
"bench0": { | |
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec", | |
"startMs": 0, | |
"durationMs": 720000, | |
"producerNode": "node0", | |
"bootstrapServers": "%{bootstrapServers}", |
View runmain_simple_test.go
// +build testrunmain | |
package main | |
import ( | |
"testing" | |
"github.com/confluentinc/bincover" | |
) |
View start_app.java
final KafkaStreams streams = new TestKafkaStreams(builder.build(), streamsConfiguration); | |
streams.cleanUp(); | |
streams.start(); |
View inference.java
final KStream<String, String> inputEvents = builder.stream(inputTopic); | |
inputEvents.foreach((key, value) -> { | |
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values): | |
String[] valuesAsArray = value.split(","); | |
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1])); | |
// Model inference in real time: | |
output = model.output(input); |
View config.java
// Configure Kafka Streams Application | |
Properties streamsConfiguration = new Properties(); | |
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test"); | |
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers()); | |
// Specify default (de)serializers for record keys and for record values | |
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); | |
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); |
View load_model.java
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API | |
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath(); | |
System.out.println(simpleMlp.toString()); | |
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp); |
View import.java
import org.apache.kafka.streams.KafkaStreams; | |
import org.apache.kafka.streams.KeyValue; | |
import org.apache.kafka.streams.StreamsBuilder; | |
import org.apache.kafka.streams.StreamsConfig; | |
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster; | |
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils; | |
import org.apache.kafka.streams.kstream.KStream; | |
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport; | |
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork; |