Skip to content

Instantly share code, notes, and snippets.

@0xa
Created February 26, 2017 22:20
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 0xa/7e45524e48f12962939d201157f4533b to your computer and use it in GitHub Desktop.
Save 0xa/7e45524e48f12962939d201157f4533b to your computer and use it in GitHub Desktop.
#!/usr/bin/env python3
"""
Quake3/UrbanTerror OOB+Netchan protocol implementation.
Powered by boredom, di.fm/hardcore, and my secret love for Carmack.
"""
import q3huff
import socket
import time
from collections import namedtuple
# for grepping purposes
debug = print
def xor_decode(data, challenge, seqn, last_acked=None):
""" Magic """
last_acked = last_acked or (b'\x00' * 1024)
index = 0
output = bytes()
key = (challenge ^ seqn) & 0xff
print('key=%02x' % key)
offset = 8
for i, c in enumerate(data):
if not last_acked[index]:
index = 0
if last_acked[index] > 127 or last_acked[index] == ord('%'):
key = key ^ (ord('.') << ((offset + i) & 1))
else:
key = key ^ (last_acked[index] << ((offset + i) & 1))
index += 1
output += bytes([c ^ key])
return output
def read_data(reader, len):
return bytes([reader.read_byte() for _ in range(len)])
NcPacket = namedtuple('NcPacket', ['seq', 'ack', 'ops'])
NcNop = namedtuple('NcNop', ['opcode'])
NcGamestate = namedtuple('NcGamestate', ['opcode', 'last_server_cmd', 'content', 'client_id', 'checksum_seed'])
NcServerCmd = namedtuple('NcServerCmd', ['opcode', 'id', 'cmd'])
NcSnapshot = namedtuple('NcSnapshot', ['opcode', 'server_time', 'last_frame', 'flags', 'area_mask', 'player_state', 'entities'])
NcConfigString = namedtuple('NcConfigString', ['ctype', 'id', 'value'])
NcBaseline = namedtuple('NcBaseline', ['ctype', 'entity'])
NcPlayerState = namedtuple('NcPlayerState', ['fields', 'player_stats', 'pers_stats', 'ammo', 'powerups'])
class NcEntity(namedtuple('NcEntity', ['id', 'status', 'fields'])):
def __new__(cls, id, status, fields=None):
return super().__new__(cls, id, status, fields)
NC_NOP = 0x01
NC_EOF = 0x08
NC_GAMESTATE = 0x02
NC_SERVER_CMD = 0x05
NC_DOWNLOAD = 0x06
NC_SNAPSHOT = 0x07
NC_CONFIGSTRING = 0x03
NC_BASELINE = 0x04
MAX_POWERUPS = 16
GENTITYNUM_BITS = 10
ENTITY_FIELDS = [
("pos.trTime", 32),
("pos.trBase[0]", 0),
("pos.trBase[1]", 0),
("pos.trDelta[0]", 0),
("pos.trDelta[1]", 0),
("pos.trBase[2]", 0),
("apos.trBase[1]", 0),
("pos.trDelta[2]", 0),
("apos.trBase[0]", 0),
("event", 10),
("angles2[1]", 0),
("eType", 8),
("torsoAnim", 8),
("eventParm", 8),
("legsAnim", 8),
("groundEntityNum", GENTITYNUM_BITS),
("pos.trType", 8),
("eFlags", 19),
("otherEntityNum", GENTITYNUM_BITS),
("weapon", 8),
("clientNum", 8),
("angles[1]", 0),
("pos.trDuration", 32),
("apos.trType", 8),
("origin[0]", 0),
("origin[1]", 0),
("origin[2]", 0),
("solid", 24),
("powerups", MAX_POWERUPS),
("modelindex", 8),
("otherEntityNum2", GENTITYNUM_BITS),
("loopSound", 8),
("generic1", 8),
("origin2[2]", 0),
("origin2[0]", 0),
("origin2[1]", 0),
("modelindex2", 8),
("angles[0]", 0),
("time", 32),
("apos.trTime", 32),
("apos.trDuration", 32),
("apos.trBase[2]", 0),
("apos.trDelta[0]", 0),
("apos.trDelta[1]", 0),
("apos.trDelta[2]", 0),
("time2", 32),
("angles[2]", 0),
("angles2[0]", 0),
("angles2[2]", 0),
("constantLight", 32),
("frame", 16)
]
PLAYER_STATE_FIELDS = [
("commandTime", 32),
("origin[0]", 0),
("origin[1]", 0),
("bobCycle", 8),
("velocity[0]", 0),
("velocity[1]", 0),
("viewangles[1]", 0),
("viewangles[0]", 0),
("weaponTime", -16),
("origin[2]", 0),
("velocity[2]", 0),
("legsTimer", 8),
("pm_time", -16),
("eventSequence", 16),
("torsoAnim", 8),
("movementDir", 4),
("events[0]", 8),
("legsAnim", 8),
("events[1]", 8),
("pm_flags", 16),
("groundEntityNum", GENTITYNUM_BITS),
("weaponstate", 4),
("eFlags", 16),
("externalEvent", 10),
("gravity", 16),
("speed", 16),
("delta_angles[1]", 16),
("externalEventParm", 8),
("viewheight", -8),
("damageEvent", 8),
("damageYaw", 8),
("damagePitch", 8),
("damageCount", 8),
("generic1", 8),
("pm_type", 8),
("delta_angles[0]", 16),
("delta_angles[2]", 16),
("torsoTimer", 12),
("eventParms[0]", 8),
("eventParms[1]", 8),
("clientNum", 8),
("weapon", 5),
("viewangles[2]", 0),
("grapplePoint[0]", 0),
("grapplePoint[1]", 0),
("grapplePoint[2]", 0),
("jumppad_ent", 10),
("loopSound", 16)
]
MAX_STATS = 16
MAX_PERS = 16
MAX_WEAPONS = 16
MAX_POWERUPS = 16
STATS = [
"HEALTH",
"HOLDABLE_ITEM",
"PERSISTANT_POWERUP", # missionpack
"WEAPONS",
"ARMOR",
"DEAD_YAW",
"CLIENTS_READY",
"MAX_HEALTH",
]
PERS = [
"SCORE",
"HITS",
"RANK",
"TEAM",
"SPAWN_COUNT",
"PLAYEREVENTS",
"ATTACKER",
"ATTACKEE_ARMOR",
"KILLED",
"IMPRESSIVE_COUNT",
"EXCELLENT_COUNT",
"DEFEND_COUNT",
"ASSIST_COUNT",
"GAUNTLET_FRAG_COUNT",
"CAPTURES",
]
STATS = STATS + [("unk%02x"%i) for i in range(len(STATS), MAX_STATS)]
PERS = PERS + [("unk%02x"%i) for i in range(len(PERS), MAX_PERS)]
# Not sure of the actual names
WEAPONS = list(map(str, list(range(0, 16))))
POWERUPS = list(map(str, list(range(0, 16))))
def assert_eq(a, b):
msg = '%r != %r' % (a, b)
assert a == b, msg
def oob(command, payload=None):
data = b'\xff\xff\xff\xff' + command.encode('ascii')
if payload:
data += payload
return data
def parse_oob(data):
assert b'\xff\xff\xff\xff' == data[0:4]
data = data[4:]
lines = data.split(b'\n')
lines = [l.decode('utf-8') for l in lines]
while lines[-1] == '':
lines = lines[:-1]
if ' ' in lines[0]:
prepend = lines[0].split(' ')
lines = prepend + lines[1:]
return lines[0], lines[1:]
class Netchan:
def __init__(self, client, challenge):
self.client = client
self.challenge = challenge
self.prev_decoded = None
def parse(self, data):
seq = int.from_bytes(data[0:4], 'little')
debug("seq=0x%08x" % seq)
# rest is encrypted then compressed
# let's just decrypt, recreate a Reader, and read a long
# I spent two days trying to figure that shit out
a, b = data[0:8], data[8:]
b = xor_decode(b, self.challenge, seq, self.prev_decoded)
data = a + b
r = q3huff.Reader(data[4:])
ack = r.read_long()
debug("ack=0x%08x" % ack)
parsers = {
NC_NOP: self.parse_nop,
NC_GAMESTATE: self.parse_gamestate,
NC_SERVER_CMD: self.parse_server_cmd,
NC_DOWNLOAD: self.parse_download,
NC_SNAPSHOT: self.parse_snapshot,
}
ops = []
# The great infinite loop of segfaults
# # # explosive material # # #
while True:
opcode = r.read_byte()
debug("opcode=0x%02x" % opcode)
# EOF indicator
if opcode == NC_EOF:
break
if opcode not in parsers:
raise Exception("Unexpected opcode: 0x%02x" % opcode)
op = parsers[opcode](r)
ops.append(op)
packet = NcPacket(seq=seq, ack=ack, ops=ops)
return packet
def parse_nop(self, r):
return NcNop(opcode=NC_NOP)
def parse_gamestate(self, r):
last_server_cmd = r.read_long()
ctype = r.read_byte()
if ctype == NC_CONFIGSTRING:
c = NcConfigString(
ctype=NC_CONFIGSTRING,
config_id=r.read_short(),
config_value=r.read_string(),
)
elif ctype == NC_BASELINE:
c = NcBaseline(
ctype=NC_BASELINE,
entity=self.parse_entity(r),
)
else:
raise Exception("Unexpected gamestate content: 0x%02x" % ctype)
return NcGamestate(
opcode=NC_GAMESTATE,
last_server_cmd=last_server_cmd,
content=c,
client_id=r.read_long(),
checksum_seed=r.read_long(),
)
def parse_server_cmd(self, r):
return NcServerCmd(
opcode=NC_SERVER_CMD,
id=r.read_long(),
cmd=r.read_string(),
)
def parse_download(self, r):
raise NotImplementedError()
def parse_snapshot(self, r):
server_time = r.read_long()
last_frame = r.read_byte()
flags = r.read_byte()
mask_size = r.read_byte()
mask = read_data(r, mask_size)
player_state = self.parse_player_state(r)
entities = self.parse_entities(r)
return NcSnapshot(
opcode=NC_SNAPSHOT,
server_time=server_time,
last_frame=last_frame,
flags=flags,
area_mask=mask,
player_state=player_state,
entities=entities,
)
def parse_float(self, r):
full_float = r.read_bits(1)
if full_float:
return r.read_float()
else:
return r.read_bits(13) - 4096
def parse_entity(self, r, id=None):
if id is None:
id = r.read_bits(10)
removed = r.read_bits(1)
if removed:
return NcEntity(id=id, status='removed')
changed = r.read_bits(1)
if not changed:
return NcEntity(id=id, status='unchanged')
fields = {}
n_fields = r.read_byte()
for i in range(0, n_fields):
fd = ENTITY_FIELDS[i]
v = self.parse_entity_field(r, fd[1])
if v is not None:
fields[fd[0]] = v
return NcEntity(id=id, status='changed', fields=fields)
def parse_entities(self, r):
entities = []
while True:
n = r.read_bits(10)
if n == 0x3ff:
break
e = self.parse_entity(r, n)
entities.append(e)
return entities
def parse_entity_field(self, r, bits):
field_changed = r.read_bits(1)
if not field_changed:
return None
not_zero = r.read_bits(1)
if not not_zero:
return 0
if bits == 0:
return self.parse_float(r)
else:
return r.read_bits(bits)
def parse_field(self, r, bits):
field_changed = r.read_bits(1)
if not field_changed:
return None
if bits == 0:
return self.parse_float(r)
else:
return r.read_bits(bits)
def parse_player_state(self, r):
fields = {}
n_fields = r.read_byte()
print(n_fields)
for i in range(0, n_fields):
fd = PLAYER_STATE_FIELDS[i]
v = self.parse_field(r, fd[1])
if v is not None:
fields[fd[0]] = v
def parse_values(fields, max_fields, bits):
changed = r.read_bits(1)
if not changed:
return {}
d = {}
mask = r.read_bits(max_fields)
for i in range(max_fields):
if mask & (1 << i):
d[fields[i]] = r.read_bits(bits)
return d
player_stats = {}
pers_stats = {}
ammo = {}
powerups = {}
changed_state = r.read_bits(1)
if changed_state:
player_stats = parse_values(STATS, MAX_STATS, 16)
pers_stats = parse_values(PERS, MAX_PERS, 16)
ammo = parse_values(WEAPONS, MAX_WEAPONS, 16)
powerups = parse_values(POWERUPS, MAX_POWERUPS, 32)
return NcPlayerState(
fields=fields,
player_stats=player_stats,
pers_stats=pers_stats,
ammo=ammo,
powerups=powerups,
)
class Client:
def __init__(self, host, port):
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
self.sock.connect((host, port))
self.sock.settimeout(1)
self.name = "AliceInBotland"
self.guid = "8539EC37FDC9F41EF6808BA044DA830C"
def status(self):
self.sock.send(oob('getstatus'))
command, lines = parse_oob(self.sock.recv(8192))
cvars = lines[0].split('\\')[1:]
print("cvars:")
for i in range(0, len(cvars), 2):
print(' - %s: %s' % (cvars[i], cvars[i + 1]))
print("players:")
for l in lines[1:]:
print(' - ', l)
def open_netchan(self):
self.sock.send(oob('getchallenge'))
command, data = parse_oob(self.sock.recv(8192))
assert_eq(command, 'challengeResponse')
assert_eq(len(data), 1)
challenge = int(data[0])
cs_text = '\\challenge\\%s\\qport\\27960\\protocol\\68\\snaps\\20\\name\\%s\\rate\\10000\\sex\\male\\handicap\\100\\color2\\5\\color1\\4\\authc\\0\\cl_guid\\%s' % (challenge, self.name, self.guid)
print('CS:', cs_text)
cs = b' ' + q3huff.compress(('"' + cs_text + '"').encode('ascii'))
self.sock.send(oob('connect', cs))
command, data = parse_oob(self.sock.recv(8192))
assert_eq(command, 'connectResponse')
assert_eq(len(data), 0)
self.nc = Netchan(self, challenge)
return self.nc
def listen_loop(self):
while True:
try:
seq, ack, ops = self.nc.parse(self.sock.recv(8192))
print('Packet: #%08x: [%08x]' % (seq, ack))
for op in ops:
print(" - Op: %s" % op.__class__.__name__)
for k, v in op._asdict().items():
print(" - %s: %s" % (k, repr(v)))
except socket.timeout:
pass
time.sleep(0.1)
def test_nc():
nc = Netchan(0x9e12afed)
r = nc.parse(bytes.fromhex(open('./test_packet.hex').read().strip()))
print(repr(r))
#test_nc()
if __name__ == "__main__":
host, port = '144.76.158.173', 27960
host, port = '188.165.192.156', 27962
c = Client(host, port)
c.status()
c.open_netchan()
c.listen_loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment