Skip to content

Instantly share code, notes, and snippets.

@irajhedayati
Created July 28, 2022 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save irajhedayati/19c8fa3ef2e674e74ea23aee661218a1 to your computer and use it in GitHub Desktop.
Save irajhedayati/19c8fa3ef2e674e74ea23aee661218a1 to your computer and use it in GitHub Desktop.
Validate Pub/Sub messages from local emulator
class PubSubValidator(projectId: String, topic: String, expectedRecords: List[String]) {
private val topicName = TopicName.format(projectId, topic)
private val subscriptionName = SubscriptionName.format(projectId, subscription)
private lazy val subscription: String = s"$topic-sub"
private val actualPulledMessages = new ConcurrentLinkedQueue[String]()
private val numberOfRecordsToPull = new AtomicInteger(expectedRecords.size)
private var subscriber: ApiService = _
def start(container: PubSubEmulatorContainer): Unit = {
container.subscriptionAdminClient.createSubscription(
Subscription.newBuilder().setTopic(topicName).setName(subscriptionName).build()
)
subscriber = container
.subscriber(
ProjectSubscriptionName.of(projectId, subscription),
(message: PubsubMessage, consumer: AckReplyConsumer) => {
actualPulledMessages.add(message.getData.toStringUtf8)
if (numberOfRecordsToPull.decrementAndGet() < 0)
fail("The number of messages sent to Item Service is more than the number of messages expected")
consumer.ack()
}
)
.startAsync()
subscriber.awaitRunning()
}
def assert(): Unit = {
while (numberOfRecordsToPull.get() > 0) Thread.sleep(500)
val actual = actualPulledMessages.asScala.toSeq
subscriber.stopAsync()
subscriber.awaitTerminated()
actual.size shouldBe expectedRecords.size
actual should contain theSameElementsAs expectedRecords
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment