Last active
March 13, 2024 20:44
-
-
Save rpanachi/fe12d5837239670aedece1345444f4be to your computer and use it in GitHub Desktop.
Tuya api script (for home assistant)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
- sensor: | |
name: Tuya Power Clamp | |
unique_id: tuya_power_clamp | |
command: "python3 /config/tuya.py device_id" | |
device_class: power | |
state_class: total_increasing | |
unit_of_measurement: Wh | |
scan_interval: 60 | |
value_template: "{{ value_json.EnergyConsumed | float(0) * 10 }}" |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# put this on the end of file | |
command_line: !include command_line.yaml |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import sys | |
import hashlib | |
import hmac | |
import json | |
import urllib | |
import urllib.parse | |
import logging | |
from urllib.request import urlopen, Request | |
from datetime import datetime | |
def make_request(url, params=None, headers=None): | |
if params: | |
url = url + "?" + urllib.parse.urlencode(params) | |
request = Request(url, headers=headers or {}) | |
try: | |
with urlopen(request, timeout=10) as response: | |
return response, response.read().decode("utf-8") | |
except Exception as error: | |
return error, "" | |
def get_timestamp(now = datetime.now()): | |
return str(int(datetime.timestamp(now)*1000)) | |
def get_sign(payload, key): | |
byte_key = bytes(key, 'UTF-8') | |
message = payload.encode() | |
sign = hmac.new(byte_key, message, hashlib.sha256).hexdigest() | |
return sign.upper() | |
def get_access_token(): | |
now = datetime.now() | |
timestamp = get_timestamp(now) | |
string_to_sign = client_id + timestamp + "GET\n" + \ | |
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n" + \ | |
"\n" + \ | |
LOGIN_URL | |
signed_string = get_sign(string_to_sign, client_secret) | |
headers = { | |
"client_id": client_id, | |
"sign": signed_string, | |
"t": timestamp, | |
"mode": "cors", | |
"sign_method": "HMAC-SHA256", | |
"Content-Type": "application/json" | |
} | |
response, body = make_request(BASE_URL + LOGIN_URL, headers = headers) | |
json_result = json.loads(body)["result"] | |
access_token = json_result["access_token"] | |
return access_token | |
''' | |
Get the current attributes of device as hash. Example: | |
{ | |
"attribute1": value, | |
"attribute2": value, | |
"attribute3": value | |
} | |
''' | |
def get_device_properties(access_token, device_id): | |
url = ATTRIBUTES_URL.format(device_id=device_id) | |
timestamp = get_timestamp() | |
string_to_sign = client_id + access_token + timestamp + "GET\n" + \ | |
"e3b0c44298fc1c149afbf4c8996fb92427ae41e4649b934ca495991b7852b855\n" + \ | |
"\n" + \ | |
url | |
signed_string = get_sign(string_to_sign, client_secret) | |
headers = { | |
"client_id": client_id, | |
"sign": signed_string, | |
"access_token": access_token, | |
"t": timestamp, | |
"mode": "cors", | |
"sign_method": "HMAC-SHA256", | |
"Content-Type": "application/json" | |
} | |
response, body = make_request(BASE_URL + url, headers = headers) | |
json_result = json.loads(body) | |
properties = json_result["result"]["properties"] | |
output = {j['code']: j['value'] for j in properties} | |
return output | |
BASE_URL = "https://openapi.tuyaus.com" | |
LOGIN_URL = "/v1.0/token?grant_type=1" | |
ATTRIBUTES_URL = "/v2.0/cloud/thing/{device_id}/shadow/properties" | |
if len(sys.argv) != 2: | |
raise SystemExit("usage: python3 tuya.py device_id") | |
_, device_id = sys.argv | |
client_id = "tuya_client_id" | |
client_secret = "tuya_client_secret" | |
access_token = get_access_token() | |
attributes = get_device_properties(access_token, device_id) | |
json_output = json.dumps(attributes) | |
print(json_output) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Hi
Thank you. I have this working without issue.
Would it be possible to expand on it to allow reporting of all other attributes? Voltage, Current etc.
Regards