Skip to content

Instantly share code, notes, and snippets.

@jasl
Last active February 9, 2025 11:47
Show Gist options
  • Save jasl/e899c88778c792cdb97841371fffa46e to your computer and use it in GitHub Desktop.
Save jasl/e899c88778c792cdb97841371fffa46e to your computer and use it in GitHub Desktop.
test_charging.py
import serial # pip install pyserial
gabyCRCHi = [
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,
0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,
0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1,
0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1,
0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,
0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,
0x81,0x40,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,
0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,
0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,
0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,
0x01,0xc0,0x80,0x41,0x00,0xc1,0x81,0x40,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40,0x00,0xc1,0x81,0x40,
0x01,0xc0,0x80,0x41,0x01,0xc0,0x80,0x41,0x00,0xc1,
0x81,0x40,0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,
0x00,0xc1,0x81,0x40,0x01,0xc0,0x80,0x41,0x01,0xc0,
0x80,0x41,0x00,0xc1,0x81,0x40
]
gabyCRCLo = [
0x00,0xc0,0xc1,0x01,0xc3,0x03,0x02,0xc2,0xc6,0x06,
0x07,0xc7,0x05,0xc5,0xc4,0x04,0xcc,0x0c,0x0d,0xcd,
0x0f,0xcf,0xce,0x0e,0x0a,0xca,0xcb,0x0b,0xc9,0x09,
0x08,0xc8,0xd8,0x18,0x19,0xd9,0x1b,0xdb,0xda,0x1a,
0x1e,0xde,0xdf,0x1f,0xdd,0x1d,0x1c,0xdc,0x14,0xd4,
0xd5,0x15,0xd7,0x17,0x16,0xd6,0xd2,0x12,0x13,0xd3,
0x11,0xd1,0xd0,0x10,0xf0,0x30,0x31,0xf1,0x33,0xf3,
0xf2,0x32,0x36,0xf6,0xf7,0x37,0xf5,0x35,0x34,0xf4,
0x3c,0xfc,0xfd,0x3d,0xff,0x3f,0x3e,0xfe,0xfa,0x3a,
0x3b,0xfb,0x39,0xf9,0xf8,0x38,0x28,0xe8,0xe9,0x29,
0xeb,0x2b,0x2a,0xea,0xee,0x2e,0x2f,0xef,0x2d,0xed,
0xec,0x2c,0xe4,0x24,0x25,0xe5,0x27,0xe7,0xe6,0x26,
0x22,0xe2,0xe3,0x23,0xe1,0x21,0x20,0xe0,0xa0,0x60,
0x61,0xa1,0x63,0xa3,0xa2,0x62,0x66,0xa6,0xa7,0x67,
0xa5,0x65,0x64,0xa4,0x6c,0xac,0xad,0x6d,0xaf,0x6f,
0x6e,0xae,0xaa,0x6a,0x6b,0xab,0x69,0xa9,0xa8,0x68,
0x78,0xb8,0xb9,0x79,0xbb,0x7b,0x7a,0xba,0xbe,0x7e,
0x7f,0xbf,0x7d,0xbd,0xbc,0x7c,0xb4,0x74,0x75,0xb5,
0x77,0xb7,0xb6,0x76,0x72,0xb2,0xb3,0x73,0xb1,0x71,
0x70,0xb0,0x50,0x90,0x91,0x51,0x93,0x53,0x52,0x92,
0x96,0x56,0x57,0x97,0x55,0x95,0x94,0x54,0x9c,0x5c,
0x5d,0x9d,0x5f,0x9f,0x9e,0x5e,0x5a,0x9a,0x9b,0x5b,
0x99,0x59,0x58,0x98,0x88,0x48,0x49,0x89,0x4b,0x8b,
0x8a,0x4a,0x4e,0x8e,0x8f,0x4f,0x8d,0x4d,0x4c,0x8c,
0x44,0x84,0x85,0x45,0x87,0x47,0x46,0x86,0x82,0x42,
0x43,0x83,0x41,0x81,0x80,0x40
]
def build_real_time_read_request(sequence_number, pile_number, gun_number):
"""
构建读取实时监测数据的请求报文 (0x12)
"""
frame = bytearray()
frame.append(0x68) # 起始标志
frame.append(0x0C) # 数据长度
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号
frame.append(0x00) # 加密标志
frame.append(0x12) # 帧类型(读取实时监测数据)
frame.extend(pile_number) # 桩编号
frame.append(gun_number) # 枪号
# 计算CRC
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域
frame.extend(crc) # CRC校验值,低字节在前,高字节在后
return frame
def parse_real_time_monitoring_data(frame):
"""
解析实时监测数据 (0x13)
"""
if frame[0] != 0x68:
# print("帧起始标志错误")
print("[ERROR] Invalid frame header")
return
data_length = frame[1]
crc_received = frame[2 + data_length:2 + data_length + 2]
crc_calculated = calculate_crc16(frame[2:2 + data_length])
if crc_received != crc_calculated:
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}")
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}")
return
frame_type = frame[5]
if frame_type == 0x13:
transaction_id = frame[6:22] # 交易流水号
pile_number = frame[22:29] # 桩编号
gun_number = frame[29] # 枪号
status = frame[30] # 状态
stand_by = frame[31] # 枪是否归位
plugged_in = frame[32] # 是否插枪
voltage = int.from_bytes(frame[33:35], byteorder='big') / 10.0 # 输出电压
current = int.from_bytes(frame[35:37], byteorder='big') / 10.0 # 输出电流
# wire_temperature = frame[37]
# wire_code = frame[38:46]
soc = frame[46] # SOC
# voltage = int.from_bytes(frame[34:36], byteorder='big') / 10.0 # 输出电压
# current = int.from_bytes(frame[36:38], byteorder='big') / 10.0 # 输出电流
# soc = frame[46] # SOC
# print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%")
print(f"[INFO] Current status: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, StandBy={stand_by}, PluggedIn={plugged_in}, OutputVoltage={voltage}V, OutputCurrent={current}A, SOC={soc}%")
else:
# print("帧类型错误,无法解析")
print(f"[ERROR] Expect type 0x13, got {frame[5]}")
def build_remote_start_charging_request(sequence_number, pile_number, gun_number, transaction_number):
"""
构建运营平台远程控制启机报文 (0x34)
"""
frame = bytearray()
frame.append(0x68) # 起始标志
frame.append(0x30) # 数据长度
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号
frame.append(0x00) # 加密标志
frame.append(0x34) # 帧类型(运营平台远程控制启机器)
frame.extend(transaction_number.to_bytes(16, byteorder="big")) # 交易流水号(16 bytes)
frame.extend(pile_number) # 桩编号
frame.append(gun_number) # 枪号
frame.extend(0x01.to_bytes(8, byteorder="big")) # 逻辑卡号(不用)
frame.extend(0x01.to_bytes(8, byteorder="big")) # 物理卡号(不用)
frame.extend(0xA0860100.to_bytes(4, byteorder="big")) # 账号余额(硬编码成最大值)
# 计算CRC
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域
frame.extend(crc) # CRC校验值,低字节在前,高字节在后
return frame
def parse_remote_start_charging_data(frame):
"""
解析远程启动充电命令回复 (0x33)
"""
if frame[0] != 0x68:
# print("帧起始标志错误")
print("[ERROR] Invalid frame header")
return
data_length = frame[1]
crc_received = frame[2 + data_length:2 + data_length + 2]
crc_calculated = calculate_crc16(frame[2:2 + data_length])
if crc_received != crc_calculated:
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}")
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}")
return
if frame[5] != 0x33:
# print("帧类型错误,无法解析")
print(f"[ERROR] Expect type 0x33, got {frame[5]}")
return
transaction_id = frame[6:22] # 交易流水号
pile_number = frame[22:29] # 桩编号
gun_number = frame[29] # 枪号
status = frame[30] # 启动结果
error = frame[31]
# print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%")
print(f"[INFO] Remote start charging result: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, Error={error}")
def parse_billing_info_data(frame):
"""
解析远程启动充电命令回复 (0x33)
"""
if frame[0] != 0x68:
# print("帧起始标志错误")
print("[ERROR] Invalid frame header")
return
data_length = frame[1]
crc_received = frame[2 + data_length:2 + data_length + 2]
crc_calculated = calculate_crc16(frame[2:2 + data_length])
if crc_received != crc_calculated:
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}")
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}")
return
if frame[5] != 0x3B:
# print("帧类型错误,无法解析")
print(f"[ERROR] Expect type 0x33, got {frame[5]}")
return
# TODO: NYI
print(f"[INFO] Billng info: (NYI) raw={frame.hex().upper()}")
# transaction_id = frame[6:22] # 交易流水号
# pile_number = frame[22:29] # 桩编号
# gun_number = frame[29] # 枪号
# start_time = frame[30:37] # 开始时间
# end_time = frame[37:44] # 结束时间
# peak_price = frame[45:49] # 尖单价
# peak_electric_quantity = frame[50:54] # 尖电量
# # print(f"实时监测数据: 交易流水号={transaction_id.hex().upper()}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}, 状态={status}, 输出电压={voltage}V, 输出电流={current}A, SOC={soc}%")
# print(f"[INFO] Remote start charging result: TXID={transaction_id.hex().upper()}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}, Status={status}, Error={error}")
def calculate_crc16(bytes_data):
"""
计算CRC校验值
"""
byCRCHi = 0xff
byCRCLo = 0xff
for byte in bytes_data:
byIdx = byCRCHi ^ byte
byCRCHi = byCRCLo ^ gabyCRCHi[byIdx]
byCRCLo = gabyCRCLo[byIdx]
crc = byCRCLo
crc <<= 8
crc += byCRCHi
return crc.to_bytes(2, 'little')
def receive_frame(ser):
"""
从串口接收完整帧
"""
while True:
byte = ser.read(1) # 等待起始标志
if byte == b'\x68': # 检测起始标志
break
data_length = ser.read(1) # 读取数据长度
if len(data_length) == 0:
# print("接收数据长度字节失败")
print("[ERROR] Read 0 byte")
return None
total_length = data_length[0] + 2 # 数据段长度 + 2字节CRC
remaining_data = ser.read(total_length)
if len(remaining_data) != total_length:
# print(f"接收数据长度不足: 当前数据={remaining_data.hex().upper()}")
print(f"[ERROR] Remaining data shorter than expected, got={len(remaining_data)} expect={total_length} data={remaining_data.hex().upper()}")
return None
# 构造完整帧
frame = b'\x68' + data_length + remaining_data
return frame
def parse_frame(frame):
"""
解析帧数据
:param frame: 接收到的完整帧
:return: 帧类型, 桩编号, 序列号, 枪号(可选), 额外数据(可选)
"""
if frame[0] != 0x68:
# print("帧起始标志错误")
print("[ERROR] Invalid frame header")
return None, None, None, None, None
data_length = frame[1]
message_body = frame[2:2 + data_length]
crc_received = frame[2 + data_length:2 + data_length + 2]
crc_calculated = calculate_crc16(frame[2:2 + data_length])
if crc_received != crc_calculated:
# print(f"CRC校验失败: 接收到={crc_received.hex().upper()}, 计算值={crc_calculated.hex().upper()}")
print(f"[ERROR] CRC failed: received={crc_received.hex().upper()}, expect={crc_calculated.hex().upper()}")
return None, None, None, None, None
# 提取公共字段
sequence_number = int.from_bytes(frame[2:4], byteorder="big") # 序列号
frame_type = frame[5] # 帧类型
pile_number = frame[6:13] # 桩编号
# 针对不同帧类型解析特定字段
if frame_type == 0x03: # 心跳包
gun_number = frame[13] # 枪号
# print(f"解析帧成功: 帧类型={hex(frame_type)}, 序列号={sequence_number}, 桩编号={pile_number.hex().upper()}, 枪号={gun_number}")
# print(f"[DEBUG] Parsed frame: type={hex(frame_type)}, SequenceNumber={sequence_number}, PileNumber={pile_number.hex().upper()}, GunNumber={gun_number}")
return frame_type, pile_number, sequence_number, gun_number, None
elif frame_type == 0x05: # 计费模型验证请求
model_code = frame[13:15] # 计费模型编号
# print(f"解析帧成功: 帧类型={hex(frame_type)}, 序列号={sequence_number}, 桩编号={pile_number.hex().upper()}, 模型编号={model_code.hex().upper()}")
# print(f"[DEBUG] Parsed frame: type={hex(frame_type)}, SequenceNumber={sequence_number}, PileNumber={pile_number.hex().upper()}, ModelCode={model_code.hex().upper()}")
return frame_type, pile_number, sequence_number, None, model_code
else:
# print(f"未处理的帧类型: {hex(frame_type)}")
# print(f"[DEBUG] Unrecognized frame type: {hex(frame_type)}")
return frame_type, pile_number, sequence_number, None, None
def build_login_response(sequence_number, pile_number):
"""
构建认证成功的回复报文 (0x02)
"""
frame = bytearray()
frame.append(0x68) # 起始标志
frame.append(0x0C) # 数据长度
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号
frame.append(0x00) # 加密标志
frame.append(0x02) # 帧类型(认证应答)
frame.extend(pile_number) # 桩编号
frame.append(0x00) # 登录结果(成功)
# 计算CRC
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域
frame.extend(crc) # CRC校验值,低字节在前,高字节在后
return frame
def build_billing_model_response(sequence_number, pile_number, model_code):
"""
构建计费模型验证的回复报文 (0x06)
"""
frame = bytearray()
frame.append(0x68) # 起始标志
frame.append(0x0E) # 数据长度
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号
frame.append(0x00) # 加密标志
frame.append(0x06) # 帧类型(计费模型验证应答)
frame.extend(pile_number) # 桩编号
frame.extend(model_code) # 计费模型编号
frame.append(0x00) # 验证结果(0x00 表示成功)
# 计算CRC
crc = calculate_crc16(frame[2:]) # 从数据长度到数据域
frame.extend(crc) # CRC校验值,低字节在前,高字节在后
return frame
def build_heartbeat_response(sequence_number, pile_number, gun_number):
"""
构建心跳包回复报文 (0x04)
:param sequence_number: 序列号 (2字节整数)
:param pile_number: 桩编号 (7字节)
:param gun_number: 枪号 (1字节)
:return: 完整的回复报文(包含校验)
"""
frame = bytearray()
frame.append(0x68) # 起始标志
frame.append(0x0D) # 数据长度(固定为 0x0D)
frame.extend(sequence_number.to_bytes(2, byteorder="big")) # 序列号
frame.append(0x00) # 加密标志
frame.append(0x04) # 帧类型(心跳应答)
frame.extend(pile_number) # 桩编号
frame.append(gun_number) # 枪号
frame.append(0x00) # 固定为 0x00 的应答字段
# 计算CRC
crc = calculate_crc16(frame[2:]) # 从数据长度到应答字段
frame.extend(crc) # CRC校验值,低字节在前,高字节在后
# print(f"构建心跳回复帧: {frame.hex().upper()}")
# print(f"[DEBUG] Heartbeat frame: {frame.hex().upper()}")
return frame
def communicate_with_pile(port, baudrate):
"""
与充电桩进行串口交互
"""
try:
# 打开串口
with serial.Serial(port, baudrate, timeout=1) as ser:
# print(f"成功连接到串口: {port},波特率: {baudrate}")
print(f"[DEBUG] Connected to serial port: {port}, Baud rate: {baudrate}")
dev_login = False # Dev only, flag for login
dev_billing_mode_updated = False # Dev only
dev_charge_request_sent = False # Dev only, flag for charging request sent
while True:
# 等待接收数据
frame = receive_frame(ser)
if frame is None:
continue
# print(f"收到数据: {frame.hex().upper()}")
print(f"[DEBUG] Received frame: {frame.hex().upper()}")
# 解析数据帧
frame_type, pile_number, sequence_number, gun_number, extra_data = parse_frame(frame)
gun_number = 1 # hard code it
if frame_type is None:
# print("帧解析失败,丢弃帧")
print("[WARNING] Couldn't parse the frame, drop it")
continue
# 处理帧类型
if frame_type == 0x01: # 登录请求
# print("处理登录请求...")
# print("[Debug] Building login response frame")
response = build_login_response(sequence_number, pile_number)
# print(f"发送登录回复: {response.hex().upper()}")
# print(f"[DEBUG] Sending login response frame: {response.hex().upper()}")
ser.write(response)
dev_login = True
elif frame_type == 0x05: # 计费模型验证请求
# print("处理计费模型验证请求...")
# print("[DEBUG] Buiding billing model response frame")
model_code = frame[13:15] # 提取计费模型编号(假设固定位置)
response = build_billing_model_response(sequence_number, pile_number, model_code)
# print(f"发送计费模型验证回复: {response.hex().upper()}")
# print(f"[DEBUG] Sending billing model response frame: {response.hex().upper()}")
ser.write(response)
dev_billing_mode_updated = True
elif frame_type == 0x03: # 心跳包
# print("处理心跳包...")
# print("[DEBUG] Building heartbeat response frame")
response = build_heartbeat_response(sequence_number, pile_number, gun_number)
# print(f"发送心跳包回复: {response.hex().upper()}")
# print(f"[DEBUG] Sending heartbeat response frame: {response.hex().upper()}")
ser.write(response)
if dev_login and dev_billing_mode_updated:
if dev_charge_request_sent == False:
# 构建运营平台远程控制启机请求
transaction_number = 0x00000000000000000000000000000001 # Hard code for PoC
request_frame = build_remote_start_charging_request(sequence_number, pile_number, gun_number, transaction_number)
# print(f"发送运营平台远程控制启机请求: {request_frame.hex().upper()}")
print(f"[DEBUG] Sending remote start charging frame: {request_frame.hex().upper()}")
ser.write(request_frame)
dev_charge_request_sent = True
sequence_number = (sequence_number + 1) % 65536
else:
# 构建监测数据请求
request_frame = build_real_time_read_request(sequence_number, pile_number, gun_number)
# print(f"发送读取实时监测数据请求: {request_frame.hex().upper()}")
# print(f"[DEBUG] Sending real-time read request frame: {request_frame.hex().upper()}")
ser.write(request_frame)
sequence_number = (sequence_number + 1) % 65536
# # 构建监测数据请求
# request_frame = build_real_time_read_request(sequence_number, pile_number, gun_number)
# # print(f"发送读取实时监测数据请求: {request_frame.hex().upper()}")
# # print(f"[DEBUG] Sending real-time read request frame: {request_frame.hex().upper()}")
# ser.write(request_frame)
# sequence_number = (sequence_number + 1) % 65536
elif frame_type == 0x13: # 实时监测数据
parse_real_time_monitoring_data(frame)
# print(f"实时监测成功")
elif frame_type == 0x33: # 远程启动充电命令回复
parse_remote_start_charging_data(frame)
# print(f"远程启动充电命令回复成功")
elif frame_type == 0x3B: # 计费模型验证请求
parse_billing_info_data(frame)
else:
# print(f"未处理的帧类型: {hex(frame_type)}")
print(f"[ERROR] Unhandled frame type: {hex(frame_type)}")
except serial.SerialException as e:
# print(f"串口通信错误: {e}")
print(f"SerialException: {e}")
# except Exception as e:
# print(f"Exception: {e}")
# 主程序
if __name__ == "__main__":
# 设置串口参数
SERIAL_PORT = "/dev/ttyUSB0" # 根据实际情况修改
BAUDRATE = 115200
# 启动交互
communicate_with_pile(SERIAL_PORT, BAUDRATE)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment