Skip to content

Instantly share code, notes, and snippets.

@irsl
Created October 8, 2023 15:44
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save irsl/e6b6e635dc8dfba9ea3e8b6263f119c1 to your computer and use it in GitHub Desktop.
Save irsl/e6b6e635dc8dfba9ea3e8b6263f119c1 to your computer and use it in GitHub Desktop.
Find tapo devices over LAN
root@host:/# python3 tapo_scanner.py
{
"device_id": "36612bba[redacted]9005d5f",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOPLUG",
"device_model": "P110(EU)",
"ip": "10.6.8.113",
"mac": "28-87-BA-48-80-14",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "2409a2167[redacted]dcc8ab2faf6",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOHUB",
"device_model": "H100(EU)",
"ip": "10.6.8.134",
"mac": "48-22-54-2D-89-02",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "59cb47973[redacted]1522ae5fd16ca",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOPLUG",
"device_model": "P110(EU)",
"ip": "10.6.8.139",
"mac": "30-DE-4B-7A-C1-93",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "4c03fd8[redacted]270d69dee1394",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOPLUG",
"device_model": "P110(EU)",
"ip": "10.6.8.189",
"mac": "AC-15-A2-E4-3E-16",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "300484c6[redacted]bdc5decb7ac7f",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOPLUG",
"device_model": "P110(EU)",
"ip": "10.6.8.249",
"mac": "30-DE-4B-7A-D4-E2",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "1eee33eb8[redacted]8fa78ce7cf64",
"owner": "E4483[redacted]BCBD1",
"device_type": "SMART.TAPOPLUG",
"device_model": "P110(EU)",
"ip": "10.6.8.160",
"mac": "30-DE-4B-37-0C-AC",
"is_support_iot_cloud": true,
"obd_src": "tplink",
"factory_default": false,
"mgt_encrypt_schm": {
"is_support_https": false,
"encrypt_type": "AES",
"http_port": 80,
"lv": 2
}
}
{
"device_id": "68FCDD103[redacted]A86DF4865F41",
"device_name": "nappali",
"device_type": "SMART.IPCAMERA",
"device_model": "C110",
"ip": "10.6.8.216",
"mac": "9C-A2-F4-F4-D5-91",
"hardware_version": "1.0",
"firmware_version": "1.3.0 Build 220830 Rel.69880n(4555)",
"factory_default": false,
"is_support_iot_cloud": true,
"mgt_encrypt_schm": {
"is_support_https": true
},
"encrypt_info": {
[redacted]
}
}
{
"device_id": "CEC1A3789[redacted]8F2EFB29",
"device_name": "bej\u00e1rat (bent)",
"device_type": "SMART.IPCAMERA",
"device_model": "C110",
"ip": "10.6.8.229",
"mac": "9C-A2-F4-F4-CA-F7",
"hardware_version": "1.0",
"firmware_version": "1.3.0 Build 220830 Rel.69880n(4555)",
"factory_default": false,
"is_support_iot_cloud": true,
"mgt_encrypt_schm": {
"is_support_https": true
},
"encrypt_info": {
[redacted]
}
}
{
"device_id": "C745C38[redacted]BC70533",
"device_name": "Levi",
"device_type": "SMART.IPCAMERA",
"device_model": "C110",
"ip": "10.6.8.165",
"mac": "5C-62-8B-89-B7-5C",
"hardware_version": "2.0",
"firmware_version": "1.3.2 Build 221208 Rel.40424n(4555)",
"factory_default": false,
"is_support_iot_cloud": true,
"mgt_encrypt_schm": {
"is_support_https": true
},
"encrypt_info": {
[redacted]
}
}
#!/usr/bin/env python3
"""
from a ubuntu env:
docker run --rm -it --network host ubuntu
apt update ; apt install -y python3 python3-pip
pip3 install pycryptodome
"""
import struct
import sys
import zlib
import json
import base64
import socket
from Crypto.Cipher import PKCS1_v1_5
from Crypto.Cipher import PKCS1_OAEP
from Crypto.PublicKey import RSA
from Crypto.Cipher import AES
import select
import base64
import http.server, ssl
import threading
import os
import time
DEBUG = int(os.getenv("DEBUG") or "0")
PKT_ONBOARD_REQUEST = b'\x11\x00' # \x02\x0D\x87\x23'
PKT_ONBOARD_RESPONSE = b'"\x01' # \x02\r\x87#'
OUR_KEY = RSA.generate(2048) # if preferred, hardcoded key could be used: RSA.import_key("-----BEGIN PRIVATE KEY-----...")
OUR_PUBLIC_KEY = OUR_KEY.public_key().export_key('PEM').decode()
OUR_CIPHER = PKCS1_OAEP.new(OUR_KEY)
def eprint(*args, **kwargs):
if not DEBUG: return
print(*args, **kwargs, file=sys.stderr)
# note: pkcs7.PKCS7Encoder().encode is broken
# https://stackoverflow.com/questions/43199123/encrypting-with-aes-256-and-pkcs7-padding
def pkcs7_pad(input_str, block_len=16):
return input_str + chr(block_len-len(input_str)%block_len)*(block_len-len(input_str)%16)
def pkcs7_unpad(ct):
return ct[:-ord(ct[-1])]
class TpLinkCipher:
def __init__(self, b_arr: bytearray, b_arr2: bytearray):
self.iv = b_arr2
self.key = b_arr
def encrypt(self, data):
data = pkcs7_pad(data)
cipher = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv))
encrypted = cipher.encrypt(data.encode())
return base64.b64encode(encrypted).decode().replace("\r\n","")
def decrypt(self, data: str):
aes = AES.new(bytes(self.key), AES.MODE_CBC, bytes(self.iv))
pad_text = aes.decrypt(base64.b64decode(data.encode())).decode()
return pkcs7_unpad(pad_text)
def extract_pkt_id(packet):
return packet[8:12]
def extract_payload_from_package(packet):
return packet[16:]
def extract_payload_from_package_json(packet):
return json.loads(packet[16:])
def build_packet_for_payload(payload, pkt_type, pkt_id=b"\x01\x02\x03\x04"):
len_bytes = struct.pack(">h", len(payload))
skeleton = b'\x02\x00\x00\x01'+len_bytes+pkt_type+pkt_id+b'\x5A\x6B\x7C\x8D'+payload
calculated_crc32 = zlib.crc32(skeleton) & 0xffffffff
calculated_crc32_bytes = struct.pack(">I", calculated_crc32)
re = skeleton[0:12] + calculated_crc32_bytes + skeleton[16:]
return re
def build_packet_for_payload_json(payload, pkt_type, pkt_id=b"\x01\x02\x03\x04"):
return build_packet_for_payload(json.dumps(payload).encode(), pkt_type, pkt_id)
def process_encrypted_handshake(response):
encryptedSessionKey = response["result"]["encrypt_info"]["key"]
encryptedSessionKeyBytes = base64.b64decode(encryptedSessionKey.encode())
clearSessionKeyBytes = OUR_CIPHER.decrypt(encryptedSessionKeyBytes)
if not clearSessionKeyBytes:
raise ValueError("Decryption failed!")
b_arr = bytearray()
b_arr2 = bytearray()
for i in range(0, 16):
b_arr.insert(i, clearSessionKeyBytes[i])
for i in range(0, 16):
b_arr2.insert(i, clearSessionKeyBytes[i + 16])
cipher = TpLinkCipher(b_arr, b_arr2)
cleartextDataBytes = cipher.decrypt(response["result"]["encrypt_info"]["data"])
eprint("handshake payload decrypted as", cleartextDataBytes)
return json.loads(cleartextDataBytes)
def find_tapo_devices(timeout=3):
packet = build_packet_for_payload_json({"params":{"rsa_key": OUR_PUBLIC_KEY}}, PKT_ONBOARD_REQUEST)
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) # UDP
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.setsockopt(socket.IPPROTO_IP, socket.IP_TTL, 5);
# sock.settimeout(2)
sock.sendto(packet, ("255.255.255.255", 20002))
eprint("packet sent", packet)
pollerObject = select.poll()
pollerObject.register(sock, select.POLLIN)
before = time.time()
while True:
fdVsEvent = pollerObject.poll(100) # 0.1 sec
if fdVsEvent:
handshake_packet, addr = sock.recvfrom(2048)
eprint("received", addr, handshake_packet)
try:
handshake_json = extract_payload_from_package_json(handshake_packet)
if handshake_json["error_code"]:
continue
result = handshake_json["result"]
yield result
except:
pass
now = time.time()
if now - before > timeout:
break
if __name__ == "__main__":
for td in find_tapo_devices():
print(json.dumps(td, indent=3))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment