Last active
February 4, 2024 12:14
-
-
Save csabavirag/334d9fa4028c17a69e3af4ea22838381 to your computer and use it in GitHub Desktop.
Techlife pro Smart Bulb scripts
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paho.mqtt.client as mqtt | |
import time | |
import binascii | |
import traceback | |
bulb_mac = 'aa:bb:cc:dd:ee:ff' | |
mqtt_server = "cloud.qh-tek.com" | |
mqtt_user = 'testuser' | |
mqtt_pass = 'testpass' | |
def on_connect(client, obj, flags, rc): | |
if rc==0: | |
print("[ON_CONNECT] Connected OK") | |
client.subscribe("dev_pub_%s" % bulb_mac) | |
client.subscribe("dev_sub_%s" % bulb_mac) | |
else: | |
print("[ON_CONNECT] Bad connection Returned code=%s",rc) | |
def on_disconnect(client, userdata, rc): | |
print("[ON_DISCONNECT] disconnecting reason " +str(rc)) | |
client.connected_flag=False | |
client.disconnect_flag=True | |
def on_log(client, userdata, level, buff): | |
print("[ON_LOG]: %s" % buff) | |
def on_message(client, userdata, message): | |
try: | |
msg = binascii.hexlify(message.payload) | |
topic = message.topic | |
print("[ON_MESSAGE] Command received in topic %s: %s" % (topic, msg)) | |
if ((topic == "dev_sub_%s" % bulb_mac) and message.payload[0] == 0xfc and message.payload[1] == 0xf0): | |
response = bytearray.fromhex("110000000000003f0d000000014100ffffff1524f14d22") | |
client.publish("dev_pub_%s" % bulb_mac, response) | |
except Exception as e: | |
traceback.print_exc() | |
############### MAIN ######################### | |
print("Start") | |
client = mqtt.Client("clientid%s" % bulb_mac) #create new instance | |
client.on_message=on_message #attach function to callback | |
client.on_connect=on_connect #attach function to callback | |
#client.on_log=on_log | |
print("Connecting to broker") | |
if mqtt_user: | |
client.username_pw_set(mqtt_user, password=mqtt_pass) | |
client.connect(mqtt_server) #connect to broker | |
while True: | |
try: | |
client.loop_forever() | |
except KeyboardInterrupt: | |
client.disconnect() | |
exit(0) | |
except: | |
raise |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import paho.mqtt.client as mqtt | |
import time | |
import binascii | |
import traceback | |
mqtt_broker = '192.168.1.100' #your MQTT broker's IP address | |
mac_address = "aa:aa:aa:aa:aa:aa" #your bulb's MAC Address | |
pub_topic = "dev_pub_%s" % mac_address # MQTT topic to receive responses | |
sub_topic = "dev_sub_%s" % mac_address # MQTT topic to send the query commands to | |
debug = False #Set to True if you want more verbosed output | |
############### MQTT callbacks ######################## | |
def on_connect(client, userdata, flags, rc): | |
if debug: print("Connected with result code "+str(rc)) | |
# Subscribing in on_connect() means that if we lose the connection and | |
# reconnect then subscriptions will be renewed. | |
client.subscribe(pub_topic) | |
# The callback for when a PUBLISH message is received from the server. | |
def on_message(client, userdata, msg): | |
parsePayload(msg.topic, msg.payload) | |
def on_log(mqttc, obj, level, string): | |
print(string) | |
def parsePayload(topic, payload): | |
try: | |
if debug: print(topic+" "+binascii.hexlify(payload)) | |
MQTT_command = ord(payload[0]) | |
if MQTT_command == 0xfa: | |
print("MQTT Broker address: %d.%d.%d.%d:%d" % (ord(payload[1]),ord(payload[2]),ord(payload[3]),ord(payload[4]),ord(payload[5]) + (ord(payload[6]) << 8))) | |
if MQTT_command == 0xbc: | |
print("Bulb's time: %d.%02d.%02d %02d:%02d:%02d %02d" % (ord(payload[1]) + (ord(payload[2]) << 8), ord(payload[3]), ord(payload[4]), ord(payload[5]), ord(payload[6]), ord(payload[7]), ord(payload[8]) )) | |
if MQTT_command == 0x12: | |
payload_type = ord(payload[2]) + (ord(payload[3]) << 8) | |
if payload_type == 0x2466: | |
print("ClientID: %s" % payload[4:payload.find(b'\x00')]) | |
if payload_type == 0x2444: | |
print("MQTT Topic: %s" % payload[4:payload.find(b'\x00')]) | |
if payload_type == 0x611: | |
mac = ':'.join("%02x" % ord(i) for i in payload[4:10]) | |
print("MAC Address: %s" % mac) | |
if payload_type == 0x2422: | |
print("34 22 response: %s" % payload[4:payload.find(b'\xff')]) | |
if payload_type == 0x2433: | |
print("34 33 response: %s" % payload[4:payload.find(b'\xff')]) | |
if payload_type == 0x2477: | |
print("MQTT Publisher topic: %s" % payload[4:payload.find(b'\x00')]) | |
if payload_type == 0x2488: | |
print("AppID: %s" % payload[4:payload.find(b'\x00')]) | |
except Exception as e: | |
traceback.print_exc() | |
def calcChecksum(stream): | |
checksum = 0 | |
for i in range(1, 14): | |
checksum = checksum ^ stream[i] | |
stream[14] = checksum & 255 | |
return bytearray(stream) | |
############### Techlife query commands ######################## | |
def getIP(): | |
payload = bytearray.fromhex("AF 00 00 00 00 00 00 0F 00 00 00 00 00 00 00 B0") | |
return calcChecksum(payload) | |
def getTime(): | |
payload = bytearray.fromhex("B0 00 00 00 00 00 00 00 00 0F 00 00 00 00 00 B1") | |
return calcChecksum(payload) | |
def getCodeCC(): | |
payload = bytearray.fromhex("CC 0F 00 00 00 00 00 00 00 00 00 00 00 00 00 CD") | |
return calcChecksum(payload) | |
def getCode34_22(): | |
payload = bytearray.fromhex("34 F0 22 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getCode34_33(): | |
payload = bytearray.fromhex("34 F0 33 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getMACAddress(): | |
payload = bytearray.fromhex("34 F0 11 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getMQTTTopic(): | |
payload = bytearray.fromhex("34 F0 44 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getMQTTPubTopic(): | |
payload = bytearray.fromhex("34 F0 77 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getCientId(): | |
payload = bytearray.fromhex("34 F0 66 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
def getAppId(): | |
payload = bytearray.fromhex("34 F0 88 00 00 00 00 00 00 00 00 00 00 00 00 00") | |
return calcChecksum(payload) | |
############### Main ######################## | |
client = mqtt.Client() | |
client.on_connect = on_connect | |
client.on_message = on_message | |
#client.on_log = on_log | |
client.username_pw_set('mqtt','mqtt') | |
client.connect(mqtt_broker, 1883, 60) | |
client.loop_start() | |
time.sleep(1) | |
if debug: print("\n*** Test - GET IP address ***") | |
client.publish(sub_topic, getIP() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET Time ***") | |
client.publish(sub_topic, getTime() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET Code 0xCC ***") | |
client.publish(sub_topic, getCodeCC() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET MQTT Publisher topic ***") | |
client.publish(sub_topic, getMQTTPubTopic() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET AppID ***") | |
client.publish(sub_topic, getAppId() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET ClientID ***") | |
client.publish(sub_topic, getCientId() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET MQTT Topic ***") | |
client.publish(sub_topic, getMQTTTopic() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET MAC Address ***") | |
client.publish(sub_topic, getMACAddress() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET GET 34 Type 33 ***") | |
client.publish(sub_topic, getCode34_22() ) | |
time.sleep(1) | |
if debug: print("\n*** Test - GET 34 Type 33 ***") | |
client.publish(sub_topic, getCode34_33() ) | |
time.sleep(1) | |
client.loop_stop() | |
client.disconnect() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# 1. Modify the variables according to your setup: ssid, password, bssid, [email] | |
# 2. Connect the computer to AP-TechLife-xx-xx SSID | |
# 3. Run the script | |
import socket | |
# Variables to change | |
ssid = '**YOURSSID**' | |
password = '**WIFIPASSWORD**' | |
bssid = bytearray([0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa]) # Enter your WiFi router's WiFi interface MAC address in hex (eg. AA:AA:AA:AA:AA:AA) | |
email = 'none@nowhere.com' | |
# The bulb's network details | |
TCP_IP = '192.168.66.1' | |
TCP_PORT = 8000 | |
BUFFER_SIZE = 1024 | |
# Initialize Payload | |
payload = bytearray(145) | |
payload[0x00] = 0xff | |
payload[0x69] = 0x01 | |
# Add the SSID to the payload | |
ssid_start = 0x01 | |
ssid_length = 0 | |
for letter in ssid: | |
payload[(ssid_start + ssid_length)] = ord(letter) | |
ssid_length += 1 | |
# Add the WiFi password to the payload | |
pass_start = 0x22 | |
pass_length = 0 | |
for letter in password: | |
payload[(pass_start + pass_length)] = ord(letter) | |
pass_length += 1 | |
# Add the BSSID to the payload | |
bssid_start = 0x63 | |
bssid_length = 0 | |
for digit in bssid: | |
payload[(bssid_start + bssid_length)] = digit | |
bssid_length += 1 | |
# Add the email to the payload | |
email_start = 0x6a | |
email_length = 0 | |
for letter in email: | |
payload[(email_start + email_length)] = ord(letter) | |
email_length += 1 | |
checksum = 0 | |
j = 1 | |
while j < 0x8f: | |
checksum = (payload[j] ^ checksum) | |
checksum = checksum & 0xff | |
j += 1 | |
payload[0x8e] = 0xf0 | |
payload[0x8f] = checksum & 0xff | |
payload[0x90] = 0xef | |
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) | |
s.connect((TCP_IP, TCP_PORT)) | |
s.send(payload) | |
data = s.recv(BUFFER_SIZE) | |
s.close() | |
print ("received data:", data) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Is there a way to get the command or MQTT messages fro the different colors for the TechPro life bulbs. Using the above and the help of https://community.openhab.org/t/hacking-techlife-pro-bulbs/85940/24 I am able to switch ON and OFF and control the brightness. However, I am unable to control the color code. what are the commands for the same?