Skip to content

Instantly share code, notes, and snippets.

@tjb0607
Created June 27, 2012 22:40
Show Gist options
  • Save tjb0607/3007360 to your computer and use it in GitHub Desktop.
Save tjb0607/3007360 to your computer and use it in GitHub Desktop.
Minecraft server list MOTD and kick message thing
#!/bin/env python2
# -*- coding: utf-8 -*-
# This program is free software. It comes without any warranty, to
# the extent permitted by applicable law. You can redistribute it
# and/or modify it under the terms of the Do What The Fuck You Want
# To Public License, Version 2, as published by Sam Hocevar. See
# http://sam.zoy.org/wtfpl/COPYING for more details.
import asyncore
import asynchat
import socket
import re
from codecs import utf_16_be_encode, utf_16_be_decode
from struct import pack, unpack
MOTD = "UP at mustanglover.dyndns.org"
KICKMOTD = "The server is up, but it's moved to mustanglover.dyndns.org. Go there!"
IP = "mustanglover.dyndns.org"
def get_info(host, port):
s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
s.connect((host, port))
s.send('\xfe')
d = s.recv(256)
s.close()
assert d[0] == '\xff'
d = d[3:].decode('utf-16be').split(u'\xa7')
return {'motd': d[0],
'players': int(d[1]),
'max_players': int(d[2])}
def pack_string(string):
'''
Packs a string into UCS-2 and prefixes it with its length as a short int.
This function can't actually handle UCS-2, therefore kick messages and
the MOTD can't contain special characters.
'''
string = u"".join(i if ord(i) < 65536 else u"?" for i in string)
return (pack(">h", len(string)) +
utf_16_be_encode(string, "replace")[0])
def unpack_string(data):
'''
Extracts a string from a data stream.
Like in the pack method, UCS-2 isn't handled correctly. Since usernames
and hosts can't contain special characters this isn't an issue.
'''
(l,) = unpack(">h", data[:2])
assert len(data) >= 2 + 2 * l
return utf_16_be_decode(data[2:l * 2])[0]
class Listener(asyncore.dispatcher):
'''
Listens for connecting clients.
'''
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.clients = []
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
print "Listening for connections on %s:%s" % (host, port)
def handle_accept(self):
'''
Accepts a new connections and assigns it to a new ClientTunnel.
'''
pair = self.accept()
if pair is None:
pass
else:
sock, addr = pair
ClientTunnel(sock, addr)
@staticmethod
def loop():
'''
Starts the main I/O loop.
'''
asyncore.loop()
class ClientTunnel(asynchat.async_chat):
'''
Handles a connecting client
'''
def __init__(self, sock, addr):
asynchat.async_chat.__init__(self, sock)
self.set_terminator(None)
self.addr = "%s:%s" % addr
self.log("Incoming connection")
def log(self, message):
'''
Feedback to user
'''
print "%s - %s" % (self.addr, message)
def collect_incoming_data(self, data):
'''
Listens for data
'''
if len(data) >= 1:
(packetId,) = unpack(">B", data[0])
if packetId == 0xfe: # Handle server list query
serverinfo = get_info(IP, 25565)
self.log("Received server list query")
self.kick(u"%s§%s§%s" % (MOTD, serverinfo['players'], serverinfo['max_players']))
else:
self.kick(KICKMOTD)
def kick(self, reason):
'''
Kicks the client.
'''
self.log("Kicking (%s)" % reason)
self.push(pack(">B", 0xff) + pack_string(reason))
self.close()
def handle_close(self):
self.log("Client closed connection")
self.close()
if __name__ == "__main__":
server = Listener('0.0.0.0', 25565)
Listener.loop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment