This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PubSubValidator(projectId: String, topic: String, expectedRecords: List[String]) { | |
private val topicName = TopicName.format(projectId, topic) | |
private val subscriptionName = SubscriptionName.format(projectId, subscription) | |
private lazy val subscription: String = s"$topic-sub" | |
private val actualPulledMessages = new ConcurrentLinkedQueue[String]() | |
private val numberOfRecordsToPull = new AtomicInteger(expectedRecords.size) | |
private var subscriber: ApiService = _ | |
def start(container: PubSubEmulatorContainer): Unit = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PubSubIT extends AnyFunSuiteLike with Matchers with BeforeAndAfterAll with ForAllTestContainer { | |
private val PubsubLocalProjectId = "local-project" | |
private val InputTopicName = "input-topic" | |
private val InputSubscriptionName = "input-topic-sub" | |
private val OutputTopicName = "output-topic" | |
override val container: PubSubEmulatorContainer = PubSubEmulatorContainer() | |
override protected def beforeAll(): Unit = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Main { | |
def main(args: Array[String]): Unit = { | |
Using.resource(getSubscriber(args)) { subscriber => | |
subscriber.startAsync.awaitRunning() | |
subscriber.awaitTerminated() | |
} | |
} | |
def getSubscriber(args: Array[String]): SubscriberWrapper = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SubscriberWrapper(subscriber: Subscriber) extends AutoCloseable { | |
override def close(): Unit = subscriber.stopAsync() | |
def startAsync: ApiService = subscriber.startAsync() | |
def awaitTerminated(): Unit = subscriber.awaitTerminated() | |
} | |
object SubscriberWrapper { | |
def apply(projectId: String, subscription: String, localPubSubEndpoint: Option[String], receiver: MessageReceiver): SubscriberWrapper = { | |
val baseBuilder = Subscriber.newBuilder(ProjectSubscriptionName.format(projectId, subscription), receiver) | |
val subscriber = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
version: '2' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.5.1 | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- 2181:2181 | |
environment: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create a flat table | |
CREATE TABLE calls_nested ( | |
call_id STRING, | |
name STRING, | |
age INT | |
) | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY ',' | |
COLLECTION ITEMS TERMINATED BY '|' | |
MAP KEYS TERMINATED BY '#' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
CREATE DATABASE fall2019_iraj; | |
-- Encoding; CSV table | |
CREATE TABLE test ( | |
name STRING, | |
age INT | |
); | |
/* Check the structure */ | |
-- Note the location | |
DESCRIBE test; |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* How to create a database */ | |
CREATE DATABASE test_db; | |
-- Show the query in the history | |
/* If try to create a database that already exists, it fails */ | |
CREATE DATABASE test_db; | |
-- Explain the query result | |
-- Show the query in the history and that indicates the failure | |
/* In order to avoid failure in scripting, we can use IF NOT EXISTS keyword */ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/bin/bash | |
set -e | |
usage() { | |
echo " Register Avro schema from a file in a Schema Registry" | |
echo "" | |
echo " Usage:" | |
echo " register-avro-schema.sh <Options>" | |
echo "" | |
echo " Options:" | |
echo " -r the URL of the Schema Registry e.g. http://localhost:8081" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Section 1: topic | |
# To see different options and help on a command, just run it without any parameter | |
kafka-topics | |
# To create a topic with one partition and replication factor of 1 | |
kafka-topics --zookeeper localhost:2181 --create --topic test --partitions 1 --replication-factor 1 | |
# To get a list of existing topics | |
kafka-topics --zookeeper localhost:2181 --list |
NewerOlder