Skip to content

Instantly share code, notes, and snippets.

@reata
Created March 30, 2025 06:37
解码Protocol Buffer
def proto_gen(proto: str):
"""
create a proto file and generate the python code
"""
import os
import subprocess
proto_path = "src"
if not os.path.exists(proto_path):
os.mkdir(proto_path)
proto_file_path = os.path.join(proto_path, "person.proto")
with open(proto_file_path, "w") as f:
f.write(proto)
subprocess.run(
["protoc", f"--proto_path={proto_path}", "--python_out=.", proto_file_path]
)
def generate_proto_bytes(pid: int, pname: str, pheight: float) -> bytes:
"""
generate the bytes of a person object
"""
import person_pb2
person = person_pb2.Person()
person.id = pid
person.name = pname
person.height = pheight
return person.SerializeToString()
if __name__ == "__main__":
proto_str = """syntax = "proto3";
message Person {
string name = 1;
int32 id = 2;
float height = 3;
}
"""
proto_gen(proto_str)
_id, name, height = 300, "Phuong Le", 1.75
person_bytes: bytes = generate_proto_bytes(_id, name, height)
print(f"Person object bytes: \n{person_bytes}\n")
import re
import struct
# extract the field lines in parentheses from the proto string
field_dict = {
int(field_number): (field_name, field_type)
for field_type, field_name, field_number in re.findall(
r"(\S*) (\S*) = (.*);\n", proto_str
)
}
print("Describe each byte of the person object:")
for i, b in enumerate(person_bytes):
print(person_bytes[i : i + 1], end="\t")
# print the binary representation of the byte
print(format(b, "08b"), end="\t")
# print the decimal representation of the byte
print(b)
print()
i = 0
while i < len(person_bytes):
first_byte = person_bytes[i]
# first 5 bits is field number and last 3 bits is wire type
field_number = first_byte >> 3
wire_type = first_byte & 0b00000111
print(f"field_number: {field_number}, wire_type: {wire_type}")
if wire_type == 0:
# varint
length = 1
value_bits = []
# the most significant bit is a continuation bit, the value is in the lower 7 bits
while True:
value = person_bytes[i + length] & 0b01111111
value_bits.append(value)
if person_bytes[i + length] >> 7 == 0:
break
else:
length += 1
byte_value = person_bytes[i + 1 : i + 1 + length]
print(f"field byte value: {byte_value}")
if field_dict[field_number][1] == "int32":
field_value = 0b0
for v in value_bits[::-1]:
field_value = (field_value << 7) | v
else:
raise NotImplementedError(f"only support int32 type for varint")
print(f"field value: {field_dict[field_number][0]} = {field_value}")
i += 1 + length
elif wire_type == 2:
# length-delimited
length = person_bytes[i + 1]
byte_value = person_bytes[i + 2 : i + 2 + length]
print(f"field byte value: {byte_value}")
if field_dict[field_number][1] == "string":
field_value = byte_value.decode("utf-8")
else:
raise NotImplementedError(
f"only support string type for length-delimited"
)
print(f"field value: {field_dict[field_number][0]} = {field_value}")
i += 2 + length
elif wire_type == 5:
# fixed32
length = 4
byte_value = person_bytes[i + 1 : i + 5]
print(f"field byte value: {byte_value}")
if field_dict[field_number][1] == "float":
field_value = struct.unpack("f", byte_value)[0]
else:
raise NotImplementedError(f"only support float type for fixed32")
print(f"field value: {field_dict[field_number][0]} = {field_value}")
i += 5
elif wire_type in [1, 3, 4]:
# fixed64, start group, end group
raise NotImplementedError(f"wire type {wire_type} is not implemented")
else:
raise ValueError(f"Unknown wire type: {wire_type}")
print()
@reata
Copy link
Author

reata commented Mar 30, 2025

这里假定proto_gen函数是根据.proto定义文件生成代码,生产和消费端共享,通常由CI过程搞定并发布一个common库;generate_proto_bytes函数模拟生产端写入数据,并序列化为binary;而main方法中则模拟消费端,拿到binary后,反序列化的流程。

本gist是参考https://victoriametrics.com/blog/go-protobuf/index.html 的Python实现。

依赖:

apt install -y protobuf-compiler
pip install protobuf

程序输出:

Person object bytes: 
b'\n\tPhuong Le\x10\xac\x02\x1d\x00\x00\xe0?'

Describe each byte of the person object:
b'\n'	00001010	10
b'\t'	00001001	9
b'P'	01010000	80
b'h'	01101000	104
b'u'	01110101	117
b'o'	01101111	111
b'n'	01101110	110
b'g'	01100111	103
b' '	00100000	32
b'L'	01001100	76
b'e'	01100101	101
b'\x10'	00010000	16
b'\xac'	10101100	172
b'\x02'	00000010	2
b'\x1d'	00011101	29
b'\x00'	00000000	0
b'\x00'	00000000	0
b'\xe0'	11100000	224
b'?'	00111111	63

field_number: 1, wire_type: 2
field byte value: b'Phuong Le'
field value: name = Phuong Le

field_number: 2, wire_type: 0
field byte value: b'\xac\x02'
field value: id = 300

field_number: 3, wire_type: 5
field byte value: b'\x00\x00\xe0?'
field value: height = 1.75

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment