Skip to content

Instantly share code, notes, and snippets.

security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
username="" \
password="";
sh kafka-topics.sh --create --topic out_topic --bootstrap-server <server_id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf
@jacace
jacace / gist:edd865fccb0715c43278c4954ce451a5
Created January 29, 2021 18:45
Kafka Publish Message
sh kafka-console-producer.sh --broker-list <server:id>.us-east1.gcp.confluent.cloud:9092 --producer.config config.properties --topic in_topic
sh kafka-topics.sh --list --bootstrap-server <server-id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf
@jacace
jacace / gist:a6ee766a8a40cf72e28e125a50a2bad3
Created February 1, 2021 12:37
Kafka consume message
//Sample kafka client in nodejs
'use strict';
require('dotenv').config()
//kafka init starts
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: process.env.consumerid,
kubectl describe secrets/build-robot-secret
sh kafka-topics.sh --delete --topic demo_topic --bootstrap-server <server-id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf
@jacace
jacace / gist:27b433c4b173868859fac5c4e3853443
Created February 10, 2021 10:37
Transform each record in one stream vs another (kafka vs Spark streamign APIs)
#With Kafka Streaming API:
stream.mapValues(value -> value.toLowerCase());
#With Spark Streaming API:
stream.map(record -> new Tuple2<>(record.key(), record.value()));
@jacace
jacace / gist:1625e49cf9b2adce9e9b754f559d7301
Created February 10, 2021 10:49
Create resources in the Stack driver at the partition level
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLeft = builder.stream("TopicIn1");
KStream<String, String> streamRight = builder.stream("TopicIn2");
KStream<String, String> joined = streamLeft.join(streamRight,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(5000),
Joined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */