Skip to content

Instantly share code, notes, and snippets.

@ccooper21
Created November 10, 2017 08:35
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ccooper21/24e76fd7906785e0cb7c1d466ecb6371 to your computer and use it in GitHub Desktop.
Save ccooper21/24e76fd7906785e0cb7c1d466ecb6371 to your computer and use it in GitHub Desktop.
These scripts validate the websocket client handshake functionality described in RFC6455 and provided by MicroPython's "websocket_helper".
# This script validates the websocket client handshake functionality, described in RFC6455 and provided by MicroPython's
# "websocket_helper" module, against the websocket endpoint for AWS' IoT MQTT-based service. For more information about
# this service, see https://aws.amazon.com/iot-platform/. Two key considerations with respect to integrating with AWS'
# service is that AWS requires the use of TLS/SSL (i.e. HTTPS) and the use of a cryptographically sound authentication
# mechanism. While AWS supports several authentication mechanisms, this script uses the AWS SigV4 digital signature
# mechanism described at http://docs.aws.amazon.com/general/latest/gr/signature-version-4.html and incorporates code
# based on the associated SigV4 Python-based example code.
#
# For this script to succeed, the changes included in pull request #3420 must be applied (see
# https://github.com/micropython/micropython/pull/3420). Further, the following keys in this script must be replaced
# with the proper AWS customer specific arguments.
#
# <AWS_ACCESS_KEY>
# <AWS_SECRET_KEY>
# <AWS_IOT_ENDPOINT_HOST>
# <AWS_REGION>
#
# Also, this script requires several modules from the "micropython-lib" project for the SigV4 implementation. These
# modules can be installed via the REPL as follows:
#
# import upip
# upip.install([
# 'micropython-hashlib',
# 'micropython-hmac',
# 'micropython-time',
# 'micropython-urllib.parse'
# ])
#
# Lastly, since the "websocket_helper" module is only included with the ESP8266 port, for other ports it is necessary to
# either add this module as a frozen module while building the port, or make it available at run-time via the file
# system.
#
# Ideally, this script would work on any MicroPython port with networking and TLS/SSL support (i.e. with the "usocket"
# and "ussl" modules). However, the "time" module requires a port with FFI support, and the "hashlib" module requires a
# run-time environment with significant free RAM. This means this script is only known to run on the UNIX port.
#
# As per the websocket specification, the handshake takes place using the HTTP protocol. This script will print both
# the HTTP request and response to assist in diagnosing any failures.
import hashlib
import hmac
import time
import urandom
import urllib.parse
import usocket
import ussl
import utime
import websocket_helper
access_key = b'<AWS_ACCESS_KEY>'
secret_key = b'<AWS_SECRET_KEY>'
host = b'<AWS_IOT_ENDPOINT_HOST>'
tcp_port = 443
uri = b'/mqtt'
subprotocol = b'mqtt'
method = b'GET'
region = b'<AWS_REGION>'
service = b'iotdevicegateway'
print('*** Preparing AWS SigV4 signature...')
def sign(key, message):
return hmac.new(key, message, hashlib.sha256).digest()
def calc_signing_key(key, date_stamp, region, service):
key_date = sign(b'AWS4' + key, date_stamp)
key_region = sign(key_date, region)
key_service = sign(key_region, service)
key_signing = sign(key_service, b'aws4_request')
return key_signing
now = time.gmtime()
date_stamp = time.strftime('%Y%m%d', now).encode('ascii')
amz_date = time.strftime('%Y%m%dT%H%M%SZ', now).encode('ascii')
credential_scope = date_stamp + b'/' + region + b'/' + service + b'/aws4_request'
canonical_uri = uri
canonical_headers = b'host:' + host + b'\n'
amz_signed_headers = b'host'
canonical_query_string = (
'X-Amz-Algorithm=AWS4-HMAC-SHA256'
'&X-Amz-Credential={credential}'
'&X-Amz-Date={date}'
'&X-Amz-Expires=30'
'&X-Amz-SignedHeaders={signed_headers}'
).format(
credential=urllib.parse.quote_plus(access_key + b'/' + credential_scope),
date=amz_date.decode('ascii'),
signed_headers=amz_signed_headers.decode('ascii')
).encode('ascii')
payload_hash = hashlib.sha256(b'').hexdigest()
canonical_request = (
method
+ b'\n'
+ canonical_uri
+ b'\n'
+ canonical_query_string
+ b'\n'
+ canonical_headers
+ b'\n'
+ amz_signed_headers
+ b'\n'
+ payload_hash
)
string_to_sign = (
b'AWS4-HMAC-SHA256\n'
+ amz_date
+ b'\n'
+ credential_scope
+ b'\n'
+ hashlib.sha256(canonical_request).hexdigest()
)
signing_key = calc_signing_key(secret_key, date_stamp, region, service)
signature = hmac.new(signing_key, string_to_sign, hashlib.sha256).hexdigest()
query_string = canonical_query_string + b'&X-Amz-Signature=' + signature
print('\n++ Canonical Request:\n{s}'.format(s=canonical_request.decode('ascii')))
print('\n++ String to Sign:\n{s}'.format(s=string_to_sign.decode('ascii')))
print('\n')
s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
addr = usocket.getaddrinfo(host, tcp_port)[0][-1]
s.connect(addr)
s = ussl.wrap_socket(s)
class SocketWrapper:
def __init__(self, sock):
self._sock = sock
def write(self, content):
print(content.decode('ascii'), end='')
self._sock.write(content)
def read(self):
content = self._sock.read()
print(content.decode('ascii'), end='')
return content
def readline(self):
line = self._sock.readline()
print(line.decode('ascii'), end='')
return line
s = SocketWrapper(s)
print('*** Attempting websocket client handshake...')
urandom.seed(utime.ticks_ms())
try:
websocket_helper.client_handshake(s, host, uri + b'?' + query_string, subprotocol)
print('*** Success!')
except OSError as e:
print('*** Failed (Error: "' + e.args[0] + '")')
print('*** Attempting to read HTTP response body to get failure detail...')
s.read()
# This script validates the websocket client handshake functionality, described in RFC6455 and provided by MicroPython's
# "websocket_helper" module, against the websocket endpoint for HiveMQ's MQTT service. For more information about this
# service, see https://www.hivemq.com/blog/mqtt-over-websockets-with-hivemq and
# http://www.hivemq.com/demos/websocket-client/.
#
# For this script to succeed, the changes included in pull request #3420 must be applied (see
# https://github.com/micropython/micropython/pull/3420).
#
# This script should work on any MicroPython port with networking support (i.e. with the "usocket" module). However,
# since the "websocket_helper" module is only included with the ESP8266 port, for other ports it is necessary to either
# add this module as a frozen module while building the port, or make it available at run-time via the file system.
#
# As per the websocket specification, the handshake takes place using the HTTP protocol. This script will print both
# the HTTP request and response to assist in diagnosing any failures.
import urandom
import usocket
import utime
import websocket_helper
host = b'broker.mqttdashboard.com'
tcp_port = 8000
uri = b'/mqtt'
subprotocol = b'mqtt'
s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM)
addr = usocket.getaddrinfo(host, tcp_port)[0][-1]
s.connect(addr)
class SocketWrapper:
def __init__(self, sock):
self._sock = sock
def write(self, content):
print(content.decode('ascii'), end='')
self._sock.write(content)
def read(self):
content = self._sock.read()
print(content.decode('ascii'), end='')
return content
def readline(self):
line = self._sock.readline()
print(line.decode('ascii'), end='')
return line
s = SocketWrapper(s)
print('*** Attempting websocket client handshake...')
urandom.seed(utime.ticks_ms())
try:
websocket_helper.client_handshake(s, host, uri, subprotocol)
print('*** Success!')
except OSError as e:
print('*** Failed (Error: "' + e.args[0] + '")')
print('*** Attempting to read HTTP response body to get failure detail...')
s.read()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment