Skip to content

Instantly share code, notes, and snippets.

@naoko
Last active March 9, 2017 23:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save naoko/f992055d38c4707137c31a49e991fe50 to your computer and use it in GitHub Desktop.
Save naoko/f992055d38c4707137c31a49e991fe50 to your computer and use it in GitHub Desktop.
Example of how to use Google Pub/Sub
import logging
from google.cloud import pubsub
logger = logging.getLogger(__name__)
def create_topic(topic_name):
client = pubsub.Client()
topic = client.topic(topic_name)
if not topic.exists():
logger.info("Creating topic {}".format(topic_name))
topic.create(topic_name)
def create_subscription(topic_name, subscription_name):
client = pubsub.Client()
topic = client.topic(topic_name)
subscription = topic.subscription(subscription_name)
if not subscription.exists():
subscription.create()
def publish_message(topic_name, data):
"""Publishes a message to a Pub/Sub topic with the given data."""
client = pubsub.Client()
topic = client.topic(topic_name)
# Data must be a bytestring
data = data.encode('utf-8')
message_id = topic.publish(data)
logger.info('Message {} published.'.format(message_id))
def receive_message(topic_name, subscription_name):
"""Receives a message from a pull subscription.
`subscription.pull` will return generator of tuples(ack_id, message)
message.message_id: An ID assigned to the message by the API
message.data: The payload of the message.
message.attributes: Extra metadata associated by the publisher with the message.
:param topic_name: Topic name defined in Google Pub/Sub
:param subscription_name: Subscription name
:return: subscription_obj, List(Tuple(ack_id, message))
"""
client = pubsub.Client()
topic = client.topic(topic_name)
# maximum time after a subscriber receives a message before the subscriber should acknowledge the message
# default is 10 seconds
# The maximum custom deadline you can specify is 600 seconds (10 minutes)
subscription = topic.subscription(subscription_name, ack_deadline=600)
# Change return_immediately=False to block until messages are received.
results = subscription.pull(return_immediately=False, max_messages=10)
logging.info('Received {} messages.'.format(len(results)))
return subscription, results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment