Skip to content

Instantly share code, notes, and snippets.

@nitori
Created March 25, 2024 10:28
Show Gist options
  • Save nitori/3678ec3a8bf82683c06ad15c19b0ea35 to your computer and use it in GitHub Desktop.
Save nitori/3678ec3a8bf82683c06ad15c19b0ea35 to your computer and use it in GitHub Desktop.
Get status of some minecraft server.
import asyncio
import struct
import io
import json
def pack_varint(val):
total = b''
if val < 0:
val = (1 << 32) + val
while val >= 0x80:
bits = val & 0x7F
val >>= 7
total += struct.pack('B', (0x80 | bits))
bits = val & 0x7F
total += struct.pack('B', bits)
return total
def unpack_varint(buf: io.BytesIO):
total = 0
shift = 0
val = 0x80
while val & 0x80:
val = struct.unpack('B', buf.read(1))[0]
total |= ((val & 0x7F) << shift)
shift += 7
if total & (1 << 31):
total = total - (1 << 32)
return total
def pack_data(pack_id: int, payload: bytes) -> bytes:
data = pack_varint(pack_id) + payload
return pack_varint(len(data)) + data
async def read_packets(reader: asyncio.StreamReader):
buf = b''
while True:
chunk = await reader.read(1024)
if not chunk:
print('no data')
break
buf += chunk
tmp = io.BytesIO(buf)
try:
length = unpack_varint(tmp)
except struct.error:
print('Not enough data')
continue
buf = tmp.read() # rest of the data after the varint
if len(buf) < length:
buf += await reader.readexactly(length - len(buf))
packet_data, buf = buf[:length], buf[length:]
tmp = io.BytesIO(packet_data)
packet_id = unpack_varint(tmp)
payload = tmp.read()
yield packet_id, payload
def pack_string(s: str) -> bytes:
s = s.encode('utf-8')
return pack_varint(len(s)) + s
async def main() -> None:
server = '127.0.0.1'
port = 25565
print('connecting')
reader, writer = await asyncio.open_connection(server, port)
pack_data_handshake = pack_data(
0x00,
pack_varint(765) # protocol version, 765 = 1.20.4
+ pack_string(server)
+ struct.pack('>H', port)
+ pack_varint(1) # status
)
pack_data_status_request = pack_data(0x00, b'')
print('writing data', pack_data_handshake)
print('writing data', pack_data_status_request)
writer.write(pack_data_handshake)
writer.write(pack_data_status_request)
await writer.drain()
print('waiting for response')
async for packet_id, payload in read_packets(reader):
if packet_id == 0x00:
tmp = io.BytesIO(payload)
string_length = unpack_varint(tmp)
payload = tmp.read(string_length)
motd = json.loads(payload)
max_players = motd['players']['max']
online_players = motd['players']['online']
print('MOTD:')
print(' - name:', motd['version']['name'])
print(' - protocol:', motd['version']['protocol'])
print(f' - players: {online_players}/{max_players}')
description = motd['description']
if isinstance(description, str):
print(' - description:', description)
else:
# this is specially formatted text. would need interpreting but too lazy
print(' - description:', description)
print('closing')
if __name__ == '__main__':
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment