Created
February 26, 2017 22:20
-
-
Save 0xa/7e45524e48f12962939d201157f4533b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
""" | |
Quake3/UrbanTerror OOB+Netchan protocol implementation. | |
Powered by boredom, di.fm/hardcore, and my secret love for Carmack. | |
""" | |
import q3huff | |
import socket | |
import time | |
from collections import namedtuple | |
# for grepping purposes | |
debug = print | |
def xor_decode(data, challenge, seqn, last_acked=None): | |
""" Magic """ | |
last_acked = last_acked or (b'\x00' * 1024) | |
index = 0 | |
output = bytes() | |
key = (challenge ^ seqn) & 0xff | |
print('key=%02x' % key) | |
offset = 8 | |
for i, c in enumerate(data): | |
if not last_acked[index]: | |
index = 0 | |
if last_acked[index] > 127 or last_acked[index] == ord('%'): | |
key = key ^ (ord('.') << ((offset + i) & 1)) | |
else: | |
key = key ^ (last_acked[index] << ((offset + i) & 1)) | |
index += 1 | |
output += bytes([c ^ key]) | |
return output | |
def read_data(reader, len): | |
return bytes([reader.read_byte() for _ in range(len)]) | |
NcPacket = namedtuple('NcPacket', ['seq', 'ack', 'ops']) | |
NcNop = namedtuple('NcNop', ['opcode']) | |
NcGamestate = namedtuple('NcGamestate', ['opcode', 'last_server_cmd', 'content', 'client_id', 'checksum_seed']) | |
NcServerCmd = namedtuple('NcServerCmd', ['opcode', 'id', 'cmd']) | |
NcSnapshot = namedtuple('NcSnapshot', ['opcode', 'server_time', 'last_frame', 'flags', 'area_mask', 'player_state', 'entities']) | |
NcConfigString = namedtuple('NcConfigString', ['ctype', 'id', 'value']) | |
NcBaseline = namedtuple('NcBaseline', ['ctype', 'entity']) | |
NcPlayerState = namedtuple('NcPlayerState', ['fields', 'player_stats', 'pers_stats', 'ammo', 'powerups']) | |
class NcEntity(namedtuple('NcEntity', ['id', 'status', 'fields'])): | |
def __new__(cls, id, status, fields=None): | |
return super().__new__(cls, id, status, fields) | |
NC_NOP = 0x01 | |
NC_EOF = 0x08 | |
NC_GAMESTATE = 0x02 | |
NC_SERVER_CMD = 0x05 | |
NC_DOWNLOAD = 0x06 | |
NC_SNAPSHOT = 0x07 | |
NC_CONFIGSTRING = 0x03 | |
NC_BASELINE = 0x04 | |
MAX_POWERUPS = 16 | |
GENTITYNUM_BITS = 10 | |
ENTITY_FIELDS = [ | |
("pos.trTime", 32), | |
("pos.trBase[0]", 0), | |
("pos.trBase[1]", 0), | |
("pos.trDelta[0]", 0), | |
("pos.trDelta[1]", 0), | |
("pos.trBase[2]", 0), | |
("apos.trBase[1]", 0), | |
("pos.trDelta[2]", 0), | |
("apos.trBase[0]", 0), | |
("event", 10), | |
("angles2[1]", 0), | |
("eType", 8), | |
("torsoAnim", 8), | |
("eventParm", 8), | |
("legsAnim", 8), | |
("groundEntityNum", GENTITYNUM_BITS), | |
("pos.trType", 8), | |
("eFlags", 19), | |
("otherEntityNum", GENTITYNUM_BITS), | |
("weapon", 8), | |
("clientNum", 8), | |
("angles[1]", 0), | |
("pos.trDuration", 32), | |
("apos.trType", 8), | |
("origin[0]", 0), | |
("origin[1]", 0), | |
("origin[2]", 0), | |
("solid", 24), | |
("powerups", MAX_POWERUPS), | |
("modelindex", 8), | |
("otherEntityNum2", GENTITYNUM_BITS), | |
("loopSound", 8), | |
("generic1", 8), | |
("origin2[2]", 0), | |
("origin2[0]", 0), | |
("origin2[1]", 0), | |
("modelindex2", 8), | |
("angles[0]", 0), | |
("time", 32), | |
("apos.trTime", 32), | |
("apos.trDuration", 32), | |
("apos.trBase[2]", 0), | |
("apos.trDelta[0]", 0), | |
("apos.trDelta[1]", 0), | |
("apos.trDelta[2]", 0), | |
("time2", 32), | |
("angles[2]", 0), | |
("angles2[0]", 0), | |
("angles2[2]", 0), | |
("constantLight", 32), | |
("frame", 16) | |
] | |
PLAYER_STATE_FIELDS = [ | |
("commandTime", 32), | |
("origin[0]", 0), | |
("origin[1]", 0), | |
("bobCycle", 8), | |
("velocity[0]", 0), | |
("velocity[1]", 0), | |
("viewangles[1]", 0), | |
("viewangles[0]", 0), | |
("weaponTime", -16), | |
("origin[2]", 0), | |
("velocity[2]", 0), | |
("legsTimer", 8), | |
("pm_time", -16), | |
("eventSequence", 16), | |
("torsoAnim", 8), | |
("movementDir", 4), | |
("events[0]", 8), | |
("legsAnim", 8), | |
("events[1]", 8), | |
("pm_flags", 16), | |
("groundEntityNum", GENTITYNUM_BITS), | |
("weaponstate", 4), | |
("eFlags", 16), | |
("externalEvent", 10), | |
("gravity", 16), | |
("speed", 16), | |
("delta_angles[1]", 16), | |
("externalEventParm", 8), | |
("viewheight", -8), | |
("damageEvent", 8), | |
("damageYaw", 8), | |
("damagePitch", 8), | |
("damageCount", 8), | |
("generic1", 8), | |
("pm_type", 8), | |
("delta_angles[0]", 16), | |
("delta_angles[2]", 16), | |
("torsoTimer", 12), | |
("eventParms[0]", 8), | |
("eventParms[1]", 8), | |
("clientNum", 8), | |
("weapon", 5), | |
("viewangles[2]", 0), | |
("grapplePoint[0]", 0), | |
("grapplePoint[1]", 0), | |
("grapplePoint[2]", 0), | |
("jumppad_ent", 10), | |
("loopSound", 16) | |
] | |
MAX_STATS = 16 | |
MAX_PERS = 16 | |
MAX_WEAPONS = 16 | |
MAX_POWERUPS = 16 | |
STATS = [ | |
"HEALTH", | |
"HOLDABLE_ITEM", | |
"PERSISTANT_POWERUP", # missionpack | |
"WEAPONS", | |
"ARMOR", | |
"DEAD_YAW", | |
"CLIENTS_READY", | |
"MAX_HEALTH", | |
] | |
PERS = [ | |
"SCORE", | |
"HITS", | |
"RANK", | |
"TEAM", | |
"SPAWN_COUNT", | |
"PLAYEREVENTS", | |
"ATTACKER", | |
"ATTACKEE_ARMOR", | |
"KILLED", | |
"IMPRESSIVE_COUNT", | |
"EXCELLENT_COUNT", | |
"DEFEND_COUNT", | |
"ASSIST_COUNT", | |
"GAUNTLET_FRAG_COUNT", | |
"CAPTURES", | |
] | |
STATS = STATS + [("unk%02x"%i) for i in range(len(STATS), MAX_STATS)] | |
PERS = PERS + [("unk%02x"%i) for i in range(len(PERS), MAX_PERS)] | |
# Not sure of the actual names | |
WEAPONS = list(map(str, list(range(0, 16)))) | |
POWERUPS = list(map(str, list(range(0, 16)))) | |
def assert_eq(a, b): | |
msg = '%r != %r' % (a, b) | |
assert a == b, msg | |
def oob(command, payload=None): | |
data = b'\xff\xff\xff\xff' + command.encode('ascii') | |
if payload: | |
data += payload | |
return data | |
def parse_oob(data): | |
assert b'\xff\xff\xff\xff' == data[0:4] | |
data = data[4:] | |
lines = data.split(b'\n') | |
lines = [l.decode('utf-8') for l in lines] | |
while lines[-1] == '': | |
lines = lines[:-1] | |
if ' ' in lines[0]: | |
prepend = lines[0].split(' ') | |
lines = prepend + lines[1:] | |
return lines[0], lines[1:] | |
class Netchan: | |
def __init__(self, client, challenge): | |
self.client = client | |
self.challenge = challenge | |
self.prev_decoded = None | |
def parse(self, data): | |
seq = int.from_bytes(data[0:4], 'little') | |
debug("seq=0x%08x" % seq) | |
# rest is encrypted then compressed | |
# let's just decrypt, recreate a Reader, and read a long | |
# I spent two days trying to figure that shit out | |
a, b = data[0:8], data[8:] | |
b = xor_decode(b, self.challenge, seq, self.prev_decoded) | |
data = a + b | |
r = q3huff.Reader(data[4:]) | |
ack = r.read_long() | |
debug("ack=0x%08x" % ack) | |
parsers = { | |
NC_NOP: self.parse_nop, | |
NC_GAMESTATE: self.parse_gamestate, | |
NC_SERVER_CMD: self.parse_server_cmd, | |
NC_DOWNLOAD: self.parse_download, | |
NC_SNAPSHOT: self.parse_snapshot, | |
} | |
ops = [] | |
# The great infinite loop of segfaults | |
# # # explosive material # # # | |
while True: | |
opcode = r.read_byte() | |
debug("opcode=0x%02x" % opcode) | |
# EOF indicator | |
if opcode == NC_EOF: | |
break | |
if opcode not in parsers: | |
raise Exception("Unexpected opcode: 0x%02x" % opcode) | |
op = parsers[opcode](r) | |
ops.append(op) | |
packet = NcPacket(seq=seq, ack=ack, ops=ops) | |
return packet | |
def parse_nop(self, r): | |
return NcNop(opcode=NC_NOP) | |
def parse_gamestate(self, r): | |
last_server_cmd = r.read_long() | |
ctype = r.read_byte() | |
if ctype == NC_CONFIGSTRING: | |
c = NcConfigString( | |
ctype=NC_CONFIGSTRING, | |
config_id=r.read_short(), | |
config_value=r.read_string(), | |
) | |
elif ctype == NC_BASELINE: | |
c = NcBaseline( | |
ctype=NC_BASELINE, | |
entity=self.parse_entity(r), | |
) | |
else: | |
raise Exception("Unexpected gamestate content: 0x%02x" % ctype) | |
return NcGamestate( | |
opcode=NC_GAMESTATE, | |
last_server_cmd=last_server_cmd, | |
content=c, | |
client_id=r.read_long(), | |
checksum_seed=r.read_long(), | |
) | |
def parse_server_cmd(self, r): | |
return NcServerCmd( | |
opcode=NC_SERVER_CMD, | |
id=r.read_long(), | |
cmd=r.read_string(), | |
) | |
def parse_download(self, r): | |
raise NotImplementedError() | |
def parse_snapshot(self, r): | |
server_time = r.read_long() | |
last_frame = r.read_byte() | |
flags = r.read_byte() | |
mask_size = r.read_byte() | |
mask = read_data(r, mask_size) | |
player_state = self.parse_player_state(r) | |
entities = self.parse_entities(r) | |
return NcSnapshot( | |
opcode=NC_SNAPSHOT, | |
server_time=server_time, | |
last_frame=last_frame, | |
flags=flags, | |
area_mask=mask, | |
player_state=player_state, | |
entities=entities, | |
) | |
def parse_float(self, r): | |
full_float = r.read_bits(1) | |
if full_float: | |
return r.read_float() | |
else: | |
return r.read_bits(13) - 4096 | |
def parse_entity(self, r, id=None): | |
if id is None: | |
id = r.read_bits(10) | |
removed = r.read_bits(1) | |
if removed: | |
return NcEntity(id=id, status='removed') | |
changed = r.read_bits(1) | |
if not changed: | |
return NcEntity(id=id, status='unchanged') | |
fields = {} | |
n_fields = r.read_byte() | |
for i in range(0, n_fields): | |
fd = ENTITY_FIELDS[i] | |
v = self.parse_entity_field(r, fd[1]) | |
if v is not None: | |
fields[fd[0]] = v | |
return NcEntity(id=id, status='changed', fields=fields) | |
def parse_entities(self, r): | |
entities = [] | |
while True: | |
n = r.read_bits(10) | |
if n == 0x3ff: | |
break | |
e = self.parse_entity(r, n) | |
entities.append(e) | |
return entities | |
def parse_entity_field(self, r, bits): | |
field_changed = r.read_bits(1) | |
if not field_changed: | |
return None | |
not_zero = r.read_bits(1) | |
if not not_zero: | |
return 0 | |
if bits == 0: | |
return self.parse_float(r) | |
else: | |
return r.read_bits(bits) | |
def parse_field(self, r, bits): | |
field_changed = r.read_bits(1) | |
if not field_changed: | |
return None | |
if bits == 0: | |
return self.parse_float(r) | |
else: | |
return r.read_bits(bits) | |
def parse_player_state(self, r): | |
fields = {} | |
n_fields = r.read_byte() | |
print(n_fields) | |
for i in range(0, n_fields): | |
fd = PLAYER_STATE_FIELDS[i] | |
v = self.parse_field(r, fd[1]) | |
if v is not None: | |
fields[fd[0]] = v | |
def parse_values(fields, max_fields, bits): | |
changed = r.read_bits(1) | |
if not changed: | |
return {} | |
d = {} | |
mask = r.read_bits(max_fields) | |
for i in range(max_fields): | |
if mask & (1 << i): | |
d[fields[i]] = r.read_bits(bits) | |
return d | |
player_stats = {} | |
pers_stats = {} | |
ammo = {} | |
powerups = {} | |
changed_state = r.read_bits(1) | |
if changed_state: | |
player_stats = parse_values(STATS, MAX_STATS, 16) | |
pers_stats = parse_values(PERS, MAX_PERS, 16) | |
ammo = parse_values(WEAPONS, MAX_WEAPONS, 16) | |
powerups = parse_values(POWERUPS, MAX_POWERUPS, 32) | |
return NcPlayerState( | |
fields=fields, | |
player_stats=player_stats, | |
pers_stats=pers_stats, | |
ammo=ammo, | |
powerups=powerups, | |
) | |
class Client: | |
def __init__(self, host, port): | |
self.sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
self.sock.connect((host, port)) | |
self.sock.settimeout(1) | |
self.name = "AliceInBotland" | |
self.guid = "8539EC37FDC9F41EF6808BA044DA830C" | |
def status(self): | |
self.sock.send(oob('getstatus')) | |
command, lines = parse_oob(self.sock.recv(8192)) | |
cvars = lines[0].split('\\')[1:] | |
print("cvars:") | |
for i in range(0, len(cvars), 2): | |
print(' - %s: %s' % (cvars[i], cvars[i + 1])) | |
print("players:") | |
for l in lines[1:]: | |
print(' - ', l) | |
def open_netchan(self): | |
self.sock.send(oob('getchallenge')) | |
command, data = parse_oob(self.sock.recv(8192)) | |
assert_eq(command, 'challengeResponse') | |
assert_eq(len(data), 1) | |
challenge = int(data[0]) | |
cs_text = '\\challenge\\%s\\qport\\27960\\protocol\\68\\snaps\\20\\name\\%s\\rate\\10000\\sex\\male\\handicap\\100\\color2\\5\\color1\\4\\authc\\0\\cl_guid\\%s' % (challenge, self.name, self.guid) | |
print('CS:', cs_text) | |
cs = b' ' + q3huff.compress(('"' + cs_text + '"').encode('ascii')) | |
self.sock.send(oob('connect', cs)) | |
command, data = parse_oob(self.sock.recv(8192)) | |
assert_eq(command, 'connectResponse') | |
assert_eq(len(data), 0) | |
self.nc = Netchan(self, challenge) | |
return self.nc | |
def listen_loop(self): | |
while True: | |
try: | |
seq, ack, ops = self.nc.parse(self.sock.recv(8192)) | |
print('Packet: #%08x: [%08x]' % (seq, ack)) | |
for op in ops: | |
print(" - Op: %s" % op.__class__.__name__) | |
for k, v in op._asdict().items(): | |
print(" - %s: %s" % (k, repr(v))) | |
except socket.timeout: | |
pass | |
time.sleep(0.1) | |
def test_nc(): | |
nc = Netchan(0x9e12afed) | |
r = nc.parse(bytes.fromhex(open('./test_packet.hex').read().strip())) | |
print(repr(r)) | |
#test_nc() | |
if __name__ == "__main__": | |
host, port = '144.76.158.173', 27960 | |
host, port = '188.165.192.156', 27962 | |
c = Client(host, port) | |
c.status() | |
c.open_netchan() | |
c.listen_loop() | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment