Skip to content

Instantly share code, notes, and snippets.

@vinodjayachandran
Last active September 6, 2021 13:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vinodjayachandran/213f790c8a2a789310892a2b81542b23 to your computer and use it in GitHub Desktop.
Save vinodjayachandran/213f790c8a2a789310892a2b81542b23 to your computer and use it in GitHub Desktop.
Publish and Subscribe on a topic of GCP PubSub
import os
from google.cloud import pubsub
from concurrent.futures import TimeoutError
project_id = "your-project-id"
topic_id = "topic-id-from-gcp-console"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = "path-to-your-credential-json-file"
publisher = pubsub.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
# Construct any sample data
data = u"{'name':'vinod'}"
# Data must be a bytestring
data = data.encode("utf-8")
# Add two attributes, origin and username, to the message
future = publisher.publish(topic_path, data)
print("Published messages.")
def callback(message):
print("Received message: {}".format(message))
message.ack()
# Consuming Messages from PubSub via Subscriber
subscription_id = "subscriber-id-from-gcp-console"
# Number of seconds the subscriber should listen for messages
timeout = 5.0
subscriber = pubsub.SubscriberClient()
# The `subscription_path` method creates a fully qualified identifier
# in the form `projects/{project_id}/subscriptions/{subscription_id}`
subscription_path = subscriber.subscription_path(project_id, subscription_id)
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print("Listening for messages on {}..\n".format(subscription_path))
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=timeout)
except TimeoutError:
streaming_pull_future.cancel()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment