Skip to content

Instantly share code, notes, and snippets.

@enkiusz
Created December 19, 2022 15:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save enkiusz/2359d46a6c54e05118d05ff67cff5ccf to your computer and use it in GitHub Desktop.
Save enkiusz/2359d46a6c54e05118d05ff67cff5ccf to your computer and use it in GitHub Desktop.
A sniffer for the TuyaMCU protocol
#!/usr/bin/env python3
# Sniffer for the TuyaMCU serial port protocol. This protocol is used between an application MCU
# and the Tuya radio modules.
# Reference: https://tasmota.github.io/docs/TuyaMCU/#tuya-protocols
import serial
import argparse
import structlog
import sys
import logging
from struct import pack,unpack
from enum import Enum
from binascii import hexlify
# Reference: https://github.com/tuya/tuya-wifi-mcu-sdk-arduino-library/blob/master/src/TuyaDefs.h
class MCUFrameProtoVer(Enum):
MCU_RX_VER=0x00 # Module send frame protocol version number
MCU_TX_VER=0x03 # MCU send frame protocol version number(default)
class MCUFrameType(Enum):
HEAT_BEAT_CMD=0 # Heartbeat package
PRODUCT_INFO_CMD=1 # Product information
WORK_MODE_CMD=2 # Query the module working mode set by the MCU
WIFI_STATE_CMD=3 # Wifi working status
WIFI_RESET_CMD=4 # Reset wifi
WIFI_MODE_CMD=5 # Select smartconfig/AP mode
DATA_QUERT_CMD=6 # (sic) Order send
STATE_UPLOAD_CMD=7 # Status upload
STATE_QUERY_CMD=8 # Status query
UPDATE_START_CMD=0x0a # Upgrade start
UPDATE_TRANS_CMD=0x0b # Upgrade transfer
GET_ONLINE_TIME_CMD=0x0c # Get system time (Greenwich Mean Time)
FACTORY_MODE_CMD=0x0d # Enter production test mode
WIFI_TEST_CMD=0x0e # Wifi function test
GET_LOCAL_TIME_CMD=0x1c # Get local time
WEATHER_OPEN_CMD=0x20 # (sic) Turn on the weather
#define WEATHER_DATA_CMD 0x21 //Weather data
#define STATE_UPLOAD_SYN_CMD 0x22 //Status upload (synchronization)
#define STATE_UPLOAD_SYN_RECV_CMD 0x23 //Status upload results(synchronization)
#define HEAT_BEAT_STOP 0x25 //Turn off the WIFI module heartbeat
#define STREAM_TRANS_CMD 0x28 //Stream data transmission
#define GET_WIFI_STATUS_CMD 0x2b //Gets the wifi networking status
#define WIFI_CONNECT_TEST_CMD 0x2c //Wifi function test(connection designated route)
#define GET_MAC_CMD 0x2d //Get module mac
#define GET_IR_STATUS_CMD 0x2e //IR status notification
#define IR_TX_RX_TEST_CMD 0x2f //IR into send-receive test
#define MAPS_STREAM_TRANS_CMD 0x30 //streams trans(Support for multiple maps)
#define FILE_DOWNLOAD_START_CMD 0x31 //File download startup
#define FILE_DOWNLOAD_TRANS_CMD 0x32 //File download data transfer
#define MODULE_EXTEND_FUN_CMD 0x34 //Open the module time service notification
#define BLE_TEST_CMD 0x35 //Bluetooth functional test(Scan designated bluetooth beacon)
#define GET_VOICE_STATE_CMD 0x60 //Gets the voice status code
#define MIC_SILENCE_CMD 0x61 //MIC mute Settings
#define SET_SPEAKER_VOLUME_CMD 0x62 //speaker volume set
#define VOICE_TEST_CMD 0x63 //Audio production test
#define VOICE_AWAKEN_TEST_CMD 0x64 //Wake up production test
#define VOICE_EXTEND_FUN_CMD 0x65 //Voice module extension function
class DataPointType(Enum):
BOOLEAN = 1 # boolean data 0/1
INTEGER = 2 # value data. If a value contains less than 4 bytes, 0 is supplemented before
STRING = 3 # string data
ENUM = 4 # enum data 0/1/2/3/4/5
FAULT = 5 # fault data, report only
# Reference: https://stackoverflow.com/a/49724281
LOG_LEVEL_NAMES = [logging.getLevelName(v) for v in
sorted(getattr(logging, '_levelToName', None) or logging._levelNames) if getattr(v, "real", 0)]
log = structlog.get_logger()
def pkt_checksum(pkt):
sum = 0
log.debug('calculating checksum', pkt=pkt)
for c in pkt:
log.debug('c', c=c)
sum = (sum + c) & 0xff
return sum
if __name__ == "__main__":
structlog.configure(
wrapper_class=structlog.make_filtering_bound_logger(logging.INFO),
logger_factory=structlog.PrintLoggerFactory(file=sys.stderr)
)
parser = argparse.ArgumentParser(description='TuyaMCU Serial Protocol Sniffer')
parser.add_argument('--loglevel', choices=LOG_LEVEL_NAMES, default='INFO', help='Change log level')
parser.add_argument('serial_port', help='Serial port in the form of pyserial ')
args = parser.parse_args()
config = args
# Restrict log message to be above selected level
structlog.configure( wrapper_class=structlog.make_filtering_bound_logger(getattr(logging, args.loglevel)) )
log.debug('config', config=config)
log.info('opening serial port', url=config.serial_port)
with serial.serial_for_url(config.serial_port) as ser:
while True:
fullpacket = bytearray()
# Wait for header
data = ser.read(3)
log.debug('read from serial', data=data)
(hdr, proto_ver) = unpack('>HB', data)
if hdr != 0x55AA:
continue
proto_ver = MCUFrameProtoVer(proto_ver)
if proto_ver != MCUFrameProtoVer.MCU_RX_VER and proto_ver != MCUFrameProtoVer.MCU_TX_VER:
log.warn('protocol version not supported', data=data, proto_ver=proto_ver)
continue
fullpacket.extend(data)
log.debug('hdr', hdr=hex(hdr), proto_ver=proto_ver)
data = ser.read(3)
log.debug('read from serial', data=data)
fullpacket.extend(data)
(frame_type, payload_len) = unpack('>BH', data)
frame_type = MCUFrameType(frame_type)
log.debug('tuyamcu frame', type=frame_type, payload_len=payload_len)
payload = ser.read(payload_len)
fullpacket.extend(payload)
(checksum, ) = unpack('B', ser.read(1))
if pkt_checksum(fullpacket) != checksum:
log.warn('invalid frame checksum', pkt=hexlify(fullpacket),
checksum=hex(checksum),
should_be=hex(pkt_checksum(fullpacket)))
log.info('frame', proto_ver=proto_ver, type=frame_type,
payload=hexlify(payload), fullpacket=hexlify(fullpacket + bytearray([checksum])))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment