Skip to content

Instantly share code, notes, and snippets.

@csabavirag
Last active February 4, 2024 12:14
Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save csabavirag/334d9fa4028c17a69e3af4ea22838381 to your computer and use it in GitHub Desktop.
Save csabavirag/334d9fa4028c17a69e3af4ea22838381 to your computer and use it in GitHub Desktop.
Techlife pro Smart Bulb scripts
import paho.mqtt.client as mqtt
import time
import binascii
import traceback
bulb_mac = 'aa:bb:cc:dd:ee:ff'
mqtt_server = "cloud.qh-tek.com"
mqtt_user = 'testuser'
mqtt_pass = 'testpass'
def on_connect(client, obj, flags, rc):
if rc==0:
print("[ON_CONNECT] Connected OK")
client.subscribe("dev_pub_%s" % bulb_mac)
client.subscribe("dev_sub_%s" % bulb_mac)
else:
print("[ON_CONNECT] Bad connection Returned code=%s",rc)
def on_disconnect(client, userdata, rc):
print("[ON_DISCONNECT] disconnecting reason " +str(rc))
client.connected_flag=False
client.disconnect_flag=True
def on_log(client, userdata, level, buff):
print("[ON_LOG]: %s" % buff)
def on_message(client, userdata, message):
try:
msg = binascii.hexlify(message.payload)
topic = message.topic
print("[ON_MESSAGE] Command received in topic %s: %s" % (topic, msg))
if ((topic == "dev_sub_%s" % bulb_mac) and message.payload[0] == 0xfc and message.payload[1] == 0xf0):
response = bytearray.fromhex("110000000000003f0d000000014100ffffff1524f14d22")
client.publish("dev_pub_%s" % bulb_mac, response)
except Exception as e:
traceback.print_exc()
############### MAIN #########################
print("Start")
client = mqtt.Client("clientid%s" % bulb_mac) #create new instance
client.on_message=on_message #attach function to callback
client.on_connect=on_connect #attach function to callback
#client.on_log=on_log
print("Connecting to broker")
if mqtt_user:
client.username_pw_set(mqtt_user, password=mqtt_pass)
client.connect(mqtt_server) #connect to broker
while True:
try:
client.loop_forever()
except KeyboardInterrupt:
client.disconnect()
exit(0)
except:
raise
import paho.mqtt.client as mqtt
import time
import binascii
import traceback
mqtt_broker = '192.168.1.100' #your MQTT broker's IP address
mac_address = "aa:aa:aa:aa:aa:aa" #your bulb's MAC Address
pub_topic = "dev_pub_%s" % mac_address # MQTT topic to receive responses
sub_topic = "dev_sub_%s" % mac_address # MQTT topic to send the query commands to
debug = False #Set to True if you want more verbosed output
############### MQTT callbacks ########################
def on_connect(client, userdata, flags, rc):
if debug: print("Connected with result code "+str(rc))
# Subscribing in on_connect() means that if we lose the connection and
# reconnect then subscriptions will be renewed.
client.subscribe(pub_topic)
# The callback for when a PUBLISH message is received from the server.
def on_message(client, userdata, msg):
parsePayload(msg.topic, msg.payload)
def on_log(mqttc, obj, level, string):
print(string)
def parsePayload(topic, payload):
try:
if debug: print(topic+" "+binascii.hexlify(payload))
MQTT_command = ord(payload[0])
if MQTT_command == 0xfa:
print("MQTT Broker address: %d.%d.%d.%d:%d" % (ord(payload[1]),ord(payload[2]),ord(payload[3]),ord(payload[4]),ord(payload[5]) + (ord(payload[6]) << 8)))
if MQTT_command == 0xbc:
print("Bulb's time: %d.%02d.%02d %02d:%02d:%02d %02d" % (ord(payload[1]) + (ord(payload[2]) << 8), ord(payload[3]), ord(payload[4]), ord(payload[5]), ord(payload[6]), ord(payload[7]), ord(payload[8]) ))
if MQTT_command == 0x12:
payload_type = ord(payload[2]) + (ord(payload[3]) << 8)
if payload_type == 0x2466:
print("ClientID: %s" % payload[4:payload.find(b'\x00')])
if payload_type == 0x2444:
print("MQTT Topic: %s" % payload[4:payload.find(b'\x00')])
if payload_type == 0x611:
mac = ':'.join("%02x" % ord(i) for i in payload[4:10])
print("MAC Address: %s" % mac)
if payload_type == 0x2422:
print("34 22 response: %s" % payload[4:payload.find(b'\xff')])
if payload_type == 0x2433:
print("34 33 response: %s" % payload[4:payload.find(b'\xff')])
if payload_type == 0x2477:
print("MQTT Publisher topic: %s" % payload[4:payload.find(b'\x00')])
if payload_type == 0x2488:
print("AppID: %s" % payload[4:payload.find(b'\x00')])
except Exception as e:
traceback.print_exc()
def calcChecksum(stream):
checksum = 0
for i in range(1, 14):
checksum = checksum ^ stream[i]
stream[14] = checksum & 255
return bytearray(stream)
############### Techlife query commands ########################
def getIP():
payload = bytearray.fromhex("AF 00 00 00 00 00 00 0F 00 00 00 00 00 00 00 B0")
return calcChecksum(payload)
def getTime():
payload = bytearray.fromhex("B0 00 00 00 00 00 00 00 00 0F 00 00 00 00 00 B1")
return calcChecksum(payload)
def getCodeCC():
payload = bytearray.fromhex("CC 0F 00 00 00 00 00 00 00 00 00 00 00 00 00 CD")
return calcChecksum(payload)
def getCode34_22():
payload = bytearray.fromhex("34 F0 22 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getCode34_33():
payload = bytearray.fromhex("34 F0 33 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getMACAddress():
payload = bytearray.fromhex("34 F0 11 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getMQTTTopic():
payload = bytearray.fromhex("34 F0 44 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getMQTTPubTopic():
payload = bytearray.fromhex("34 F0 77 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getCientId():
payload = bytearray.fromhex("34 F0 66 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
def getAppId():
payload = bytearray.fromhex("34 F0 88 00 00 00 00 00 00 00 00 00 00 00 00 00")
return calcChecksum(payload)
############### Main ########################
client = mqtt.Client()
client.on_connect = on_connect
client.on_message = on_message
#client.on_log = on_log
client.username_pw_set('mqtt','mqtt')
client.connect(mqtt_broker, 1883, 60)
client.loop_start()
time.sleep(1)
if debug: print("\n*** Test - GET IP address ***")
client.publish(sub_topic, getIP() )
time.sleep(1)
if debug: print("\n*** Test - GET Time ***")
client.publish(sub_topic, getTime() )
time.sleep(1)
if debug: print("\n*** Test - GET Code 0xCC ***")
client.publish(sub_topic, getCodeCC() )
time.sleep(1)
if debug: print("\n*** Test - GET MQTT Publisher topic ***")
client.publish(sub_topic, getMQTTPubTopic() )
time.sleep(1)
if debug: print("\n*** Test - GET AppID ***")
client.publish(sub_topic, getAppId() )
time.sleep(1)
if debug: print("\n*** Test - GET ClientID ***")
client.publish(sub_topic, getCientId() )
time.sleep(1)
if debug: print("\n*** Test - GET MQTT Topic ***")
client.publish(sub_topic, getMQTTTopic() )
time.sleep(1)
if debug: print("\n*** Test - GET MAC Address ***")
client.publish(sub_topic, getMACAddress() )
time.sleep(1)
if debug: print("\n*** Test - GET GET 34 Type 33 ***")
client.publish(sub_topic, getCode34_22() )
time.sleep(1)
if debug: print("\n*** Test - GET 34 Type 33 ***")
client.publish(sub_topic, getCode34_33() )
time.sleep(1)
client.loop_stop()
client.disconnect()
#!/usr/bin/env python
# 1. Modify the variables according to your setup: ssid, password, bssid, [email]
# 2. Connect the computer to AP-TechLife-xx-xx SSID
# 3. Run the script
import socket
# Variables to change
ssid = '**YOURSSID**'
password = '**WIFIPASSWORD**'
bssid = bytearray([0xaa, 0xaa, 0xaa, 0xaa, 0xaa, 0xaa]) # Enter your WiFi router's WiFi interface MAC address in hex (eg. AA:AA:AA:AA:AA:AA)
email = 'none@nowhere.com'
# The bulb's network details
TCP_IP = '192.168.66.1'
TCP_PORT = 8000
BUFFER_SIZE = 1024
# Initialize Payload
payload = bytearray(145)
payload[0x00] = 0xff
payload[0x69] = 0x01
# Add the SSID to the payload
ssid_start = 0x01
ssid_length = 0
for letter in ssid:
payload[(ssid_start + ssid_length)] = ord(letter)
ssid_length += 1
# Add the WiFi password to the payload
pass_start = 0x22
pass_length = 0
for letter in password:
payload[(pass_start + pass_length)] = ord(letter)
pass_length += 1
# Add the BSSID to the payload
bssid_start = 0x63
bssid_length = 0
for digit in bssid:
payload[(bssid_start + bssid_length)] = digit
bssid_length += 1
# Add the email to the payload
email_start = 0x6a
email_length = 0
for letter in email:
payload[(email_start + email_length)] = ord(letter)
email_length += 1
checksum = 0
j = 1
while j < 0x8f:
checksum = (payload[j] ^ checksum)
checksum = checksum & 0xff
j += 1
payload[0x8e] = 0xf0
payload[0x8f] = checksum & 0xff
payload[0x90] = 0xef
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((TCP_IP, TCP_PORT))
s.send(payload)
data = s.recv(BUFFER_SIZE)
s.close()
print ("received data:", data)
@manju-rn
Copy link

Is there a way to get the command or MQTT messages fro the different colors for the TechPro life bulbs. Using the above and the help of https://community.openhab.org/t/hacking-techlife-pro-bulbs/85940/24 I am able to switch ON and OFF and control the brightness. However, I am unable to control the color code. what are the commands for the same?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment