-
-
Save goeo-/9a795969767e63d6f591eb74e70e69c8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from cryptography.hazmat.primitives import hashes, serialization | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.asymmetric.utils import decode_dss_signature | |
import cbrrr | |
import httpx | |
import hashlib | |
import base64 | |
import sys | |
try: | |
assert (did := sys.argv[1]).startswith('did:plc:') | |
except (IndexError, AssertionError): | |
print('Usage: python3 undelete.py <DID>') | |
sys.exit(1) | |
print(did) | |
with open('/pds/pds.env') as env_file: | |
pds_plc_key = None | |
pds_hostname = None | |
for line in env_file: | |
if line.startswith('PDS_PLC_ROTATION_KEY_K256_PRIVATE_KEY_HEX'): | |
pds_plc_key = ec.derive_private_key(int(line.split('=')[1].strip().strip('"'), 16), ec.SECP256K1()) | |
elif line.startswith('PDS_HOSTNAME'): | |
pds_hostname = line.split('=')[1].strip().strip('"') | |
try: | |
assert pds_plc_key is not None | |
assert pds_hostname is not None | |
except AssertionError: | |
print('PDS environment variables not found') | |
sys.exit(1) | |
import multiformats | |
def encode_did_pubkey(pubkey: ec.EllipticCurvePrivateKey): | |
assert(type(pubkey.curve) is ec.SECP256K1) | |
compressed_public_bytes = pubkey.public_key().public_bytes( | |
serialization.Encoding.X962, | |
serialization.PublicFormat.CompressedPoint | |
) | |
return multiformats.multibase.encode( | |
multiformats.multicodec.wrap("secp256k1-pub", compressed_public_bytes), | |
"base58btc" | |
) | |
last_plc_data_json = httpx.get(f'https://plc.directory/{did}/log/last').json() | |
last_plc_data = cbrrr.encode_dag_cbor(last_plc_data_json) | |
last_plc_data_hash = hashlib.sha256(last_plc_data).digest() | |
last_plc_data_cid = 'b'+base64.b32encode(b'\x01\x71\x12\x20' + last_plc_data_hash).decode().lower().strip('=') | |
print(last_plc_data_cid) | |
print(last_plc_data_json) | |
new_key = ec.generate_private_key(ec.SECP256K1()) | |
print(new_key.private_numbers().private_value) | |
last_plc_data_json['verificationMethods']['atproto'] = 'did:key:'+encode_did_pubkey(new_key) | |
last_plc_data_json['prev'] = last_plc_data_cid | |
last_plc_data_json.pop('sig') | |
print(last_plc_data_json) | |
r, s = decode_dss_signature(pds_plc_key.sign(cbrrr.encode_dag_cbor(last_plc_data_json), ec.ECDSA(hashes.SHA256()))) | |
# apply low-s malleability mitigation | |
SECP256K1_N = 0xFFFFFFFF_FFFFFFFF_FFFFFFFF_FFFFFFFE_BAAEDCE6_AF48A03B_BFD25E8C_D0364141 | |
if s > SECP256K1_N // 2: | |
print('was bad') | |
s = SECP256K1_N - s | |
signature = base64.urlsafe_b64encode(r.to_bytes(32, "big") + s.to_bytes(32, "big")).decode().strip("=") | |
last_plc_data_json["sig"] = signature | |
print(last_plc_data_json) | |
last_plc_data_json = cbrrr.decode_dag_cbor(cbrrr.encode_dag_cbor(last_plc_data_json)) | |
print(last_plc_data_json) | |
r = httpx.post(f'https://plc.directory/{did}', json=last_plc_data_json) | |
print(r, r.text) | |
import time | |
when = int(time.time()) + 60*60*24*100 | |
xd = f'{{"iss":"{did}","aud":"did:web:{pds_hostname}","exp":{when}}}' | |
xd = b'eyJ0eXAiOiJKV1QiLCJhbGciOiJFUzI1NksifQ.' + base64.urlsafe_b64encode(xd.encode()).rstrip(b'=') | |
xd = xd + b'.' + base64.urlsafe_b64encode(new_key.sign(xd, ec.ECDSA(hashes.SHA256()))).rstrip(b'=') | |
xd = xd.decode() | |
print(f'JWT for use on createAccount call with existing did: {xd}') | |
""" | |
a = httpx.post(f'https://{pds_hostname}/xrpc/com.atproto.server.createAccount', headers={ | |
'Authorization': f'Bearer {xd}' | |
},json={ | |
'email': 'aa@bb.cc', | |
'handle': 'aa.bb.cc', | |
'did': did, | |
'inviteCode': "bb-cc-asdfg-asdfg", | |
'password': '12341234' | |
}) | |
print(a, a.text) | |
""" |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment