Skip to content

Instantly share code, notes, and snippets.

Confluent, Inc. confluentgist

Block or report user

Report or block confluentgist

Hide content and notifications from this user.

Learn more about blocking users

Contact Support about this user’s behavior.

Learn more about reporting abuse

Report abuse
View GitHub Profile
confluentgist /
Created Jan 9, 2020 — forked from miguno/
Kafka Streams Example: read a topic as a stream
// Create KStream from Kafka topic.
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream ="input-topic", Consumed.with(Serdes.String(), Serdes.String()));
confluentgist / topic-as-stream.sql
Created Jan 9, 2020 — forked from miguno/topic-as-stream.sql
ksqlDB example: read topic as stream
View topic-as-stream.sql
-- Create ksqlDB stream from Kafka topic.
CREATE STREAM myStream (username VARCHAR, location VARCHAR)
WITH (KAFKA_TOPIC='input-topic', VALUE_FORMAT='...');
View gist:5cc32f336179d2b37dfafcabfc220f7e
“task”: {
“type”: “.TaskRole,
“initialDelayMs”: 20000,
“taskSpecs”: {
"bench0": {
"class": "org.apache.kafka.trogdor.workload.ProduceBenchSpec",
"startMs": 0,
"durationMs": 720000,
"producerNode": "node0",
"bootstrapServers": "%{bootstrapServers}",
confluentgist / runmain_simple_test.go
Last active Dec 28, 2019
Simple way to instrument a Go binary for code coverage
View runmain_simple_test.go
// +build testrunmain
package main
import (
final KafkaStreams streams = new TestKafkaStreams(, streamsConfiguration);
final KStream<String, String> inputEvents =;
inputEvents.foreach((key, value) -> {
// Transform input values (list of Strings) to expected DL4J parameters (two Integer values):
String[] valuesAsArray = value.split(",");
INDArray input = Nd4j.create(Integer.parseInt(valuesAsArray[0]), Integer.parseInt(valuesAsArray[1]));
// Model inference in real time:
output = model.output(input);
// Configure Kafka Streams Application
Properties streamsConfiguration = new Properties();
streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-streams-tensorflow-keras-integration-test");
streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, CLUSTER.bootstrapServers());
// Specify default (de)serializers for record keys and for record values
streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
// Step 1: Load Keras TensorFlow Model using DeepLearning4J API
String simpleMlp = new ClassPathResource("generatedModels/Keras/simple_mlp.h5").getFile().getPath();
MultiLayerNetwork model = KerasModelImport.importKerasSequentialModelAndWeights(simpleMlp);
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster;
import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
import org.apache.kafka.streams.kstream.KStream;
import org.deeplearning4j.nn.modelimport.keras.KerasModelImport;
import org.deeplearning4j.nn.multilayer.MultiLayerNetwork;
// Start Kafka Streams Application to process new incoming images from the Input Topic
final KafkaStreams streams = new KafkaStreams(, streamsConfiguration);
You can’t perform that action at this time.