Skip to content

Instantly share code, notes, and snippets.

@usbpc
Last active July 11, 2017 20:52
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save usbpc/f3ff6388bfd04d70e5a3941431150ae5 to your computer and use it in GitHub Desktop.
Save usbpc/f3ff6388bfd04d70e5a3941431150ae5 to your computer and use it in GitHub Desktop.
A "simple" python CLI application to authenticate a user and download a minecraft realm world if the user is allowed to do so. minecraft realm downloader
import requests
import sys
import time
import json
from pathlib import Path
authURL = "https://authserver.mojang.com"
realmsURL = "https://pc.realms.minecraft.net"
header = {'Content-Type': 'application/json'}
credentialFile = Path("./realmCredentials.json")
class TokenNotValidForRefresh(Exception):
pass
def auth_first_time():
while True:
username = input("username: ")
password = input("password: ")
auth_payload = {'agent': {'name': 'Minecraft', 'version': 1}, 'password': password, 'username': username}
auth_response = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header,
json=auth_payload)
if auth_response.status_code == requests.codes.ok:
break
print(auth_response.json()['errorMessage'])
response_json = auth_response.json()
return response_json['clientToken'], response_json['accessToken'], response_json['selectedProfile']
def login_with_token(client_token):
while True:
username = input("username: ")
password = input("password: ")
auth_payload = {'agent': {'name': 'Minecraft', 'version': 1}, 'clientToken': client_token, 'password': password,
'username': username}
auth_response = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header,
json=auth_payload)
if auth_response.status_code == requests.codes.ok:
break
print(auth_response.json()['errorMessage'])
response_json = auth_response.json()
return response_json['clientToken'], response_json['accessToken'], response_json['selectedProfile']
def is_token_valid(client_token, access_token):
validate_payload = {'accessToken': access_token, 'clientToken': client_token}
validate_response = requests.post("{authURL}/validate".format(authURL=authURL), headers=header,
json=validate_payload)
if validate_response.status_code == requests.codes.no_content:
return True
else:
print("While checking if the accessToken is staill valid I got response code: {code} with error text: {text}"
.format(code=validate_response.status_code, text=validate_response.text))
return False
def refresh_token(client_token, access_token, profile):
auth_payload = {'accessToken': access_token, 'clientToken': client_token}
auth_response = requests.post("{authURL}/refresh".format(authURL=authURL), headers=header, json=auth_payload)
if not auth_response.status_code == requests.codes.ok:
print("While refreshing the token I got response code: {code} with error text: {text}"
.format(code=auth_response.status_code, text=auth_response.text))
raise TokenNotValidForRefresh
else:
response_json = auth_response.json()
return response_json['clientToken'], response_json['accessToken'], profile
def get_valid_token_from_token(client_token, access_token, profile):
print("Lets see if this token is still valid...")
if is_token_valid(client_token, access_token):
print("YES, still valid, nothing more to do here. :)")
return client_token, access_token, profile
else:
print("NO, dammit! Well, let's try to refresh it.")
try:
client_token, access_token, profile = refresh_token(client_token, access_token, profile)
print("Nice, at least that worked. :)")
return client_token, access_token, profile
except TokenNotValidForRefresh:
print("Nope that didn't work either... well then...")
print("You need to log in again:")
return login_with_token(client_token)
def get_owned_realms(access_token, player_uuid, username):
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}".format(accessToken=access_token, playerUUID=player_uuid),
'user': username, 'version': "1.12"}
# print(realmsCookie)
realms_response = requests.get("{realmsURL}/worlds".format(realmsURL=realmsURL), cookies=realms_cookie)
# print("code:{code}, text={text}".format(code=realmsResponse.status_code, text=realmsResponse.text))
response_json = realms_response.json()
owned_worlds = {}
for world in response_json['servers']:
if world['ownerUUID'] == player_uuid:
owned_worlds[world['id']] = world['name']
return owned_worlds
def get_active_slot(realm_id, access_token, player_uuid, username):
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}"
.format(accessToken=access_token, playerUUID=player_uuid), 'user': username, 'version': "1.12"}
realms_response = requests.get("{realmsURL}/worlds/{id}"
.format(realmsURL=realmsURL, id=realm_id), cookies=realms_cookie)
response_json = realms_response.json()
return response_json['activeSlot']
def get_backup_download_url(realm_id, access_token, player_uuid, username):
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}".format(accessToken=access_token, playerUUID=player_uuid),
'user': username, 'version': "1.12"}
while True:
download_response = requests.get(
"{realmsURL}/worlds/{id}/slot/{slot}/download".format(realmsURL=realmsURL, id=realm_id,
slot=get_active_slot(realm_id, access_token,
player_uuid,
username)),
cookies=realms_cookie)
if download_response.status_code == requests.codes.ok:
break
elif download_response.status_code == requests.codes.service_unavailable:
print("Download not ready yet...")
time.sleep(10)
else:
print("While trying to get the download link I got response code: {code} with error text: {text}"
.format(code=download_response.status_code, text=download_response.text))
print("Ending program now...")
sys.exit(-1)
return download_response.json()['downloadLink']
def download_file(url, filename):
with open(filename, "wb") as f:
print("Starting download...")
response = requests.get(url, stream=True)
total_length = response.headers.get('content-length')
if total_length is None: # no content length header
f.write(response.content)
else:
dl = 0
total_length = int(total_length)
for data in response.iter_content(chunk_size=4096):
dl += len(data)
f.write(data)
done = int(50 * dl / total_length)
sys.stdout.write("\r[%s%s]" % ('=' * done, ' ' * (50 - done)))
sys.stdout.flush()
def main():
if not credentialFile.is_file():
client_token, access_token, profile = auth_first_time()
credentialFile.write_text(
json.dumps({'clientToken': client_token, 'accessToken': access_token, 'profile': profile}, sort_keys=True,
indent=4))
data = json.loads(credentialFile.read_text())
data['clientToken'], data['accessToken'], data['profile'] = get_valid_token_from_token(data['clientToken'],
data['accessToken'],
data['profile'])
if not 'realmID' in data:
owned_realms = get_owned_realms(data['accessToken'], data['profile']['id'], data['profile']['name'])
if len(owned_realms) == 0:
print("You don't have a realm to download the world...")
sys.exit(-1)
elif len(owned_realms) == 1:
print("You only own one realm, so I'm gonna download that world")
for key in owned_realms:
data['realmID'] = key
else:
while True:
print("Please select the realm where you want to download the world: ")
for realmID, realmName in data.items():
print("{key}) {name}".format(key=realmID, name=realmName))
selection = int(input())
if selection in owned_realms:
data['realmID'] = selection
break
# Save everyting...
credentialFile.write_text(json.dumps(data, sort_keys=True, indent=4))
if len(sys.argv) > 1:
filename = "{x}.tar.gz".format(x=sys.argv[1])
else:
filename = "{x}.tar.gz".format(x=input("Please enter a filename for the file to download:"))
download_file(
get_backup_download_url(data['realmID'], data['accessToken'], data['profile']['id'], data['profile']['name']),
filename)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment