Skip to content

Instantly share code, notes, and snippets.

@irajhedayati
Last active July 28, 2022 13:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save irajhedayati/b923179ba7ac4abd27daa480882e1584 to your computer and use it in GitHub Desktop.
Save irajhedayati/b923179ba7ac4abd27daa480882e1584 to your computer and use it in GitHub Desktop.
A Pub/Sub subscriber wrapper that supports local emulator
class SubscriberWrapper(subscriber: Subscriber) extends AutoCloseable {
override def close(): Unit = subscriber.stopAsync()
def startAsync: ApiService = subscriber.startAsync()
def awaitTerminated(): Unit = subscriber.awaitTerminated()
}
object SubscriberWrapper {
def apply(projectId: String, subscription: String, localPubSubEndpoint: Option[String], receiver: MessageReceiver): SubscriberWrapper = {
val baseBuilder = Subscriber.newBuilder(ProjectSubscriptionName.format(projectId, subscription), receiver)
val subscriber =
localPubSubEndpoint
.map(h => handleLocalExecution(baseBuilder, h))
.map(_.build())
.getOrElse(baseBuilder.build())
new SubscriberWrapper(subscriber)
}
private def handleLocalExecution(baseBuilder: Subscriber.Builder, localPubSubEndpoint: String): Subscriber.Builder = {
val channel = ManagedChannelBuilder
.forTarget(localPubSubEndpoint)
.usePlaintext()
.asInstanceOf[ManagedChannelBuilder[NettyChannelBuilder]]
.build()
baseBuilder
.setChannelProvider(FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel)))
.setCredentialsProvider(NoCredentialsProvider.create)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment