Skip to content

Instantly share code, notes, and snippets.

@kppullin
Last active January 7, 2022 00:27
Show Gist options
  • Save kppullin/8d3d86992ddf2585f5900585a90b8065 to your computer and use it in GitHub Desktop.
Save kppullin/8d3d86992ddf2585f5900585a90b8065 to your computer and use it in GitHub Desktop.
Python Kafka __consumer_offset binary parser/formatter
# Port of `kafka/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala -> OffsetsMessageFormatter`
# `key` and `value` are the raw byte arrays from the `__consumer_offsets` topic.
import io
import struct
key_reader = io.BytesIO(key)
value_reader = io.BytesIO(value)
key_version = struct.unpack('>h', key_reader.read(2))[0]
if 0 <= key_version <= 1:
group_len = struct.unpack('>h', key_reader.read(2))[0]
group = key_reader.read(group_len).decode('utf-8')
topic_len = struct.unpack('>h', key_reader.read(2))[0]
topic = key_reader.read(topic_len).decode('utf-8')
partition = struct.unpack('>i', key_reader.read(4))[0]
if value is None:
# tombstone
continue
value_version = struct.unpack('>h', value_reader.read(2))[0]
if 0 <= value_version <= 3:
offset = struct.unpack('>q', value_reader.read(8))[0]
if value_version >= 3:
leaderEpoch = struct.unpack('>i', value_reader.read(4))[0]
else:
leaderEpoch = 0
metadata_length = struct.unpack('>h', value_reader.read(2))[0]
metadata = value_reader.read(metadata_length).decode('utf-8')
commitTimestamp = struct.unpack('>q', value_reader.read(8))[0]
if value_version == 1:
expireTimestamp = struct.unpack('>q', value_reader.read(8))[0]
else:
expireTimestamp = -1
else:
raise Exception("Unknown version: " + value_version)
elif key_version == 2:
# Key Parsing
group_len = struct.unpack('>h', key_reader.read(2))[0]
group = key_reader.read(group_len).decode('utf-8')
# Value parsing
if value is None:
# tombstone
continue
value_version = struct.unpack('>h', value_reader.read(2))[0]
protocol_type_len = struct.unpack('>h', value_reader.read(2))[0]
protocol_type = value_reader.read(protocol_type_len).decode('utf-8')
generation = struct.unpack('>i', value_reader.read(4))[0]
protocol_len = struct.unpack('>h', value_reader.read(2))[0]
if protocol_len < 0:
protocol = None
else:
protocol = value_reader.read(protocol_len).decode('utf-8')
leader_len = struct.unpack('>h', value_reader.read(2))[0]
if leader_len < 0:
leader = None
else:
leader = value_reader.read(leader_len).decode('utf-8')
if value_version >= 2:
current_state_timestamp = struct.unpack('>q', value_reader.read(8))[0]
array_len = struct.unpack('>i', value_reader.read(4))[0]
members = []
for i in range(0, array_len):
member_id_length = struct.unpack('>h', value_reader.read(2))[0]
member_id = value_reader.read(member_id_length).decode('utf-8')
if value_version >= 3:
group_instance_id_len = struct.unpack('>h', value_reader.read(2))[0]
if group_instance_id_len < 0:
group_instance_id = None
else:
group_instance_id = value_reader.read(group_instance_id_len).decode('utf-8')
client_id_length = struct.unpack('>h', value_reader.read(2))[0]
client_id = value_reader.read(client_id_length).decode('utf-8')
client_host_length = struct.unpack('>h', value_reader.read(2))[0]
client_host = value_reader.read(client_host_length).decode('utf-8')
if value_version >= 1:
rebalance_timeout = struct.unpack('>i', value_reader.read(4))[0]
session_timeout = struct.unpack('>i', value_reader.read(4))[0]
subscription_len = struct.unpack('>i', value_reader.read(4))[0]
subscription = value_reader.read(subscription_len)
assignment_len = struct.unpack('>i', value_reader.read(4))[0]
assignment = value_reader.read(assignment_len)
members.append({
"member_id": member_id,
"group_instance_id_len": group_instance_id_len,
"client_id": client_id,
"client_host": client_host,
"rebalance_timeout": rebalance_timeout,
"session_timeout": session_timeout,
"subscription": subscription, # TODO: public static Subscription deserializeSubscription(final ByteBuffer buffer)
"assignment": assignment # TODO: public static Assignment deserializeAssignment(final ByteBuffer buffer)
})
else:
raise Exception("Unknown version: " + key_version)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment