Created
July 13, 2017 21:50
-
-
Save syssi/4da06e2c2d1e897984b0c53778cc4da4 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import datetime | |
import hashlib | |
import json | |
import logging | |
import calendar | |
from construct import (Struct, Bytes, Const, Int16ub, Int32ub, GreedyBytes, | |
Adapter, Checksum, RawCopy, Rebuild, IfThenElse, | |
Default, Pointer, Pass, Enum) | |
# for debugging parsing | |
# from construct import Probe | |
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes | |
from cryptography.hazmat.backends import default_backend | |
from cryptography.hazmat.primitives import padding | |
_LOGGER = logging.getLogger(__name__) | |
# Map of device ids | |
xiaomi_devices = {0x02f2: "Xiaomi Mi Robot Vacuum", | |
0x02c1: "Xiaomi Smart Mi Air Purifier", | |
0x031a: "Xiaomi Smart home gateway", | |
0x0330: "Yeelight color bulb" | |
} | |
xiaomi_devices = {y: x for x, y in xiaomi_devices.items()} | |
class Utils: | |
""" This class is adapted from the original xpn.py code by gst666 """ | |
# TODO nicer way to handle tokens | |
token = "" | |
def md5(data): | |
checksum = hashlib.md5() | |
checksum.update(data) | |
return checksum.digest() | |
def key_iv(): | |
key = Utils.md5(Utils.token) | |
iv = Utils.md5(key + Utils.token) | |
return key, iv | |
def encrypt(plaintext): | |
key, iv = Utils.key_iv() | |
padder = padding.PKCS7(128).padder() | |
padded_plaintext = padder.update(plaintext) + padder.finalize() | |
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), | |
backend=default_backend()) | |
encryptor = cipher.encryptor() | |
return encryptor.update(padded_plaintext) + encryptor.finalize() | |
def decrypt(ciphertext): | |
key, iv = Utils.key_iv() | |
cipher = Cipher(algorithms.AES(key), modes.CBC(iv), | |
backend=default_backend()) | |
decryptor = cipher.decryptor() | |
padded_plaintext = decryptor.update(ciphertext) + decryptor.finalize() | |
unpadder = padding.PKCS7(128).unpadder() | |
unpadded_plaintext = unpadder.update(padded_plaintext) | |
unpadded_plaintext += unpadder.finalize() | |
return unpadded_plaintext | |
def checksum_field_bytes(ctx): | |
"""Gatherd bytes for checksum calculation""" | |
# print("CHECKSUM: %s" % ctx["header"]) | |
if Utils.token is None: | |
raise Exception("you have to define token") | |
#print("CTX: %s" % ctx) | |
x = bytearray(ctx["header"].data) | |
x += Utils.token | |
if "data" in ctx: | |
x += ctx["data"].data | |
# print("DATA: %s" % ctx["data"]) | |
return x | |
def get_length(x): | |
"""Return total packet length.""" | |
datalen = x._.data.length | |
return datalen + 32 | |
def is_hello(x): | |
"""Return if packet is a hello packet.""" | |
# not very nice, but we know that hellos are 32b of length | |
if 'length' in x: | |
val = x['length'] | |
else: | |
val = x.header.value['length'] | |
return val == 32 | |
class TimeAdapter(Adapter): | |
"""Adapter for timestamp conversion.""" | |
def _encode(self, obj, context): | |
return calendar.timegm(obj.timetuple()) | |
def _decode(self, obj, context): | |
return datetime.datetime.fromtimestamp(obj) | |
class EncryptionAdapter(Adapter): | |
"""Adapter to handle communication encryption.""" | |
def _encode(self, obj, context): | |
return Utils.encrypt(json.dumps(obj).encode('utf-8') + b'\x00') | |
def _decode(self, obj, context): | |
try: | |
decrypted = Utils.decrypt(obj).rstrip(b"\x00") | |
except Exception as ex: | |
_LOGGER.debug("Unable to decrypt, returning raw bytes.") | |
return obj | |
try: | |
jsoned = json.loads(decrypted.decode('utf-8')) | |
except: | |
_LOGGER.error("unable to parse json, was: %s", decrypted) | |
jsoned = b'{}' | |
raise | |
return jsoned | |
Message = Struct( | |
# for building we need data before anything else. | |
"data" / Pointer(32, RawCopy(EncryptionAdapter(GreedyBytes))), | |
"header" / RawCopy(Struct( | |
Const(Int16ub, 0x2131), | |
"length" / Rebuild(Int16ub, Utils.get_length), | |
"unknown" / Default(Int32ub, 0x00000000), | |
"devtype" / Enum(Default(Int16ub, 0x02C1), | |
default=Pass, **xiaomi_devices), | |
"serial" / Default(Int16ub, 0xa40d), | |
"ts" / Default(Int32ub, 0x00000000), | |
)), | |
"checksum" / IfThenElse( | |
Utils.is_hello, | |
Bytes(16), | |
Checksum(Bytes(16), | |
Utils.md5, | |
Utils.checksum_field_bytes)), | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment