Skip to content

Instantly share code, notes, and snippets.

@this-post
Created May 19, 2023 03:49
Show Gist options
  • Save this-post/30570cebdb6d02e9bb52015813a27ffe to your computer and use it in GitHub Desktop.
Save this-post/30570cebdb6d02e9bb52015813a27ffe to your computer and use it in GitHub Desktop.
from flask import Flask, jsonify, request
from pymemcache.client import base # brew install memcached && pip install pymemcache
from cryptography import x509
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes, serialization
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.kdf.hkdf import HKDF
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
import ssl, sys, sqlite3, uuid, os, json
app = Flask(__name__)
playfab_title_id = 'TITLE_ID' # change into your title ID
playfab_url = '{0}.playfabapi.com'.format(playfab_title_id)
@app.route('/getPinnedCertSha512', methods=['POST'])
def getPinnedCertSha512():
if not request.is_json:
return '', 400
data = request.json
kid = data.get('kid')
if mc_client.get(kid) is None:
return 'Key not found', 403
cert_pem = ssl.get_server_certificate((playfab_url, 443))
cert = x509.load_pem_x509_certificate(cert_pem.encode('utf-8'), default_backend())
sha512fingerprint = cert.fingerprint(hashes.SHA256())
payload = {
'sha512fingerprint': sha512fingerprint.hex()
}
response = {
'data': aes_encrypt(kid, json.dumps(payload).encode('utf-8'))
}
return jsonify(response), 200
@app.route('/getE2eeParams', methods=['POST'])
def keyExchange():
# print(request.headers, file=sys.stderr)
# print(request.form, file=sys.stderr)
# print(request.json, file=sys.stderr)
# print(request.files, file=sys.stderr)
if request.is_json:
data = request.json
public_key = data.get('publicKey')
if public_key is not None:
k_uuid = generate_kid(public_key)
_salt = os.urandom(16)
result, shared_secret = get_shared_secret(k_uuid)
if not result:
return 'Key exchange failed', 500
session_key = get_shared_derived_key(_salt, shared_secret)
# print(session_key.hex(), file=sys.stderr)
mc_client.set(k_uuid, session_key)
response = {
'kid': k_uuid,
'salt': _salt.hex(),
'serverPublicKey': server_private_key.public_key().public_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PublicFormat.SubjectPublicKeyInfo
).hex()
}
return jsonify(response), 200
return 'Unexpected content type', 400
def get_shared_secret(kid):
sql_conn = sqlite3.connect('data.db')
cur = sql_conn.cursor()
cur.execute("SELECT value FROM public_key WHERE id = ?", (kid, ))
res_s = cur.fetchone()
if res_s is not None:
public_key_hex, = res_s
public_key_der = serialization.load_der_public_key(bytes.fromhex(public_key_hex))
shared_secret = server_private_key.exchange(ec.ECDH(), public_key_der)
sql_conn.close()
return (True, shared_secret)
else:
sql_conn.close()
return (False, "")
def get_shared_derived_key(_salt, shared_secret):
return HKDF(
algorithm=hashes.SHA256(),
length=32,
salt=_salt,
info=b'handshake data'
).derive(shared_secret)
def aes_encrypt(kid, plain):
session_key = mc_client.get(kid)
nonce = os.urandom(16)
aad = os.urandom(16)
# print(aad.hex(), file=sys.stderr)
aesgcm = AESGCM(session_key)
cipher = aesgcm.encrypt(nonce, plain, aad)
return nonce.hex() + cipher.hex() + aad.hex() # 16 bytes of nonce + cipher + 16 bytes of AAD
def aes_decrypt(kid, cipher):
session_key = mc_client.get(kid)
nonce = cipher[:16 * 2]
aad = cipher[:-(16 * 2)]
aesgcm = AESGCM(session_key)
plain = aesgcm.decrypt(nonce, cipher, aad)
return plain
def generate_kid(public_key):
sql_conn = sqlite3.connect('data.db')
cur = sql_conn.cursor()
kid = str(uuid.uuid4())
cur.execute("INSERT INTO public_key(id, value) VALUES(?, ?)", (kid, public_key))
sql_conn.commit()
sql_conn.close()
return kid
def run_memcache():
global mc_client
mc_client = base.Client(('localhost', 11211))
def init():
global server_private_key
server_private_key = ec.generate_private_key(ec.SECP521R1())
sql_conn = sqlite3.connect('data.db') # create empty file "data.db" first
cur = sql_conn.cursor()
cur.execute("DROP TABLE IF EXISTS public_key")
cur.execute("CREATE TABLE public_key(id, value)")
sql_conn.close()
run_memcache()
if __name__ == '__main__':
init()
app.run(host='127.0.0.1', port=1337, debug=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment