Skip to content

Instantly share code, notes, and snippets.

View irajhedayati's full-sized avatar

Iraj Hedayati irajhedayati

View GitHub Profile
@irajhedayati
irajhedayati / PubSubValidator.scala
Created July 28, 2022 14:07
Validate Pub/Sub messages from local emulator
class PubSubValidator(projectId: String, topic: String, expectedRecords: List[String]) {
private val topicName = TopicName.format(projectId, topic)
private val subscriptionName = SubscriptionName.format(projectId, subscription)
private lazy val subscription: String = s"$topic-sub"
private val actualPulledMessages = new ConcurrentLinkedQueue[String]()
private val numberOfRecordsToPull = new AtomicInteger(expectedRecords.size)
private var subscriber: ApiService = _
def start(container: PubSubEmulatorContainer): Unit = {
@irajhedayati
irajhedayati / PubSubIT.scala
Last active July 28, 2022 14:05
The Integration Test for Pub/Sub application
class PubSubIT extends AnyFunSuiteLike with Matchers with BeforeAndAfterAll with ForAllTestContainer {
private val PubsubLocalProjectId = "local-project"
private val InputTopicName = "input-topic"
private val InputSubscriptionName = "input-topic-sub"
private val OutputTopicName = "output-topic"
override val container: PubSubEmulatorContainer = PubSubEmulatorContainer()
override protected def beforeAll(): Unit = {
@irajhedayati
irajhedayati / PubSubWithITMain.scala
Created July 28, 2022 13:52
The main application to run Pub/Sub application which supports local emulator
object Main {
def main(args: Array[String]): Unit = {
Using.resource(getSubscriber(args)) { subscriber =>
subscriber.startAsync.awaitRunning()
subscriber.awaitTerminated()
}
}
def getSubscriber(args: Array[String]): SubscriberWrapper = {
@irajhedayati
irajhedayati / SubscriberWrapper.scala
Last active July 28, 2022 13:38
A Pub/Sub subscriber wrapper that supports local emulator
class SubscriberWrapper(subscriber: Subscriber) extends AutoCloseable {
override def close(): Unit = subscriber.stopAsync()
def startAsync: ApiService = subscriber.startAsync()
def awaitTerminated(): Unit = subscriber.awaitTerminated()
}
object SubscriberWrapper {
def apply(projectId: String, subscription: String, localPubSubEndpoint: Option[String], receiver: MessageReceiver): SubscriberWrapper = {
val baseBuilder = Subscriber.newBuilder(ProjectSubscriptionName.format(projectId, subscription), receiver)
val subscriber =
@irajhedayati
irajhedayati / docker-compose.yaml
Created April 4, 2021 01:30
Multi-broker Kafka cluster with Schema Registry
---
version: '2'
services:
zookeeper:
image: confluentinc/cp-zookeeper:5.5.1
hostname: zookeeper
container_name: zookeeper
ports:
- 2181:2181
environment:
-- Create a flat table
CREATE TABLE calls_nested (
call_id STRING,
name STRING,
age INT
)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
COLLECTION ITEMS TERMINATED BY '|'
MAP KEYS TERMINATED BY '#'
@irajhedayati
irajhedayati / hive-encoding.sql
Last active March 10, 2021 04:56
A set of tutorials for Hive encodings
CREATE DATABASE fall2019_iraj;
-- Encoding; CSV table
CREATE TABLE test (
name STRING,
age INT
);
/* Check the structure */
-- Note the location
DESCRIBE test;
@irajhedayati
irajhedayati / hive-basic.sql
Created March 5, 2021 23:33
Tutorial for HiveQL basics
/* How to create a database */
CREATE DATABASE test_db;
-- Show the query in the history
/* If try to create a database that already exists, it fails */
CREATE DATABASE test_db;
-- Explain the query result
-- Show the query in the history and that indicates the failure
/* In order to avoid failure in scripting, we can use IF NOT EXISTS keyword */
@irajhedayati
irajhedayati / register-avro-schema.sh
Created December 17, 2020 14:11
Register an Avro schema using command line
#!/bin/bash
set -e
usage() {
echo " Register Avro schema from a file in a Schema Registry"
echo ""
echo " Usage:"
echo " register-avro-schema.sh <Options>"
echo ""
echo " Options:"
echo " -r the URL of the Schema Registry e.g. http://localhost:8081"
@irajhedayati
irajhedayati / 1.Kafka-tutorial-topic.sh
Last active November 24, 2020 23:16
A set of tutorials on Kafka
# Section 1: topic
# To see different options and help on a command, just run it without any parameter
kafka-topics
# To create a topic with one partition and replication factor of 1
kafka-topics --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1
# To get a list of existing topics
kafka-topics --zookeeper localhost:2181 --list