Created
March 25, 2024 10:28
-
-
Save nitori/3678ec3a8bf82683c06ad15c19b0ea35 to your computer and use it in GitHub Desktop.
Get status of some minecraft server.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncio | |
import struct | |
import io | |
import json | |
def pack_varint(val): | |
total = b'' | |
if val < 0: | |
val = (1 << 32) + val | |
while val >= 0x80: | |
bits = val & 0x7F | |
val >>= 7 | |
total += struct.pack('B', (0x80 | bits)) | |
bits = val & 0x7F | |
total += struct.pack('B', bits) | |
return total | |
def unpack_varint(buf: io.BytesIO): | |
total = 0 | |
shift = 0 | |
val = 0x80 | |
while val & 0x80: | |
val = struct.unpack('B', buf.read(1))[0] | |
total |= ((val & 0x7F) << shift) | |
shift += 7 | |
if total & (1 << 31): | |
total = total - (1 << 32) | |
return total | |
def pack_data(pack_id: int, payload: bytes) -> bytes: | |
data = pack_varint(pack_id) + payload | |
return pack_varint(len(data)) + data | |
async def read_packets(reader: asyncio.StreamReader): | |
buf = b'' | |
while True: | |
chunk = await reader.read(1024) | |
if not chunk: | |
print('no data') | |
break | |
buf += chunk | |
tmp = io.BytesIO(buf) | |
try: | |
length = unpack_varint(tmp) | |
except struct.error: | |
print('Not enough data') | |
continue | |
buf = tmp.read() # rest of the data after the varint | |
if len(buf) < length: | |
buf += await reader.readexactly(length - len(buf)) | |
packet_data, buf = buf[:length], buf[length:] | |
tmp = io.BytesIO(packet_data) | |
packet_id = unpack_varint(tmp) | |
payload = tmp.read() | |
yield packet_id, payload | |
def pack_string(s: str) -> bytes: | |
s = s.encode('utf-8') | |
return pack_varint(len(s)) + s | |
async def main() -> None: | |
server = '127.0.0.1' | |
port = 25565 | |
print('connecting') | |
reader, writer = await asyncio.open_connection(server, port) | |
pack_data_handshake = pack_data( | |
0x00, | |
pack_varint(765) # protocol version, 765 = 1.20.4 | |
+ pack_string(server) | |
+ struct.pack('>H', port) | |
+ pack_varint(1) # status | |
) | |
pack_data_status_request = pack_data(0x00, b'') | |
print('writing data', pack_data_handshake) | |
print('writing data', pack_data_status_request) | |
writer.write(pack_data_handshake) | |
writer.write(pack_data_status_request) | |
await writer.drain() | |
print('waiting for response') | |
async for packet_id, payload in read_packets(reader): | |
if packet_id == 0x00: | |
tmp = io.BytesIO(payload) | |
string_length = unpack_varint(tmp) | |
payload = tmp.read(string_length) | |
motd = json.loads(payload) | |
max_players = motd['players']['max'] | |
online_players = motd['players']['online'] | |
print('MOTD:') | |
print(' - name:', motd['version']['name']) | |
print(' - protocol:', motd['version']['protocol']) | |
print(f' - players: {online_players}/{max_players}') | |
description = motd['description'] | |
if isinstance(description, str): | |
print(' - description:', description) | |
else: | |
# this is specially formatted text. would need interpreting but too lazy | |
print(' - description:', description) | |
print('closing') | |
if __name__ == '__main__': | |
asyncio.run(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment