Skip to content

Instantly share code, notes, and snippets.

@alexsavio
Created April 19, 2021 07:13
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save alexsavio/e2878ae210fed3ba2b15243fb043057d to your computer and use it in GitHub Desktop.
Save alexsavio/e2878ae210fed3ba2b15243fb043057d to your computer and use it in GitHub Desktop.
GCP PubSub publisher
import time
from flask import Request
from google.cloud import pubsub_v1
from lib.logger import get_logger
logger = get_logger(__name__)
class ForwardFlackRequestToPubSub:
def __init__(
self,
request: Request,
pubsub_client: pubsub_v1.PublisherClient,
pubsub_topic_path: str,
):
self.request = request
self.publisher = pubsub_client
self.pubsub_topic_path = pubsub_topic_path
def run(self):
try:
# publish request into Pub/Sub
self.publish_request(self.request.get_data(as_text=True))
except Exception as error:
logger.error(f"Got an error: {error}")
else:
logger.info('Request published')
def publish_request(self, request):
futures = {}
def get_callback(f, data):
def callback(f):
try:
logger.info(f.result())
futures.pop(data)
except: # noqa
logger.error(f'Please handle {f.exception()} for {data}.')
return callback
futures.update({request: None})
# When you publish a message, the client returns a future.
future = self.publisher.publish(self.pubsub_topic_path, request.encode('utf-8'))
futures[request] = future
# Publish failures shall be handled in the callback function.
future.add_done_callback(get_callback(future, request))
# Wait for all the publish futures to resolve before exiting.
while futures:
time.sleep(5)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment