-
-
Save chriswheeldon/3b17d974db3817613c69191c0480fe55 to your computer and use it in GitHub Desktop.
New TP-Link HS110 handshake protocol POC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import binascii | |
import socket | |
import requests | |
# pycryptodome | |
from Crypto import Random | |
from Crypto.Cipher import AES | |
from Crypto.Util import Counter, Padding | |
import logging | |
import hashlib | |
import time | |
import json | |
# Add logging from urllib | |
import http.client as http_client | |
http_client.HTTPConnection.debuglevel = 1 | |
logging.basicConfig() | |
logging.getLogger().setLevel(logging.DEBUG) | |
requests_log = logging.getLogger("requests.packages.urllib3") | |
requests_log.setLevel(logging.DEBUG) | |
requests_log.propagate = True | |
class EncryptionSession: | |
def __init__(self, local_seed, remote_seed, user_hash): | |
self._key = self._key_derive(local_seed, remote_seed, user_hash) | |
(self._iv, self._seq) = self._iv_derive(local_seed, remote_seed, user_hash) | |
self._sig = self._sig_derive(local_seed, remote_seed, user_hash) | |
def _key_derive(self, local_seed, remote_seed, user_hash): | |
payload = 'lsk'.encode('utf-8') + local_seed + remote_seed + user_hash | |
return hashlib.sha256(payload).digest()[:16] | |
def _iv_derive(self, local_seed, remote_seed, user_hash): | |
# iv is first 16 bytes of sha256, where the last 4 bytes forms the | |
# sequence number used in requests and is incremented on each request | |
payload = 'iv'.encode('utf-8') + local_seed + remote_seed + user_hash | |
iv = hashlib.sha256(payload).digest()[:16] | |
return (iv[:12], (int.from_bytes(iv[12:16], 'big') & 0x7fffffff)) | |
def _sig_derive(self, local_seed, remote_seed, user_hash): | |
# used to create a hash with which to prefix each request | |
payload = 'ldk'.encode('utf-8') + local_seed + remote_seed + user_hash | |
return hashlib.sha256(payload).digest()[:28] | |
def iv(self): | |
seq = self._seq.to_bytes(4, 'big') | |
iv = self._iv + seq | |
assert(len(iv) == 16) | |
return iv | |
def encrypt(self, msg): | |
self._seq = self._seq + 1 | |
if (type(msg) == str): | |
msg = msg.encode('utf-8') | |
assert(type(msg) == bytes) | |
cipher = AES.new(self._key, AES.MODE_CBC, self.iv()) | |
ciphertext = cipher.encrypt(Padding.pad(msg, AES.block_size)) | |
signature = hashlib.sha256(self._sig + self._seq.to_bytes(4, 'big') + ciphertext).digest() | |
return (signature + ciphertext, self._seq) | |
def decrypt(self, msg): | |
assert(type(msg) == bytes) | |
cipher = AES.new(self._key, AES.MODE_CBC, self.iv()) | |
plaintext = Padding.unpad(cipher.decrypt(msg[32:]), AES.block_size) | |
return plaintext | |
class Handshake: | |
def __init__(self, ip): | |
self.ip = ip | |
self.local_seed = Random.get_random_bytes(16) | |
def user_hash(self): | |
# md5(md5(email) + md5(pass)) | |
# device is not connected to tplink cloud i.e. email and pass are empty | |
# may need to include your email and password below if app/plug are associated with a tplink account? | |
# i.e. user_hash = hashlib.md5(b'<email>').digest() + hashlib.md5(b'<pass>').digest() | |
user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest() | |
return hashlib.md5(user_hash).digest() | |
def perform(self, http_session): | |
# step 1 - send our seed | |
result = http_session.post('http://{}:80/app/handshake1'.format(self.ip), data=self.local_seed) | |
assert(result.status_code == 200) | |
body = result.content | |
self.remote_seed = body[:16] | |
assert(hashlib.sha256(self.local_seed + self.user_hash()).digest() == body[16:]) # device responds with hash of seed + user hash | |
time.sleep(0.25) | |
# step 2 - send hash of remote seed + user hash | |
payload = hashlib.sha256(self.remote_seed + self.user_hash()).digest() | |
result = s.post('http://{}:80/app/handshake2'.format(self.ip), data=payload) | |
assert(result.status_code == 200) | |
time.sleep(0.5) # device seems to return 403 if first request is made too soon after handshake? | |
return EncryptionSession(self.local_seed, self.remote_seed, self.user_hash()) | |
def discover(): | |
hexPayload = '020000010000000000000000463cb5d3' # magic number | |
binary = binascii.unhexlify(hexPayload) | |
sock_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
sock_a.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1) | |
sock_a.sendto(binary, ('255.255.255.255', 20002)) | |
buf = sock_a.recv(1024) | |
buf = buf[16:].decode('utf-8') | |
return json.loads(buf)['result']['ip'] | |
if __name__ == '__main__': | |
s = requests.Session() | |
ip = discover() | |
print('Found device at {}'.format(ip)) | |
handshake = Handshake(ip) | |
encryption = handshake.perform(s) | |
(msg, seq) = encryption.encrypt('{"system":{"get_sysinfo":null}}') | |
res = s.post('http://{}:80/app/request'.format(ip), params={'seq': seq}, data=msg) | |
assert(res.status_code == 200) | |
print(encryption.decrypt(res.content)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment