Last active
February 9, 2025 11:47
-
-
Save jasl/e899c88778c792cdb97841371fffa46e to your computer and use it in GitHub Desktop.
test_charging.py
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import serial # pip install pyserial | |
gabyCRCHi = [ | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41, | |
0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0, | |
0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1, | |
0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1, | |
0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40, | |
0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1, | |
0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40, | |
0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0, | |
0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41, | |
0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41, | |
0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40, | |
0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1, | |
0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41, | |
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0, | |
0x80,0x41,0x00,0xc1,0x81,0x40 | |
] | |
gabyCRCLo = [ | |
0x00,0xc0,0xc1,0x01,0xc3,0x03,0x02,0xc2,0xc6,0x06, | |
0x07,0xc7,0x05,0xc5,0xc4,0x04,0xcc,0x0c,0x0d,0xcd, | |
0x0f,0xcf,0xce,0x0e,0x0a,0xca,0xcb,0x0b,0xc9,0x09, | |
0x08,0xc8,0xd8,0x18,0x19,0xd9,0x1b,0xdb,0xda,0x1a, | |
0x1e,0xde,0xdf,0x1f,0xdd,0x1d,0x1c,0xdc,0x14,0xd4, | |
0xd5,0x15,0xd7,0x17,0x16,0xd6,0xd2,0x12,0x13,0xd3, | |
0x11,0xd1,0xd0,0x10,0xf0,0x30,0x31,0xf1,0x33,0xf3, | |
0xf2,0x32,0x36,0xf6,0xf7,0x37,0xf5,0x35,0x34,0xf4, | |
0x3c,0xfc,0xfd,0x3d,0xff,0x3f,0x3e,0xfe,0xfa,0x3a, | |
0x3b,0xfb,0x39,0xf9,0xf8,0x38,0x28,0xe8,0xe9,0x29, | |
0xeb,0x2b,0x2a,0xea,0xee,0x2e,0x2f,0xef,0x2d,0xed, | |
0xec,0x2c,0xe4,0x24,0x25,0xe5,0x27,0xe7,0xe6,0x26, | |
0x22,0xe2,0xe3,0x23,0xe1,0x21,0x20,0xe0,0xa0,0x60, | |
0x61,0xa1,0x63,0xa3,0xa2,0x62,0x66,0xa6,0xa7,0x67, | |
0xa5,0x65,0x64,0xa4,0x6c,0xac,0xad,0x6d,0xaf,0x6f, | |
0x6e,0xae,0xaa,0x6a,0x6b,0xab,0x69,0xa9,0xa8,0x68, | |
0x78,0xb8,0xb9,0x79,0xbb,0x7b,0x7a,0xba,0xbe,0x7e, | |
0x7f,0xbf,0x7d,0xbd,0xbc,0x7c,0xb4,0x74,0x75,0xb5, | |
0x77,0xb7,0xb6,0x76,0x72,0xb2,0xb3,0x73,0xb1,0x71, | |
0x70,0xb0,0x50,0x90,0x91,0x51,0x93,0x53,0x52,0x92, | |
0x96,0x56,0x57,0x97,0x55,0x95,0x94,0x54,0x9c,0x5c, | |
0x5d,0x9d,0x5f,0x9f,0x9e,0x5e,0x5a,0x9a,0x9b,0x5b, | |
0x99,0x59,0x58,0x98,0x88,0x48,0x49,0x89,0x4b,0x8b, | |
0x8a,0x4a,0x4e,0x8e,0x8f,0x4f,0x8d,0x4d,0x4c,0x8c, | |
0x44,0x84,0x85,0x45,0x87,0x47,0x46,0x86,0x82,0x42, | |
0x43,0x83,0x41,0x81,0x80,0x40 | |
] | |
def build_real_time_read_request(sequence_number, pile_number, gun_number): | |
""" | |
构建读取实时监测数据的请求报文 (0x12) | |
""" | |
frame = bytearray() | |
frame.append(0x68) # 起始标志 | |
frame.append(0x0C) # 数据长度 | |
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号 | |
frame.append(0x00) # 加密标志 | |
frame.append(0x12) # 帧类型(读取实时监测数据) | |
frame.extend(pile_number) # 桩编号 | |
frame.append(gun_number) # 枪号 | |
# 计算CRC | |
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域 | |
frame.extend(crc) # CRC校验值,低字节在前,高字节在后 | |
return frame | |
def parse_real_time_monitoring_data(frame): | |
""" | |
解析实时监测数据 (0x13) | |
""" | |
if frame[0] != 0x68: | |
# print("帧起始标志错误") | |
print("[ERROR] Invalid frame header") | |
return | |
data_length = frame[1] | |
crc_received = frame[2 + data_length:2 + data_length + 2] | |
crc_calculated = calculate_crc16(frame[2:2 + data_length]) | |
if crc_received != crc_calculated: | |
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}") | |
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}") | |
return | |
frame_type = frame[5] | |
if frame_type == 0x13: | |
transaction_id = frame[6:22] # 交易流水号 | |
pile_number = frame[22:29] # 桩编号 | |
gun_number = frame[29] # 枪号 | |
status = frame[30] # 状态 | |
stand_by = frame[31] # 枪是否归位 | |
plugged_in = frame[32] # 是否插枪 | |
voltage = int.from_bytes(frame[33:35], byteorder='big') / 10.0 # 输出电压 | |
current = int.from_bytes(frame[35:37], byteorder='big') / 10.0 # 输出电流 | |
# wire_temperature = frame[37] | |
# wire_code = frame[38:46] | |
soc = frame[46] # SOC | |
# voltage = int.from_bytes(frame[34:36], byteorder='big') / 10.0 # 输出电压 | |
# current = int.from_bytes(frame[36:38], byteorder='big') / 10.0 # 输出电流 | |
# soc = frame[46] # SOC | |
# print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%") | |
print(f"[INFO] Current status: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, StandBy={stand_by}, PluggedIn={plugged_in}, OutputVoltage={voltage}V, OutputCurrent={current}A, SOC={soc}%") | |
else: | |
# print("帧类型错误,无法解析") | |
print(f"[ERROR] Expect type 0x13, got {frame[5]}") | |
def build_remote_start_charging_request(sequence_number, pile_number, gun_number, transaction_number): | |
""" | |
构建运营平台远程控制启机报文 (0x34) | |
""" | |
frame = bytearray() | |
frame.append(0x68) # 起始标志 | |
frame.append(0x30) # 数据长度 | |
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号 | |
frame.append(0x00) # 加密标志 | |
frame.append(0x34) # 帧类型(运营平台远程控制启机器) | |
frame.extend(transaction_number.to_bytes(16, byteorder="big")) # 交易流水号(16 bytes) | |
frame.extend(pile_number) # 桩编号 | |
frame.append(gun_number) # 枪号 | |
frame.extend(0x01.to_bytes(8, byteorder="big")) # 逻辑卡号(不用) | |
frame.extend(0x01.to_bytes(8, byteorder="big")) # 物理卡号(不用) | |
frame.extend(0xA0860100.to_bytes(4, byteorder="big")) # 账号余额(硬编码成最大值) | |
# 计算CRC | |
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域 | |
frame.extend(crc) # CRC校验值,低字节在前,高字节在后 | |
return frame | |
def parse_remote_start_charging_data(frame): | |
""" | |
解析远程启动充电命令回复 (0x33) | |
""" | |
if frame[0] != 0x68: | |
# print("帧起始标志错误") | |
print("[ERROR] Invalid frame header") | |
return | |
data_length = frame[1] | |
crc_received = frame[2 + data_length:2 + data_length + 2] | |
crc_calculated = calculate_crc16(frame[2:2 + data_length]) | |
if crc_received != crc_calculated: | |
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}") | |
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}") | |
return | |
if frame[5] != 0x33: | |
# print("帧类型错误,无法解析") | |
print(f"[ERROR] Expect type 0x33, got {frame[5]}") | |
return | |
transaction_id = frame[6:22] # 交易流水号 | |
pile_number = frame[22:29] # 桩编号 | |
gun_number = frame[29] # 枪号 | |
status = frame[30] # 启动结果 | |
error = frame[31] | |
# print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%") | |
print(f"[INFO] Remote start charging result: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, Error={error}") | |
def parse_billing_info_data(frame): | |
""" | |
解析远程启动充电命令回复 (0x33) | |
""" | |
if frame[0] != 0x68: | |
# print("帧起始标志错误") | |
print("[ERROR] Invalid frame header") | |
return | |
data_length = frame[1] | |
crc_received = frame[2 + data_length:2 + data_length + 2] | |
crc_calculated = calculate_crc16(frame[2:2 + data_length]) | |
if crc_received != crc_calculated: | |
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}") | |
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}") | |
return | |
if frame[5] != 0x3B: | |
# print("帧类型错误,无法解析") | |
print(f"[ERROR] Expect type 0x33, got {frame[5]}") | |
return | |
# TODO: NYI | |
print(f"[INFO] Billng info: (NYI) raw={frame.hex().upper()}") | |
# transaction_id = frame[6:22] # 交易流水号 | |
# pile_number = frame[22:29] # 桩编号 | |
# gun_number = frame[29] # 枪号 | |
# start_time = frame[30:37] # 开始时间 | |
# end_time = frame[37:44] # 结束时间 | |
# peak_price = frame[45:49] # 尖单价 | |
# peak_electric_quantity = frame[50:54] # 尖电量 | |
# # print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%") | |
# print(f"[INFO] Remote start charging result: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, Error={error}") | |
def calculate_crc16(bytes_data): | |
""" | |
计算CRC校验值 | |
""" | |
byCRCHi = 0xff | |
byCRCLo = 0xff | |
for byte in bytes_data: | |
byIdx = byCRCHi ^ byte | |
byCRCHi = byCRCLo ^ gabyCRCHi[byIdx] | |
byCRCLo = gabyCRCLo[byIdx] | |
crc = byCRCLo | |
crc <<= 8 | |
crc += byCRCHi | |
return crc.to_bytes(2, 'little') | |
def receive_frame(ser): | |
""" | |
从串口接收完整帧 | |
""" | |
while True: | |
byte = ser.read(1) # 等待起始标志 | |
if byte == b'\x68': # 检测起始标志 | |
break | |
data_length = ser.read(1) # 读取数据长度 | |
if len(data_length) == 0: | |
# print("接收数据长度字节失败") | |
print("[ERROR] Read 0 byte") | |
return None | |
total_length = data_length[0] + 2 # 数据段长度 + 2字节CRC | |
remaining_data = ser.read(total_length) | |
if len(remaining_data) != total_length: | |
# print(f"接收数据长度不足: 当前数据={remaining_data.hex().upper()}") | |
print(f"[ERROR] Remaining data shorter than expected, got={len(remaining_data)} expect={total_length} data={remaining_data.hex().upper()}") | |
return None | |
# 构造完整帧 | |
frame = b'\x68' + data_length + remaining_data | |
return frame | |
def parse_frame(frame): | |
""" | |
解析帧数据 | |
:param frame: 接收到的完整帧 | |
:return: 帧类型, 桩编号, 序列号, 枪号(可选), 额外数据(可选) | |
""" | |
if frame[0] != 0x68: | |
# print("帧起始标志错误") | |
print("[ERROR] Invalid frame header") | |
return None, None, None, None, None | |
data_length = frame[1] | |
message_body = frame[2:2 + data_length] | |
crc_received = frame[2 + data_length:2 + data_length + 2] | |
crc_calculated = calculate_crc16(frame[2:2 + data_length]) | |
if crc_received != crc_calculated: | |
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}") | |
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}") | |
return None, None, None, None, None | |
# 提取公共字段 | |
sequence_number = int.from_bytes(frame[2:4], byteorder="big") # 序列号 | |
frame_type = frame[5] # 帧类型 | |
pile_number = frame[6:13] # 桩编号 | |
# 针对不同帧类型解析特定字段 | |
if frame_type == 0x03: # 心跳包 | |
gun_number = frame[13] # 枪号 | |
# print(f"解析帧成功: 帧类型={hex(frame_type)}, 序列号={sequence_number}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}") | |
# print(f"[DEBUG] Parsed frame: type={hex(frame_type)}, SequenceNumber={sequence_number}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}") | |
return frame_type, pile_number, sequence_number, gun_number, None | |
elif frame_type == 0x05: # 计费模型验证请求 | |
model_code = frame[13:15] # 计费模型编号 | |
# print(f"解析帧成功: 帧类型={hex(frame_type)}, 序列号={sequence_number}, 桩编号={pile_number.hex().upper()}, 模型编号={model_code.hex().upper()}") | |
# print(f"[DEBUG] Parsed frame: type={hex(frame_type)}, SequenceNumber={sequence_number}, PileNumber={pile_number.hex().upper()}, ModelCode={model_code.hex().upper()}") | |
return frame_type, pile_number, sequence_number, None, model_code | |
else: | |
# print(f"未处理的帧类型: {hex(frame_type)}") | |
# print(f"[DEBUG] Unrecognized frame type: {hex(frame_type)}") | |
return frame_type, pile_number, sequence_number, None, None | |
def build_login_response(sequence_number, pile_number): | |
""" | |
构建认证成功的回复报文 (0x02) | |
""" | |
frame = bytearray() | |
frame.append(0x68) # 起始标志 | |
frame.append(0x0C) # 数据长度 | |
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号 | |
frame.append(0x00) # 加密标志 | |
frame.append(0x02) # 帧类型(认证应答) | |
frame.extend(pile_number) # 桩编号 | |
frame.append(0x00) # 登录结果(成功) | |
# 计算CRC | |
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域 | |
frame.extend(crc) # CRC校验值,低字节在前,高字节在后 | |
return frame | |
def build_billing_model_response(sequence_number, pile_number, model_code): | |
""" | |
构建计费模型验证的回复报文 (0x06) | |
""" | |
frame = bytearray() | |
frame.append(0x68) # 起始标志 | |
frame.append(0x0E) # 数据长度 | |
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号 | |
frame.append(0x00) # 加密标志 | |
frame.append(0x06) # 帧类型(计费模型验证应答) | |
frame.extend(pile_number) # 桩编号 | |
frame.extend(model_code) # 计费模型编号 | |
frame.append(0x00) # 验证结果(0x00 表示成功) | |
# 计算CRC | |
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域 | |
frame.extend(crc) # CRC校验值,低字节在前,高字节在后 | |
return frame | |
def build_heartbeat_response(sequence_number, pile_number, gun_number): | |
""" | |
构建心跳包回复报文 (0x04) | |
:param sequence_number: 序列号 (2字节整数) | |
:param pile_number: 桩编号 (7字节) | |
:param gun_number: 枪号 (1字节) | |
:return: 完整的回复报文(包含校验) | |
""" | |
frame = bytearray() | |
frame.append(0x68) # 起始标志 | |
frame.append(0x0D) # 数据长度(固定为 0x0D) | |
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号 | |
frame.append(0x00) # 加密标志 | |
frame.append(0x04) # 帧类型(心跳应答) | |
frame.extend(pile_number) # 桩编号 | |
frame.append(gun_number) # 枪号 | |
frame.append(0x00) # 固定为 0x00 的应答字段 | |
# 计算CRC | |
crc = calculate_crc16(frame[2:]) # 从数据长度到应答字段 | |
frame.extend(crc) # CRC校验值,低字节在前,高字节在后 | |
# print(f"构建心跳回复帧: {frame.hex().upper()}") | |
# print(f"[DEBUG] Heartbeat frame: {frame.hex().upper()}") | |
return frame | |
def communicate_with_pile(port, baudrate): | |
""" | |
与充电桩进行串口交互 | |
""" | |
try: | |
# 打开串口 | |
with serial.Serial(port, baudrate, timeout=1) as ser: | |
# print(f"成功连接到串口: {port},波特率: {baudrate}") | |
print(f"[DEBUG] Connected to serial port: {port}, Baud rate: {baudrate}") | |
dev_login = False # Dev only, flag for login | |
dev_billing_mode_updated = False # Dev only | |
dev_charge_request_sent = False # Dev only, flag for charging request sent | |
while True: | |
# 等待接收数据 | |
frame = receive_frame(ser) | |
if frame is None: | |
continue | |
# print(f"收到数据: {frame.hex().upper()}") | |
print(f"[DEBUG] Received frame: {frame.hex().upper()}") | |
# 解析数据帧 | |
frame_type, pile_number, sequence_number, gun_number, extra_data = parse_frame(frame) | |
gun_number = 1 # hard code it | |
if frame_type is None: | |
# print("帧解析失败,丢弃帧") | |
print("[WARNING] Couldn't parse the frame, drop it") | |
continue | |
# 处理帧类型 | |
if frame_type == 0x01: # 登录请求 | |
# print("处理登录请求...") | |
# print("[Debug] Building login response frame") | |
response = build_login_response(sequence_number, pile_number) | |
# print(f"发送登录回复: {response.hex().upper()}") | |
# print(f"[DEBUG] Sending login response frame: {response.hex().upper()}") | |
ser.write(response) | |
dev_login = True | |
elif frame_type == 0x05: # 计费模型验证请求 | |
# print("处理计费模型验证请求...") | |
# print("[DEBUG] Buiding billing model response frame") | |
model_code = frame[13:15] # 提取计费模型编号(假设固定位置) | |
response = build_billing_model_response(sequence_number, pile_number, model_code) | |
# print(f"发送计费模型验证回复: {response.hex().upper()}") | |
# print(f"[DEBUG] Sending billing model response frame: {response.hex().upper()}") | |
ser.write(response) | |
dev_billing_mode_updated = True | |
elif frame_type == 0x03: # 心跳包 | |
# print("处理心跳包...") | |
# print("[DEBUG] Building heartbeat response frame") | |
response = build_heartbeat_response(sequence_number, pile_number, gun_number) | |
# print(f"发送心跳包回复: {response.hex().upper()}") | |
# print(f"[DEBUG] Sending heartbeat response frame: {response.hex().upper()}") | |
ser.write(response) | |
if dev_login and dev_billing_mode_updated: | |
if dev_charge_request_sent == False: | |
# 构建运营平台远程控制启机请求 | |
transaction_number = 0x00000000000000000000000000000001 # Hard code for PoC | |
request_frame = build_remote_start_charging_request(sequence_number, pile_number, gun_number, transaction_number) | |
# print(f"发送运营平台远程控制启机请求: {request_frame.hex().upper()}") | |
print(f"[DEBUG] Sending remote start charging frame: {request_frame.hex().upper()}") | |
ser.write(request_frame) | |
dev_charge_request_sent = True | |
sequence_number = (sequence_number + 1) % 65536 | |
else: | |
# 构建监测数据请求 | |
request_frame = build_real_time_read_request(sequence_number, pile_number, gun_number) | |
# print(f"发送读取实时监测数据请求: {request_frame.hex().upper()}") | |
# print(f"[DEBUG] Sending real-time read request frame: {request_frame.hex().upper()}") | |
ser.write(request_frame) | |
sequence_number = (sequence_number + 1) % 65536 | |
# # 构建监测数据请求 | |
# request_frame = build_real_time_read_request(sequence_number, pile_number, gun_number) | |
# # print(f"发送读取实时监测数据请求: {request_frame.hex().upper()}") | |
# # print(f"[DEBUG] Sending real-time read request frame: {request_frame.hex().upper()}") | |
# ser.write(request_frame) | |
# sequence_number = (sequence_number + 1) % 65536 | |
elif frame_type == 0x13: # 实时监测数据 | |
parse_real_time_monitoring_data(frame) | |
# print(f"实时监测成功") | |
elif frame_type == 0x33: # 远程启动充电命令回复 | |
parse_remote_start_charging_data(frame) | |
# print(f"远程启动充电命令回复成功") | |
elif frame_type == 0x3B: # 计费模型验证请求 | |
parse_billing_info_data(frame) | |
else: | |
# print(f"未处理的帧类型: {hex(frame_type)}") | |
print(f"[ERROR] Unhandled frame type: {hex(frame_type)}") | |
except serial.SerialException as e: | |
# print(f"串口通信错误: {e}") | |
print(f"SerialException: {e}") | |
# except Exception as e: | |
# print(f"Exception: {e}") | |
# 主程序 | |
if __name__ == "__main__": | |
# 设置串口参数 | |
SERIAL_PORT = "/dev/ttyUSB0" # 根据实际情况修改 | |
BAUDRATE = 115200 | |
# 启动交互 | |
communicate_with_pile(SERIAL_PORT, BAUDRATE) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment