Created
November 10, 2017 08:35
-
-
Save ccooper21/24e76fd7906785e0cb7c1d466ecb6371 to your computer and use it in GitHub Desktop.
These scripts validate the websocket client handshake functionality described in RFC6455 and provided by MicroPython's "websocket_helper".
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script validates the websocket client handshake functionality, described in RFC6455 and provided by MicroPython's | |
# "websocket_helper" module, against the websocket endpoint for AWS' IoT MQTT-based service. For more information about | |
# this service, see https://aws.amazon.com/iot-platform/. Two key considerations with respect to integrating with AWS' | |
# service is that AWS requires the use of TLS/SSL (i.e. HTTPS) and the use of a cryptographically sound authentication | |
# mechanism. While AWS supports several authentication mechanisms, this script uses the AWS SigV4 digital signature | |
# mechanism described at http://docs.aws.amazon.com/general/latest/gr/signature-version-4.html and incorporates code | |
# based on the associated SigV4 Python-based example code. | |
# | |
# For this script to succeed, the changes included in pull request #3420 must be applied (see | |
# https://github.com/micropython/micropython/pull/3420). Further, the following keys in this script must be replaced | |
# with the proper AWS customer specific arguments. | |
# | |
# <AWS_ACCESS_KEY> | |
# <AWS_SECRET_KEY> | |
# <AWS_IOT_ENDPOINT_HOST> | |
# <AWS_REGION> | |
# | |
# Also, this script requires several modules from the "micropython-lib" project for the SigV4 implementation. These | |
# modules can be installed via the REPL as follows: | |
# | |
# import upip | |
# upip.install([ | |
# 'micropython-hashlib', | |
# 'micropython-hmac', | |
# 'micropython-time', | |
# 'micropython-urllib.parse' | |
# ]) | |
# | |
# Lastly, since the "websocket_helper" module is only included with the ESP8266 port, for other ports it is necessary to | |
# either add this module as a frozen module while building the port, or make it available at run-time via the file | |
# system. | |
# | |
# Ideally, this script would work on any MicroPython port with networking and TLS/SSL support (i.e. with the "usocket" | |
# and "ussl" modules). However, the "time" module requires a port with FFI support, and the "hashlib" module requires a | |
# run-time environment with significant free RAM. This means this script is only known to run on the UNIX port. | |
# | |
# As per the websocket specification, the handshake takes place using the HTTP protocol. This script will print both | |
# the HTTP request and response to assist in diagnosing any failures. | |
import hashlib | |
import hmac | |
import time | |
import urandom | |
import urllib.parse | |
import usocket | |
import ussl | |
import utime | |
import websocket_helper | |
access_key = b'<AWS_ACCESS_KEY>' | |
secret_key = b'<AWS_SECRET_KEY>' | |
host = b'<AWS_IOT_ENDPOINT_HOST>' | |
tcp_port = 443 | |
uri = b'/mqtt' | |
subprotocol = b'mqtt' | |
method = b'GET' | |
region = b'<AWS_REGION>' | |
service = b'iotdevicegateway' | |
print('*** Preparing AWS SigV4 signature...') | |
def sign(key, message): | |
return hmac.new(key, message, hashlib.sha256).digest() | |
def calc_signing_key(key, date_stamp, region, service): | |
key_date = sign(b'AWS4' + key, date_stamp) | |
key_region = sign(key_date, region) | |
key_service = sign(key_region, service) | |
key_signing = sign(key_service, b'aws4_request') | |
return key_signing | |
now = time.gmtime() | |
date_stamp = time.strftime('%Y%m%d', now).encode('ascii') | |
amz_date = time.strftime('%Y%m%dT%H%M%SZ', now).encode('ascii') | |
credential_scope = date_stamp + b'/' + region + b'/' + service + b'/aws4_request' | |
canonical_uri = uri | |
canonical_headers = b'host:' + host + b'\n' | |
amz_signed_headers = b'host' | |
canonical_query_string = ( | |
'X-Amz-Algorithm=AWS4-HMAC-SHA256' | |
'&X-Amz-Credential={credential}' | |
'&X-Amz-Date={date}' | |
'&X-Amz-Expires=30' | |
'&X-Amz-SignedHeaders={signed_headers}' | |
).format( | |
credential=urllib.parse.quote_plus(access_key + b'/' + credential_scope), | |
date=amz_date.decode('ascii'), | |
signed_headers=amz_signed_headers.decode('ascii') | |
).encode('ascii') | |
payload_hash = hashlib.sha256(b'').hexdigest() | |
canonical_request = ( | |
method | |
+ b'\n' | |
+ canonical_uri | |
+ b'\n' | |
+ canonical_query_string | |
+ b'\n' | |
+ canonical_headers | |
+ b'\n' | |
+ amz_signed_headers | |
+ b'\n' | |
+ payload_hash | |
) | |
string_to_sign = ( | |
b'AWS4-HMAC-SHA256\n' | |
+ amz_date | |
+ b'\n' | |
+ credential_scope | |
+ b'\n' | |
+ hashlib.sha256(canonical_request).hexdigest() | |
) | |
signing_key = calc_signing_key(secret_key, date_stamp, region, service) | |
signature = hmac.new(signing_key, string_to_sign, hashlib.sha256).hexdigest() | |
query_string = canonical_query_string + b'&X-Amz-Signature=' + signature | |
print('\n++ Canonical Request:\n{s}'.format(s=canonical_request.decode('ascii'))) | |
print('\n++ String to Sign:\n{s}'.format(s=string_to_sign.decode('ascii'))) | |
print('\n') | |
s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) | |
addr = usocket.getaddrinfo(host, tcp_port)[0][-1] | |
s.connect(addr) | |
s = ussl.wrap_socket(s) | |
class SocketWrapper: | |
def __init__(self, sock): | |
self._sock = sock | |
def write(self, content): | |
print(content.decode('ascii'), end='') | |
self._sock.write(content) | |
def read(self): | |
content = self._sock.read() | |
print(content.decode('ascii'), end='') | |
return content | |
def readline(self): | |
line = self._sock.readline() | |
print(line.decode('ascii'), end='') | |
return line | |
s = SocketWrapper(s) | |
print('*** Attempting websocket client handshake...') | |
urandom.seed(utime.ticks_ms()) | |
try: | |
websocket_helper.client_handshake(s, host, uri + b'?' + query_string, subprotocol) | |
print('*** Success!') | |
except OSError as e: | |
print('*** Failed (Error: "' + e.args[0] + '")') | |
print('*** Attempting to read HTTP response body to get failure detail...') | |
s.read() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# This script validates the websocket client handshake functionality, described in RFC6455 and provided by MicroPython's | |
# "websocket_helper" module, against the websocket endpoint for HiveMQ's MQTT service. For more information about this | |
# service, see https://www.hivemq.com/blog/mqtt-over-websockets-with-hivemq and | |
# http://www.hivemq.com/demos/websocket-client/. | |
# | |
# For this script to succeed, the changes included in pull request #3420 must be applied (see | |
# https://github.com/micropython/micropython/pull/3420). | |
# | |
# This script should work on any MicroPython port with networking support (i.e. with the "usocket" module). However, | |
# since the "websocket_helper" module is only included with the ESP8266 port, for other ports it is necessary to either | |
# add this module as a frozen module while building the port, or make it available at run-time via the file system. | |
# | |
# As per the websocket specification, the handshake takes place using the HTTP protocol. This script will print both | |
# the HTTP request and response to assist in diagnosing any failures. | |
import urandom | |
import usocket | |
import utime | |
import websocket_helper | |
host = b'broker.mqttdashboard.com' | |
tcp_port = 8000 | |
uri = b'/mqtt' | |
subprotocol = b'mqtt' | |
s = usocket.socket(usocket.AF_INET, usocket.SOCK_STREAM) | |
addr = usocket.getaddrinfo(host, tcp_port)[0][-1] | |
s.connect(addr) | |
class SocketWrapper: | |
def __init__(self, sock): | |
self._sock = sock | |
def write(self, content): | |
print(content.decode('ascii'), end='') | |
self._sock.write(content) | |
def read(self): | |
content = self._sock.read() | |
print(content.decode('ascii'), end='') | |
return content | |
def readline(self): | |
line = self._sock.readline() | |
print(line.decode('ascii'), end='') | |
return line | |
s = SocketWrapper(s) | |
print('*** Attempting websocket client handshake...') | |
urandom.seed(utime.ticks_ms()) | |
try: | |
websocket_helper.client_handshake(s, host, uri, subprotocol) | |
print('*** Success!') | |
except OSError as e: | |
print('*** Failed (Error: "' + e.args[0] + '")') | |
print('*** Attempting to read HTTP response body to get failure detail...') | |
s.read() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment