Skip to content

Instantly share code, notes, and snippets.

@goeo-
Created November 18, 2024 14:19
Show Gist options
  • Save goeo-/4af74126b866fff9434e84488848d9e1 to your computer and use it in GitHub Desktop.
Save goeo-/4af74126b866fff9434e84488848d9e1 to your computer and use it in GitHub Desktop.
def parse_plc_op(record, prev_ops):
did = record["did"]
operation = record["operation"]
cid = record["cid"]
created_at = datetime.fromisoformat(record["createdAt"])
if any(x[1] == cid for x in prev_ops):
return None
cid_h = parse_cid(cid)
cbor = cbrrr.encode_dag_cbor(operation)
cbor_hash = hashlib.sha256(cbor).digest()
assert cid_h == cbor_hash
to_verify_sig = ub64d(operation.pop("sig"))
to_verify = cbrrr.encode_dag_cbor(operation)
valid_keys = None
if operation["prev"] is None:
assert any(x in operation for x in ("rotationKeys", "recoveryKey"))
if len(prev_ops) > 1 or (len(prev_ops) == 1 and prev_ops[0][1] != cid):
print(len(prev_ops), prev_ops[0][1] != cid, prev_ops[0][1], cid)
raise Exception("prev ops exist for did but prev none")
real_did = "did:plc:" + b32e(cid_h[:15])
assert did == real_did
valid_keys = (
operation["rotationKeys"]
if "rotationKeys" in operation
else [operation["recoveryKey"], operation["signingKey"]]
)
else:
prev_cid = operation["prev"]
undone_rows = []
for row in prev_ops:
r_created_at, r_cid, r_rotation_keys, r_signed_by = row
if r_cid == prev_cid:
if len(undone_rows) > 0:
time_delta = created_at - r_created_at.replace(tzinfo=UTC)
assert time_delta <= timedelta(hours=72)
valid_keys = r_rotation_keys[:undone_rows[-1][3]]
else:
valid_keys = r_rotation_keys
undone_rows.append(row)
if valid_keys is None:
raise Exception("no valid keys found", did, prev_ops, operation)
for i, key in enumerate(valid_keys):
assert key.startswith("did:key:")
key = key[8:]
key = parse_pubkey(key)
try:
key.verify(signature=to_verify_sig, data=to_verify)
except InvalidSignature:
continue
verification_methods = operation.get("verificationMethods", {})
atproto_key = verification_methods.get("atproto")
labeler_key = verification_methods.get("atproto_label")
atproto_key = atproto_key or operation.get("signingKey")
rotation_keys = []
if "rotationKeys" in operation:
rotation_keys = operation["rotationKeys"]
elif "recoveryKey" in operation:
rotation_keys = [operation["recoveryKey"], operation["signingKey"]]
handle = None
if "alsoKnownAs" in operation:
for aka in operation["alsoKnownAs"]:
if aka.startswith("at://"):
handle = aka[5:]
break
elif "handle" in operation:
handle = operation["handle"]
services = operation.get("services", {})
pds = services.get("atproto_pds", {}).get("endpoint") or operation.get(
"service"
)
labeler = services.get("atproto_labeler", {}).get("endpoint")
chat = services.get("bsky_chat", {}).get("endpoint")
feedgen = services.get("bsky_fg", {}).get("endpoint")
return (
created_at,
did,
cid,
rotation_keys,
handle,
pds,
labeler,
chat,
feedgen,
atproto_key,
labeler_key,
i,
)
raise Exception("couldn't verify op", record, valid_keys, prev_ops)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment