Created
October 14, 2020 14:14
-
-
Save abdelhai/cfe51382c83efa35a3ea747467b0029b to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
from typing import Optional | |
from urllib.parse import urlencode | |
import httpx | |
from fastapi import Cookie, FastAPI | |
from fastapi.responses import HTMLResponse, RedirectResponse | |
# make sure to set these env vars | |
config = { | |
"client_id": os.getenv("GITHUB_CIENT_ID"), | |
"client_secret": os.getenv("GITHUB_CLIENT_SECRET"), | |
"allowed_usernames": os.getenv("ALLOWED_USERS", "").split(","), | |
} | |
app = FastAPI(debug=True) | |
def make_url(path): | |
# this function returns domain + path for APIs hosted on Deta | |
# You could remove it if hosting somewhere else. Or keep it, it's not going to affect your logic. | |
if "DETA_PATH" not in os.environ: | |
return path | |
return f"https://{os.getenv('DETA_PATH')}.deta.dev{path}" | |
async def get_user(token): | |
r = httpx.get( | |
"https://api.github.com/user", headers={"Authorization": f"token {token}"} | |
) | |
user = r.json() | |
return {"username": user["login"], "id": user["id"]} | |
def auth_view(): | |
url = "https://github.com/login/oauth/authorize" | |
params = { | |
"client_id": config["client_id"], | |
"redirect_uri": make_url("/auth"), | |
"scope": "user", | |
} | |
return HTMLResponse( | |
f""" | |
You are not logged in. Please <a href="{url}?{urlencode(params)}">login</a> | |
""" | |
) | |
def auth_fail_view(): | |
return HTMLResponse( | |
f""" | |
An error happened, see logs. | |
""" | |
) | |
def secret_view(username): | |
return HTMLResponse( | |
f""" | |
Hello, {username}!</br> | |
Die Katze ist aus dem Sack.</br> | |
<a href="/logout">Logout</a> | |
""" | |
) | |
def wrong_user_view(username): | |
return HTMLResponse( | |
f""" | |
Goodbye, {username}!</br> | |
You shall not pass. | |
""" | |
) | |
@app.get("/") | |
async def root(token: Optional[str] = Cookie(None)): | |
if token: | |
try: | |
username = (await get_user(token))["username"] | |
if username in config["allowed_usernames"]: | |
return secret_view(username) | |
return wrong_user_view(username) | |
except Exception as e: | |
print(e) | |
return auth_fail_view() | |
return auth_view() | |
@app.get("/auth") | |
def auth(code: str): | |
params = { | |
"client_id": config["client_id"], | |
"client_secret": config["client_secret"], | |
"code": code, | |
} | |
r = httpx.post( | |
"https://github.com/login/oauth/access_token", | |
data=params, | |
headers={"Accept": "application/json"}, | |
) | |
token_data = r.json() | |
resp = RedirectResponse(make_url("/")) | |
resp.set_cookie( | |
"token", | |
token_data.get("access_token"), | |
expires=token_data.get("expires_in"), | |
secure=True, | |
httponly=True, | |
) | |
return resp | |
@app.get("/logout") | |
def logout(): | |
resp = RedirectResponse(make_url("/")) | |
resp.delete_cookie("token") | |
return resp |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment