Skip to content

Instantly share code, notes, and snippets.

@psawaya
Created July 22, 2011 00:18
Show Gist options
  • Save psawaya/1098555 to your computer and use it in GitHub Desktop.
Save psawaya/1098555 to your computer and use it in GitHub Desktop.
# ***** BEGIN LICENSE BLOCK *****
# Version: MPL 1.1/GPL 2.0/LGPL 2.1
#
# The contents of this file are subject to the Mozilla Public License Version
# 1.1 (the "License"); you may not use this file except in compliance with
# the License. You may obtain a copy of the License at
# http://www.mozilla.org/MPL/
#
# Software distributed under the License is distributed on an "AS IS" basis,
# WITHOUT WARRANTY OF ANY KIND, either express or implied. See the License
# for the specific language governing rights and limitations under the
# License.
#
# The Original Code is the Mozilla Push Notifications Server.
#
# The Initial Developer of the Original Code is
# Mozilla Corporation.
# Portions created by the Initial Developer are Copyright (C) 2011
# the Initial Developer. All Rights Reserved.
#
# Contributor(s):
# Shane da Silva <sdasilva@mozilla.com>
#
# Alternatively, the contents of this file may be used under the terms of
# either the GNU General Public License Version 2 or later (the "GPL"), or
# the GNU Lesser General Public License Version 2.1 or later (the "LGPL"),
# in which case the provisions of the GPL or the LGPL are applicable instead
# of those above. If you wish to allow use of your version of this file only
# under the terms of either the GPL or the LGPL, and not to allow others to
# use your version of this file under the terms of the MPL, indicate your
# decision by deleting the provisions above and replace them with the notice
# and other provisions required by the GPL or the LGPL. If you do not delete
# the provisions above, a recipient may use your version of this file under
# the terms of any one of the MPL, the GPL or the LGPL.
#
# ***** END LICENSE BLOCK *****
import base64
import hashlib
import hmac
import httplib
import json
import os
import smtplib
import sys
import time
from Crypto.Cipher import AES
import pika
DEFAULT_AMQP_PORT = 5672
class NotificationReceiver(object):
"""Client for receiving notifications."""
def __init__(self, host='localhost', port=DEFAULT_AMQP_PORT, virtual_host='/'):
"""Creates a client that will listen at the specified server address."""
self.conn_params = pika.ConnectionParameters(
credentials=pika.PlainCredentials('guest', 'guest'),
host=host,
port=port,
virtual_host=virtual_host,
)
# TODO: Consider whether or not we want to make this non-blocking
# and allow multiple calls. This would require opening a separate
# connection for each call, or using a single Async connection and
# just declaring a channel for each queue. (second approach limits
# us in the number of channels, where the first does not)
def consume(self, queue_name, callback):
"""Consumes notifications from the specified queue.
This opens a connection and consumes from the specified queue,
blocking the current thread.
Arg:
queue_name: Name of queue from which to consume.
callback: Function to call when a message is received.
"""
def wrapping_callback(channel, method, header, body):
channel.basic_ack(delivery_tag = method.delivery_tag)
if callback:
callback(body)
conn = pika.BlockingConnection(self.conn_params)
try:
channel = conn.channel()
channel.basic_consume(
wrapping_callback,
queue=queue_name,
)
finally:
conn.disconnect()
class NotificationSender(object):
"""Client for sending notifications."""
def __init__(self, host='localhost', port=8000):
"""Creates a client for sending notifications.
The client connects to the specified host/port when sending
notifications.
"""
self.host = host
self.port = port
@classmethod
def create_notification_json(cls, subscription, payload, timestamp, ttl):
if 'title' not in payload:
raise KeyError("Notification title not specified.")
if 'text' not in payload:
raise KeyError("Notification text not specified.")
notif = subscription.create_notification(payload, timestamp, ttl)
return json.dumps(notif)
def send(self, subscription, payload, timestamp=None, ttl=None):
"""Sends a notification to the POST Office for a given subscription.
Arg:
subscription: Subscription object.
payload: String payload, typically a serialized JSON object.
Return:
Nothing, but will raise an exception if unsuccessful.
"""
request_body = self.create_notification_json(subscription, payload, timestamp, ttl)
conn = httplib.HTTPConnection(self.host, self.port)
try:
conn.request(
method="POST",
url="/1.0/notification",
body=request_body,
headers={"Content-Type": "application/json"},
)
response = conn.getresponse()
if response.status != httplib.ACCEPTED:
raise Exception("Notification sending failed: %s" % response.status)
finally:
conn.close()
class SMTPNotificationSender(object):
"""Client for sending notifications over the SMTP interface."""
def __init__(self, from_addr, host='127.0.0.1', port=1025):
self.host = host
self.port = port
self.from_addr = from_addr
def send(self, subscription, payload, timestamp=None, ttl=None, email_text=None):
request_body = NotificationSender.create_notification_json(subscription, payload, timestamp, ttl)
msg = MIMEMultipart('alternative')
msg.attach(MIMEText(email_text or payload['text'], 'plain'))
msg.attach(MIMEText(request_body, 'json'))
# TODO: get notifications token from payload, add to e-mail address
dest_addr = 'notifications@' + self.host
msg['To'] = email.utils.formataddr(('Recipient', dest_addr))
msg['From'] = email.utils.formataddr(('Author', self.from_addr))
server = smtplib.SMTP(self.host, self.port)
# TODO: remove this
server.set_debuglevel(True)
server.sendmail(self.from_addr, [dest_addr], msg.as_string())
server.quit()
class Subscription(object):
"""Represents a notification subscription.
This contains the information needed to create a notification for a
subscription.
"""
def __init__(self, token, encryption_key, hmac_key):
"""Creates a subscription.
Associates the created subscription instance with a unique token,
encryption key, and HMAC key.
Arg:
token: 256-bit subscription identifier as Base64-encoded string.
encryption_key: 256-bit key to use for payload encryption.
hmac_key: 256-bit key to use for generating HMAC.
"""
self.token = token
self.encryption_key = encryption_key
self.hmac_key = hmac_key
def create_notification(self, payload, timestamp=None, ttl=None):
"""Creates a notification for this subscription.
The resulting notification will only be readable by the intended
recipient of the subscription (i.e. the user with the decryption key).
Arg:
payload: String payload.
timestamp: UNIX timestamp of when the notification was sent.
ttl: Amount of time (in seconds) before this notification expires.
Return:
Dictionary of the following form:
{
"body": "{
\"token\": \"BASE64==\",
\"timestamp\": ##########,
\"ttl\": #######,
\"ciphertext\": \"BASE64==\",
\"IV\": \"BASE64==\"
}",
"HMAC": "BASE64=="
}
Notice that the contents of the "body" field is not a dictionary,
but a string representation of a JSON object. The "HMAC" field is
the base64-encoded value of the HMAC of the contents of "body".
The ciphertext is the base64 encoding of the encrypted payload
string.
"""
timestamp = timestamp or int(time.time())
ttl = ttl or 2*24*60*60
iv = os.urandom(16)
iv_b64 = base64.b64encode(iv)
# Add PKCS5 padding
block_size = 16
pad = lambda s: s + (block_size - len(s) % block_size) * chr(block_size - len(s) % block_size)
cipher = AES.new(self.encryption_key, AES.MODE_CBC, iv)
ciphertext = cipher.encrypt(pad(payload))
ciphertext_b64 = base64.b64encode(ciphertext)
body = json.dumps({
"token": self.token,
"timestamp": timestamp,
"ttl": ttl,
"ciphertext": ciphertext_b64,
"IV": iv_b64,
})
hmac_b64 = base64.b64encode(hmac.new(
self.hmac_key,
body,
hashlib.sha256
).digest())
notif = {
"body": body,
"HMAC": hmac_b64,
}
return notif
if __name__ == '__main__':
if len(sys.argv) <= 1:
print "Must specify queue name."
exit(-1)
client_queue_name = sys.argv[1]
# Called when we receive a message in the delivery queue
def handle_delivery(body):
print "%r" % body
receiver = NotificationReceiver()
try:
print "Listening for notifications..."
receiver.consume(client_queue_name, handle_delivery)
except KeyboardInterrupt:
print "Shutting down client."
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment