Skip to content

Instantly share code, notes, and snippets.

@raybotha
Created March 12, 2018 14:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save raybotha/362b661b02c5b95cdfac5bca10fbc0c9 to your computer and use it in GitHub Desktop.
Save raybotha/362b661b02c5b95cdfac5bca10fbc0c9 to your computer and use it in GitHub Desktop.
Google PubSub Dependency Provider
from nameko.extensions import DependencyProvider
from google.cloud.pubsub_v1 import PublisherClient
from eventlet.queue import Queue
class EventPublisher(PublisherClient):
def __init__(self, topic, *args, **kwargs):
self._topic = topic
super().__init__(*args, **kwargs)
def publish(self, data, **kwargs):
return super().publish(self._topic, data=data.encode("utf-8"), **kwargs)
class ThreadSafeClient:
def __init__(self):
self.queue = Queue()
def safe_publish(self, message):
self.queue.put(message)
class PubSub(DependencyProvider):
def __init__(self, topic=None, **options):
self.topic = topic
self.options = options
def _run(self):
topic_path = f"projects/{self.project}/topics/{self.topic}"
client = EventPublisher(topic_path)
while True:
item = self.safe_client.queue.get()
if item is None:
break
client.publish(item)
del item
def stop(self):
self.safe_client.queue.put(None)
if self._gt is not None:
self._gt.wait()
def setup(self):
config = self.container.config["PUBSUB"]
self.project = config["PROJECT"]
if self.topic is None:
self.topic = config["TOPIC"]
self.safe_client = ThreadSafeClient()
self._gt = self.container.spawn_managed_thread(
self._run)
def get_dependency(self, worker_ctx):
return self.safe_client
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment