Skip to content

Instantly share code, notes, and snippets.

@dzen
Last active October 26, 2019 09:31
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dzen/b0f5270deeb8b880b36e3fdca7bfa021 to your computer and use it in GitHub Desktop.
Save dzen/b0f5270deeb8b880b36e3fdca7bfa021 to your computer and use it in GitHub Desktop.
Decode without network function
# The idea is simple: this code know how to decode some bytes, and how many bytes it requires.
# but this lib must not issue any network related code (read, write)
# to be used with any network framework
class Decoder:
buffer: typing.Optional[bytes]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.buffer = None
def __iter__(self):
return self.decode()
def push(self, raw_data: bytes):
self.buffer = raw_data
def decode(self):
"""Decode an AVL packet"""
header_fmt = '!IIBB'
yield struct.calcsize(header_fmt)
_zeroes, data_length, codec_id, record_count = struct.unpack(header_fmt, self.buffer)
codec = mcbeacon.model.AVLCodec(codec_id)
if codec != mcbeacon.model.AVLCodec.CODEC8:
raise mcbeacon.exceptions.UnsupportedCodec(f"Unsupported codec {codec}")
# data_length is calculated starting from Codec ID to Number of Data 2
avl_data_length = data_length - 3
yield avl_data_length
avl_data = self.buffer
footer_fmt = '!BI'
yield struct.calcsize(footer_fmt)
record_count_validation, crc16 = struct.unpack(footer_fmt, self.buffer)
if record_count_validation != record_count:
raise Exception("Wrong recount ({record_count} != {record_count_validation})")
self.packet = mcbeacon.model.AVLPacket(
length=data_length, codec=mcbeacon.model.AVLCodec(codec_id),
record_count=record_count,
avl_data=avl_data, crc=crc16
)
self.packet.check_crc()
def test_network_decode():
message = binascii.unhexlify(
"000000000000003608010000016B40D8EA30010000000000000000000000000000000"
"105021503010101425E0F01F10000601A014E0000000000000000010000C7CF"
)
buffer = io.BytesIO(message)
decoder = Decoder()
for toread in decoder:
decoder.push(buffer.read(toread))
packet = decoder.packet
assert isinstance(packet, mcbeacon.model.AVLPacket)
assert packet.length == int("00000036", 16)
assert packet.codec is mcbeacon.model.AVLCodec.CODEC8
assert packet.record_count == 1
assert packet.crc == int("0000C7CF", 16)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment