Skip to content

Instantly share code, notes, and snippets.

@joodicator
Created May 17, 2018 01:41
Show Gist options
  • Save joodicator/20391386937de06ad3db9c436bba777f to your computer and use it in GitHub Desktop.
Save joodicator/20391386937de06ad3db9c436bba777f to your computer and use it in GitHub Desktop.
Analyse Minecraft Wireshark YAML TCP stream dumps using pyCraft
#!/usr/bin/env python
from __future__ import print_function
import sys
import os
import re
import io
import base64
import zlib
import yaml as y
from minecraft import SUPPORTED_PROTOCOL_VERSIONS
from minecraft.networking import types, packets, connection
from minecraft.networking.packets import clientbound, serverbound
CLIENT, SERVER = 0, 1
HANDSHAKE, STATUS, LOGIN, PLAY = 0, 1, 2, 3
PEER_RE = re.compile(r'peer(\d+)_(\d+)$')
def parse_file(file):
events = y.parse(file, y.SafeLoader)
for etype in y.StreamStartEvent, y.DocumentStartEvent, y.MappingStartEvent:
event = next(events)
assert isinstance(event, etype), 'not isinstance(%r, %r)' % (event, etype)
streams = {CLIENT: io.BytesIO(), SERVER: io.BytesIO()}
counts = {CLIENT: 0, SERVER: 0}
context = connection.ConnectionContext(
protocol_version=max(SUPPORTED_PROTOCOL_VERSIONS))
compression = False
state = HANDSHAKE
packet_map = get_packets(context)
for event1 in events:
assert isinstance(event1, y.ScalarEvent), \
'not isinstance(%r, y.ScalarEvent)' % event1
peer, index = map(int, PEER_RE.match(event1.value).groups())
assert counts[peer] == index, \
'counts[%r] == %r != %r' % (peer, counts[peer], index)
counts[peer] += 1
event2 = next(events)
assert isinstance(event2, y.ScalarEvent), \
'not isinstance(%r, y.ScalarEvent)' % event2
assert event2.tag == 'tag:yaml.org,2002:binary'
streams[peer].seek(len(streams[peer].getbuffer()))
streams[peer].write(base64.b64decode(event2.value))
streams[peer].seek(0)
try:
while True:
for packet in read_packet(
stream = streams[peer],
compression = compression,
packet_map = packet_map[peer][state],
context = context
):
if packet is None: break
streams[peer] = io.BytesIO(streams[peer].read())
if isinstance(packet, serverbound.handshake.HandShakePacket):
assert state == HANDSHAKE
context.protocol_version = packet.protocol_version
packet_map = get_packets(context)
state = packet.next_state
elif isinstance(packet, clientbound.login.LoginSuccessPacket):
assert state == LOGIN
state = PLAY
elif packet.packet_name == 'set compression':
assert state in (LOGIN, PLAY)
compression = packet.threshold > 0
print('%s %s' % ('<--' if peer == CLIENT else '-->', packet))
if packet is None: break
except:
print('<--' if peer == CLIENT else '-->', end=' ***Exception:\n')
raise
unused = { p: s.read() for (p, s) in stream }
if any(len(s) > 0 for s in unused.values()):
print('Unused data at end of stream: %r' % unused)
def read_packet(stream, compression, packet_map, context):
try:
for buffer in read_packet_buffer(stream, compression):
if buffer is not None: break
yield None
except zlib.error as e:
print('*** Exception: %s' % e)
packet = packets.Packet(context)
else:
packet_id = next(read_varint(buffer))
assert packet_id is not None
if packet_id in packet_map:
packet = packet_map[packet_id](context)
packet.read(buffer)
else:
packet = packets.Packet(context, id=packet_id)
yield packet
def read_packet_buffer(stream, compression):
for length in read_varint(stream):
if length is not None: break
yield None
buffer = packets.PacketBuffer()
while True:
buffer.send(stream.read(length - len(buffer.get_writable())))
if len(buffer.get_writable()) >= length: break
yield None
assert len(buffer.get_writable()) == length
buffer.reset_cursor()
if compression:
data_length = next(read_varint(buffer))
assert data_length is not None
if data_length > 0:
data = zlib.decompress(buffer.read())
assert len(data) == data_length
buffer.reset()
buffer.send(data)
buffer.reset_cursor()
yield buffer
def read_varint(stream):
value = 0
for n in range(5):
while True:
byte = stream.read(1)
if len(byte) > 0: break
yield None
value |= (byte[0] & 0x7F) << 7*n
if byte[0] & 0x80 == 0: break
else:
raise Exception('VarInt overflow.')
yield value
def get_packets(context):
return {bk: {sk: {p.get_id(context): p for p in sm.get_packets(context)}
for (sk,sm) in ((HANDSHAKE, bm.handshake),
(LOGIN, bm.login),
(STATUS, bm.status),
(PLAY, bm.play))}
for (bk,bm) in ((CLIENT,serverbound), (SERVER,clientbound))}
def main(*args):
if not args:
parse_file(sys.stdin)
for arg in args:
print('%s:' % arg)
with open(arg) as file:
parse_file(file)
if __name__ == '__main__':
main(*sys.argv[1:])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment