/decode_protobuf_binary.py Secret
Created
March 30, 2025 06:37
解码Protocol Buffer
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def proto_gen(proto: str): | |
""" | |
create a proto file and generate the python code | |
""" | |
import os | |
import subprocess | |
proto_path = "src" | |
if not os.path.exists(proto_path): | |
os.mkdir(proto_path) | |
proto_file_path = os.path.join(proto_path, "person.proto") | |
with open(proto_file_path, "w") as f: | |
f.write(proto) | |
subprocess.run( | |
["protoc", f"--proto_path={proto_path}", "--python_out=.", proto_file_path] | |
) | |
def generate_proto_bytes(pid: int, pname: str, pheight: float) -> bytes: | |
""" | |
generate the bytes of a person object | |
""" | |
import person_pb2 | |
person = person_pb2.Person() | |
person.id = pid | |
person.name = pname | |
person.height = pheight | |
return person.SerializeToString() | |
if __name__ == "__main__": | |
proto_str = """syntax = "proto3"; | |
message Person { | |
string name = 1; | |
int32 id = 2; | |
float height = 3; | |
} | |
""" | |
proto_gen(proto_str) | |
_id, name, height = 300, "Phuong Le", 1.75 | |
person_bytes: bytes = generate_proto_bytes(_id, name, height) | |
print(f"Person object bytes: \n{person_bytes}\n") | |
import re | |
import struct | |
# extract the field lines in parentheses from the proto string | |
field_dict = { | |
int(field_number): (field_name, field_type) | |
for field_type, field_name, field_number in re.findall( | |
r"(\S*) (\S*) = (.*);\n", proto_str | |
) | |
} | |
print("Describe each byte of the person object:") | |
for i, b in enumerate(person_bytes): | |
print(person_bytes[i : i + 1], end="\t") | |
# print the binary representation of the byte | |
print(format(b, "08b"), end="\t") | |
# print the decimal representation of the byte | |
print(b) | |
print() | |
i = 0 | |
while i < len(person_bytes): | |
first_byte = person_bytes[i] | |
# first 5 bits is field number and last 3 bits is wire type | |
field_number = first_byte >> 3 | |
wire_type = first_byte & 0b00000111 | |
print(f"field_number: {field_number}, wire_type: {wire_type}") | |
if wire_type == 0: | |
# varint | |
length = 1 | |
value_bits = [] | |
# the most significant bit is a continuation bit, the value is in the lower 7 bits | |
while True: | |
value = person_bytes[i + length] & 0b01111111 | |
value_bits.append(value) | |
if person_bytes[i + length] >> 7 == 0: | |
break | |
else: | |
length += 1 | |
byte_value = person_bytes[i + 1 : i + 1 + length] | |
print(f"field byte value: {byte_value}") | |
if field_dict[field_number][1] == "int32": | |
field_value = 0b0 | |
for v in value_bits[::-1]: | |
field_value = (field_value << 7) | v | |
else: | |
raise NotImplementedError(f"only support int32 type for varint") | |
print(f"field value: {field_dict[field_number][0]} = {field_value}") | |
i += 1 + length | |
elif wire_type == 2: | |
# length-delimited | |
length = person_bytes[i + 1] | |
byte_value = person_bytes[i + 2 : i + 2 + length] | |
print(f"field byte value: {byte_value}") | |
if field_dict[field_number][1] == "string": | |
field_value = byte_value.decode("utf-8") | |
else: | |
raise NotImplementedError( | |
f"only support string type for length-delimited" | |
) | |
print(f"field value: {field_dict[field_number][0]} = {field_value}") | |
i += 2 + length | |
elif wire_type == 5: | |
# fixed32 | |
length = 4 | |
byte_value = person_bytes[i + 1 : i + 5] | |
print(f"field byte value: {byte_value}") | |
if field_dict[field_number][1] == "float": | |
field_value = struct.unpack("f", byte_value)[0] | |
else: | |
raise NotImplementedError(f"only support float type for fixed32") | |
print(f"field value: {field_dict[field_number][0]} = {field_value}") | |
i += 5 | |
elif wire_type in [1, 3, 4]: | |
# fixed64, start group, end group | |
raise NotImplementedError(f"wire type {wire_type} is not implemented") | |
else: | |
raise ValueError(f"Unknown wire type: {wire_type}") | |
print() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
这里假定
proto_gen
函数是根据.proto定义文件生成代码,生产和消费端共享,通常由CI过程搞定并发布一个common库;generate_proto_bytes
函数模拟生产端写入数据,并序列化为binary;而main方法中则模拟消费端,拿到binary后,反序列化的流程。本gist是参考https://victoriametrics.com/blog/go-protobuf/index.html 的Python实现。
依赖:
程序输出: