Skip to content

Instantly share code, notes, and snippets.

@rosmo
Created January 12, 2022 21:54
Show Gist options
  • Save rosmo/29200c1aedb991ce55942c4ae8b54edd to your computer and use it in GitHub Desktop.
Save rosmo/29200c1aedb991ce55942c4ae8b54edd to your computer and use it in GitHub Desktop.
TP-Link X90 Deco API example
from requests.api import request
from Crypto.Cipher import PKCS1_OAEP, PKCS1_v1_5
from Crypto.Cipher import AES
from Crypto.PublicKey import RSA
from Crypto.Util.Padding import pad
from Crypto.Util.number import bytes_to_long
import base64
import requests
import string
import random
from aiohttp.hdrs import (
ACCEPT,
COOKIE,
PRAGMA,
REFERER,
CONNECTION,
KEEP_ALIVE,
USER_AGENT,
CONTENT_TYPE,
CACHE_CONTROL,
ACCEPT_ENCODING,
ACCEPT_LANGUAGE
)
import time
import re
import hashlib
import json
import codecs
import logging
class C6:
def __init__(self, host, username, password):
self.host = host
self.username = username
self.password = password
self.pubkey = ''
self.jsessionId = ''
self.token = ''
self.stok = ''
self.sysauth = ''
self.rsa_public_key = []
self.public_key = ''
self.password_rsa_public_key = []
self.rsa_seq = ''
self.aes_key = '1641928074282809'
self.aes_iv = '1641928074282186'
self.pad_chars = ''
for c in range(16):
self.pad_chars += chr(c)
hash_str = "{}{}".format(self.username, self.password)
self.password_hash = hashlib.md5(hash_str.encode("utf-8")).hexdigest()
self.parse_macs_hyphens = re.compile('[0-9A-F]{2}-[0-9A-F]{2}-' +
'[0-9A-F]{2}-[0-9A-F]{2}-' +
'[0-9A-F]{2}-[0-9A-F]{2}')
self.parse_macs_colons = re.compile('[0-9A-F]{2}:[0-9A-F]{2}:' +
'[0-9A-F]{2}:[0-9A-F]{2}:' +
'[0-9A-F]{2}:[0-9A-F]{2}')
def scan_devices(self):
self._update_info()
self._log_out()
print(self.last_results.keys())
def get_device_name(self, device):
return self.last_results.get(device)
def _get_auth_tokens(self):
print("Retrieving auth tokens...")
url = 'http://{}/cgi-bin/luci/;stok=/login?form=login' \
.format(self.host)
referer = 'http://{}/webpages/login.html'.format(self.host)
# If possible implement RSA encryption of password here.
response = requests.post(
url, params={'operation': 'login', 'username': self.username,
'password': self.password},
headers={REFERER: referer}, timeout=4)
try:
self.stok = response.json().get('data').get('stok')
print(self.stok)
regex_result = re.search(
'sysauth=(.*);', response.headers['set-cookie'])
self.sysauth = regex_result.group(1)
print(self.sysauth)
return True
except (ValueError, KeyError, AttributeError) as _:
print("Couldn't fetch auth tokens! Response was: %s",
response.text)
return False
def _get_auth_tokens_rsa(self):
print("Getting public RSA key")
referer = 'http://{}/webpages/login.html'.format(self.host)
# Fetch 1024-bit RSA public key used specifically for encrypting
# the user's password
url = 'http://{}/cgi-bin/luci/;stok=/login' \
.format(self.host)
response = requests.post(
url, params={'form': 'keys'},
data={'operation': 'read'},
headers={REFERER: referer}, timeout=4)
try:
self.password_rsa_public_key = response.json().get('result').get('password')
except (ValueError, KeyError, AttributeError) as _:
print("Couldn't fetch password RSA keys! Response was: %s",
response.text)
return False
# Fetch 512-bit RSA public key used to encrypt the "signature"
# (which is actually URL-encoded string with AES key, IV, hash of
# username + password and sequence number)
url = 'http://{}/cgi-bin/luci/;stok=/login' \
.format(self.host)
response = requests.post(
url, params={'form': 'auth'},
data={'operation': 'read'},
headers={REFERER: referer}, timeout=4)
try:
self.rsa_public_key = response.json().get('result').get('key')
self.rsa_seq = response.json().get('result').get('seq')
except (ValueError, KeyError, AttributeError) as _:
print("Couldn't fetch data RSA public key! Response was: %s",
response.text)
return False
# Create MD5 hash of username concatenated with password
rsa_modulus_n = bytes_to_long(bytes.fromhex(self.password_rsa_public_key[0]))
rsa_public_exp = bytes_to_long(bytes.fromhex(self.password_rsa_public_key[1]))
public_key = RSA.construct((rsa_modulus_n, rsa_public_exp))
pkcs1_encryptor = PKCS1_v1_5.new(public_key)
password_encrypted = pkcs1_encryptor.encrypt(self.password.encode('utf-8'))
password_hex = codecs.encode(password_encrypted, 'hex').decode('utf-8')
auth_data = json.dumps({
"params": {"password": password_hex.upper()},
"operation": "login",
})
# Encrypt the authentication JSON with 128-bit AES in CBC mode
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8'))
ciphertext = cipher.encrypt(pad(auth_data.encode('utf-8'), 16))
# Encode ciphertext in base64
ciphertext_base64 = base64.b64encode(ciphertext)
# Construct the "sign" parameter
encrypt_str = "k={}&i={}&h={}&s={}".format(self.aes_key, self.aes_iv, self.password_hash, self.rsa_seq + len(ciphertext_base64))
rsa_modulus_n = bytes_to_long(bytes.fromhex(self.rsa_public_key[0]))
rsa_public_exp = bytes_to_long(bytes.fromhex(self.rsa_public_key[1]))
self.public_key = RSA.construct((rsa_modulus_n, rsa_public_exp))
pkcs1_encryptor = PKCS1_v1_5.new(self.public_key)
if len(encrypt_str) > 53:
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str[0:53].encode('utf-8'))
encrypted_str += pkcs1_encryptor.encrypt(encrypt_str[53:].encode('utf-8'))
else:
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str.encode('utf-8'))
# Convert signature to hex characters
signature = codecs.encode(encrypted_str, 'hex').decode('utf-8')
print("Retrieving auth tokens...")
referer = 'http://{}/webpages/index.html'.format(self.host)
url = 'http://{}/cgi-bin/luci/;stok=/login'.format(self.host)
# If possible implement RSA encryption of password here.
response = requests.post(
url, params={'form': 'login'},
data={'sign': signature, 'data': ciphertext_base64},
headers={
REFERER: referer,
"Content-Type": "application/json",
"X-Requested-With": "XMLHttpRequest" # This header is required for sure
})
try:
result = response.json().get('data')
result_decoded = base64.b64decode(result)
cipher = AES.new(self.aes_key.encode("utf-8"), AES.MODE_CBC, self.aes_iv.encode("utf-8"))
# Result plaintext will be JSON encoded
result_plaintext = cipher.decrypt(result_decoded).decode("utf-8")
# Decrypted data may be right-padded
result_json = json.loads(result_plaintext.rstrip(self.pad_chars))
self.stok = result_json['result']['stok']
regex_result = re.search(
'sysauth=(.*);', response.headers['set-cookie'])
self.sysauth = regex_result.group(1)
return True
except (ValueError, KeyError, AttributeError) as e:
print("Couldn't fetch auth tokens! Response was %d: %s" %
(response.status_code, response.text))
return False
def _get_signature_and_data(self, data={}):
encoded_data = json.dumps(data)
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8'))
ciphertext = cipher.encrypt(pad(encoded_data.encode('utf-8'), 16))
ciphertext_base64 = base64.b64encode(ciphertext)
encrypt_str = "h={}&s={}".format(self.password_hash, self.rsa_seq + len(ciphertext_base64))
pkcs1_encryptor = PKCS1_v1_5.new(self.public_key)
if len(encrypt_str) > 53:
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str[0:53].encode('utf-8'))
encrypted_str += pkcs1_encryptor.encrypt(encrypt_str[53:].encode('utf-8'))
else:
encrypted_str = pkcs1_encryptor.encrypt(encrypt_str.encode('utf-8'))
# Convert signature to hex characters
signature = codecs.encode(encrypted_str, 'hex').decode('utf-8')
return signature, ciphertext_base64
def _get_data(self, data):
return ciphertext_base64
def _decrypt_json_response(self, data):
try:
cipher = AES.new(self.aes_key.encode('utf-8'), AES.MODE_CBC, self.aes_iv.encode('utf-8'))
plaintext = cipher.decrypt(base64.b64decode(data))
return json.loads(plaintext.decode('utf-8').rstrip(self.pad_chars))
except Exception as e:
print(e)
def _update_info(self):
print("[C6] Loading wireless clients...")
if (self.stok == '') or (self.sysauth == ''):
self._get_auth_tokens_rsa()
url = 'http://{}/cgi-bin/luci/;stok={}/admin/client'.format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
origin = 'http://{}'.format(self.host)
signature, data = self._get_signature_and_data({
"operation": "read",
"params": { "device_mac": "default" },
})
response = requests.post(
url, headers={
REFERER: referer,
"Origin": origin,
"Content-Type": "application/json",
"X-Requested-With": "XMLHttpRequest"
},
params={"form": "client_list"},
data={
"sign": signature,
"data": data,
},
cookies={'sysauth': self.sysauth}, timeout=5)
try:
json_response = self._decrypt_json_response(response.json().get('data'))
if json_response.get('error_code') == 0:
result = json_response.get('result').get('client_list')
else:
print(
"An unknown error happened while fetching data")
return False
except ValueError:
print(response.text)
print("Router didn't respond with JSON. "
"Check if credentials are correct")
return False
if result:
self.last_results = {
device['mac'].replace('-', ':'): device['mac']
for device in result
}
return True
return False
def _log_out(self):
print("Logging out of router admin interface...")
url = ('http://{}/cgi-bin/luci/;stok={}/admin/system?'
'form=logout').format(self.host, self.stok)
referer = 'http://{}/webpages/index.html'.format(self.host)
requests.post(
url, params={'operation': 'write'}, headers={REFERER: referer},
cookies={'sysauth': self.sysauth})
self.stok = ''
self.sysauth = ''
# Use this to dump HTTP requests:
#import http.client as http_client
#http_client.HTTPConnection.debuglevel = 1
#logging.basicConfig()
#logging.getLogger().setLevel(logging.DEBUG)
#requests_log = logging.getLogger("requests.packages.urllib3")
#requests_log.setLevel(logging.DEBUG)
#requests_log.propagate = True
c6 = C6('192.168.1.1', 'admin', 'YOUR-PASSWORD')
c6.scan_devices()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment