Skip to content

Instantly share code, notes, and snippets.

@usbpc
Created October 30, 2019 21:17
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save usbpc/8a212e7a11019ebd2b2d47964fba9a8f to your computer and use it in GitHub Desktop.
Save usbpc/8a212e7a11019ebd2b2d47964fba9a8f to your computer and use it in GitHub Desktop.
import requests
import sys
import time
import json
from pathlib import Path
authURL = "https://authserver.mojang.com"
realmsURL = "https://pc.realms.minecraft.net"
header = {'Content-Type':'application/json'}
credentialFile = Path("./realmCredentials.json")
class TokenNotValidForRefresh(Exception):
pass
def authFirstTime():
while True:
username = input("username: ")
password = input("password: ")
authPayload = {'agent': {'name': 'Minecraft', 'version': 1}, 'password': password, 'username': username}
authResponse = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, json=authPayload)
if authResponse.status_code == requests.codes.ok:
break
print(authResponse.json()['errorMessage'])
responseJson = authResponse.json()
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile']
def loginWithToken(clientToken):
while True:
username = input("username: ")
password = input("password: ")
authPayload = {'agent': {'name': 'Minecraft', 'version': 1}, 'clientToken': clientToken,'password': password, 'username': username}
authResponse = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, json=authPayload)
if authResponse.status_code == requests.codes.ok:
break
print(authResponse.json()['errorMessage'])
responseJson = authResponse.json()
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile']
def isTokenValid(clientToken, accessToken):
validatePayload = {'accessToken': accessToken, 'clientToken': clientToken}
validateResponse = requests.post("{authURL}/validate".format(authURL=authURL), headers=header, json=validatePayload)
if validateResponse.status_code == requests.codes.no_content:
return True
else:
return False
def refreshToken(clientToken, accessToken, profile):
authPayload = {'accessToken': accessToken, 'clientToken': clientToken, 'selectedProfile': profile}
authResponse = requests.post("{authURL}/refresh".format(authURL=authURL), headers=header, json=authPayload)
if not authResponse.status_code == requests.codes.ok:
raise TokenNotValidForRefresh
else:
responseJson = authResponse.json()
return responseJson['clientToken'], responseJson['accessToken'], responseJson['selectedProfile']
def getValidTokenFromToken(clientToken, accessToken, profile):
if isTokenValid(clientToken, accessToken):
return clientToken, accessToken, profile
else:
try:
return refreshToken(clientToken, accessToken, profile)
except TokenNotValidForRefresh:
print("You need to log in again:")
return loginWithToken(clientToken)
def getAllOwnedRealms(accessToken, playerUUID, username):
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12")
#print(realmsCookie)
realmsResponse = requests.get("{realmsURL}/worlds".format(realmsURL=realmsURL), cookies=realmsCookie)
#print("code:{code}, text={text}".format(code=realmsResponse.status_code, text=realmsResponse.text))
responseJson = realmsResponse.json()
ownedWorlds = {}
for world in responseJson['servers']:
if world['ownerUUID'] == playerUUID:
ownedWorlds[world['id']] = world['name']
return ownedWorlds
def getActiveSlot(realmID, accessToken, playerUUID, username):
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12")
realmsResponse = requests.get("{realmsURL}/worlds/{id}".format(realmsURL=realmsURL, id=realmID), cookies=realmsCookie)
responseJson = realmsResponse.json()
return responseJson['activeSlot']
def getRealmDownloadURL(realmID, accessToken, playerUUID, username):
realmsCookie = dict(sid="token:{accessToken}:{playerUUID}".format(accessToken=accessToken, playerUUID=playerUUID), user=username, version="1.12")
while True:
downloadResponse = requests.get("{realmsURL}/worlds/{id}/slot/{slot}/download".format(realmsURL=realmsURL, id=realmID, slot=getActiveSlot(realmID, accessToken, playerUUID, username)), cookies=realmsCookie)
if (downloadResponse.status_code == requests.codes.ok):
break
elif (downloadResponse.status_code == requests.codes.service_unavailable):
print("Download not ready yet...")
time.sleep(10)
else:
print("Something isn't quite right... ending programm")
sys.exit(-1)
return downloadResponse.json()['downloadLink']
def download_file(url, filename):
with open(filename, "wb") as f:
print("Starting download...")
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if total_length is None: # no content length header
f.write(response.content)
else:
dl = 0
total_length = int(total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
done = int(50 * dl / total_length)
sys.stdout.write("\r[%s%s]" % ('=' * done, ' ' * (50-done)) )
sys.stdout.flush()
if not credentialFile.is_file():
clientToken, accessToken, profile = authFirstTime()
credentialFile.write_text(json.dumps({'clientToken': clientToken, 'accessToken': accessToken, 'profile': profile}, sort_keys=True, indent=4))
data = json.loads(credentialFile.read_text())
data['clientToken'], data['accessToken'], data['profile'] = getValidTokenFromToken(data['clientToken'], data['accessToken'], data['profile'])
if not 'realmID' in data:
ownedRealms = getAllOwnedRealms(data['accessToken'], data['profile']['id'], data['profile']['name'])
if len(ownedRealms) == 0:
print("You don't have a realm to download the world...")
sys.exit(-1)
elif len(ownedRealms) == 1:
print("You only own one realm, so I'm gonna download that world")
for key in ownedRealms:
data['realmID'] = key
else:
while True:
print("Please select the realm where you want to download the world: ")
for realmID, realmName in data.items():
print("{key}) {name}".format(key = realmID, name=realmName))
selection = int(input())
if selection in ownedRealms:
data['realmID'] = selection
break
#Save everyting...
credentialFile.write_text(json.dumps(data, sort_keys=True, indent=4))
if len(sys.argv) > 1:
filename = "{x}.tar.gz".format(x=sys.argv[1])
else:
filename = "{x}.tar.gz".format(x=input("Please enter a filename for the file to download, if you don't want to do this every time just put a filename after script name :"))
download_file(getRealmDownloadURL(data['realmID'], data['accessToken'], data['profile']['id'], data['profile']['name']), filename)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment