Created
September 4, 2012 01:23
-
-
Save barneygale/3615559 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import asyncore | |
import asynchat | |
import socket | |
import struct | |
import sys | |
import M2Crypto as m2crypto | |
from hashlib import sha1 | |
import requests | |
import base64 | |
import random | |
import hmac | |
MOTD = "Reddit Auth Server" | |
CHECK_URL = "http://session.minecraft.net/game/checkserver.jsp" | |
REPORT_URL = "" | |
REPORT_SECRET = "" | |
#### | |
#### mc format utils | |
#### | |
class BufferUnderrun(Exception): | |
pass | |
def unpack(data, ty): | |
l = struct.calcsize(ty) | |
if len(data) < l: | |
raise BufferUnderrun() | |
return data[l:], struct.unpack('>'+ty, data[:l])[0] | |
def unpack_string(data): | |
data, l = unpack(data, 'h') | |
if len(data) < l*2: | |
raise BufferUnderrun() | |
data, s = data[2*l:], data[:2*l].decode('utf-16be') | |
return data, s | |
def unpack_array(data): | |
data, l = unpack(data, 'h') | |
if len(data) < l: | |
raise BufferUnderrun() | |
data, s = data[l:], data [:l] | |
return data, s | |
def pack(data, ty): | |
return struct.pack('>'+ty, data) | |
def pack_string(data): | |
return pack(len(data), 'h') + data.encode('utf-16be') | |
def pack_array(data): | |
return pack(len(data), 'h') + data | |
#### | |
#### Encryption | |
#### | |
def generate_key_pair(): | |
return m2crypto.RSA.gen_key(1024, 0x10001, callback=lambda: True) | |
pem_start = '-----BEGIN PUBLIC KEY-----' | |
pem_end = '-----END PUBLIC KEY-----' | |
#Dumps the public key to the format minecraft uses | |
#(python makes us jump through some hoops) | |
def export_public_key(key_pair): | |
#First extract a PEM file | |
bio = m2crypto.BIO.MemoryBuffer('') | |
key_pair.save_pub_key_bio(bio) | |
d = bio.getvalue() | |
#Get just the key data | |
s = d.find(pem_start) | |
e = d.find(pem_end) | |
assert s != -1 and e != -1 | |
out = d[s+len(pem_start):e] | |
#Decode | |
out = base64.decodestring(out) | |
return out | |
#The reverse. string -> new RSA instance | |
def import_public_key(bytes): | |
#base64 | |
data = base64.encodestring(bytes) | |
#format | |
data = pem_start + '\n' + data + pem_end | |
#initialise buffer | |
bio = m2crypto.BIO.MemoryBuffer(data) | |
#load | |
return m2crypto.RSA.load_pub_key_bio(bio) | |
def encrypt_shared_secret(keypair, secret): | |
return keypair.public_encrypt(secret, m2crypto.m2.pkcs1_padding) | |
def private_decrypt(keypair, data): | |
return keypair.private_decrypt(data, m2crypto.m2.pkcs1_padding) | |
#### | |
#### Server | |
#### | |
class Server(asyncore.dispatcher): | |
def __init__(self, host, port): | |
asyncore.dispatcher.__init__(self) | |
self.create_socket(socket.AF_INET, socket.SOCK_STREAM) | |
self.set_reuse_addr() | |
self.bind((host, port)) | |
self.listen(5) | |
self.key_pair = generate_key_pair() | |
self.public_key = export_public_key(self.key_pair) | |
print 'Listening for connections to %s:%s' % (host, port) | |
def handle_accept(self): | |
pair = self.accept() | |
if pair is None: | |
pass | |
else: | |
sock, addr = pair | |
handler = ClientHandler(self, sock, addr) | |
def loop(self): | |
asyncore.loop() | |
#### | |
#### ClientHandler: handles connections from clients. | |
#### | |
class ClientHandler(asynchat.async_chat): | |
def __init__(self, server, sock, addr): | |
print "initing..." | |
asynchat.async_chat.__init__(self, sock) | |
print "done init." | |
self.server = server | |
self.ibuffer = '' | |
self.username = '' | |
self.addr = '%s:%s' % addr | |
self.set_terminator(None) | |
#self.log('Incoming connection') | |
def log(self, message): | |
print '%s - %s' % (self.addr, message.encode("ascii", "ignore")) | |
def collect_incoming_data(self, data): | |
self.ibuffer += data | |
while len(self.ibuffer) > 0: | |
try: | |
backup = self.ibuffer | |
self.ibuffer, packet_id = unpack(self.ibuffer, 'B') | |
if packet_id == 0x02: | |
self.ibuffer, protocol_version = unpack(self.ibuffer, 'B') | |
self.ibuffer, self.username = unpack_string(self.ibuffer) | |
self.ibuffer, host = unpack_string(self.ibuffer) | |
self.ibuffer, port = unpack(self.ibuffer, 'I') | |
#Send a 0xFD back | |
self.server_id = '%x' % random.randint(0, pow(2, 8*8)-1) | |
self.verify_token = ''.join([chr(random.randint(0, 255)) for n in range(4)]) | |
self.push( | |
pack(0xFD, 'B') + | |
pack_string(self.server_id) + | |
pack_array(self.server.public_key) + | |
pack_array(self.verify_token)) | |
elif packet_id == 0xFC: | |
self.ibuffer, shared_secret = unpack_array(self.ibuffer) | |
self.ibuffer, verify_token = unpack_array(self.ibuffer) | |
shared_secret = private_decrypt(self.server.key_pair, shared_secret) | |
verify_token = private_decrypt(self.server.key_pair, verify_token) | |
if verify_token != self.verify_token: | |
return kick('Incorrect verify_token') | |
digest = sha1() | |
digest.update(self.server_id) | |
digest.update(shared_secret) | |
digest.update(self.server.public_key) | |
d = long(digest.hexdigest(), 16) | |
if d >> 39*4 & 0x8: | |
d = "-%x" % ((-d) & (2**(40*4)-1)) | |
else: | |
d = "%x" % d | |
r = requests.get(CHECK_URL, params={'user': self.username, 'serverId': d}) | |
if not r.ok: | |
return kick('Minecraft auth servers are being derpy - try again later.') | |
if r.content.strip() != "YES": | |
return kick('Failed to verify username!') | |
h = hmac.new(REPORT_SECRET, self.username) | |
r2 = requests.get(REPORT_URL, params={'user': self.username, 'secret': h.hexdigest()}) | |
response = r2.json | |
if response['success']: | |
self.kick('Thanks! Please check your web browser') | |
else: | |
self.kick('Error: %s' % response['detail']) | |
elif packet_id == 0xFE: | |
self.kick(u'%s\xa70\xa79001' % MOTD) | |
else: | |
self.kick("Protocol error: Unknown packet 0x%02x" % packet_id) | |
break | |
except BufferUnderrun: | |
self.ibuffer = backup | |
break | |
def kick(self, reason): | |
self.push(pack(0xFF, 'B') + pack_string(reason)) | |
self.handle_close(True) | |
def handle_close(self, *args): | |
self.close() | |
if __name__ == '__main__': | |
args = sys.argv[1:] | |
if len(args) != 2: | |
print "Usage: python authserver.py listen_ip listen_port" | |
else: | |
print "Starting up!" | |
s = Server(args[0], int(args[1])) | |
try: | |
s.loop() | |
except KeyboardInterrupt: | |
s.close() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment