Last active
July 11, 2017 20:52
-
-
Save usbpc/f3ff6388bfd04d70e5a3941431150ae5 to your computer and use it in GitHub Desktop.
A "simple" python CLI application to authenticate a user and download a minecraft realm world if the user is allowed to do so. minecraft realm downloader
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import requests | |
import sys | |
import time | |
import json | |
from pathlib import Path | |
authURL = "https://authserver.mojang.com" | |
realmsURL = "https://pc.realms.minecraft.net" | |
header = {'Content-Type': 'application/json'} | |
credentialFile = Path("./realmCredentials.json") | |
class TokenNotValidForRefresh(Exception): | |
pass | |
def auth_first_time(): | |
while True: | |
username = input("username: ") | |
password = input("password: ") | |
auth_payload = {'agent': {'name': 'Minecraft', 'version': 1}, 'password': password, 'username': username} | |
auth_response = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, | |
json=auth_payload) | |
if auth_response.status_code == requests.codes.ok: | |
break | |
print(auth_response.json()['errorMessage']) | |
response_json = auth_response.json() | |
return response_json['clientToken'], response_json['accessToken'], response_json['selectedProfile'] | |
def login_with_token(client_token): | |
while True: | |
username = input("username: ") | |
password = input("password: ") | |
auth_payload = {'agent': {'name': 'Minecraft', 'version': 1}, 'clientToken': client_token, 'password': password, | |
'username': username} | |
auth_response = requests.post("{authURL}/authenticate".format(authURL=authURL), headers=header, | |
json=auth_payload) | |
if auth_response.status_code == requests.codes.ok: | |
break | |
print(auth_response.json()['errorMessage']) | |
response_json = auth_response.json() | |
return response_json['clientToken'], response_json['accessToken'], response_json['selectedProfile'] | |
def is_token_valid(client_token, access_token): | |
validate_payload = {'accessToken': access_token, 'clientToken': client_token} | |
validate_response = requests.post("{authURL}/validate".format(authURL=authURL), headers=header, | |
json=validate_payload) | |
if validate_response.status_code == requests.codes.no_content: | |
return True | |
else: | |
print("While checking if the accessToken is staill valid I got response code: {code} with error text: {text}" | |
.format(code=validate_response.status_code, text=validate_response.text)) | |
return False | |
def refresh_token(client_token, access_token, profile): | |
auth_payload = {'accessToken': access_token, 'clientToken': client_token} | |
auth_response = requests.post("{authURL}/refresh".format(authURL=authURL), headers=header, json=auth_payload) | |
if not auth_response.status_code == requests.codes.ok: | |
print("While refreshing the token I got response code: {code} with error text: {text}" | |
.format(code=auth_response.status_code, text=auth_response.text)) | |
raise TokenNotValidForRefresh | |
else: | |
response_json = auth_response.json() | |
return response_json['clientToken'], response_json['accessToken'], profile | |
def get_valid_token_from_token(client_token, access_token, profile): | |
print("Lets see if this token is still valid...") | |
if is_token_valid(client_token, access_token): | |
print("YES, still valid, nothing more to do here. :)") | |
return client_token, access_token, profile | |
else: | |
print("NO, dammit! Well, let's try to refresh it.") | |
try: | |
client_token, access_token, profile = refresh_token(client_token, access_token, profile) | |
print("Nice, at least that worked. :)") | |
return client_token, access_token, profile | |
except TokenNotValidForRefresh: | |
print("Nope that didn't work either... well then...") | |
print("You need to log in again:") | |
return login_with_token(client_token) | |
def get_owned_realms(access_token, player_uuid, username): | |
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}".format(accessToken=access_token, playerUUID=player_uuid), | |
'user': username, 'version': "1.12"} | |
# print(realmsCookie) | |
realms_response = requests.get("{realmsURL}/worlds".format(realmsURL=realmsURL), cookies=realms_cookie) | |
# print("code:{code}, text={text}".format(code=realmsResponse.status_code, text=realmsResponse.text)) | |
response_json = realms_response.json() | |
owned_worlds = {} | |
for world in response_json['servers']: | |
if world['ownerUUID'] == player_uuid: | |
owned_worlds[world['id']] = world['name'] | |
return owned_worlds | |
def get_active_slot(realm_id, access_token, player_uuid, username): | |
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}" | |
.format(accessToken=access_token, playerUUID=player_uuid), 'user': username, 'version': "1.12"} | |
realms_response = requests.get("{realmsURL}/worlds/{id}" | |
.format(realmsURL=realmsURL, id=realm_id), cookies=realms_cookie) | |
response_json = realms_response.json() | |
return response_json['activeSlot'] | |
def get_backup_download_url(realm_id, access_token, player_uuid, username): | |
realms_cookie = {'sid': "token:{accessToken}:{playerUUID}".format(accessToken=access_token, playerUUID=player_uuid), | |
'user': username, 'version': "1.12"} | |
while True: | |
download_response = requests.get( | |
"{realmsURL}/worlds/{id}/slot/{slot}/download".format(realmsURL=realmsURL, id=realm_id, | |
slot=get_active_slot(realm_id, access_token, | |
player_uuid, | |
username)), | |
cookies=realms_cookie) | |
if download_response.status_code == requests.codes.ok: | |
break | |
elif download_response.status_code == requests.codes.service_unavailable: | |
print("Download not ready yet...") | |
time.sleep(10) | |
else: | |
print("While trying to get the download link I got response code: {code} with error text: {text}" | |
.format(code=download_response.status_code, text=download_response.text)) | |
print("Ending program now...") | |
sys.exit(-1) | |
return download_response.json()['downloadLink'] | |
def download_file(url, filename): | |
with open(filename, "wb") as f: | |
print("Starting download...") | |
response = requests.get(url, stream=True) | |
total_length = response.headers.get('content-length') | |
if total_length is None: # no content length header | |
f.write(response.content) | |
else: | |
dl = 0 | |
total_length = int(total_length) | |
for data in response.iter_content(chunk_size=4096): | |
dl += len(data) | |
f.write(data) | |
done = int(50 * dl / total_length) | |
sys.stdout.write("\r[%s%s]" % ('=' * done, ' ' * (50 - done))) | |
sys.stdout.flush() | |
def main(): | |
if not credentialFile.is_file(): | |
client_token, access_token, profile = auth_first_time() | |
credentialFile.write_text( | |
json.dumps({'clientToken': client_token, 'accessToken': access_token, 'profile': profile}, sort_keys=True, | |
indent=4)) | |
data = json.loads(credentialFile.read_text()) | |
data['clientToken'], data['accessToken'], data['profile'] = get_valid_token_from_token(data['clientToken'], | |
data['accessToken'], | |
data['profile']) | |
if not 'realmID' in data: | |
owned_realms = get_owned_realms(data['accessToken'], data['profile']['id'], data['profile']['name']) | |
if len(owned_realms) == 0: | |
print("You don't have a realm to download the world...") | |
sys.exit(-1) | |
elif len(owned_realms) == 1: | |
print("You only own one realm, so I'm gonna download that world") | |
for key in owned_realms: | |
data['realmID'] = key | |
else: | |
while True: | |
print("Please select the realm where you want to download the world: ") | |
for realmID, realmName in data.items(): | |
print("{key}) {name}".format(key=realmID, name=realmName)) | |
selection = int(input()) | |
if selection in owned_realms: | |
data['realmID'] = selection | |
break | |
# Save everyting... | |
credentialFile.write_text(json.dumps(data, sort_keys=True, indent=4)) | |
if len(sys.argv) > 1: | |
filename = "{x}.tar.gz".format(x=sys.argv[1]) | |
else: | |
filename = "{x}.tar.gz".format(x=input("Please enter a filename for the file to download:")) | |
download_file( | |
get_backup_download_url(data['realmID'], data['accessToken'], data['profile']['id'], data['profile']['name']), | |
filename) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment