Skip to content

Instantly share code, notes, and snippets.

@Riebart
Last active October 31, 2018 05:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Riebart/87b3571f8b3f797d4ae89d034896f045 to your computer and use it in GitHub Desktop.
Save Riebart/87b3571f8b3f797d4ae89d034896f045 to your computer and use it in GitHub Desktop.
Python script to listen to an AWS IoT Messaging endpoint, authenticating with IAM, and printing messages as they arrive.
#!/usr/bin/env python3
"""
Provide a simple wrapper for listening to an MQTT stream from an AWS IoT websocket endpoint. Prints messages to stdout.
"""
try:
from urllib.parse import urlparse, urlencode, parse_qs
except ImportError:
from urlparse import urlparse, parse_qs
from urllib import urlencode
import base64
import json
import re
import ssl
import sys
import uuid
import paho.mqtt.client as mqtt
from boto3 import Session
from botocore.auth import SigV4Auth
from botocore.awsrequest import AWSRequest
def on_star(label):
def f(*args, **kwargs):
print(label, args, kwargs, file=sys.stderr, flush=True)
return f
def on_message(client, userdata, msg):
try:
print(json.dumps(json.loads(msg.payload.decode("utf-8"))), flush=True)
except:
print(msg.payload.decode("utf-8"), flush=True)
def signing_headers(url_string: str) -> dict:
method = "GET"
# Adapted from:
# https://github.com/jmenga/requests-aws-sign/blob/master/requests_aws_sign/requests_aws_sign.py
region = re.search("iot.(.*).amazonaws.com", url_string).group(1)
url = urlparse(url_string)
path = url.path or '/'
querystring = ''
if url.query:
querystring = '?' + urlencode(
parse_qs(url.query, keep_blank_values=True), doseq=True)
safe_url = "https" + '://' + url.netloc.split(':')[0] + path + querystring
request = AWSRequest(method=method.upper(), url=safe_url)
SigV4Auth(Session().get_credentials(), "iotdevicegateway",
region).add_auth(request)
return dict(request.headers.items())
if __name__ == "__main__":
url = sys.argv[1]
host = urlparse(url).netloc
headers = signing_headers(url)
# Now add a bunch of other boilerplate headers MQTT won't do for us
headers["Host"] = host
headers["Upgrade"] = "websocket"
headers["Connection"] = "Upgrade"
headers["Origin"] = "https://%s" % host
headers["Sec-Websocket-Version"] = "13"
headers["Sec-Websocket-Protocol"] = "mqtt"
# The client needs to use the websockets transport
client = mqtt.Client(transport="websockets")
# Callbacks when we're connected, and when a message arrives.
client.on_message = on_message
# client.on_connect = on_star("connect")
# client.on_disconnect = on_star("disconnect")
# client.on_socket_open = on_star("sock_open")
# client.on_socket_close = on_star("sock_close")
# client.on_subscribe = on_star("sub")
# client.on_unsubscribe = on_star("unsub")
# With our constructed headers
client.ws_set_options(headers=headers)
# AWS mandates TLS 1.2, so use that
client.tls_set(tls_version=ssl.PROTOCOL_TLSv1_2)
# Now finally connect
client.connect(host=host, port=443)
# Subscribe to the topic indicated on the command line
client.subscribe(sys.argv[2])
# Loop, printing messages as they arrive.
client.loop_forever()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment