Last active
October 26, 2019 09:31
-
-
Save dzen/b0f5270deeb8b880b36e3fdca7bfa021 to your computer and use it in GitHub Desktop.
Decode without network function
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# The idea is simple: this code know how to decode some bytes, and how many bytes it requires. | |
# but this lib must not issue any network related code (read, write) | |
# to be used with any network framework | |
class Decoder: | |
buffer: typing.Optional[bytes] | |
def __init__(self, *args, **kwargs): | |
super().__init__(*args, **kwargs) | |
self.buffer = None | |
def __iter__(self): | |
return self.decode() | |
def push(self, raw_data: bytes): | |
self.buffer = raw_data | |
def decode(self): | |
"""Decode an AVL packet""" | |
header_fmt = '!IIBB' | |
yield struct.calcsize(header_fmt) | |
_zeroes, data_length, codec_id, record_count = struct.unpack(header_fmt, self.buffer) | |
codec = mcbeacon.model.AVLCodec(codec_id) | |
if codec != mcbeacon.model.AVLCodec.CODEC8: | |
raise mcbeacon.exceptions.UnsupportedCodec(f"Unsupported codec {codec}") | |
# data_length is calculated starting from Codec ID to Number of Data 2 | |
avl_data_length = data_length - 3 | |
yield avl_data_length | |
avl_data = self.buffer | |
footer_fmt = '!BI' | |
yield struct.calcsize(footer_fmt) | |
record_count_validation, crc16 = struct.unpack(footer_fmt, self.buffer) | |
if record_count_validation != record_count: | |
raise Exception("Wrong recount ({record_count} != {record_count_validation})") | |
self.packet = mcbeacon.model.AVLPacket( | |
length=data_length, codec=mcbeacon.model.AVLCodec(codec_id), | |
record_count=record_count, | |
avl_data=avl_data, crc=crc16 | |
) | |
self.packet.check_crc() | |
def test_network_decode(): | |
message = binascii.unhexlify( | |
"000000000000003608010000016B40D8EA30010000000000000000000000000000000" | |
"105021503010101425E0F01F10000601A014E0000000000000000010000C7CF" | |
) | |
buffer = io.BytesIO(message) | |
decoder = Decoder() | |
for toread in decoder: | |
decoder.push(buffer.read(toread)) | |
packet = decoder.packet | |
assert isinstance(packet, mcbeacon.model.AVLPacket) | |
assert packet.length == int("00000036", 16) | |
assert packet.codec is mcbeacon.model.AVLCodec.CODEC8 | |
assert packet.record_count == 1 | |
assert packet.crc == int("0000C7CF", 16) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment