-
-
Save goeo-/4af74126b866fff9434e84488848d9e1 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def parse_plc_op(record, prev_ops): | |
did = record["did"] | |
operation = record["operation"] | |
cid = record["cid"] | |
created_at = datetime.fromisoformat(record["createdAt"]) | |
if any(x[1] == cid for x in prev_ops): | |
return None | |
cid_h = parse_cid(cid) | |
cbor = cbrrr.encode_dag_cbor(operation) | |
cbor_hash = hashlib.sha256(cbor).digest() | |
assert cid_h == cbor_hash | |
to_verify_sig = ub64d(operation.pop("sig")) | |
to_verify = cbrrr.encode_dag_cbor(operation) | |
valid_keys = None | |
if operation["prev"] is None: | |
assert any(x in operation for x in ("rotationKeys", "recoveryKey")) | |
if len(prev_ops) > 1 or (len(prev_ops) == 1 and prev_ops[0][1] != cid): | |
print(len(prev_ops), prev_ops[0][1] != cid, prev_ops[0][1], cid) | |
raise Exception("prev ops exist for did but prev none") | |
real_did = "did:plc:" + b32e(cid_h[:15]) | |
assert did == real_did | |
valid_keys = ( | |
operation["rotationKeys"] | |
if "rotationKeys" in operation | |
else [operation["recoveryKey"], operation["signingKey"]] | |
) | |
else: | |
prev_cid = operation["prev"] | |
undone_rows = [] | |
for row in prev_ops: | |
r_created_at, r_cid, r_rotation_keys, r_signed_by = row | |
if r_cid == prev_cid: | |
if len(undone_rows) > 0: | |
time_delta = created_at - r_created_at.replace(tzinfo=UTC) | |
assert time_delta <= timedelta(hours=72) | |
valid_keys = r_rotation_keys[:undone_rows[-1][3]] | |
else: | |
valid_keys = r_rotation_keys | |
undone_rows.append(row) | |
if valid_keys is None: | |
raise Exception("no valid keys found", did, prev_ops, operation) | |
for i, key in enumerate(valid_keys): | |
assert key.startswith("did:key:") | |
key = key[8:] | |
key = parse_pubkey(key) | |
try: | |
key.verify(signature=to_verify_sig, data=to_verify) | |
except InvalidSignature: | |
continue | |
verification_methods = operation.get("verificationMethods", {}) | |
atproto_key = verification_methods.get("atproto") | |
labeler_key = verification_methods.get("atproto_label") | |
atproto_key = atproto_key or operation.get("signingKey") | |
rotation_keys = [] | |
if "rotationKeys" in operation: | |
rotation_keys = operation["rotationKeys"] | |
elif "recoveryKey" in operation: | |
rotation_keys = [operation["recoveryKey"], operation["signingKey"]] | |
handle = None | |
if "alsoKnownAs" in operation: | |
for aka in operation["alsoKnownAs"]: | |
if aka.startswith("at://"): | |
handle = aka[5:] | |
break | |
elif "handle" in operation: | |
handle = operation["handle"] | |
services = operation.get("services", {}) | |
pds = services.get("atproto_pds", {}).get("endpoint") or operation.get( | |
"service" | |
) | |
labeler = services.get("atproto_labeler", {}).get("endpoint") | |
chat = services.get("bsky_chat", {}).get("endpoint") | |
feedgen = services.get("bsky_fg", {}).get("endpoint") | |
return ( | |
created_at, | |
did, | |
cid, | |
rotation_keys, | |
handle, | |
pds, | |
labeler, | |
chat, | |
feedgen, | |
atproto_key, | |
labeler_key, | |
i, | |
) | |
raise Exception("couldn't verify op", record, valid_keys, prev_ops) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment