-
-
Save goeo-/b985fe05945ba8c76555d2e9633c6294 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import binascii | |
import time | |
import datetime | |
import io | |
from base64 import urlsafe_b64decode | |
from typing import Annotated, Union, Literal | |
import httpx | |
import uvicorn | |
from fastapi import FastAPI, Depends, Request | |
from fastapi.responses import JSONResponse | |
from fastapi.exceptions import HTTPException | |
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
from fastapi.staticfiles import StaticFiles | |
from cryptography.exceptions import InvalidSignature | |
from cryptography.hazmat.primitives import hashes | |
from cryptography.hazmat.primitives.asymmetric import ec | |
from cryptography.hazmat.primitives.asymmetric.utils import encode_dss_signature | |
from multiformats import multibase, multicodec | |
from pydantic import BaseModel, Field, ValidationError | |
from pydantic.functional_validators import AfterValidator | |
app = FastAPI() | |
security = HTTPBearer() | |
client = httpx.AsyncClient() | |
DID = Annotated[str, Field(pattern=r'^did\:(?:plc\:[a-z2-7]{24}|web\:(?:[A-Za-z0-9\-]+\.)+[A-Za-z0-9\-]+)$')] | |
class User(BaseModel): | |
did: DID | |
username: str | |
avatar_url: Union[str, None] | |
class JWTHeader(BaseModel): | |
typ: Literal['JWT'] | |
alg: Literal['ES256', 'ES256K'] | |
def check_current(v: int) -> int: | |
assert v > time.time(), f'{v} is too old' | |
return v | |
class JWTPayload(BaseModel): | |
exp: Annotated[int, AfterValidator(check_current)] | |
aud: Literal['did:web:feeds.goeo.lol'] | |
iss: DID | |
async def get_current_user(bearer: Annotated[HTTPAuthorizationCredentials, Depends(security)]): | |
print(bearer.credentials) | |
auth = bearer.credentials.split('.') | |
if not len(auth) == 3: | |
raise HTTPException(status_code=400, detail='Bad Bearer token') | |
try: | |
header, payload, signature = (urlsafe_b64decode(x.ljust(len(x) + (len(x) % 4), '=')) for x in auth) | |
except binascii.Error: | |
raise HTTPException(status_code=400, detail='Bad JWT: could not base64 decode') | |
try: | |
header = JWTHeader.model_validate_json(header) | |
except ValidationError: | |
raise HTTPException(status_code=400, detail='Bad JWT header') | |
try: | |
payload = JWTPayload.model_validate_json(payload) | |
except ValidationError: | |
raise HTTPException(status_code=400, detail='Bad JWT payload') | |
try: | |
signing_key, handle, pds = await get_signing_key_handle_pds(payload.iss) | |
except InvalidDIDException: | |
raise HTTPException(status_code=400, detail='Bad iss did') | |
signing_key = multibase.decode(signing_key) | |
codec, signing_key = multicodec.unwrap(signing_key) | |
if header.alg == 'ES256': | |
if codec.name != 'p256-pub': | |
raise HTTPException(status_code=400, detail="Bad JWT: signing key doesn't match JWT typ") | |
signing_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256R1(), signing_key) | |
else: # header.alg == 'ES256K': | |
if codec.name != 'secp256k1-pub': | |
raise HTTPException(status_code=400, detail="Bad JWT: signing key doesn't match JWT typ") | |
signing_key = ec.EllipticCurvePublicKey.from_encoded_point(ec.SECP256K1(), signing_key) | |
to_verify = '.'.join(auth[:2]).encode() | |
try: | |
signing_key.verify( | |
signature=encode_dss_signature( | |
int.from_bytes(signature[:32], 'big'), | |
int.from_bytes(signature[32:], 'big') | |
), | |
data=to_verify, | |
signature_algorithm=ec.ECDSA(hashes.SHA256()) | |
) | |
except InvalidSignature: | |
raise HTTPException(status_code=400, detail='Bad JWT signature') | |
try: | |
resolved_handle = await resolve_handle(handle) | |
verified_handle = resolved_handle == payload.iss | |
except CannotResolveHandleException: | |
verified_handle = False | |
avatar_url = None | |
display_name = None | |
if pds and verified_handle: | |
profile = await client.get( | |
f'{pds}/xrpc/com.atproto.repo.getRecord?repo={payload.iss}&collection=app.bsky.actor.profile&rkey=self' | |
) | |
profile = profile.json() | |
avatar_blob = profile.get('value', {}).get('avatar', {}).get('ref', {}).get('$link') | |
if avatar_blob: | |
avatar_url = f'{pds}/xrpc/com.atproto.sync.getBlob?did={payload.iss}&cid={avatar_blob}' | |
display_name = profile.get('value', {}).get('displayName') | |
if display_name: | |
username = f'{display_name} ({handle})' | |
elif verified_handle: | |
username = handle | |
else: | |
username = f'invalid handle: {handle} ({payload.iss})' | |
return User(did=payload.iss, username=username, avatar_url=avatar_url) | |
@app.get('/.well-known/did.json') | |
async def did(request: Request): | |
return JSONResponse(content={"@context":["https://www.w3.org/ns/did/v1"],"id":"did:web:feeds.goeo.lol","alsoKnownAs":[],"authentication":None,"verificationMethod":[],"service":[{"id":"#bsky_fg","type":"BskyFeedGenerator","serviceEndpoint":"https://feeds.goeo.lol"}]}) | |
@app.get('/xrpc/app.bsky.feed.getFeedSkeleton') | |
async def create_report(request: Request, current_user: Annotated[User, Depends(get_current_user)], feed: Literal['at://did:web:genco.me/app.bsky.feed.generator/misses'], limit: Annotated[int, Field(ge=1, le=100, default=50)], cursor: str | None = None): | |
orig_cursor = str(cursor) | |
to_serve = [] | |
while True: | |
res = await client.get('https://public.api.bsky.app/xrpc/app.bsky.feed.getAuthorFeed', params={ | |
"actor": current_user.did, | |
"limit": 100, | |
"filter": "posts_no_replies", | |
"cursor": cursor | |
}) | |
res = res.json() | |
if len(res['feed']) == 0: | |
break | |
for post in res['feed']: | |
if post['post']['author']['did'] != current_user.did: | |
continue | |
if post['post']['likeCount'] == 0: | |
to_serve.append(post['post']['uri']) | |
cursor = post['post']['indexedAt'] | |
if len(to_serve) >= limit: | |
break | |
if len(to_serve) >= limit: | |
break | |
cursor = res['feed'][-1]['post']['indexedAt'] | |
res = {"feed": [{"post": post_uri} for post_uri in to_serve]} | |
if cursor: | |
res['cursor'] = cursor | |
return JSONResponse(content=res) | |
class CannotResolveHandleException(Exception): | |
pass | |
async def resolve_handle(handle): | |
res = await client.get(f'https://1.1.1.1/dns-query?name=_atproto.{handle}&type=TXT', | |
headers={'Accept': 'application/dns-json'}) | |
res = res.json() | |
if 'Answer' in res and len(res['Answer']) > 0: | |
data = res['Answer'][0]['data'] | |
assert data.startswith('"did=did:') and data.endswith('"') | |
return data[5:-1] | |
try: | |
res = await client.get(f'https://{handle}/.well-known/atproto-did') | |
except Exception as e: | |
print(e) | |
raise CannotResolveHandleException | |
assert res.text.startswith('did:') | |
return res.text.strip() | |
class InvalidDIDException(Exception): | |
pass | |
async def get_signing_key_handle_pds(did): | |
if did.startswith('did:plc:'): | |
doc = (await client.get(f'https://plc.directory/{did}')).json() | |
elif did.startswith('did:web:'): | |
doc = (await client.get(f'https://{did[8:]}/.well-known/did.json')).json() | |
else: | |
raise InvalidDIDException | |
handle = None | |
for aka in doc.get('alsoKnownAs', []): | |
if aka.startswith('at://'): | |
handle = aka[5:] | |
break | |
pds = None | |
for service in doc.get('service', []): | |
if service.get('id') == '#atproto_pds' and service.get('type') == 'AtprotoPersonalDataServer': | |
pds = service.get('serviceEndpoint') | |
break | |
signing_key = None | |
for verification_method in doc.get('verificationMethod', []): | |
if ( | |
verification_method.get('id') == f'{did}#atproto' and | |
verification_method.get('type') == 'Multikey' and | |
verification_method.get('controller') == did | |
): | |
signing_key = verification_method.get('publicKeyMultibase') | |
break | |
if not signing_key: | |
raise InvalidDIDException | |
return signing_key, handle, pds | |
if __name__ == '__main__': | |
uvicorn.run(app='main:app', reload=True) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment