Skip to content

Instantly share code, notes, and snippets.

@DavidBuchanan314
Last active October 25, 2023 01:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save DavidBuchanan314/972266864b54fc9343148b47ed5ee2c2 to your computer and use it in GitHub Desktop.
Save DavidBuchanan314/972266864b54fc9343148b47ed5ee2c2 to your computer and use it in GitHub Desktop.
This is not production quality code, several corners have been cut, etc. etc.
import io
import base64
import struct
import hashlib
import asyncio
import websockets
from enum import Enum
# ground control to major type
class MajorType(Enum):
UNSIGNED_INT = 0
NEGATIVE_INT = 1
BYTE_STRING = 2
TEXT_STRING = 3
ARRAY = 4
MAP = 5
TAG = 6
FLOAT = 7
def parse_cbor_head(stream):
first = stream.read(1)[0]
major_type = MajorType(first >> 5)
additional_info = first & 0x1f
if additional_info < 24:
if major_type == major_type.FLOAT:
return major_type, {
20: False,
21: True,
22: None # null
}[additional_info]
return major_type, additional_info
BYTE_LENGTHS = {24: 1, 25: 2, 26: 4, 27: 8}
if additional_info in BYTE_LENGTHS:
byte_value = stream.read(BYTE_LENGTHS[additional_info])
if major_type == MajorType.NEGATIVE_INT:
return major_type, -1-int.from_bytes(byte_value, "big") # TODO: check canonical-ness
if major_type == MajorType.FLOAT:
if len(byte_value) == 1:
raise Exception("invalid")
if len(byte_value) == 2:
return major_type, struct.unpack("!e", byte_value)[0]
if len(byte_value) == 4:
return major_type, struct.unpack("!f", byte_value)[0]
if len(byte_value) == 8:
return major_type, struct.unpack("!d", byte_value)[0]
raise Exception("unreachable")
return major_type, int.from_bytes(byte_value, "big") # TODO: check canonical-ness
if additional_info == 31:
raise Exception("indefinite lengths not supported by this implementation")
raise Exception("not well-formed")
# LEB128
def parse_varint(stream):
n = 0
shift = 0
while True:
val = stream.read(1)[0]
n |= (val & 0x7f) << shift
if not val & 0x80:
return n
shift += 7
# parse into pythonic objects (not roundtrip-safe, for now)
def parse_dag_cbor_object(stream):
major_type, info = parse_cbor_head(stream)
if major_type in [MajorType.UNSIGNED_INT, MajorType.NEGATIVE_INT, MajorType.FLOAT]:
return info
if major_type == MajorType.BYTE_STRING:
value = stream.read(info)
if len(value) != info:
raise EOFError()
return value
if major_type == MajorType.TEXT_STRING:
value = stream.read(info)
if len(value) != info:
raise EOFError()
return value.decode()
if major_type == MajorType.ARRAY:
values = []
for _ in range(info):
values.append(parse_dag_cbor_object(stream))
return values
if major_type == MajorType.MAP:
values = {}
for _ in range(info):
key = parse_dag_cbor_object(stream)
if type(key) != str:
raise ValueError("DAG-CBOR only accepts strings as map keys")
values[key] = parse_dag_cbor_object(stream)
# TODO: check canonical map ordering
return values
if major_type == MajorType.TAG:
if info != 42:
raise Exception("non-42 tags are not supported")
cid_bytes = parse_dag_cbor_object(stream)
assert(type(cid_bytes) is bytes)
assert(len(cid_bytes) == 37)
assert(cid_bytes.startswith(b"\x00\x01q\x12 ") or cid_bytes.startswith(b"\x00\x01U\x12 ")) # multibase prefix, CIDv1, dag-cbor or raw, sha256
return "b" + base64.b32encode(cid_bytes[1:]).decode().lower().rstrip("=")
def parse_car(stream, length):
header_len = parse_varint(stream)
header_start = stream.tell()
car_header = parse_dag_cbor_object(stream)
assert(stream.tell() - header_start == header_len)
assert(car_header.get("version") == 1)
assert(len(car_header.get("roots", [])) == 1)
root = car_header["roots"][0]
nodes = {}
while stream.tell() != length:
block_len = parse_varint(stream)
block_start = stream.tell() # XXX: what if length is less than 36?
cid_raw = stream.read(36) # XXX: this needs to be parsed properly, length might not be 36
assert(cid_raw.startswith(b"\x01q\x12 ")) # CIDv1, dag-cbor, sha256
cid = "b" + base64.b32encode(cid_raw).decode().lower().rstrip("=")
block_data = stream.read(block_len-36)
content_hash = hashlib.sha256(block_data).digest()
assert(cid_raw.endswith(content_hash))
block_data_stream = io.BytesIO(block_data)
block = parse_dag_cbor_object(block_data_stream)
assert(not block_data_stream.read())
assert(stream.tell() - block_start == block_len)
nodes[cid] = block
return root, nodes
def enumerate_mst_records(nodes, node):
records = {}
if node.get("l") in nodes:
records |= enumerate_mst_records(nodes, nodes[node["l"]])
prev_key = b""
for entry in node["e"]:
assert(entry["p"] <= len(prev_key))
key = prev_key[:entry["p"]] + entry["k"]
prev_key = key
records[key] = entry["v"]
if entry.get("t") in nodes:
records |= enumerate_mst_records(nodes, nodes[entry["t"]])
return records
async def main():
async with websockets.connect("wss://bsky.social/xrpc/com.atproto.sync.subscribeRepos", ping_timeout=None) as websocket:
while True:
res = await websocket.recv()
stream = io.BytesIO(res)
head = parse_dag_cbor_object(stream)
if head["t"] == "#commit":
body = parse_dag_cbor_object(stream) # XXX: does this repeat "op" times?
root, nodes = parse_car(io.BytesIO(body["blocks"]), len(body["blocks"]))
for op in body["ops"]:
if op["path"].startswith("app.bsky.feed.post/") and op["action"] == "create":
signed_commit = nodes[root]
# this is here you'd verify the signature, if you're into that kinda thing
records = enumerate_mst_records(nodes, nodes[signed_commit["data"]])
post = nodes[records[op["path"].encode()]]
#post = nodes[op["cid"]] # non-tree-walky approach, can't verify authenticity this way
uri = f'at://{body["repo"]}/{op["path"]}'
print(uri, post["text"])
if __name__ == "__main__":
asyncio.run(main())
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment