Created June 27, 2024 22:41
export CLIENT_ID=
export SECRET_KEY=''
export GITHUB_PAT=''
import os
import datetime # to calculate expiration of the JWT
from fastapi import FastAPI, Depends, HTTPException, Security, Request
from fastapi.responses import RedirectResponse
from import APIKeyCookie # this is the part that puts the lock icon to the docs
from fastapi_sso.sso.github import GithubSSO
from fastapi_sso.sso.base import OpenID
import httpx
from github import Github
from jose import jwt # pip install python-jose[cryptography]
SECRET_KEY = os.environ["SECRET_KEY"]
CLIENT_ID = os.environ["CLIENT_ID"]
sso = GithubSSO(
# This is a personal access token (classic) with the `read:org` scope
PAT = os.environ["GITHUB_PAT"]
ORG = "simonsobs"
app = FastAPI()
async def optional_get_logged_user(dataset_id: int, request: Request):
if dataset_id == 0:
return None
return await get_logged_user(request)
async def get_logged_user(request:Request) -> OpenID:
"""Get user's JWT stored in cookie 'token', parse it and return the user's OpenID."""
cookie = await APIKeyCookie(name="token")(request)
claims = jwt.decode(cookie, key=SECRET_KEY, algorithms=["HS256"])
return OpenID(**claims["pld"])
#except Exception as error:
# raise HTTPException(status_code=401, detail="Invalid authentication credentials") from error
def check_user_in_org(github_username):
g = Github(PAT)
org = g.get_organization(ORG)
user = g.get_user(github_username)
return org.has_in_members(user)
async def protected_endpoint(dataset_id: int, request:Request, user: OpenID = Depends(optional_get_logged_user)):
"""This endpoint will say hello to the logged user.
If the user is not logged, it will return a 401 error from `get_logged_user`."""
if dataset_id == 0:
return {"message": f"This should be public and not require authentication."}
not_member = "" if check_user_in_org(user.display_name) else "not "
return {
"message": f"You are very welcome, {}! You are {not_member}a member of the {ORG} organization."
async def login():
"""Redirect the user to the Github login page."""
with sso:
return await sso.get_login_redirect()
async def logout():
"""Forget the user's session."""
response = RedirectResponse(url="/protected")
return response
async def login_callback(request: Request):
"""Process login and redirect the user to the protected endpoint."""
with sso:
openid = await sso.verify_and_process(request)
if not openid:
raise HTTPException(status_code=401, detail="Authentication failed")
# Create a JWT with the user's OpenID
expiration = + datetime.timedelta(days=1)
token = jwt.encode({"pld": openid.dict(), "exp": expiration, "sub":}, key=SECRET_KEY, algorithm="HS256")
response = RedirectResponse(url="/protected/1")
key="token", value=token, expires=expiration
) # This cookie will make sure /protected knows the user
return response
if __name__ == "__main__":
import uvicorn, host="", port=5000)
