Skip to content

Instantly share code, notes, and snippets.

@barneygale
Created September 4, 2012 01:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save barneygale/3615559 to your computer and use it in GitHub Desktop.
Save barneygale/3615559 to your computer and use it in GitHub Desktop.
import asyncore
import asynchat
import socket
import struct
import sys
import M2Crypto as m2crypto
from hashlib import sha1
import requests
import base64
import random
import hmac
MOTD = "Reddit Auth Server"
CHECK_URL = "http://session.minecraft.net/game/checkserver.jsp"
REPORT_URL = ""
REPORT_SECRET = ""
####
#### mc format utils
####
class BufferUnderrun(Exception):
pass
def unpack(data, ty):
l = struct.calcsize(ty)
if len(data) < l:
raise BufferUnderrun()
return data[l:], struct.unpack('>'+ty, data[:l])[0]
def unpack_string(data):
data, l = unpack(data, 'h')
if len(data) < l*2:
raise BufferUnderrun()
data, s = data[2*l:], data[:2*l].decode('utf-16be')
return data, s
def unpack_array(data):
data, l = unpack(data, 'h')
if len(data) < l:
raise BufferUnderrun()
data, s = data[l:], data [:l]
return data, s
def pack(data, ty):
return struct.pack('>'+ty, data)
def pack_string(data):
return pack(len(data), 'h') + data.encode('utf-16be')
def pack_array(data):
return pack(len(data), 'h') + data
####
#### Encryption
####
def generate_key_pair():
return m2crypto.RSA.gen_key(1024, 0x10001, callback=lambda: True)
pem_start = '-----BEGIN PUBLIC KEY-----'
pem_end = '-----END PUBLIC KEY-----'
#Dumps the public key to the format minecraft uses
#(python makes us jump through some hoops)
def export_public_key(key_pair):
#First extract a PEM file
bio = m2crypto.BIO.MemoryBuffer('')
key_pair.save_pub_key_bio(bio)
d = bio.getvalue()
#Get just the key data
s = d.find(pem_start)
e = d.find(pem_end)
assert s != -1 and e != -1
out = d[s+len(pem_start):e]
#Decode
out = base64.decodestring(out)
return out
#The reverse. string -> new RSA instance
def import_public_key(bytes):
#base64
data = base64.encodestring(bytes)
#format
data = pem_start + '\n' + data + pem_end
#initialise buffer
bio = m2crypto.BIO.MemoryBuffer(data)
#load
return m2crypto.RSA.load_pub_key_bio(bio)
def encrypt_shared_secret(keypair, secret):
return keypair.public_encrypt(secret, m2crypto.m2.pkcs1_padding)
def private_decrypt(keypair, data):
return keypair.private_decrypt(data, m2crypto.m2.pkcs1_padding)
####
#### Server
####
class Server(asyncore.dispatcher):
def __init__(self, host, port):
asyncore.dispatcher.__init__(self)
self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
self.set_reuse_addr()
self.bind((host, port))
self.listen(5)
self.key_pair = generate_key_pair()
self.public_key = export_public_key(self.key_pair)
print 'Listening for connections to %s:%s' % (host, port)
def handle_accept(self):
pair = self.accept()
if pair is None:
pass
else:
sock, addr = pair
handler = ClientHandler(self, sock, addr)
def loop(self):
asyncore.loop()
####
#### ClientHandler: handles connections from clients.
####
class ClientHandler(asynchat.async_chat):
def __init__(self, server, sock, addr):
print "initing..."
asynchat.async_chat.__init__(self, sock)
print "done init."
self.server = server
self.ibuffer = ''
self.username = ''
self.addr = '%s:%s' % addr
self.set_terminator(None)
#self.log('Incoming connection')
def log(self, message):
print '%s - %s' % (self.addr, message.encode("ascii", "ignore"))
def collect_incoming_data(self, data):
self.ibuffer += data
while len(self.ibuffer) > 0:
try:
backup = self.ibuffer
self.ibuffer, packet_id = unpack(self.ibuffer, 'B')
if packet_id == 0x02:
self.ibuffer, protocol_version = unpack(self.ibuffer, 'B')
self.ibuffer, self.username = unpack_string(self.ibuffer)
self.ibuffer, host = unpack_string(self.ibuffer)
self.ibuffer, port = unpack(self.ibuffer, 'I')
#Send a 0xFD back
self.server_id = '%x' % random.randint(0, pow(2, 8*8)-1)
self.verify_token = ''.join([chr(random.randint(0, 255)) for n in range(4)])
self.push(
pack(0xFD, 'B') +
pack_string(self.server_id) +
pack_array(self.server.public_key) +
pack_array(self.verify_token))
elif packet_id == 0xFC:
self.ibuffer, shared_secret = unpack_array(self.ibuffer)
self.ibuffer, verify_token = unpack_array(self.ibuffer)
shared_secret = private_decrypt(self.server.key_pair, shared_secret)
verify_token = private_decrypt(self.server.key_pair, verify_token)
if verify_token != self.verify_token:
return kick('Incorrect verify_token')
digest = sha1()
digest.update(self.server_id)
digest.update(shared_secret)
digest.update(self.server.public_key)
d = long(digest.hexdigest(), 16)
if d >> 39*4 & 0x8:
d = "-%x" % ((-d) & (2**(40*4)-1))
else:
d = "%x" % d
r = requests.get(CHECK_URL, params={'user': self.username, 'serverId': d})
if not r.ok:
return kick('Minecraft auth servers are being derpy - try again later.')
if r.content.strip() != "YES":
return kick('Failed to verify username!')
h = hmac.new(REPORT_SECRET, self.username)
r2 = requests.get(REPORT_URL, params={'user': self.username, 'secret': h.hexdigest()})
response = r2.json
if response['success']:
self.kick('Thanks! Please check your web browser')
else:
self.kick('Error: %s' % response['detail'])
elif packet_id == 0xFE:
self.kick(u'%s\xa70\xa79001' % MOTD)
else:
self.kick("Protocol error: Unknown packet 0x%02x" % packet_id)
break
except BufferUnderrun:
self.ibuffer = backup
break
def kick(self, reason):
self.push(pack(0xFF, 'B') + pack_string(reason))
self.handle_close(True)
def handle_close(self, *args):
self.close()
if __name__ == '__main__':
args = sys.argv[1:]
if len(args) != 2:
print "Usage: python authserver.py listen_ip listen_port"
else:
print "Starting up!"
s = Server(args[0], int(args[1]))
try:
s.loop()
except KeyboardInterrupt:
s.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment