Created
October 30, 2019 21:17
-
-
Save usbpc/8a212e7a11019ebd2b2d47964fba9a8f to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import sys | |
import time | |
import json | |
from pathlib import Path | |
authURL = "https://authserver.mojang.com" | |
realmsURL = "https://pc.realms.minecraft.net" | |
header = {'Content-Type':'application/json'} | |
credentialFile = Path("./realmCredentials.json") | |
class TokenNotValidForRefresh(Exception): | |
pass | |
def authFirstTime(): | |
while True: | |
username = input("username: ") | |
password = input("password: ") | |
authPayload = {'agent': {'name': 'Minecraft', 'version': 1}, 'password': password, 'username': username} | |
authResponse = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, json=authPayload) | |
if authResponse.status_code == requests.codes.ok: | |
break | |
print(authResponse.json()['errorMessage']) | |
responseJson = authResponse.json() | |
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile'] | |
def loginWithToken(clientToken): | |
while True: | |
username = input("username: ") | |
password = input("password: ") | |
authPayload = {'agent': {'name': 'Minecraft', 'version': 1}, 'clientToken': clientToken,'password': password, 'username': username} | |
authResponse = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, json=authPayload) | |
if authResponse.status_code == requests.codes.ok: | |
break | |
print(authResponse.json()['errorMessage']) | |
responseJson = authResponse.json() | |
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile'] | |
def isTokenValid(clientToken, accessToken): | |
validatePayload = {'accessToken': accessToken, 'clientToken': clientToken} | |
validateResponse = requests.post("{authURL}/validate".format(authURL=authURL), headers=header, json=validatePayload) | |
if validateResponse.status_code == requests.codes.no_content: | |
return True | |
else: | |
return False | |
def refreshToken(clientToken, accessToken, profile): | |
authPayload = {'accessToken': accessToken, 'clientToken': clientToken, 'selectedProfile': profile} | |
authResponse = requests.post("{authURL}/refresh".format(authURL=authURL), headers=header, json=authPayload) | |
if not authResponse.status_code == requests.codes.ok: | |
raise TokenNotValidForRefresh | |
else: | |
responseJson = authResponse.json() | |
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile'] | |
def getValidTokenFromToken(clientToken, accessToken, profile): | |
if isTokenValid(clientToken, accessToken): | |
return clientToken, accessToken, profile | |
else: | |
try: | |
return refreshToken(clientToken, accessToken, profile) | |
except TokenNotValidForRefresh: | |
print("You need to log in again:") | |
return loginWithToken(clientToken) | |
def getAllOwnedRealms(accessToken, playerUUID, username): | |
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12") | |
#print(realmsCookie) | |
realmsResponse = requests.get("{realmsURL}/worlds".format(realmsURL=realmsURL), cookies=realmsCookie) | |
#print("code:{code}, text={text}".format(code=realmsResponse.status_code, text=realmsResponse.text)) | |
responseJson = realmsResponse.json() | |
ownedWorlds = {} | |
for world in responseJson['servers']: | |
if world['ownerUUID'] == playerUUID: | |
ownedWorlds[world['id']] = world['name'] | |
return ownedWorlds | |
def getActiveSlot(realmID, accessToken, playerUUID, username): | |
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12") | |
realmsResponse = requests.get("{realmsURL}/worlds/{id}".format(realmsURL=realmsURL, id=realmID), cookies=realmsCookie) | |
responseJson = realmsResponse.json() | |
return responseJson['activeSlot'] | |
def getRealmDownloadURL(realmID, accessToken, playerUUID, username): | |
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12") | |
while True: | |
downloadResponse = requests.get("{realmsURL}/worlds/{id}/slot/{slot}/download".format(realmsURL=realmsURL, id=realmID, slot=getActiveSlot(realmID, accessToken, playerUUID, username)), cookies=realmsCookie) | |
if (downloadResponse.status_code == requests.codes.ok): | |
break | |
elif (downloadResponse.status_code == requests.codes.service_unavailable): | |
print("Download not ready yet...") | |
time.sleep(10) | |
else: | |
print("Something isn't quite right... ending programm") | |
sys.exit(-1) | |
return downloadResponse.json()['downloadLink'] | |
def download_file(url, filename): | |
with open(filename, "wb") as f: | |
print("Starting download...") | |
response = requests.get(url, stream=True) | |
total_length = response.headers.get('content-length') | |
if total_length is None: # no content length header | |
f.write(response.content) | |
else: | |
dl = 0 | |
total_length = int(total_length) | |
for data in response.iter_content(chunk_size=4096): | |
dl += len(data) | |
f.write(data) | |
done = int(50 * dl / total_length) | |
sys.stdout.write("\r[%s%s]" % ('=' * done, ' ' * (50-done)) ) | |
sys.stdout.flush() | |
if not credentialFile.is_file(): | |
clientToken, accessToken, profile = authFirstTime() | |
credentialFile.write_text(json.dumps({'clientToken': clientToken, 'accessToken': accessToken, 'profile': profile}, sort_keys=True, indent=4)) | |
data = json.loads(credentialFile.read_text()) | |
data['clientToken'], data['accessToken'], data['profile'] = getValidTokenFromToken(data['clientToken'], data['accessToken'], data['profile']) | |
if not 'realmID' in data: | |
ownedRealms = getAllOwnedRealms(data['accessToken'], data['profile']['id'], data['profile']['name']) | |
if len(ownedRealms) == 0: | |
print("You don't have a realm to download the world...") | |
sys.exit(-1) | |
elif len(ownedRealms) == 1: | |
print("You only own one realm, so I'm gonna download that world") | |
for key in ownedRealms: | |
data['realmID'] = key | |
else: | |
while True: | |
print("Please select the realm where you want to download the world: ") | |
for realmID, realmName in data.items(): | |
print("{key}) {name}".format(key = realmID, name=realmName)) | |
selection = int(input()) | |
if selection in ownedRealms: | |
data['realmID'] = selection | |
break | |
#Save everyting... | |
credentialFile.write_text(json.dumps(data, sort_keys=True, indent=4)) | |
if len(sys.argv) > 1: | |
filename = "{x}.tar.gz".format(x=sys.argv[1]) | |
else: | |
filename = "{x}.tar.gz".format(x=input("Please enter a filename for the file to download, if you don't want to do this every time just put a filename after script name :")) | |
download_file(getRealmDownloadURL(data['realmID'], data['accessToken'], data['profile']['id'], data['profile']['name']), filename) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment