Skip to content

Instantly share code, notes, and snippets.

@intelfx
Last active March 28, 2019 22:35
Show Gist options
  • Save intelfx/7beead590668173f228086749862f31a to your computer and use it in GitHub Desktop.
Save intelfx/7beead590668173f228086749862f31a to your computer and use it in GitHub Desktop.
Idempotent register/password reset for https://github.com/matrix-org/synapse
#!/usr/bin/env python3
import os
import os.path as p
import sys
import argparse
import json
import yaml
import requests
import attr
import hmac
import hashlib
@attr.s(kw_only=True)
class MatrixRequests():
url = attr.ib()
mxid = attr.ib()
password = attr.ib()
device_id = attr.ib(default=None)
access_token = attr.ib(default=None)
def homeserver(self):
return self.mxid.split(':', maxsplit=1)[1]
def authenticate(self):
if self.access_token:
logging.warning('NatrixRequests.auth(): access_token already set')
return
req = {
'type': 'm.login.password',
'identifier': {
'type': 'm.id.user',
'user': self.mxid,
},
'password': self.password,
}
if self.device_id is not None:
req.update({
'device_id': self.device_id,
})
login_resp = self.request('POST', '/r0/login', obj=req, auth=False)
login_json = login_resp.json()
self.mxid = login_json['user_id']
self.device_id = login_json['device_id']
self.access_token = login_json['access_token']
def request(self, method, path, obj=None, auth=True, check=True):
if not path.startswith('/'):
raise RuntimeError(f'MatrixRequests: request: bad path: {path}')
if self.url.endswith('/'):
raise RuntimeError(f'MatrixRequests: request: bad self.url: {self.url}')
url = f'{self.url}{path}'
params = {}
if auth:
if not self.access_token:
raise RuntimeError(f'MatrixRequests: request: auth=True, but self.access_token = {self.access_token}')
params['access_token'] = self.access_token
r = requests.request(method, url, params=params, data=obj and json.dumps(obj))
if check:
r.raise_for_status()
return r
@staticmethod
def _admin_register_mac(secret, req):
mac = hmac.new(
key=secret.encode('ascii'),
digestmod=hashlib.sha1,
)
mac.update(req['nonce'].encode('utf8'))
mac.update(b"\x00")
mac.update(req['username'].encode('utf8'))
mac.update(b"\x00")
mac.update(req['password'].encode('utf8'))
mac.update(b"\x00")
mac.update(req['admin'] and b'admin' or b'notadmin')
return mac.hexdigest()
def admin_register(self, secret, localpart, password, admin):
nonce_resp = self.request('GET', '/r0/admin/register', auth=False)
nonce = nonce_resp.json()['nonce']
req = {
'nonce': nonce,
'username': localpart,
'password': password,
'admin': admin,
}
req['mac'] = MatrixRequests._admin_register_mac(secret=secret, req=req)
register_resp = self.request('POST', '/r0/admin/register', obj=req, auth=False)
return register_resp
def admin_reset_password(self, mxid, password):
req = {
'new_password': password,
}
reset_resp = self.request('POST', f'/r0/admin/reset_password/{mxid}', obj=req)
return reset_resp
def register_or_reset(*, hs, secret, localpart, password, admin):
#
# attempt to register a new user
#
try:
resp = hs.admin_register(
secret=secret,
localpart=localpart,
password=password,
admin=admin,
)
return resp.json()
except requests.HTTPError as e:
if e.response.status_code == 400 and e.response.json()['errcode'] == 'M_USER_IN_USE':
pass # continue to change password
else:
raise
#
# if we got M_USER_IN_USE, reset the existing user's password
#
hs.authenticate()
mxid = f'@{localpart}:{hs.homeserver()}'
resp = hs.admin_reset_password(
mxid=mxid,
password=password,
)
return resp.json()
def read_registration_shared_secret(*, config):
with open(config, 'r') as f:
config_obj = yaml.load(f)
return config_obj['registration_shared_secret']
def read_password(*, password, password_fd):
if password is not None:
return password
if password_fd is not None:
with os.fdopen(password_fd, 'r') as f:
return f.read()
raise RuntimeError()
def parse_args():
parser = argparse.ArgumentParser()
parser.add_argument('--admin-mxid', type=str, required=True,
help='Matrix ID of a server admin account (used to reset passwords)'
)
admin_password = parser.add_mutually_exclusive_group(required=True)
admin_password.add_argument('--admin-password', type=str,
help='Password to a server admin account (used to reset passwords)'
)
admin_password.add_argument('--admin-password-fd', type=int,
help='fd to read password to a server admin account (used to reset passwords)'
)
parser.add_argument('-H', '--homeserver', type=str, required=True,
help='Matrix homeserver address (/_matrix/client)'
)
parser.add_argument('-c', '--config', type=str, required=True,
help='Path to Matrix homeserver configuration (used for registration_shared_secret)'
)
parser.add_argument('--register-localpart', type=str, required=True,
help='"localpart" of a Matrix ID to register or reset password on'
)
register_password = parser.add_mutually_exclusive_group(required=True)
register_password.add_argument('--register-password', type=str,
help='Password to set on specified Matrix ID'
)
register_password.add_argument('--register-password-fd', type=int,
help='fd to read password to set on specified Matrix ID'
)
args = parser.parse_args()
return args
def main():
args = parse_args()
hs = MatrixRequests(
url=args.homeserver,
mxid=args.admin_mxid,
device_id='REGIST~1',
password=read_password(
password=args.admin_password,
password_fd=args.admin_password_fd,
),
)
try:
status = register_or_reset(
hs=hs,
secret=read_registration_shared_secret(config=args.config),
localpart=args.register_localpart,
password=read_password(
password=args.register_password,
password_fd=args.register_password_fd,
),
admin=False,
)
verdict = {
'verdict': 'ok',
'status': status,
}
exitcode = 0
except requests.HTTPError as e:
verdict = {
'verdict': 'error',
'error': {
'code': e.response.status_code,
},
}
try:
verdict['error']['data'] = e.response.json()
except ValueError:
verdict['error']['data'] = e.response.text
exitcode = 1
json.dump(verdict, sys.stdout, indent=4)
sys.exit(exitcode)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment