Created
July 20, 2013 00:01
-
-
Save dalbothek/6043202 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# -*- coding: utf-8 -*- | |
# This source file is part of mc4p, | |
# the Minecraft Portable Protocol-Parsing Proxy. | |
# | |
# Copyright (C) 2011 Matthew J. McGill, Simon Marti | |
# This program is free software; you can redistribute it and/or modify | |
# it under the terms of the GNU General Public License v2 as published by | |
# the Free Software Foundation. | |
# | |
# This program is distributed in the hope that it will be useful, | |
# but WITHOUT ANY WARRANTY; without even the implied warranty of | |
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | |
# GNU General Public License for more details. | |
# | |
# You should have received a copy of the GNU General Public License along | |
# with this program; if not, write to the Free Software Foundation, Inc., | |
# 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA. | |
import os | |
import sys | |
import json | |
import struct | |
import hashlib | |
import requests | |
import encryption | |
class Authenticator(object): | |
SESSION_URL = 'http://session.minecraft.net/game/joinserver.jsp' | |
CHECK_SESSION_URL = 'http://session.minecraft.net/game/checkserver.jsp' | |
def __init__(self): | |
self.username = None | |
self.session_id = None | |
self.player_name = None | |
def authenticate(self): | |
raise NotImplementedError | |
def join_server(self, server_id, shared_secret, public_key): | |
requests.get(self.SESSION_URL, | |
params={'user': self.player_name, | |
'sessionId': self._get_session_id(), | |
'serverId': self.login_hash(server_id, | |
shared_secret, | |
public_key)}) | |
@classmethod | |
def check_player(cls, username, server_id, shared_secret, public_key): | |
"""Checks if a user is allowed to join the server.""" | |
return cls.__check_player(username, cls.__login_hash(server_id, | |
shared_secret, | |
public_key)) | |
@staticmethod | |
def _minecraft_folder(): | |
"""Finds the folder minecraft stores the account credentials in. | |
Copyright (c) 2010 David Rio Vierra | |
Permission to use, copy, modify, and/or distribute this software for | |
any purpose with or without fee is hereby granted, provided that the | |
above copyright notice and this permission notice appear in all copies. | |
""" | |
if sys.platform == "win32": | |
try: | |
import win32com.client | |
objShell = win32com.client.Dispatch("WScript.Shell") | |
appDataDir = objShell.SpecialFolders("AppData") | |
except: | |
try: | |
from win32com.shell import shell, shellcon | |
appDataDir = shell.SHGetPathFromIDListEx( | |
shell.SHGetSpecialFolderLocation( | |
0, shellcon.CSIDL_APPDATA | |
) | |
) | |
except: | |
appDataDir = os.environ['APPDATA'].decode( | |
sys.getfilesystemencoding() | |
) | |
minecraftDir = os.path.join(appDataDir, u".minecraft") | |
elif sys.platform == "darwin": | |
appDataDir = os.path.expanduser(u"~/Library/Application Support") | |
minecraftDir = os.path.join(appDataDir, u"minecraft") | |
minecraftDir.decode(sys.getfilesystemencoding()) | |
else: | |
appDataDir = os.path.expanduser(u"~") | |
minecraftDir = os.path.expanduser(u"~/.minecraft") | |
return minecraftDir | |
@classmethod | |
def __check_player(cls, username, session_hash): | |
r = requests.get(cls.CHECK_SESSION_URL, | |
params={'user': username, 'serverId': session_hash}) | |
return r.ok and r.content.strip() == "YES" | |
@staticmethod | |
def __login_hash(server_id, shared_secret, public_key): | |
"""Returns the server id which is then used for joining a server""" | |
digest = hashlib.sha1() | |
digest.update(server_id) | |
digest.update(shared_secret) | |
digest.update(encryption.encode_public_key(public_key)) | |
d = long(digest.hexdigest(), 16) | |
if d >> 39 * 4 & 0x8: | |
return "-%x" % ((-d) & (2 ** (40 * 4) - 1)) | |
return "%x" % d | |
class LastLoginAuthenticator(Authenticator): | |
LOGIN_URL = 'https://login.minecraft.net' | |
VERSION = 1337 | |
def __init__(self): | |
super(LastLoginAuthenticator, self).__init__() | |
path = os.path.join(self._minecraft_folder(), "lastlogin") | |
with open(path) as f: | |
ciphertext = f.read() | |
cipher = encryption.PBEWithMD5AndDES('passwordfile') | |
plaintext = cipher.decrypt(ciphertext) | |
user_size = struct.unpack(">h", plaintext[:2])[0] | |
self.username = plaintext[2:user_size + 2] | |
self.password = plaintext[4 + user_size:] | |
def authenticate(self): | |
r = requests.post(self.LOGIN_URL, | |
data={'user': self.username, | |
'password': self.password, | |
'version': self.VERSION}) | |
if not r.ok: | |
r.raise_for_status() | |
if len(r.content.split(":")) < 4: | |
raise Exception("Invalid response from server: " + r.content) | |
reponse = r.content.split(":") | |
self.player_name = reponse[2] | |
self.session_id = reponse[3].strip() | |
def __repr__(self): | |
return ("LastLoginAuthenticator(user:'%s', player:'%s')" % | |
(self.username, self.player_name)) | |
class YggdrasilAuthenticator(Authenticator): | |
YGGDRASIL_URL = "https://authserver.mojang.com/" | |
AUTH_URL = YGGDRASIL_URL + "authenticate" | |
REFRESH_URL = YGGDRASIL_URL + "refresh" | |
VALIDATE_URL = YGGDRASIL_URL + "validate" | |
INVALIDATE_URL = YGGDRASIL_URL + "invalidate" | |
SIGNOUT_URL = YGGDRASIL_URL + "signout" | |
def __init__(self, username, password): | |
super(YggdrasilAuthenticator, self).__init__() | |
self.username = username | |
self.password = password | |
self.client_token = self._random_token() | |
def authenticate(self): | |
r = self._request(self.AUTH_URL, { | |
'agent': { | |
'name': "Minecraft", | |
'version': 1 | |
}, | |
'username': self.username, | |
'password': self.password, | |
'clientToken': self.client_token | |
}) | |
self.access_token = r['accessToken'] | |
self.player_name = r['selectedProfile']['name'] | |
def _request(self, url, data): | |
r = requests.post(url, data=json.dumps(data)) | |
response = json.loads(r.content) | |
if not r.ok: | |
raise self.YggdrasilException(response) | |
return response | |
def __repr__(self): | |
return ("YggdrasilAuthenticator(user:%s, player:%s)" % | |
(self.username, self.player_name)) | |
@classmethod | |
def _random_token(cls): | |
"".join("%02x" % ord(c) for c in encryption.generate_random_bytes(16)) | |
class YggdrasilException(Exception): | |
def __init__(self, r): | |
super(YggdrasilAuthenticator.YggdrasilException, self).__init__( | |
r.get('errorMessage') | |
) | |
self.error = r.get('error') | |
self.cause = r.get('cause') | |
def __str__(self): | |
error = "%s: %s" % (self.error, self.message) | |
if self.cause: | |
return "%s (cause: %s)" % (error, self.cause) | |
else: | |
return error | |
class YggdrasilTokenAuthenticator(YggdrasilAuthenticator): | |
def __init__(self, profile=None): | |
super(YggdrasilAuthenticator, self).__init__() | |
config = self._config() | |
if profile is None: | |
profile = config['selectedProfile'] | |
self.profile = profile | |
profile = config['profiles'][profile] | |
self.username = profile['authentication']['username'] | |
self.access_token = profile['authentication']['accessToken'] | |
self.player_name = profile['authentication']['displayName'] | |
self.client_token = config['clientToken'] | |
def authenticate(self): | |
r = self._request(self.REFRESH_URL, { | |
'clientToken': self.client_token, | |
'accessToken': self.access_token | |
}) | |
self.access_token = r['accessToken'] | |
self.player_name = r['selectedProfile']['name'] | |
self._store_access_token() | |
def _store_access_token(self): | |
config = self._config() | |
profile = config['profiles'][self.profile] | |
profile['authentication']['accessToken'] = self.access_token | |
with open(self._config_path(), "w") as f: | |
json.dump(config, f, indent=2) | |
@classmethod | |
def _config(cls): | |
with open(cls._config_path()) as f: | |
return json.load(f) | |
@classmethod | |
def _config_path(cls): | |
return os.path.join(cls._minecraft_folder(), "launcher_profiles.json") | |
def AuthenticatorFactory(): | |
for authenticator in (YggdrasilTokenAuthenticator, LastLoginAuthenticator): | |
try: | |
authenticator = authenticator() | |
authenticator.authenticate() | |
except Exception as e: | |
continue | |
else: | |
return authenticator |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment