Last active
November 15, 2015 21:34
-
-
Save CptSpaceToaster/e26fbf2accab5b41986d to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python3 | |
import requests | |
import json | |
import argparse | |
import uuid | |
import getpass | |
import os | |
import sys | |
""" | |
Because we're interfacing with a Java service via json, the variable names are camelCase | |
Sorry.... | |
""" | |
def walk_up(bottom): | |
""" | |
mimic os.walk, but walk 'up' | |
instead of down the directory tree | |
""" | |
bottom = os.path.realpath(bottom) | |
# get files in current dir | |
try: | |
names = os.listdir(bottom) | |
except Exception as e: | |
print(e, file=sys.stderr) | |
return | |
dirs, nondirs = [], [] | |
for name in names: | |
if os.path.isdir(os.path.join(bottom, name)): | |
dirs.append(name) | |
else: | |
nondirs.append(name) | |
yield bottom, dirs, nondirs | |
new_path = os.path.realpath(os.path.join(bottom, '..')) | |
# see if we are at the top | |
if new_path == bottom: | |
return | |
for x in walk_up(new_path): | |
yield x | |
class ProfileFile(dict): | |
auth_url = 'https://authserver.mojang.com/' | |
def __init__(self, filename): | |
if os.path.isfile(filename): | |
with open(filename) as f: | |
super().__init__(json.load(f)) | |
else: | |
super().__init__() | |
self.filename = filename | |
self['clientToken'] = self.get('clientToken', str(uuid.uuid4())) | |
def save(self): | |
with open(self.filename, 'w') as f: | |
# TODO: indent=2 is assumed | |
json.dump(self, f, sort_keys=True, indent=2) | |
def get_agent_name(self): | |
return 'Minecraft' | |
def get_agent_version(self): | |
return 1 | |
def post_json(self, url, **kwargs): | |
# set phasers to json | |
headers = {'content-type': 'application/json'} | |
# print(kwargs, file=sys.stderr) | |
# shipit.jpg | |
r = requests.post(url, data=json.dumps(kwargs), headers=headers) | |
try: | |
# Check for an empty payload success (204) | |
if r.status_code == 204: | |
return True | |
# parse the repsonse text as json | |
j = r.json() | |
if r.status_code not in [200, 204]: | |
print('Error: {0} - {1}'.format(r.status_code, r.reason), file=sys.stderr) | |
print(j['error'], file=sys.stderr) | |
print(j['errorMessage'], file=sys.stderr) | |
return False | |
return j | |
except(ValueError): | |
# If we didn't get json back, print the status and reason | |
print('Error: {0} - {1}'.format(r.status_code, r.reason), file=sys.stderr) | |
return False | |
class VanillaProfile(ProfileFile): | |
def __init__(self, filename): | |
super().__init__(filename) | |
def get_access_token(self): | |
user = self.get_selected_user() | |
if user: | |
return user['accessToken'] | |
def get_selected_user(self): | |
if 'authenticationDatabase' not in self: | |
self['authenticationDatabase'] = {} | |
return None | |
key = self.get('selectedUser', None) | |
if key is None: | |
return None | |
return self['authenticationDatabase'].get(key, None) | |
def get_agent_name(self): | |
if 'launcherVersion' not in self: | |
self['launcherVersion'] = {} | |
# This was a default value at the time when I wrote this document | |
# Let me know if this is causing you some trouble | |
self['launcherVersion']['name'] = '1.6.44' | |
return self['launcherVersion']['name'] | |
def get_agent_version(self): | |
if 'launcherVersion' not in self: | |
self['launcherVersion'] = {} | |
# This was a default value at the time when I wrote this document | |
# Let me know if this is causing you some trouble | |
self['launcherVersion']['format'] = '17' | |
return self['launcherVersion']['format'] | |
def authenticate(self, u_name): | |
user = self.get_selected_user() | |
username = u_name | |
if user: | |
if username is None: | |
username = user.get('username', None) | |
elif username != user['username']: | |
user = None | |
if username is None: | |
username = input('Email Address or Username: ') | |
print('Username: {0}'.format(username), file=sys.stderr) | |
j = self.post_json(self.auth_url + 'authenticate', | |
agent={'name': self.get_agent_name(), | |
'version': self.get_agent_version()}, | |
username=username, | |
password=getpass.getpass(), | |
clientToken=self['clientToken']) | |
if not j: | |
return False | |
if user is None: | |
key = self['clientToken'].replace('-', '') | |
self['selectedUser'] = key | |
self['authenticationDatabase'][key] = {} | |
user = self['authenticationDatabase'][key] | |
user['accessToken'] = j['accessToken'] | |
user['username'] = username | |
# No idea why this is here... but Mojang put it there... | |
user['uuid'] = user.get('uuid', str(uuid.uuid4())) | |
if 'selectedProfile' in j: | |
user['displayName'] = j['selectedProfile']['name'] | |
user['userid'] = j['selectedProfile']['id'] | |
return True | |
def refresh(self): | |
user = self.get_selected_user() | |
if user is None: | |
return False | |
j = self.post_json(self.auth_url + 'refresh', | |
accessToken=user['accessToken'], | |
clientToken=self['clientToken']) | |
if not j: | |
return False | |
user['accessToken'] = j['accessToken'] | |
if 'selectedProfile' in j: | |
user['displayName'] = j['selectedProfile']['name'] | |
user['userid'] = j['selectedProfile']['id'] | |
return True | |
def validate(self): | |
# print('Validating', file=sys.stderr) | |
user = self.get_selected_user() | |
if user is None: | |
return False | |
return self.post_json(self.auth_url + 'validate', | |
accessToken=user['accessToken'], | |
clientToken=self['clientToken']) | |
def invalidate(self): | |
# print('Invalidating', file=sys.stderr) | |
user = self.get_selected_user() | |
if user is None: | |
return False | |
return self.post_json(self.auth_url + 'invalidate', | |
accessToken=user['accessToken'], | |
clientToken=self['clientToken']) | |
class TechnicProfile(ProfileFile): | |
def __init__(self, filename): | |
super().__init__(filename) | |
def get_access_token(self): | |
return self['savedUsers'][self['lastUser']]['accessToken'] | |
def authenticate(self, u_name): | |
# print('Authenticating', file=sys.stderr) | |
username = u_name | |
if username is None: | |
username = self.get('lastUser', None) | |
if username is None: | |
username = input('Email Address or Username: ') | |
print('Username: {0}'.format(username), file=sys.stderr) | |
j = self.post_json(self.auth_url + 'authenticate', | |
agent={'name': self.get_agent_name(), | |
'version': self.get_agent_version()}, | |
username=username, | |
password=getpass.getpass(), | |
clientToken=self['clientToken']) | |
if not j: | |
return False | |
self['lastUser'] = username | |
self['savedUsers'][username]['userName'] = username | |
self['savedUsers'][username]['accessToken'] = j['accessToken'] | |
self['savedUsers'][username]['clientToken'] = j['clientToken'] | |
self['savedUsers'][username]['displayName'] = j['selectedProfile']['name'] | |
if 'selectedProfile' in j: | |
self['savedUsers'][username]['profile'] = j['selectedProfile'] | |
return True | |
def refresh(self): | |
# print('Refreshing', file=sys.stderr) | |
username = self.get('lastUser', None) | |
if username is None: | |
return False | |
j = self.post_json(self.auth_url + 'refresh', | |
accessToken=self['savedUsers'][username]['accessToken'], | |
# This causes an error if you include it atm | |
# Multiple profiles must not be implemented | |
# selectedProfile=self['savedUsers'][username]['profile'], | |
clientToken=self['clientToken']) | |
if not j: | |
return False | |
self['savedUsers'][username]['accessToken'] = j['accessToken'] | |
if 'selectedProfile' in j: | |
self['savedUsers'][username]['profile'] = j['selectedProfile'] | |
return True | |
def validate(self): | |
# print('Validating', file=sys.stderr) | |
username = self.get('lastUser', None) | |
if username is None: | |
return False | |
return self.post_json(self.auth_url + 'validate', | |
accessToken=self['savedUsers'][username]['accessToken'], | |
clientToken=self['clientToken']) | |
def invalidate(self): | |
# print('Invalidating', file=sys.stderr) | |
username = self.get('lastUser', None) | |
if username is None: | |
return False | |
return self.post_json(self.auth_url + 'invalidate', | |
accessToken=self['savedUsers'][username]['accessToken'], | |
clientToken=self['clientToken']) | |
def get_profile(file_or_dir): | |
""" | |
Attempts to find a Yggdrasil authentication token file in the given | |
argument, and walks up directories until it hits root '/' looking for one | |
""" | |
if os.path.isfile(file_or_dir): | |
return guess_format(file_or_dir) | |
# TODO: This seems like it could be done better somehow | |
targets = ['launcher_profiles.json', 'users.json'] | |
for root, directories, files in walk_up(file_or_dir): | |
for filename in targets: | |
if filename in files: | |
return guess_format(os.path.join(root, filename)) | |
def guess_format(path_and_filename): | |
""" | |
Guess the format of the authentication token file based on its filename | |
""" | |
filename = os.path.basename(path_and_filename) | |
if filename == 'launcher_profiles.json': | |
return VanillaProfile(path_and_filename) | |
if filename == 'users.json': | |
return TechnicProfile(path_and_filename) | |
if __name__ == '__main__': | |
parser = argparse.ArgumentParser(description='yggauth - A command line authentication service for Yggdrasil', | |
usage='yggauth.py [-h | -r | -i] [-x] [-u] [FILE...]', | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('-r', '--reauth', dest='reauth', action='store_false', | |
help='Force client to reauthenticate') | |
parser.add_argument('-i', '--invaldiate', dest='invalidate', action='store_true', | |
help='Invalidate the current accessToken') | |
parser.add_argument('-x', '--disable-update', dest='update_profile', action='store_false', | |
help='Stop the client from updating any profiles it finds (this will likely invalidate them)') | |
parser.add_argument('-u', '--username', dest='username', default=None, | |
help='Use the supplied username (even if the client finds another)') | |
parser.add_argument('FILE', nargs='*', default=os.curdir, | |
help='Path to an authentication profile') | |
args = parser.parse_args() | |
for path in args.FILE: | |
P = get_profile(path) | |
if P: | |
break | |
if P is None: | |
print('Error: No profile specified. The client was unable to find an authentication token file', file=sys.stderr) | |
sys.exit(1) | |
if args.invalidate: | |
if not P.invalidate(): | |
print('Error: Invalidation failed', file=sys.stderr) | |
sys.exit(1) | |
elif args.reauth and not args.username and P.validate() and P.refresh() or P.authenticate(args.username): | |
if args.update_profile: | |
P.save() | |
print(P.get_access_token()) | |
else: | |
print('Error: Authentication failed', file=sys.stderr) | |
sys.exit(1) | |
sys.exit(0) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment