Skip to content

Instantly share code, notes, and snippets.

@goeo-

goeo-/feed.py Secret

Created September 27, 2024 19:56
Show Gist options
  • Save goeo-/b985fe05945ba8c76555d2e9633c6294 to your computer and use it in GitHub Desktop.
Save goeo-/b985fe05945ba8c76555d2e9633c6294 to your computer and use it in GitHub Desktop.
import binascii
import time
import datetime
import io
from base64 import urlsafe_b64decode
from typing import Annotated, Union, Literal
import httpx
import uvicorn
from fastapi import FastAPI, Depends, Request
from fastapi.responses import JSONResponse
from fastapi.exceptions import HTTPException
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from fastapi.staticfiles import StaticFiles
from cryptography.exceptions import InvalidSignature
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.asymmetric import ec
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature
from multiformats import multibase, multicodec
from pydantic import BaseModel, Field, ValidationError
from pydantic.functional_validators import AfterValidator
app = FastAPI()
security = HTTPBearer()
client = httpx.AsyncClient()
DID = Annotated[str, Field(pattern=r'^did\:(?:plc\:[a-z2-7]{24}|web\:(?:[A-Za-z0-9\-]+\.)+[A-Za-z0-9\-]+)$')]
class User(BaseModel):
did: DID
username: str
avatar_url: Union[str, None]
class JWTHeader(BaseModel):
typ: Literal['JWT']
alg: Literal['ES256', 'ES256K']
def check_current(v: int) -> int:
assert v > time.time(), f'{v} is too old'
return v
class JWTPayload(BaseModel):
exp: Annotated[int, AfterValidator(check_current)]
aud: Literal['did:web:feeds.goeo.lol']
iss: DID
async def get_current_user(bearer: Annotated[HTTPAuthorizationCredentials, Depends(security)]):
print(bearer.credentials)
auth = bearer.credentials.split('.')
if not len(auth) == 3:
raise HTTPException(status_code=400, detail='Bad Bearer token')
try:
header, payload, signature = (urlsafe_b64decode(x.ljust(len(x) + (len(x) % 4), '=')) for x in auth)
except binascii.Error:
raise HTTPException(status_code=400, detail='Bad JWT: could not base64 decode')
try:
header = JWTHeader.model_validate_json(header)
except ValidationError:
raise HTTPException(status_code=400, detail='Bad JWT header')
try:
payload = JWTPayload.model_validate_json(payload)
except ValidationError:
raise HTTPException(status_code=400, detail='Bad JWT payload')
try:
signing_key, handle, pds = await get_signing_key_handle_pds(payload.iss)
except InvalidDIDException:
raise HTTPException(status_code=400, detail='Bad iss did')
signing_key = multibase.decode(signing_key)
codec, signing_key = multicodec.unwrap(signing_key)
if header.alg == 'ES256':
if codec.name != 'p256-pub':
raise HTTPException(status_code=400, detail="Bad JWT: signing key doesn't match JWT typ")
signing_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), signing_key)
else: # header.alg == 'ES256K':
if codec.name != 'secp256k1-pub':
raise HTTPException(status_code=400, detail="Bad JWT: signing key doesn't match JWT typ")
signing_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), signing_key)
to_verify = '.'.join(auth[:2]).encode()
try:
signing_key.verify(
signature=encode_dss_signature(
int.from_bytes(signature[:32], 'big'),
int.from_bytes(signature[32:], 'big')
),
data=to_verify,
signature_algorithm=ec.ECDSA(hashes.SHA256())
)
except InvalidSignature:
raise HTTPException(status_code=400, detail='Bad JWT signature')
try:
resolved_handle = await resolve_handle(handle)
verified_handle = resolved_handle == payload.iss
except CannotResolveHandleException:
verified_handle = False
avatar_url = None
display_name = None
if pds and verified_handle:
profile = await client.get(
f'{pds}/xrpc/com.atproto.repo.getRecord?repo={payload.iss}&collection=app.bsky.actor.profile&rkey=self'
)
profile = profile.json()
avatar_blob = profile.get('value', {}).get('avatar', {}).get('ref', {}).get('$link')
if avatar_blob:
avatar_url = f'{pds}/xrpc/com.atproto.sync.getBlob?did={payload.iss}&cid={avatar_blob}'
display_name = profile.get('value', {}).get('displayName')
if display_name:
username = f'{display_name} ({handle})'
elif verified_handle:
username = handle
else:
username = f'invalid handle: {handle} ({payload.iss})'
return User(did=payload.iss, username=username, avatar_url=avatar_url)
@app.get('/.well-known/did.json')
async def did(request: Request):
return JSONResponse(content={"@context":["https://www.w3.org/ns/did/v1"],"id":"did:web:feeds.goeo.lol","alsoKnownAs":[],"authentication":None,"verificationMethod":[],"service":[{"id":"#bsky_fg","type":"BskyFeedGenerator","serviceEndpoint":"https://feeds.goeo.lol"}]})
@app.get('/xrpc/app.bsky.feed.getFeedSkeleton')
async def create_report(request: Request, current_user: Annotated[User, Depends(get_current_user)], feed: Literal['at://did:web:genco.me/app.bsky.feed.generator/misses'], limit: Annotated[int, Field(ge=1, le=100, default=50)], cursor: str | None = None):
orig_cursor = str(cursor)
to_serve = []
while True:
res = await client.get('https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed', params={
"actor": current_user.did,
"limit": 100,
"filter": "posts_no_replies",
"cursor": cursor
})
res = res.json()
if len(res['feed']) == 0:
break
for post in res['feed']:
if post['post']['author']['did'] != current_user.did:
continue
if post['post']['likeCount'] == 0:
to_serve.append(post['post']['uri'])
cursor = post['post']['indexedAt']
if len(to_serve) >= limit:
break
if len(to_serve) >= limit:
break
cursor = res['feed'][-1]['post']['indexedAt']
res = {"feed": [{"post": post_uri} for post_uri in to_serve]}
if cursor:
res['cursor'] = cursor
return JSONResponse(content=res)
class CannotResolveHandleException(Exception):
pass
async def resolve_handle(handle):
res = await client.get(f'https://1.1.1.1/dns-query?name=_atproto.{handle}&type=TXT',
headers={'Accept': 'application/dns-json'})
res = res.json()
if 'Answer' in res and len(res['Answer']) > 0:
data = res['Answer'][0]['data']
assert data.startswith('"did=did:') and data.endswith('"')
return data[5:-1]
try:
res = await client.get(f'https://{handle}/.well-known/atproto-did')
except Exception as e:
print(e)
raise CannotResolveHandleException
assert res.text.startswith('did:')
return res.text.strip()
class InvalidDIDException(Exception):
pass
async def get_signing_key_handle_pds(did):
if did.startswith('did:plc:'):
doc = (await client.get(f'https://plc.directory/{did}')).json()
elif did.startswith('did:web:'):
doc = (await client.get(f'https://{did[8:]}/.well-known/did.json')).json()
else:
raise InvalidDIDException
handle = None
for aka in doc.get('alsoKnownAs', []):
if aka.startswith('at://'):
handle = aka[5:]
break
pds = None
for service in doc.get('service', []):
if service.get('id') == '#atproto_pds' and service.get('type') == 'AtprotoPersonalDataServer':
pds = service.get('serviceEndpoint')
break
signing_key = None
for verification_method in doc.get('verificationMethod', []):
if (
verification_method.get('id') == f'{did}#atproto' and
verification_method.get('type') == 'Multikey' and
verification_method.get('controller') == did
):
signing_key = verification_method.get('publicKeyMultibase')
break
if not signing_key:
raise InvalidDIDException
return signing_key, handle, pds
if __name__ == '__main__':
uvicorn.run(app='main:app', reload=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment