Last active
May 24, 2017 00:30
-
-
Save gyunderscorebe/16adacbb993d3e87c657d4c7d52fd1fe to your computer and use it in GitHub Desktop.
AssaultCube server query
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
from __future__ import print_function | |
import argparse | |
import re | |
import socket | |
class InvalidData(Exception): | |
pass | |
class ReadBuffer: | |
def __init__(self, data): | |
self._data = data | |
self._position = 0 | |
def has_more(self): | |
return self._position < len(self._data) | |
def get_uchar(self): | |
if self.has_more(): | |
uchar = self._data[self._position] | |
try: | |
uchar = ord(uchar) | |
except TypeError: | |
pass | |
self._position += 1 | |
return uchar | |
else: | |
raise InvalidData("Message is too short") | |
def get_int(self): | |
uchar = self.get_uchar() | |
if uchar == 0x80: | |
ushort = self.get_uchar() | self.get_uchar() << 8 | |
return ushort if ushort < 0x8000 else ushort - 0x10000 | |
elif uchar == 0x81: | |
uint = self.get_uchar() | self.get_uchar() << 8 | self.get_uchar() << 16 | self.get_uchar() << 24 | |
return uint if uint < 0x80000000 else uint - 0x100000000 | |
else: | |
return uchar if uchar < 0x80 else uchar - 0x100 | |
def get_string(self): | |
start_position = self._position | |
while self.has_more(): | |
if self.get_uchar() == 0: | |
break | |
return self._data[start_position:self._position - 1].decode("ascii", "ignore") | |
mode_names = [ | |
"DEMO", "TDM", "coop", "DM", "SURV", "TSURV", "CTF", "PF", "BTDM", "BDM", "LSS", | |
"OSOK", "TOSOK", "BOSOK", "HTF", "TKTF", "KTF", "TPF", "TLSS", "BPF", "BLSS", "BTSURV", "BTOSOK" | |
] | |
mastermode_names = [ | |
"public", "private", "match" | |
] | |
gun_names = [ | |
"knife", "pistol", "carbine", "shotgun", "smg", "sniper rifle", "assault rifle", "combat pistol", "grenade", "akimbo" | |
] | |
state_names = [ | |
"alive", "dead", "spawning", "lagged", "editing", "spectating" | |
] | |
def filter_text(s): | |
return re.sub("\f.", "", s) | |
def main(): | |
parser = argparse.ArgumentParser(description="A tool for fetching match status from a server") | |
parser.add_argument("host", help="Server host") | |
parser.add_argument("port", help="Server port", type=int, default=28763) | |
args = parser.parse_args() | |
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) | |
address = (args.host, args.port + 1) | |
try: | |
print("Querying server {}:{}".format(args.host, args.port)) | |
print() | |
sock.sendto(b"\x01\x02\x65\x6E", address) | |
data, server = sock.recvfrom(4096) | |
buf = ReadBuffer(data[4:]) | |
buf.get_int() # Protocol, should be 1201 | |
mode = buf.get_int() | |
buf.get_int() # Current number of players, will get it from extended info later | |
minutes_remaining = buf.get_int() | |
map_name = buf.get_string() | |
server_description = filter_text(buf.get_string()) | |
max_players = buf.get_int() | |
flags = buf.get_int() | |
mastermode = flags >> 6 | |
has_password = flags & 1 == 1 | |
buf.get_int() # Query type, should be 2 | |
info_language = buf.get_string() | |
server_info = "" | |
if info_language != "": | |
lines = [] | |
while True: | |
line = buf.get_string() | |
if line == "": | |
break | |
else: | |
lines.append(filter_text(line)) | |
server_info = "\n".join(lines) | |
sock.sendto(b"\x00\x01\xFF", address) | |
data, server = sock.recvfrom(4096) | |
buf = ReadBuffer(data[3:]) | |
buf.get_int() # Ack, should be -1 | |
buf.get_int() # Version, should be 104 | |
buf.get_int() # Error flag, should be 0 | |
buf.get_int() # Response type, should be -10 | |
players = 0 | |
while buf.has_more(): | |
buf.get_int() # Client number | |
players += 1 | |
print("Server description: {}".format(server_description)) | |
print("Extended server information:") | |
print("\n" + server_info if server_info != "" else "(none provided)") | |
print() | |
print("Players: {}/{}".format(players, max_players)) | |
print("Map: {}".format(map_name if players != 0 else "(unavailable)")) | |
print("Mode: {}".format(mode_names[mode + 1] if players != 0 else "(unavailable)")) | |
print("Minutes remaining: {}".format(minutes_remaining if players != 0 else "(unavailable)")) | |
print("Mastermode: {}".format(mastermode_names[mastermode])) | |
print("Password protected: {}".format("yes" if has_password else "no")) | |
for _ in range(players): | |
data, server = sock.recvfrom(4096) | |
buf = ReadBuffer(data[6:]) | |
while buf.has_more(): | |
buf.get_int() # Response type, should be -11 | |
cn = buf.get_int() | |
ping = buf.get_int() | |
name = buf.get_string() | |
team = buf.get_string() | |
frags = buf.get_int() | |
flags = buf.get_int() | |
deaths = buf.get_int() | |
teamkills = buf.get_int() | |
accuracy = buf.get_int() | |
health = buf.get_int() | |
armour = buf.get_int() | |
gun = buf.get_int() | |
role = buf.get_int() | |
state = buf.get_int() | |
ip = "{}.{}.{}.x".format(buf.get_uchar(), buf.get_uchar(), buf.get_uchar()) | |
print() | |
print("Player {} ({})".format(name, cn)) | |
print(" IP: {}".format(ip)) | |
print(" Admin: {}".format("yes" if role == 1 else "no")) | |
print(" Ping: {}".format(ping)) | |
print(" Team: {}".format(team)) | |
print(" Frags: {}".format(frags)) | |
print(" Flags: {}".format(flags)) | |
print(" Deaths: {}".format(deaths)) | |
print(" Teamkills: {}".format(teamkills)) | |
print(" Accuracy: {}".format(accuracy)) | |
print(" Health/armour: {}/{}".format(health, armour)) | |
print(" Weapon: {}".format(gun_names[gun])) | |
print(" Status: {}".format(state_names[state])) | |
finally: | |
sock.close() | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment