This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
-- Create a flat table | |
CREATE TABLE calls_nested ( | |
call_id STRING, | |
name STRING, | |
age INT | |
) | |
ROW FORMAT DELIMITED | |
FIELDS TERMINATED BY ',' | |
COLLECTION ITEMS TERMINATED BY '|' | |
MAP KEYS TERMINATED BY '#' |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
--- | |
version: '2' | |
services: | |
zookeeper: | |
image: confluentinc/cp-zookeeper:5.5.1 | |
hostname: zookeeper | |
container_name: zookeeper | |
ports: | |
- 2181:2181 | |
environment: |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class SubscriberWrapper(subscriber: Subscriber) extends AutoCloseable { | |
override def close(): Unit = subscriber.stopAsync() | |
def startAsync: ApiService = subscriber.startAsync() | |
def awaitTerminated(): Unit = subscriber.awaitTerminated() | |
} | |
object SubscriberWrapper { | |
def apply(projectId: String, subscription: String, localPubSubEndpoint: Option[String], receiver: MessageReceiver): SubscriberWrapper = { | |
val baseBuilder = Subscriber.newBuilder(ProjectSubscriptionName.format(projectId, subscription), receiver) | |
val subscriber = |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object Main { | |
def main(args: Array[String]): Unit = { | |
Using.resource(getSubscriber(args)) { subscriber => | |
subscriber.startAsync.awaitRunning() | |
subscriber.awaitTerminated() | |
} | |
} | |
def getSubscriber(args: Array[String]): SubscriberWrapper = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PubSubIT extends AnyFunSuiteLike with Matchers with BeforeAndAfterAll with ForAllTestContainer { | |
private val PubsubLocalProjectId = "local-project" | |
private val InputTopicName = "input-topic" | |
private val InputSubscriptionName = "input-topic-sub" | |
private val OutputTopicName = "output-topic" | |
override val container: PubSubEmulatorContainer = PubSubEmulatorContainer() | |
override protected def beforeAll(): Unit = { |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class PubSubValidator(projectId: String, topic: String, expectedRecords: List[String]) { | |
private val topicName = TopicName.format(projectId, topic) | |
private val subscriptionName = SubscriptionName.format(projectId, subscription) | |
private lazy val subscription: String = s"$topic-sub" | |
private val actualPulledMessages = new ConcurrentLinkedQueue[String]() | |
private val numberOfRecordsToPull = new AtomicInteger(expectedRecords.size) | |
private var subscriber: ApiService = _ | |
def start(container: PubSubEmulatorContainer): Unit = { |
OlderNewer