Skip to content

Instantly share code, notes, and snippets.

@chriswheeldon
Last active December 18, 2021 20:11
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save chriswheeldon/3b17d974db3817613c69191c0480fe55 to your computer and use it in GitHub Desktop.
Save chriswheeldon/3b17d974db3817613c69191c0480fe55 to your computer and use it in GitHub Desktop.
New TP-Link HS110 handshake protocol POC
#!/usr/bin/env python3
import binascii
import socket
import requests
# pycryptodome
from Crypto import Random
from Crypto.Cipher import AES
from Crypto.Util import Counter, Padding
import logging
import hashlib
import time
import json
# Add logging from urllib
import http.client as http_client
http_client.HTTPConnection.debuglevel = 1
logging.basicConfig()
logging.getLogger().setLevel(logging.DEBUG)
requests_log = logging.getLogger("requests.packages.urllib3")
requests_log.setLevel(logging.DEBUG)
requests_log.propagate = True
class EncryptionSession:
def __init__(self, local_seed, remote_seed, user_hash):
self._key = self._key_derive(local_seed, remote_seed, user_hash)
(self._iv, self._seq) = self._iv_derive(local_seed, remote_seed, user_hash)
self._sig = self._sig_derive(local_seed, remote_seed, user_hash)
def _key_derive(self, local_seed, remote_seed, user_hash):
payload = 'lsk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:16]
def _iv_derive(self, local_seed, remote_seed, user_hash):
# iv is first 16 bytes of sha256, where the last 4 bytes forms the
# sequence number used in requests and is incremented on each request
payload = 'iv'.encode('utf-8') + local_seed + remote_seed + user_hash
iv = hashlib.sha256(payload).digest()[:16]
return (iv[:12], (int.from_bytes(iv[12:16], 'big') & 0x7fffffff))
def _sig_derive(self, local_seed, remote_seed, user_hash):
# used to create a hash with which to prefix each request
payload = 'ldk'.encode('utf-8') + local_seed + remote_seed + user_hash
return hashlib.sha256(payload).digest()[:28]
def iv(self):
seq = self._seq.to_bytes(4, 'big')
iv = self._iv + seq
assert(len(iv) == 16)
return iv
def encrypt(self, msg):
self._seq = self._seq + 1
if (type(msg) == str):
msg = msg.encode('utf-8')
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
ciphertext = cipher.encrypt(Padding.pad(msg, AES.block_size))
signature = hashlib.sha256(self._sig + self._seq.to_bytes(4, 'big') + ciphertext).digest()
return (signature + ciphertext, self._seq)
def decrypt(self, msg):
assert(type(msg) == bytes)
cipher = AES.new(self._key, AES.MODE_CBC, self.iv())
plaintext = Padding.unpad(cipher.decrypt(msg[32:]), AES.block_size)
return plaintext
class Handshake:
def __init__(self, ip):
self.ip = ip
self.local_seed = Random.get_random_bytes(16)
def user_hash(self):
# md5(md5(email) + md5(pass))
# device is not connected to tplink cloud i.e. email and pass are empty
# may need to include your email and password below if app/plug are associated with a tplink account?
# i.e. user_hash = hashlib.md5(b'<email>').digest() + hashlib.md5(b'<pass>').digest()
user_hash = hashlib.md5(b'').digest() + hashlib.md5(b'').digest()
return hashlib.md5(user_hash).digest()
def perform(self, http_session):
# step 1 - send our seed
result = http_session.post('http://{}:80/app/handshake1'.format(self.ip), data=self.local_seed)
assert(result.status_code == 200)
body = result.content
self.remote_seed = body[:16]
assert(hashlib.sha256(self.local_seed + self.user_hash()).digest() == body[16:]) # device responds with hash of seed + user hash
time.sleep(0.25)
# step 2 - send hash of remote seed + user hash
payload = hashlib.sha256(self.remote_seed + self.user_hash()).digest()
result = s.post('http://{}:80/app/handshake2'.format(self.ip), data=payload)
assert(result.status_code == 200)
time.sleep(0.5) # device seems to return 403 if first request is made too soon after handshake?
return EncryptionSession(self.local_seed, self.remote_seed, self.user_hash())
def discover():
hexPayload = '020000010000000000000000463cb5d3' # magic number
binary = binascii.unhexlify(hexPayload)
sock_a = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock_a.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock_a.sendto(binary, ('255.255.255.255', 20002))
buf = sock_a.recv(1024)
buf = buf[16:].decode('utf-8')
return json.loads(buf)['result']['ip']
if __name__ == '__main__':
s = requests.Session()
ip = discover()
print('Found device at {}'.format(ip))
handshake = Handshake(ip)
encryption = handshake.perform(s)
(msg, seq) = encryption.encrypt('{"system":{"get_sysinfo":null}}')
res = s.post('http://{}:80/app/request'.format(ip), params={'seq': seq}, data=msg)
assert(res.status_code == 200)
print(encryption.decrypt(res.content))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment