Created
December 19, 2022 15:16
-
-
Save enkiusz/2359d46a6c54e05118d05ff67cff5ccf to your computer and use it in GitHub Desktop.
A sniffer for the TuyaMCU protocol
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
# Sniffer for the TuyaMCU serial port protocol. This protocol is used between an application MCU | |
# and the Tuya radio modules. | |
# Reference: https://tasmota.github.io/docs/TuyaMCU/#tuya-protocols | |
import serial | |
import argparse | |
import structlog | |
import sys | |
import logging | |
from struct import pack,unpack | |
from enum import Enum | |
from binascii import hexlify | |
# Reference: https://github.com/tuya/tuya-wifi-mcu-sdk-arduino-library/blob/master/src/TuyaDefs.h | |
class MCUFrameProtoVer(Enum): | |
MCU_RX_VER=0x00 # Module send frame protocol version number | |
MCU_TX_VER=0x03 # MCU send frame protocol version number(default) | |
class MCUFrameType(Enum): | |
HEAT_BEAT_CMD=0 # Heartbeat package | |
PRODUCT_INFO_CMD=1 # Product information | |
WORK_MODE_CMD=2 # Query the module working mode set by the MCU | |
WIFI_STATE_CMD=3 # Wifi working status | |
WIFI_RESET_CMD=4 # Reset wifi | |
WIFI_MODE_CMD=5 # Select smartconfig/AP mode | |
DATA_QUERT_CMD=6 # (sic) Order send | |
STATE_UPLOAD_CMD=7 # Status upload | |
STATE_QUERY_CMD=8 # Status query | |
UPDATE_START_CMD=0x0a # Upgrade start | |
UPDATE_TRANS_CMD=0x0b # Upgrade transfer | |
GET_ONLINE_TIME_CMD=0x0c # Get system time (Greenwich Mean Time) | |
FACTORY_MODE_CMD=0x0d # Enter production test mode | |
WIFI_TEST_CMD=0x0e # Wifi function test | |
GET_LOCAL_TIME_CMD=0x1c # Get local time | |
WEATHER_OPEN_CMD=0x20 # (sic) Turn on the weather | |
#define WEATHER_DATA_CMD 0x21 //Weather data | |
#define STATE_UPLOAD_SYN_CMD 0x22 //Status upload (synchronization) | |
#define STATE_UPLOAD_SYN_RECV_CMD 0x23 //Status upload results(synchronization) | |
#define HEAT_BEAT_STOP 0x25 //Turn off the WIFI module heartbeat | |
#define STREAM_TRANS_CMD 0x28 //Stream data transmission | |
#define GET_WIFI_STATUS_CMD 0x2b //Gets the wifi networking status | |
#define WIFI_CONNECT_TEST_CMD 0x2c //Wifi function test(connection designated route) | |
#define GET_MAC_CMD 0x2d //Get module mac | |
#define GET_IR_STATUS_CMD 0x2e //IR status notification | |
#define IR_TX_RX_TEST_CMD 0x2f //IR into send-receive test | |
#define MAPS_STREAM_TRANS_CMD 0x30 //streams trans(Support for multiple maps) | |
#define FILE_DOWNLOAD_START_CMD 0x31 //File download startup | |
#define FILE_DOWNLOAD_TRANS_CMD 0x32 //File download data transfer | |
#define MODULE_EXTEND_FUN_CMD 0x34 //Open the module time service notification | |
#define BLE_TEST_CMD 0x35 //Bluetooth functional test(Scan designated bluetooth beacon) | |
#define GET_VOICE_STATE_CMD 0x60 //Gets the voice status code | |
#define MIC_SILENCE_CMD 0x61 //MIC mute Settings | |
#define SET_SPEAKER_VOLUME_CMD 0x62 //speaker volume set | |
#define VOICE_TEST_CMD 0x63 //Audio production test | |
#define VOICE_AWAKEN_TEST_CMD 0x64 //Wake up production test | |
#define VOICE_EXTEND_FUN_CMD 0x65 //Voice module extension function | |
class DataPointType(Enum): | |
BOOLEAN = 1 # boolean data 0/1 | |
INTEGER = 2 # value data. If a value contains less than 4 bytes, 0 is supplemented before | |
STRING = 3 # string data | |
ENUM = 4 # enum data 0/1/2/3/4/5 | |
FAULT = 5 # fault data, report only | |
# Reference: https://stackoverflow.com/a/49724281 | |
LOG_LEVEL_NAMES = [logging.getLevelName(v) for v in | |
sorted(getattr(logging, '_levelToName', None) or logging._levelNames) if getattr(v, "real", 0)] | |
log = structlog.get_logger() | |
def pkt_checksum(pkt): | |
sum = 0 | |
log.debug('calculating checksum', pkt=pkt) | |
for c in pkt: | |
log.debug('c', c=c) | |
sum = (sum + c) & 0xff | |
return sum | |
if __name__ == "__main__": | |
structlog.configure( | |
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO), | |
logger_factory=structlog.PrintLoggerFactory(file=sys.stderr) | |
) | |
parser = argparse.ArgumentParser(description='TuyaMCU Serial Protocol Sniffer') | |
parser.add_argument('--loglevel', choices=LOG_LEVEL_NAMES, default='INFO', help='Change log level') | |
parser.add_argument('serial_port', help='Serial port in the form of pyserial ') | |
args = parser.parse_args() | |
config = args | |
# Restrict log message to be above selected level | |
structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, args.loglevel)) ) | |
log.debug('config', config=config) | |
log.info('opening serial port', url=config.serial_port) | |
with serial.serial_for_url(config.serial_port) as ser: | |
while True: | |
fullpacket = bytearray() | |
# Wait for header | |
data = ser.read(3) | |
log.debug('read from serial', data=data) | |
(hdr, proto_ver) = unpack('>HB', data) | |
if hdr != 0x55AA: | |
continue | |
proto_ver = MCUFrameProtoVer(proto_ver) | |
if proto_ver != MCUFrameProtoVer.MCU_RX_VER and proto_ver != MCUFrameProtoVer.MCU_TX_VER: | |
log.warn('protocol version not supported', data=data, proto_ver=proto_ver) | |
continue | |
fullpacket.extend(data) | |
log.debug('hdr', hdr=hex(hdr), proto_ver=proto_ver) | |
data = ser.read(3) | |
log.debug('read from serial', data=data) | |
fullpacket.extend(data) | |
(frame_type, payload_len) = unpack('>BH', data) | |
frame_type = MCUFrameType(frame_type) | |
log.debug('tuyamcu frame', type=frame_type, payload_len=payload_len) | |
payload = ser.read(payload_len) | |
fullpacket.extend(payload) | |
(checksum, ) = unpack('B', ser.read(1)) | |
if pkt_checksum(fullpacket) != checksum: | |
log.warn('invalid frame checksum', pkt=hexlify(fullpacket), | |
checksum=hex(checksum), | |
should_be=hex(pkt_checksum(fullpacket))) | |
log.info('frame', proto_ver=proto_ver, type=frame_type, | |
payload=hexlify(payload), fullpacket=hexlify(fullpacket + bytearray([checksum]))) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment