Skip to content

Instantly share code, notes, and snippets.

@jacace
jacace / gist:6965669d626aefb1eb2bb5db4d14bd1c
Created March 8, 2021 19:56
Joins in Java Spak Streaming API
private static void joinDemo(JavaDStream<ConsumerRecord<String, String>> productStream,
JavaDStream<ConsumerRecord<String, String>> salesStream) {
ObjectMapper jacksonParser = new ObjectMapper();
JavaPairDStream<Object, Object> s1 = productStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(),
jacksonParser.readValue(record.value(), Item.class)));
JavaPairDStream<Object, Object> s2 = salesStream.mapToPair(record -> new Tuple2<Object, Object>(record.key(),
jacksonParser.readValue(record.value(), DailySales.class)));
JavaPairDStream<Object, Tuple2<Object, Object>> s3 = s1.join(s2);
s3.foreachRDD(new VoidFunction<JavaPairRDD<Object, Tuple2<Object, Object>>>() {
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> streamLeft = builder.stream("TopicIn1");
KStream<String, String> streamRight = builder.stream("TopicIn2");
KStream<String, String> joined = streamLeft.join(streamRight,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue, /* ValueJoiner */
JoinWindows.of(5000),
Joined.with(
Serdes.String(), /* key */
Serdes.String(), /* left value */
@jacace
jacace / gist:1625e49cf9b2adce9e9b754f559d7301
Created February 10, 2021 10:49
Create resources in the Stack driver at the partition level
dstream.foreachRDD { rdd =>
rdd.foreachPartition { partitionOfRecords =>
// ConnectionPool is a static, lazily initialized pool of connections
val connection = ConnectionPool.getConnection()
partitionOfRecords.foreach(record => connection.send(record))
ConnectionPool.returnConnection(connection) // return to the pool for future reuse
}
}
@jacace
jacace / gist:27b433c4b173868859fac5c4e3853443
Created February 10, 2021 10:37
Transform each record in one stream vs another (kafka vs Spark streamign APIs)
#With Kafka Streaming API:
stream.mapValues(value -> value.toLowerCase());
#With Spark Streaming API:
stream.map(record -> new Tuple2<>(record.key(), record.value()));
sh kafka-topics.sh --delete --topic demo_topic --bootstrap-server <server-id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf
kubectl describe secrets/build-robot-secret
@jacace
jacace / gist:a6ee766a8a40cf72e28e125a50a2bad3
Created February 1, 2021 12:37
Kafka consume message
//Sample kafka client in nodejs
'use strict';
require('dotenv').config()
//kafka init starts
const { Kafka } = require('kafkajs')
const kafka = new Kafka({
clientId: process.env.consumerid,
sh kafka-topics.sh --list --bootstrap-server <server-id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf
@jacace
jacace / gist:edd865fccb0715c43278c4954ce451a5
Created January 29, 2021 18:45
Kafka Publish Message
sh kafka-console-producer.sh --broker-list <server:id>.us-east1.gcp.confluent.cloud:9092 --producer.config config.properties --topic in_topic
sh kafka-topics.sh --create --topic out_topic --bootstrap-server <server_id>.us-east1.gcp.confluent.cloud:9092 --command-config jaas.conf