Skip to content

Instantly share code, notes, and snippets.

@jonasfj
Last active August 29, 2015 14:08
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jonasfj/33e4e0da765b05a601fc to your computer and use it in GitHub Desktop.
Save jonasfj/33e4e0da765b05a601fc to your computer and use it in GitHub Desktop.
Pulse Publishing Example

Pulse Publishing Example

We really should consider moving this to a library.

Dependencies: (works if installed with in virtualenv)

  • kombu
  • jsonschema

Schemas and References

These utilities uses JSON schemas for to validate messages. This ensures that you only publish valid messages and have some documentation for the messages. If you don't care about this, just set the property additionalProperties to true.

Additionally a reference format can be generated from exchanges.py, using the reference() method. This generates a reference that is compatible with taskcluster documentation tools, see schema for exchange references here:

If published somewhere public along with the JSON schemas, this makes for some pretty awesome documentation. Note changes to reference format is planned, but it'll only be minor things. Feel free to ignore the reference, if you don't care.

from pulse_publisher import PulsePublisher, Exchange, Key
class TelemetryPublisher(PulsePublisher):
title = "Telemetry Exchanges"
description = """
Exchanges for users who wants data from telemetry.
"""
exchange_prefix = 'v1/'
new_published_data = Exchange(
# Note exchange name will be:
# "exchange/<PULSE_USERNAME>/v1/new-published-data"
exchange = 'new-published-data',
title = "New Data Published",
description = """
Whenever new data is processed: validated, converted, sorted, split,
compressed and uploaded to S3 for storage; a message will be
published to this exchange.
""",
routing_keys = [
# Warning routing keys cannot contain dots '.'
Key(
name = 'product',
summary = "Product for which the data is published"
),
Key(
name = 'channel',
summary = "Product channel for published data"
)
],
schema = "https://telemetry.mozilla.org/schemas/v1/published-data-message.json#"
)
{
"id": "https://telemetry.mozilla.org/schemas/v1/published-data-message.json#",
"$schema": "http://json-schema.org/draft-04/schema#",
"title": "New Published Data Message",
"description": "Pulse message sent whenever new data is published to telemetry bucket.",
"type": "object",
"properties": {
"version": {
"title": "Message-format version",
"enum": [1]
},
"prefix": {
"title": "S3 File Prefix",
"description": "Key/path/prefix for published file in S3",
"type": "string"
},
"bucket": {
"title": "S3 Bucket",
"description": "S3 bucket data was published to, usually always the same",
"type": "string"
},
"size": {
"title": "File Size",
"description": "Size of published file in bytes",
"type": "integer"
}
},
"additionalProperties": false,
"required": ["version", "prefix", "bucket", "size"]
}
import kombu, re, json, os, jsonschema, logging
logger = logging.getLogger(__name__)
def toCamelCase(input):
def replace(match):
return match.group(1).upper()
return re.sub(r'_(.)', replace, input)
def load_schemas(folder):
""" Load JSON schemas from folder """
schemas = {}
# List files in folder
for filename in os.listdir(folder):
# Skip non-json files
if not filename.endswith('.json'):
continue
# Read file and insert into schemas
with open(os.path.join(folder, filename)) as f:
data = json.load(f)
assert data.has_key('id'), "JSON schemas must have an 'id' property"
schemas[data['id']] = data
# Return schemas loaded
return schemas
class Exchange(object):
"""
Exchange declaration that can be used as property on a subclass of
PulsePublisher.
"""
def __init__(self, exchange, title, description, routing_keys, schema):
"""
Create exchange instance
"""
self.exchange = exchange
self.title = title
self.description = description
self.routing_keys = routing_keys
self.schema = schema
def message(self, message, **keys):
""" Construct message """
return message
def routing(self, message, **keys):
""" Construct routing key """
return '.'.join([key.build(**keys) for key in self.routing_keys])
def reference(self, name):
""" Construct reference entry with given name """
return {
'type': 'topic-exchange',
'exchange': self.exchange,
'name': toCamelCase(name),
'title': self.title,
'description': self.description,
'routingKey': [key.reference() for key in self.routing_keys],
'schema': self.schema
}
class Key(object):
""" Routing key entry """
def __init__(self, name, summary, required = True, multiple_words = False):
self.name = name
self.summary = summary
self.required = required
self.multiple_words = multiple_words
def build(self, **keys):
""" Build routing key entry """
key = keys.get(self.name)
# Ensure the key is present if required
if self.required and key is None:
raise ValueError("Key %s is required" % self.name)
key = key or '_'
# Check if has multiple words
if '.' in key and not self.multiple_words:
raise ValueError("Key %s cannot contain dots" % self.name)
# Return constructed key
return key
def reference(self):
""" Construct reference entry for this routing key entry """
return {
'name': toCamelCase(self.name),
'summary': self.summary,
'multipleWords': self.multiple_words,
'required': self.required
}
class PulsePublisher(object):
"""
Base class for pulse publishers.
All subclasses of this class must define the properties:
* title
* description
* exchange_prefix
Additional properties of type `Exchange` will be declared as exchanges.
"""
def __init__(self, client_id, access_token, schemas, namespace = None):
"""
Create publisher, requires a connection_string and a mapping from
JSON schema uris to JSON schemas.
"""
# Validate properties
assert hasattr(self, 'title'), "Title is required"
assert hasattr(self, 'description'), "description is required"
assert hasattr(self, 'exchange_prefix'), "exchange_prefix is required"
# Set attributes
self.client_id = client_id
self.access_token = access_token
self.schemas = schemas
self.namespace = namespace or client_id
self.exchanges = []
self.connection = kombu.Connection(
userid = client_id,
password = access_token,
hostname = 'pulse.mozilla.org',
virtual_host = '/',
port = 5671,
ssl = True,
transport = 'amqp',
# In some environments there have been some problems with
# confirm_publish and block_for_ack. Basically if the connection
# or process exits before the message is published it could get
# dropped. This only in some environments (treeherder), sometimes
# this also seems to work fine. Anyways, if you fear this, you can
# call: publisher.connection.release() before letting the process
# exit.
transport_options = {
'confirm_publish': True,
'block_for_ack': True
}
)
# Find exchanges
for name in dir(self):
exchange = getattr(self, name)
if isinstance(exchange, Exchange):
self.exchanges += ((name, exchange),)
# Wrap exchanges in functions
for name, exchange in self.exchanges:
# Create producer for the exchange
exchange_path = "exchange/%s/%s%s" % (
self.namespace,
self.exchange_prefix,
exchange.exchange
)
producer = kombu.Producer(
channel = self.connection,
exchange = kombu.Exchange(
name = exchange_path,
type = 'topic',
durable = True,
delivery_mode = 'persistent'
),
auto_declare = True
)
publish_message = self.connection.ensure(
producer, producer.publish, max_retries = 5
)
# Create publication method for the exchange
def publish(**kwargs):
message = exchange.message(**kwargs)
jsonschema.validate(message, self.schemas[exchange.schema])
publish_message(
body = json.dumps(message),
routing_key = exchange.routing(**kwargs),
content_type = 'application/json'
)
setattr(self, name, publish)
def error(self, error, exchange, routing_key, message):
logger.error(
'Error publishing message to {0}'
).format(exchange)
def reference(self):
""" Construct reference for this publisher"""
return {
'version': '0.2.0',
'title': self.title,
'description': self.description,
'exchangePrefix': "exchange/%s/%s" % (
self.namespace,
self.exchange_prefix
),
'entries': [ex.reference(name) for name, ex in self.exchanges]
}
from exchanges import TelemetryPublisher
from pulse_publisher import load_schemas
import os, sys
source_folder = os.path.dirname(os.path.realpath(__file__))
schema_folder = source_folder # os.path.join(source_folder, 'schemas')
schemas = load_schemas(schema_folder)
if not (os.environ.get('PULSE_USERNAME') and os.environ.get('PULSE_PASSWORD')):
print "Environment variables PULSE_USERNAME/PULSE_PASSWORD is missing"
sys.exit(1)
publisher = TelemetryPublisher(
client_id = os.environ['PULSE_USERNAME'],
access_token = os.environ['PULSE_PASSWORD'],
schemas = schemas
)
publisher.new_published_data(
message = {
'version': 1,
'prefix': "saved_session/.../Firefox/...",
'bucket': 'telemetry-published-v2',
'size': 324234,
},
# Keys needed for routing key (cannot contain dot '.')
product = "firefox",
channel = "nightly"
)
publisher.new_published_data(
message = {
'version': 1,
'prefix': "saved_session/.../Firefox/...",
'bucket': 'telemetry-published-v2',
'size': 324234,
},
# Keys needed for routing key (cannot contain dot '.')
product = "thunderbird",
channel = "nightly"
)
# Calling publisher.connection.release() is a dirty hack, if you fear the
# connection/process will close before the message is published.
# But only use this if it seems that messages aren't published synchronously
# which seems like the sane way of doing thing in python.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment