Skip to content

Instantly share code, notes, and snippets.

@bahorn
Last active October 27, 2023 15:07
Show Gist options
  • Star 12 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save bahorn/9bebbbf37c2167f7057aea0244ff2d92 to your computer and use it in GitHub Desktop.
Save bahorn/9bebbbf37c2167f7057aea0244ff2d92 to your computer and use it in GitHub Desktop.
Implementation of the Tuya API signing.
import requests
import hashlib
import time
import uuid
import os
import copy
import json
# This is based on my personal implementation but stripped down to only what is
# needed to verify it.
# This should correct anywhere I was unclear in my original post.
# I developed against a Mobile API key, which is why I used the mobile methods
# in my test cases.
# Signing works the same with the Cloud API. You get a "NO_PERMISSION" if your
# keys aren't valid for the request, but if you sign wrong, you'll get a error
# related to signing instead, which is checked earlier on their end.
# The signing process is also leaked in the mobile apps (which are all
# rebrandings, no joke) in the Android logs.
## For mobile app keys:
## Be careful about which host you use. If you are registered in the US, use the
## US host.
host = "https://a1.tuyaeu.com/api.json"
# Random and not really checked. Should keep persistent if you are using
# sessions.
device_id = os.urandom(32).encode('hex')
sid = "" # Used to mobile requests that require login.
# Needs to be set, but they don't care what it is.
os = ("TEST", "0.1.2", "TEST")
# Your API keys.
appKey = "<BLANKED>"
appSecret = "<BLANKED>"
# This is likely a cause of confusion. They decided for this and only this to
# rearrange the hash. Not their normal md5-mid which they use a lot in their
# MQTT client.
def post_data_hash_transform(post_data):
h = hashlib.md5()
h.update(post_data)
pre_hash = h.hexdigest()
return pre_hash[8:16] + pre_hash[0:8] + pre_hash[24:32] + pre_hash[16:24]
# This is the implementation of "sign".
def generate_request_sign(pairs):
# This are the values that get "signed" in a request, worth checking if I
# missed one in this list.
values_to_hash = ["a", "v", "lat", "lon", "lang", "deviceId", "imei",
"imsi", "appVersion", "ttid", "isH5", "h5Token", "os",
"clientId", "postData", "time", "n4h5", "sid", "sp"]
out = []
sorted_pairs = sorted(pairs)
for item in sorted_pairs:
if item not in values_to_hash:
continue
if pairs[item] == "":
continue
if item == "postData":
out += [item + "=" + post_data_hash_transform(pairs["postData"])]
else:
out += [item + "=" + str(pairs[item])]
sign_request = "||".join(out) + "||" + appSecret
h = hashlib.md5()
h.update(sign_request)
return h.hexdigest()
# This will give you a dict with all the request parameters.
def url_generator(action, version, post_data=None, sid=None, time_param=None):
client_id = appKey
ttid = "sdk_google1@{}".format(appKey)
timezone_id = "America/New_York"
lang = "en"
if not time_param:
time_param = int(time.time())
request_id = uuid.uuid4()
pairs = {
"sdkVersion": "1.11.11",
"platform": os[2],
"ttid": ttid,
"a": action,
"timeZoneId": timezone_id,
"deviceId": device_id,
"osSystem": os[1],
"os": os[0],
"v": version,
"appVersion": "2.9.1",
"clientId": client_id,
"lang": lang,
"requestId": request_id,
"time": time_param,
"appRnVersion": "2.9",
}
if sid:
pairs['sid'] = sid
to_sign = copy.deepcopy(pairs)
if post_data:
to_sign['postData'] = post_data
pairs['sign'] = generate_request_sign(to_sign)
return pairs
# Call the endpoint.
# * action is the name as defined in the tuya docs
# * version is the version they say to provide, normally "1.0"
# * data is the JSON data you want to pass to the action
# * requires_sid means it adds a session_id to the parameters. Used in mobile endpoints.
def preform_action(action, version, data=None, requires_sid=False):
if requires_sid is True and not sid:
return None
params = url_generator(action, version, data, sid)
print params
endpoint = host
headers = {
# Maybe set a user agent or something here.
}
if data:
r = requests.post(endpoint, params=params, data={"postData":data},
headers=headers)
else:
r = requests.post(endpoint, params=params, headers=headers)
return r.status_code, r.json(), r.headers
if __name__ == "__main__":
# Should return a list of countries, doesn't use post data.
print preform_action("tuya.m.country.list", "1.0")
# Attempt to login with a fake email, takes POST data.
attempt = {
"email":"fake_email_attempt@fake_email.com",
"countryCode":1
}
print preform_action("tuya.m.user.email.token.create", "1.0",
json.dumps(attempt))
@bahorn
Copy link
Author

bahorn commented Jan 31, 2018

Just removed a reference to self that I accidentally left in after stripping preform_action from the class I took it from. Would have caused issues when using a session ID.

@panjanek
Copy link

panjanek commented Dec 2, 2021

Thankyou so much for post_data_hash_transform (rearanging md5 info)!

Apart from that I can see Tuya uses different signing and encryption algorithms for different aps or api versions. I my case (thermostats) i spent hours figuring this:

  1. Instead of md5 to sign the request I had to use HMAC-SHA256:
sign_request = "||".join(out)  #no appsecret here
hmac_key = "A_"+tuya_bmpkey+"_"+tuya_appsecret     # here you have to use secret2 (encoded in the image file) and standard secret
signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest() 

encrypted postData as base64 still has to be put to above sign_request using peculiar "post_data_hash_transform"

  1. the postData field has to be encrypted with AES in MODE_GMC with 12 bytes of random nonce as prefix and 16 bytes of validation MAC as suffix. The key is derived from request_id using HMAC-SHA256 with key obtained by contatenation of various secret values:
def encryptPostData(postData, requestId):
    #create key from requestid and ecode. ecode is created together with session id upon login, as far as i can see it is valid undefinietly,
    #so it's easier to sniff it than to request it 
    keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode     # secret1, secret2 and ecode used here
    #generate key from request_id and secrets
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')   #yes! you use only the first 16 characters of hexadecimal form as AES key
    postDataStr = json.dumps(postData)   
    nonce = os.urandom(12)
    plainBytes = postDataStr.encode('utf-8')   
    encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
    encryptedPostDataWithNonce = nonce+encryptedPostData+mac 
    encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
    return encryptedPostDataBase64
  1. The same method is used to decrypt the response. In json response, the "result" field is encrypted:
def decryptResult(result, requestId):
    #create key from requestid and ecode
    keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
    #generate key from request_id and ecode
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')
    encryptedBytes = base64.b64decode(result)
    nonce = encryptedBytes[0:12]
    encryptedPayload = encryptedBytes[12:]
    decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16]) #drop last 16 bytes, it's MAC signature
    return decrypted.decode("utf-8")  

hope it'll help someone!

@pergolafabio
Copy link

hey @panjanek , also here, do you have a full script avaible with new md5 ? Also that appsecret2 , do you have that for me? Seems its taken from the app itself? Smartlife or Tuya ?

thnx!!

@pergolafabio
Copy link

hey @bahorn or @panjanek , is it possible to update the script to use the keys below, to use smartlife or app, the appsecret2 / certsign are now needed, would be great to have a python vesion... i know use this javascript version, but want a python version of it...

https://github.com/TuyaAPI/cloud/blob/master/index.js

For Tuya:

{
"key": "3fjrekuxank9eaej3gcx",
"secret": "aq7xvqcyqcnegvew793pqjmhv77rneqc",
"secret2": "vay9g59g9g99qf3rtqptmc3emhkanwkx",
"certSign": "93:21:9F:C2:73:E2:20:0F:4A:DE:E5:F7:19:1D:C6:56:BA:2A:2D:7B:2F:F5:D2:4C:D5:5C:4B:61:55:00:1E:40"
}
For Smartlife:

{
"key": "ekmnwp9f5pnh3trdtpgy",
"secret": "r3me7ghmxjevrvnpemwmhw3fxtacphyg",
"secret2": "jfg5rs5kkmrj5mxahugvucrsvw43t48x",
"certSign": "0F:C3:61:99:9C:C0:C3:5B:A8:AC:A5:7D:AA:55:93:A2:0C:F5:57:27:70:2E:A8:5A:D7:B3:22:89:49:F8:88:FE"
}

@panjanek
Copy link

panjanek commented Dec 22, 2022

@pergolafabio It should work

Here is my tool for tuyasmart app (I don't like to post secrets, seems illicit):

import requests
import hashlib
import time
import uuid
import os
import copy
import json
import urllib3
import logging
import hmac
import base64
import string
import sys
from Crypto.Cipher import AES

tuya_appid =     "***"
tuya_appsecret = "***"
tuya_bmpkey =    "***"
tuya_endpoint =  "https://a1.tuyaeu.com/api.json"
tuya_sid =       "***"
tuya_ecode =     "***"
tuya_gid =       "***"
tuya_certsign =  "***"

urllib3.disable_warnings()

def addTuyaSignature(params):
    if not "sign" in params:
        values_to_hash = ["a", "v", "lat", "lon", "et", "lang", "deviceId", "imei",
                          "imsi", "appVersion", "ttid", "isH5", "h5Token", "os",
                          "clientId", "postData", "time", "n4h5", "sid", "sp", "requestId"]
        sorted_params = sorted(params)
        out = []
        for key in sorted_params:
            if key not in values_to_hash:
                continue
            if params[key] == "":
                continue    
            value = str(params[key])
            if key == "postData":
                h = hashlib.md5()
                h.update(value.encode('utf-8'))               
                value=h.hexdigest()
                value = value[8:16] + value[0:8] + value[24:32] + value[16:24]
            out += [key + "=" + value]

        feed = "||".join(out)
        hmac_key = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret
        signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()       
        params["sign"] = signature
    return params
    
def decryptResult(result, requestId):
    #create key from requestid and ecode
    keyparts = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
    #generate key from request_id and ecode
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')
    encryptedBytes = base64.b64decode(result)
    nonce = encryptedBytes[0:12]
    encryptedPayload = encryptedBytes[12:]
    decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16])
    return decrypted.decode("utf-8")     

def encryptPostData(postData, requestId):
    #create key from requestid and ecode
    keyparts = tuya_certsign + "_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
    #generate key from request_id and ecode
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')
    postDataStr = json.dumps(postData)   
    nonce = os.urandom(12)
    plainBytes = postDataStr.encode('utf-8')   
    encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
    encryptedPostDataWithNonce = nonce+encryptedPostData+mac 
    encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
    return encryptedPostDataBase64

def callTuyaApi(action, postData):
    time_param = int(time.time())
    request_id = str(uuid.uuid4())    
    params = {
               "a":action,
               "appRnVersion":"5.44",
               "appVersion":"3.33.5",
               "channel":"oem",
               "clientId":tuya_appid,
               "deviceCoreVersion":"3.29.5",
               "deviceId":"f1cf817055401e82b60fa5f74d8779e64133a59215b7",
               "et":"3",
               "lang":"pl_PL",
               "os":"Android",
               "osSystem":"7.1.1",
               "platform":"ONEPLUS A5000",
               "requestId":request_id,
               "sdkVersion":"3.29.5",
               "sid":tuya_sid,
               "time": str(time_param),
               "timeZoneId":"Europe/Warsaw",
               "ttid":"sdk_tuya_international",
               "v":"1.0" ,
               "gid": tuya_gid
            }

    #print(postData)
    params["postData"] = encryptPostData(postData, request_id)
    params = addTuyaSignature(params)    
    headers = { 'User-Agent' : 'Android/com.google.android.gms/203615023 (OnePlus5 NMF26X)' }   
    #print(params)
    response = requests.post(tuya_endpoint, params=params, headers=headers, verify=False)
    encryptedResult = response.json()["result"]       
    encryptedBytes = base64.b64decode(encryptedResult)
    jsonStr = decryptResult(encryptedResult, request_id)
    return json.loads(jsonStr)

def getDeviceDetails(devId):
    response = callTuyaApi("tuya.m.device.get", { "devId" :devId})
    #print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))
    dps = response["result"]["dps"]
    result = {}
    result["id"] = response["result"]["devId"]
    result["online"] = response["result"]["isOnline"]
    result["active"] = response["result"]["isActive"]
    result["localKey"] = response["result"]["localKey"]
    result["name"] = response["result"]["name"]
    for key in dps:
        result["dps_"+key] = dps[key]
    if "1" in dps and isinstance(dps["1"], bool):
        result["1"] = dps["1"]
    if "2" in dps and isinstance(dps["2"], bool):
        result["2"] = dps["2"]
    if "3" in dps and isinstance(dps["3"], bool):
        result["3"] = dps["3"]
    if "7" in dps and dps["7"]!=0:
        result["usb"] = dps["7"]
    if "20" in dps:
        result["voltage"] = float(dps["20"]) / 10.0
    if "18" in dps:
        result["current"] = float(dps["18"]) / 1000.0
    if "19" in dps:
        result["power"] = float(dps["19"]) / 10.0
    return result

if __name__ == "__main__":
    if len(sys.argv) <= 1:
        #print("usage: python tuyasmart.py <device-id> [1|2|3|usb] [on|off]")
        #print("devices:")
        devices = callTuyaApi("tuya.m.device.ext.prop.list", {})
        all = []
        for dev in devices["result"]:
            devId = dev["devId"]
            device = getDeviceDetails(devId)
            all.append(device)
        print(json.dumps(all, indent=4, sort_keys=True,ensure_ascii=False))
    elif len(sys.argv) == 2: 
        devId = sys.argv[1]
        device = getDeviceDetails(devId)
        print(json.dumps(device, indent=4, sort_keys=True,ensure_ascii=False))
    else: 
        devId = sys.argv[1]
        socket = sys.argv[2]
        if len(sys.argv) == 3:
            device = getDeviceDetails(devId)
            s = "1" if device[socket] else "0"
            print(s)
        else:   
            state = sys.argv[3]
            if state == "on" or state == "off":
                if socket == "usb":
                    socket = "7"
                postData = {}
                postData["devId"] = devId
                postData["dps"] = {}
                postData["dps"][str(socket)] = state == "on"
                response = callTuyaApi("tuya.m.device.dp.publish", postData)
                print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))
            elif state == "set":
                value = sys.argv[4]
                if value == "true":
                    value = True
                elif value == "false":
                    value = False
                elif value.isnumeric():
                    value = int(value)
                postData = {}
                postData["devId"] = devId
                postData["dps"] = {}
                postData["dps"][str(socket)] = value
                response = callTuyaApi("tuya.m.device.dp.publish", postData)
                print(json.dumps(response, indent=4, sort_keys=True,ensure_ascii=False))

Here is the tool for thermostats:

import requests
import hashlib
import time
import uuid
import os
import copy
import json
import urllib3
import logging
import hmac
import base64
import string
import sys
from Crypto.Cipher import AES

tuya_appid =     "***"
tuya_appsecret = "***"
tuya_bmpkey =    "***"
tuya_endpoint =  "https://a1.tuyaeu.com/api.json"
tuya_sid =       "***"
tuya_ecode =     "***"
tuya_gid =       "***"

#"1" - on/off, "2" - target temp * 2, "3" - current temp * 2, "102" - heater temp *2
#/bin/sh
#apk add gcc
#sudo pip istall pycrypto

urllib3.disable_warnings()

def addTuyaSignature(params):
    if not "sign" in params:
        values_to_hash = ["a", "v", "lat", "lon", "et", "lang", "deviceId", "imei",
                          "imsi", "appVersion", "ttid", "isH5", "h5Token", "os",
                          "clientId", "postData", "time", "n4h5", "sid", "sp", "requestId"]
        sorted_params = sorted(params)
        out = []
        for key in sorted_params:
            if key not in values_to_hash:
                continue
            if params[key] == "":
                continue    
            value = str(params[key])
            if key == "postData":
                h = hashlib.md5()
                h.update(value.encode('utf-8'))               
                value=h.hexdigest()
                value = value[8:16] + value[0:8] + value[24:32] + value[16:24]
            out += [key + "=" + value]

        feed = "||".join(out)
        hmac_key = "A_"+tuya_bmpkey+"_"+tuya_appsecret
        signature = hmac.new(key=hmac_key.encode('utf-8'),msg=feed.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()       
        params["sign"] = signature
    return params
    

def decryptResult(result, requestId):
    #create key from requestid and ecode
    keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
    #generate key from request_id and ecode
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')
    encryptedBytes = base64.b64decode(result)
    nonce = encryptedBytes[0:12]
    encryptedPayload = encryptedBytes[12:]
    decrypted = AES.new(shortKey, AES.MODE_GCM, nonce).decrypt(encryptedPayload[:-16])
    return decrypted.decode("utf-8")     

def encryptPostData(postData, requestId):
    #create key from requestid and ecode
    keyparts = "A_"+tuya_bmpkey+"_"+tuya_appsecret+"_"+tuya_ecode
    #generate key from request_id and ecode
    keyHex = hmac.new(key=requestId.encode('utf-8'),msg=keyparts.encode('utf-8'),digestmod=hashlib.sha256).hexdigest()     
    shortKey = keyHex[0:16].encode('utf-8')
    postDataStr = json.dumps(postData)   
    nonce = os.urandom(12)
    plainBytes = postDataStr.encode('utf-8')   
    #encryptedPostData = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt(plainBytes)
    encryptedPostData, mac = AES.new(shortKey, AES.MODE_GCM, nonce).encrypt_and_digest(plainBytes)
    encryptedPostDataWithNonce = nonce+encryptedPostData+mac 
    encryptedPostDataBase64 = base64.b64encode(encryptedPostDataWithNonce).decode("utf-8")
    return encryptedPostDataBase64

def callTuyaApi(action, postData):
    time_param = int(time.time())
    request_id = str(uuid.uuid4())    
    params = {
               "a":action,
               "appRnVersion":"5.44",
               "appVersion":"2.0.3",
               "channel":"oem",
               "clientId":tuya_appid,
               "deviceCoreVersion":"3.29.5",
               "deviceId":"0cbe6a9f082de4beb1e184c84bce097cef0dde8312bc",
               "et":"3",
               "lang":"pl_PL",
               "os":"Android",
               "osSystem":"7.1.1",
               "platform":"ONEPLUS A5000",
               "requestId":request_id,
               "sdkVersion":"3.29.5",
               "sid":tuya_sid,
               "time": str(time_param),
               "timeZoneId":"Europe/Warsaw",
               "ttid":"sdk_tuya_international@"+tuya_appid,
               "v":"1.0", 
               "gid": tuya_gid
            }
     
    params["postData"] = encryptPostData(postData, request_id)
    params = addTuyaSignature(params)    
    headers = { 'User-Agent' : 'Android/com.google.android.gms/203615023 (OnePlus5 NMF26X)' }   
    response = requests.post(tuya_endpoint, params=params, headers=headers, verify=False)
    encryptedResult = response.json()["result"]       
    encryptedBytes = base64.b64decode(encryptedResult)
    jsonStr = decryptResult(encryptedResult, request_id)
    return json.loads(jsonStr)

def getDeviceDetails(devId):
    device = callTuyaApi("tuya.m.device.get", { "devId" :devId})
    result = {}
    result["id"] = devId
    result["name"] = device["result"]["name"]
    result["online"] = device["result"]["isOnline"]
    result["switch"] = device["result"]["dps"]["1"]
    result["target"] = float(device["result"]["dps"]["2"]) / 2.0
    result["current"] = float(device["result"]["dps"]["3"]) / 2.0
    result["heater"] = float(device["result"]["dps"]["102"]) / 2.0 
    return result
       
if len(sys.argv) <= 1:
    #list all
    devices = callTuyaApi("tuya.m.my.group.device.sort.list", {})
    all = []
    for dev in devices["result"]:
        devId = dev["bizId"]
        device = getDeviceDetails(devId)
        all.append(device)
    print(json.dumps(all, indent=4, sort_keys=True,ensure_ascii=False))
elif len(sys.argv) == 2:
    devId = sys.argv[1]
    device = getDeviceDetails(devId)
    print(json.dumps(device, indent=4, sort_keys=True,ensure_ascii=False))
else:
    devId = sys.argv[1]
    result=None
    if sys.argv[2]=="on":
        result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"1":True}})
    elif  sys.argv[2]=="off":
        result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"1":False}})
    else:
        target_temp = float(sys.argv[2])
        result = callTuyaApi("tuya.m.device.dp.publish", { "devId":devId, "dps":{"2":int(target_temp*2)}})
    time.sleep(1)
    print(json.dumps(getDeviceDetails(devId), indent=4, sort_keys=True,ensure_ascii=False))

@pergolafabio
Copy link

aha, thats great, thnx for sharing!!

@SimonSelg
Copy link

@panjanek thanks for sharing your findings. You saved me a lot of time and frustration :) thanks!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment