Skip to content

Instantly share code, notes, and snippets.

@darkxst
Last active October 16, 2023 09:51
Show Gist options
  • Save darkxst/2dfc0e823bef359d8cbe36b9e2ac6232 to your computer and use it in GitHub Desktop.
Save darkxst/2dfc0e823bef359d8cbe36b9e2ac6232 to your computer and use it in GitHub Desktop.
Sonoff iHost - yc1175 test script
#!/usr/bin/python3
import crcmod
import serial
import struct
debug = True
crc16 = crcmod.mkCrcFun(0x11021, initCrc=0)
CMD_TYPE: dict[str, str] = {
'REQUEST': b'\x00',
'RESPONSE': b'\x40',
'NOTIFY': b'\x80',
}
COMMAND: dict[str, str] = {
'VERSION_YC': b'\x01',
'VERSION_RK': b'\x02',
'REPORT_EVENT': b'\x03',
'CONTROL_LED': b'\x04',
'REPORT_LED': b'\x05',
'BROADCAST_ID': b'\x06',
'QUERY_LED': b'\x07',
}
SOF = b'\xFE'
VERSION = b'\x00\x00\x01'
class SerialInterface:
def __init__(self):
self.port = '/dev/ttyS3'
self.baudrate = 115200
def open(self):
try:
self.serial = serial.Serial(port=self.port,
baudrate=self.baudrate,
parity=serial.PARITY_NONE,
stopbits=serial.STOPBITS_ONE,
timeout=3)
except Exception as e:
raise Exception("PORT ERROR: %s" % str(e))
print("Serial Connected %s" % self.port)
def close(self):
self.serial.close()
class Frame:
cmd = b'\x00'
crc = 0
data = bytearray()
dlen = 0
mtype = b'\x00'
seq = b'\x00'
def __init__(self, msg:bytearray=None):
if msg:
self.unpack(msg)
def build(self, mtype, cmd, seq, data=b'\x00'):
self.mtype = mtype
self.cmd = cmd
self.seq = seq
self.data = data
self.dlen = len(data)
def led(self, idx:int, effect:int, rgb:tuple):
self.cmd = COMMAND["CONTROL_LED"]
self.data = struct.pack(">cc3s", idx.to_bytes(), effect.to_bytes(), bytearray(rgb))
self.dlen = len(self.data)
def pack(self) -> bytearray:
N=self.dlen
frame = struct.pack(f">cHccc{N}s", SOF, self.dlen+8, self.mtype, self.cmd, self.seq, self.data )
frame += crc16(frame).to_bytes(2)
return frame
def unpack(self, packet:bytearray) -> None:
N = len(packet) - 8
tuple = struct.unpack(f">cHccc{N}sH", packet)
sof, plen, self.mtype, self.cmd, self.seq, self.data, self.crc = tuple
self.dlen = len(self.data)
class yProtocol:
def __init__(self, serial):
self.ser = serial
def checkCRC(self, packet) -> bool:
calc_crc = crc16(packet[:-2])
return calc_crc.to_bytes(2) == packet[-2:]
def getFrame(self) -> Frame:
packet = bytearray()
start = b''
#include a timeout below
while start != SOF:
start = self.ser.read(1)
packet = SOF
lenbytes = self.ser.read(2)
packet += lenbytes
lenint = int.from_bytes(lenbytes) - 3
packet += self.ser.read(lenint)
if debug:
print('[frame] '+' '.join(format(x, '02x') for x in packet))
if self.checkCRC(packet):
return Frame(packet)
return None
def sendAck(self, frame:Frame):
ct_r = CMD_TYPE["RESPONSE"]
response = Frame()
if frame.mtype == CMD_TYPE["REQUEST"]:
if frame.cmd == COMMAND["VERSION_RK"]:
response.build(ct_r, COMMAND["VERSION_RK"], frame.seq, VERSION)
elif frame.cmd == COMMAND["REPORT_LED"]:
response.build(ct_r, COMMAND["REPORT_LED"], frame.seq)
elif frame.cmd == COMMAND["REPORT_EVENT"]:
response.build(ct_r, COMMAND["REPORT_EVENT"], frame.seq)
self.sendFrame(response)
def sendFrame(self, frame:Frame):
title = "res" if frame.mtype == CMD_TYPE["RESPONSE"] else "req"
packet = frame.pack()
if debug:
print(f"[{title}] "+' '.join(format(x, '02x') for x in packet))
self.ser.write(packet)
if __name__ == "__main__":
ser = SerialInterface()
ser.open()
comm = yProtocol(ser.serial)
indicator = Frame()
rgb = (0,0,255)
indicator.led(4, 1, rgb)
comm.sendFrame(indicator)
while True:
ret = comm.getFrame()
if ret.mtype == CMD_TYPE["REQUEST"]:
comm.sendAck(ret)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment