Skip to content

Instantly share code, notes, and snippets.

@stkptr
Last active June 22, 2024 11:41
Show Gist options
  • Save stkptr/712d3bdbd4d300bbfce13ad60b2cff17 to your computer and use it in GitHub Desktop.
Save stkptr/712d3bdbd4d300bbfce13ad60b2cff17 to your computer and use it in GitHub Desktop.
Minecraft (Java and Bedrock) LAN spoofer/pinger
#!/usr/bin/env python3
import hashlib
import os
import argparse
import select
import socket
import time
import random
import hmac
import io
from collections import namedtuple
import warnings
import struct
import re
from cryptography.hazmat.primitives.ciphers import Cipher, algorithms, modes
# written by stkptr in 2024
# this file is available under the Unlicense, CC0, or MIT, at your discretion
# Java
# https://github.com/mpontillo/minecraft-lan-announce/
# RakNet information
# https://wiki.bedrock.dev/servers/raknet-and-mcpe.html
# https://wiki.vg/Raknet_Protocol
# NetherNet information
# https://github.com/df-mc/nethernet-spec
pepper = os.urandom(64)
# this hides any IP address that is printed to the terminal
# this was used since I (stkptr) requested people to run this script
# it's not strictly necessary as they should all be private addresses
# but some prefer to not have their private addresses leaked
def hide_ip(address, length=8):
m = hashlib.sha256()
m.update(address.encode('utf8'))
m.update(pepper) # use a pepper to prevent rainbow table attacks
return m.hexdigest()[:length]
def wassert(cond):
if not cond:
warnings.warn('Assertion failed', stacklevel=2)
epoch = time.monotonic_ns() / 1000
# random ID
guid = random.randrange(0, 0x7FFFFFFFFFFFFFFF)
# to big endian
to_be = lambda n, c=8: bytes(
[(n >> (8 * (c - i - 1))) & 0xFF for i in range(c)])
# from big endian
from_be = lambda b: sum(
[v << (8 * (len(b) - i - 1)) for i, v in enumerate(b)])
# to little endian
to_le = lambda n, c=8: bytes(
[(n >> (8 * i)) & 0xFF for i in range(c)])
# from little endian
from_le = lambda b: sum(
[v << (8 * i) for i, v in enumerate(b)])
def socket_loop(
socket,
reader=lambda peer, data: None,
ping=lambda: None,
ping_interval=1,
poll_timeout=0.5):
last_ping = time.monotonic()
while True:
# ping periodically
now = time.monotonic()
if now - last_ping >= ping_interval:
ping()
last_ping = now
# check if there's some data on the socket
read, _, _ = select.select([socket], [], [socket], poll_timeout)
for r in read:
bufsize = 4096
data, peer = r.recvfrom(bufsize)
reader(peer, data)
JavaAnnouncement = namedtuple('JavaAnnouncement', 'motd port')
def java_encode(packet):
return f'[MOTD]{packet.motd}[/MOTD][AD]{packet.port}[/AD]'.encode('utf8')
def java_decode(packet):
# the format is more flexible than this, but this catches typical games
matched = re.match(
r'\[MOTD\]([^[]+)\[/MOTD\]\[AD\]([0-9]+)\[/AD\]',
packet.decode('utf8')
)
if matched:
return JavaAnnouncement(matched.group(1), int(matched.group(2)))
def java_announce():
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
def ping():
print('Send ping')
packet = java_encode(JavaAnnouncement('Custom LAN Game', 25565))
send.sendto(packet, ('224.0.2.60', 4445))
ping()
socket_loop(send, ping=ping, ping_interval=1.5)
def java_query():
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
mreq = struct.pack("4sl", socket.inet_aton('224.0.2.60'), socket.INADDR_ANY)
listen.setsockopt(socket.IPPROTO_IP, socket.IP_ADD_MEMBERSHIP, mreq)
listen.bind(('', 4445))
def receive_data(peer, data):
print(
f'announcement {hide_ip(peer[0])}:{peer[1]} = {java_decode(data)}'
)
socket_loop(listen, receive_data)
# this is a magic number used by Raknet
# see https://wiki.bedrock.dev/servers/raknet-and-mcpe.html
raknet_magic = bytes([
0x00, 0xff, 0xff, 0x00,
0xfe, 0xfe, 0xfe, 0xfe,
0xfd, 0xfd, 0xfd, 0xfd,
0x12, 0x34, 0x56, 0x78,
])
UnconnectedPing = namedtuple('UnconnectedPing', 'id uptime')
UnconnectedPong = namedtuple('UnconnectedPong', [
'id',
'uptime',
'edition',
'server_name',
'protocol_version',
'version_number',
'player_count',
'max_player_count',
'level_name',
'game_mode_name',
'game_mode',
'portv4',
'portv6'
])
def raknet_encode(packet):
p = io.BytesIO()
if isinstance(packet, UnconnectedPing):
p.write(to_be(0x01, 1))
p.write(to_be(packet.uptime, 8))
p.write(raknet_magic)
p.write(to_be(packet.id, 8))
elif isinstance(packet, UnconnectedPong):
p.write(to_be(0x1c, 1))
p.write(to_be(packet.uptime, 8))
p.write(to_be(packet.id, 8))
p.write(raknet_magic)
s = [
packet.edition,
packet.server_name,
str(packet.protocol_version),
packet.version_number,
str(packet.player_count),
str(packet.max_player_count),
str(packet.id),
packet.level_name,
packet.game_mode_name,
str(packet.game_mode),
str(packet.portv4),
str(packet.portv6)
]
response_s = ';'.join(s) + ';'
p.write(to_be(len(response_s), 2))
p.write(response_s.encode('utf8'))
p.seek(0)
return p.read()
def raknet_decode(packet):
p = io.BytesIO(packet)
ptype = from_be(p.read(1))
if ptype == 0x01:
uptime = from_be(p.read(8))
wassert(raknet_magic == p.read(16))
uid = from_be(p.read(8))
return UnconnectedPing(uid, uptime)
elif ptype == 0x1c:
uptime = from_be(p.read(8))
uid = from_be(p.read(8))
wassert(raknet_magic == p.read(16))
strlen = from_be(p.read(2))
s = p.read().decode('utf8').split(';')
return UnconnectedPong(
uid,
uptime,
s[0],
s[1],
int(s[2]),
s[3],
int(s[4]),
int(s[5]),
s[7],
s[8],
int(s[9]),
int(s[10]),
int(s[11])
)
# this announces a server, responding to pings
def raknet_announce():
print('Announcing a server to the network')
# listener socket
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
listen.bind(('', 19132))
# unconnected pong
def ping_response(peer, data):
print(
f'query from {hide_ip(peer[0])}:{peer[1]} = {raknet_decode(data)}'
)
if data[0] != 0x01:
return
decoded = raknet_decode(data)
response = raknet_encode(UnconnectedPong(
guid,
decoded.uptime,
'MCPE',
'Custom LAN Game',
649,
'1.20.62',
3,
11,
'Game Level',
'Survival',
1,
19132,
19133
))
listen.sendto(response, peer)
socket_loop(listen, ping_response)
def raknet_query(broadcast):
print('Querying the network for servers')
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
send.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def ping():
print('Send ping')
ping_message = raknet_encode(UnconnectedPing(
guid,
int(time.monotonic_ns() / 1000 - epoch)
))
send.sendto(ping_message, (broadcast, 19132))
def receive_packet(peer, data):
print(
'response from'
f' {hide_ip(peer[0])}:{peer[1]} = {raknet_decode(data)}'
)
ping()
socket_loop(send, receive_packet, ping, ping_interval=1)
nethernet_key = hashlib.sha256(to_le(0xDEADBEEF, 8)).digest()
def encrypt(data):
cipher = Cipher(algorithms.AES(nethernet_key), modes.ECB())
encryptor = cipher.encryptor()
extra = (16 - len(data) % 16)
extra = extra if extra else 16
# PKCS5/7
toenc = data + bytes([extra] * extra)
return encryptor.update(toenc) + encryptor.finalize()
def decrypt(data):
cipher = Cipher(algorithms.AES(nethernet_key), modes.ECB())
decryptor = cipher.decryptor()
d = decryptor.update(data) + decryptor.finalize()
return d[:-d[-1]]
def checksum(data):
return hmac.digest(nethernet_key, data, 'sha256')
DiscoveryRequestPacket = namedtuple('DiscoveryRequestPacket', 'id')
DiscoveryResponsePacket = namedtuple('DiscoveryResponsePacket', [
'id',
'version',
'server_name',
'level_name',
'game_type',
'player_count',
'max_player_count',
'is_editor_world',
'transport_layer'
])
DiscoveryMessagePacket = namedtuple('DiscoveryMessagePacket', 'id recipient_id data')
def write_string(b, s, lenbytes=1):
b.write(to_le(len(s), lenbytes))
b.write(s)
def read_string(b, lenbytes=1):
l = from_le(b.read(lenbytes))
return b.read(l)
def nethernet_encode(packet):
p = io.BytesIO()
ptype = (
0 if isinstance(packet, DiscoveryRequestPacket) else
1 if isinstance(packet, DiscoveryResponsePacket) else
2 if isinstance(packet, DiscoveryMessagePacket) else
None
)
p.write(to_le(ptype, 2)) # type
p.write(to_le(packet.id, 8))
p.write(to_le(0, 8)) # padding
if isinstance(packet, DiscoveryRequestPacket):
pass # nothing else to encode
elif isinstance(packet, DiscoveryResponsePacket):
h = io.BytesIO()
h.write(to_le(packet.version, 1))
write_string(h, packet.server_name.encode('utf8'), 1)
write_string(h, packet.level_name.encode('utf8'), 1)
h.write(to_le(packet.game_type, 4))
h.write(to_le(packet.player_count, 4))
h.write(to_le(packet.max_player_count, 4))
h.write(to_le(int(packet.is_editor_world), 1))
h.write(to_le(packet.transport_layer, 4))
h.seek(0)
write_string(p, h.read().hex().encode('ascii'), 4)
elif isinstance(packet, DiscoveryMessagePacket):
p.write(to_le(packet.recipient_id, 8))
write_string(p, packet.data.encode('utf8'), 4)
length = p.tell() + 2
p.seek(0)
p = bytes([length & 0xFF, length >> 8]) + p.read()
return checksum(p) + encrypt(p)
def nethernet_decode(packet):
raw = io.BytesIO(packet)
cs = raw.read(32)
dec = io.BytesIO(decrypt(raw.read()))
wassert(cs == checksum(dec.read())) # checksum verification
plen = dec.tell()
dec.seek(0)
wassert(plen == from_le(dec.read(2))) # length verification
ptype = from_le(dec.read(2))
sid = from_le(dec.read(8))
wassert(dec.read(8) == bytes([0] * 8)) # padding
if ptype == 0:
return DiscoveryRequestPacket(sid)
elif ptype == 1:
hexdata = read_string(dec, 4)
dat = io.BytesIO(bytes.fromhex(hexdata.decode('ascii')))
version = from_le(dat.read(1))
server_name = read_string(dat).decode('utf8')
level_name = read_string(dat).decode('utf8')
game_type = from_le(dat.read(4))
player_count = from_le(dat.read(4))
max_players = from_le(dat.read(4))
is_editor = bool(from_le(dat.read(1)))
transport_layer = from_le(dat.read(4))
return DiscoveryResponsePacket(
sid,
version,
server_name,
level_name,
game_type,
player_count,
max_players,
is_editor,
transport_layer
)
elif ptype == 2:
recipient = from_le(dec.read(8))
data = read_string(dec, 4).decode('utf8')
return DiscoveryMessagePacket(sid, recipient, data)
def nethernet_announce():
print('Announcing a NetherNet server to the network')
# listener socket
listen = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
listen.bind(('', 7551))
def ping_response(peer, data):
decoded = nethernet_decode(data)
if decoded.id == guid:
return
print(f'query from {hide_ip(peer[0])}:{peer[1]} = {decoded}')
packet = DiscoveryResponsePacket(
guid,
2,
'Custom LAN Game',
'Game Level',
1,
3,
11,
False,
2
)
listen.sendto(nethernet_encode(packet), (peer[0], 7551))
socket_loop(listen, ping_response)
def nethernet_query(broadcast):
print('Querying the network for NetherNet servers')
send = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP)
send.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
def ping():
print('Send ping')
packet = nethernet_encode(DiscoveryRequestPacket(guid))
send.sendto(packet, (broadcast, 7551))
def receive_packet(peer, data):
decoded = nethernet_decode(data)
if decoded.id != guid:
print(f'response from {hide_ip(peer[0])}:{peer[1]} = {decoded}')
ping()
socket_loop(send, receive_packet, ping, ping_interval=2)
def main():
parser = argparse.ArgumentParser(
description='Spoof or search for Minecraft servers on the network.')
parser.add_argument('--java', '-j', action='store_true',
help='If provided, spoof/scan for Java instead of Bedrock servers')
parser.add_argument('--nethernet', '-n', action='store_true',
help='If provided, use NetherNet instead of Raknet protocol')
parser.add_argument('--broadcast', '-b', default='255.255.255.255',
help='Address to send broadcast packets to')
parser.add_argument('mode', choices=['announce', 'query'],
help='If announce, present a fake server to the network. '
'Otherwise, if query, ping for servers on the network.')
args = parser.parse_args()
print(f'Our GUID is {guid}')
try:
if args.java:
if args.mode == 'announce':
java_announce()
elif args.mode == 'query':
java_query()
elif args.nethernet:
if args.mode == 'announce':
nethernet_announce()
elif args.mode == 'query':
nethernet_query(args.broadcast)
else:
if args.mode == 'announce':
raknet_announce()
elif args.mode == 'query':
raknet_query(args.broadcast)
except KeyboardInterrupt:
print('KeyboardInterrupt, exiting')
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment