Skip to content

Instantly share code, notes, and snippets.

@rkhullar
Last active August 1, 2023 01:37
Show Gist options
  • Save rkhullar/5f47b00b9d90edc3ae81702246d93dc7 to your computer and use it in GitHub Desktop.
Save rkhullar/5f47b00b9d90edc3ae81702246d93dc7 to your computer and use it in GitHub Desktop.
tutorials/fastapi-aws-mongodb-okta
# .envrc
source_up
source_env local.env
source_env local-dev.env
# api/config.py
import os
from pydantic_settings import BaseSettings
class ProjectSettings(BaseSettings):
environment: str = os.environ['ENVIRONMENT']
reload_fastapi: bool = 'RELOAD_FASTAPI' in os.environ
class NetworkSettings(BaseSettings):
service_host: str = os.getenv('SERVICE_HOST', 'localhost')
service_port: int = int(os.getenv('SERVICE_PORT', '8000'))
class OktaSettings(BaseSettings):
okta_host: str = os.environ['OKTA_HOST']
okta_client_id: str = os.environ['OKTA_CLIENT_ID']
class MongoSettings(BaseSettings):
atlas_host: str = os.environ['ATLAS_HOST']
class Settings(ProjectSettings, NetworkSettings, OktaSettings, MongoSettings):
pass
# api/depends.py
import os
from typing import Annotated
import httpx
from fastapi import Depends
from .schema.user import OktaUser, User
from .util.okta_flow import OktaAuthCodeBearer
okta_host = os.environ['OKTA_HOST']
auth_scheme = OktaAuthCodeBearer(domain=okta_host)
httpx_client = httpx.AsyncClient()
async def get_user(access_token: str = Depends(auth_scheme)) -> OktaUser:
response = await httpx_client.get(auth_scheme.metadata['userinfo_endpoint'], headers={'Authorization': f'Bearer {access_token}'})
response.raise_for_status()
identity_token_data = response.json()
return OktaUser.from_token_data(identity_token_data)
GetUser = Annotated[User, Depends(get_user)]
# api/depends.py
from typing import Annotated
from fastapi import Depends, Request
from pymongo import MongoClient
from pymongo.database import Collection
def atlas(name: str, database: str = 'default'):
def dependency(request: Request) -> Collection:
mongo_client: MongoClient = request.app.extra['atlas']
return mongo_client.get_database(database).get_collection(name)
return Annotated[Collection, Depends(dependency)]
# api/factory.py
from fastapi import FastAPI
from .config import Settings
from .router import router as api_router
def create_app(settings: Settings) -> FastAPI:
app = FastAPI(
settings=settings,
swagger_ui_init_oauth={
'clientId': settings.okta_client_id,
'usePkceWithAuthorizationCodeGrant': True,
'scopes': ' '.join(['openid', 'profile', 'email'])
}
)
app.include_router(api_router)
return app
# api/factory.py
from fastapi import FastAPI
from pymongo import MongoClient
from .config import Settings
from .router import router as api_router
def build_atlas_client(atlas_host: str) -> MongoClient:
mongo_client_url = f'mongodb+srv://{atlas_host}/?authSource=%24external&authMechanism=MONGODB-AWS&retryWrites=true&w=majority'
return MongoClient(mongo_client_url, connect=True)
def create_app(settings: Settings) -> FastAPI:
app = FastAPI(
settings=settings,
swagger_ui_init_oauth={
'clientId': settings.okta_client_id,
'usePkceWithAuthorizationCodeGrant': True,
'scopes': ' '.join(['openid', 'profile', 'email'])
}
)
app.include_router(api_router)
app.extra['atlas'] = build_atlas_client(atlas_host=settings.atlas_host)
return app
#!/usr/bin/env sh
set -x
here=$(dirname "$(realpath "$0")")
cd "${here}" || exit
rm -rf local/build local/dist
mkdir -p local/build local/dist
cp ./*.py local/build
cp -r ./api local/build
rm -rf local/build/server.py
find local/build -type d -name '__pycache__' -exec rm -rf {} \;
find local/build -type f -name 'test_*.py' -exec rm -rf {} \;
cd local/build && zip -r9 ../dist/package.zip ./*
cd "${here}" || exit
rm -rf local/build
from fastapi import FastAPI
from mangum import Mangum
from api.config import Settings
from api.factory import create_app
settings: Settings = Settings()
app: FastAPI = create_app(settings)
lambda_handler: Mangum = Mangum(app)
#!/usr/bin/env sh
set -x
here=$(dirname "$(realpath "$0")")
build_path="${here}/build"
build_python_path="${build_path}/python"
dist_path="${here}/dist"
cd "${here}" || exit
pip install pipenv
pipenv lock
pipenv requirements > requirements.txt
mkdir -p "${build_python_path}" "${dist_path}"
pip install -r requirements.txt -t build/python
cd "${build_python_path}" || exit
rm -rf ./*.dist-info ./*.egg-info
find . | grep __pycache__ | xargs rm -rf
cd "${build_path}" || exit
zip -r9 ../dist/package.zip ./*
version: '3'
services:
builder:
build: .
platform: linux/arm64
volumes:
- ./out:/var/task/out:rw
entrypoint: ["/bin/sh", "-c"]
command: ["cp dist/*.zip out/"]
[[source]]
url = "https://pypi.org/simple"
verify_ssl = true
name = "pypi"
[packages]
fastapi = "~=0.100.0"
httpx = "~=0.24.0"
mangum = "~=0.17.0"
pydantic-settings = "~=2.0.0"
pymongo = {version = "~=4.4.0", extras = ["aws", "srv"]}
[requires]
python_version = "3.11"
ARG PYTHON_VERSION=3.11
FROM public.ecr.aws/sam/build-python${PYTHON_VERSION}
COPY Pipfile build.sh ./
RUN ./build.sh
ENTRYPOINT ["/bin/sh"]
# local-dev.env
export ENVIRONMENT=dev
export ATLAS_HOST=REDACTED
# local.env
export RELOAD_FASTAPI=1
export OKTA_HOST=REDACTED
export OKTA_CLIENT_ID=REDACTED
# api/model/document.py
from __future__ import annotations
from typing import TypeVar
from pydantic import BaseModel
from .object_id import PydanticObjectId
class Document(BaseModel):
id: PydanticObjectId | str
@classmethod
def from_pymongo(cls, doc: dict) -> DocumentType:
_id = doc.pop('_id')
return cls(id=_id, **doc)
DocumentType = TypeVar('DocumentType', bound=Document)
# api/model/message.py
import datetime as dt
from pydantic import Field
from .document import Document
class Message(Document):
created: dt.datetime = Field(default_factory=dt.datetime.utcnow)
user_id: str
message: str
# api/model/object_id.py
from typing import Annotated
from bson.objectid import ObjectId as BsonObjectId
from pydantic import GetPydanticSchema, PlainSerializer, WithJsonSchema
from pydantic_core import core_schema
PydanticObjectId = Annotated[
BsonObjectId,
GetPydanticSchema(lambda source_type, handler: core_schema.is_instance_schema(BsonObjectId)),
PlainSerializer(lambda data: str(data), return_type=str),
WithJsonSchema({'type': 'string', 'example': 'ObjectId()'})
]
# api/util/okta_flow.py
from functools import cached_property
import httpx
from fastapi.security import OAuth2AuthorizationCodeBearer
class OktaAuthCodeBearer(OAuth2AuthorizationCodeBearer):
def __init__(self, domain: str, issuer: str = 'default'):
self.domain = domain
self.issuer = issuer
scopes = ['openid', 'email', 'profile']
super().__init__(
authorizationUrl=self.metadata['authorization_endpoint'],
tokenUrl=self.metadata['token_endpoint'],
scopes={scope: scope for scope in scopes}
)
@property
def metadata_url(self) -> str:
return f'https://{self.domain}/oauth2/{self.issuer}/.well-known/openid-configuration'
@cached_property
def metadata(self) -> dict:
response = httpx.get(self.metadata_url)
response.raise_for_status()
return response.json()
# api/routes/message.py
import pymongo
from fastapi import APIRouter
from ..depends import GetUser, atlas
from ..schema.message import CreateMessage
router = APIRouter()
MessageAdapter = atlas(name='message')
@router.get('')
async def list_messages(user: GetUser, collection: MessageAdapter):
cursor = collection.find({'user_id': user.id}).sort('created', pymongo.DESCENDING)
def process(doc: dict) -> dict:
_id = str(doc.pop('_id'))
return dict(id=_id, **doc)
return [process(doc) for doc in cursor]
@router.post('')
async def create_message(user: GetUser, collection: MessageAdapter, create_object: CreateMessage):
to_insert = create_object.to_pymongo(user_id=user.id)
response = collection.insert_one(to_insert)
return {
'id': str(response.inserted_id),
'created': to_insert['created'],
'user_id': to_insert['user_id'],
'message': create_object.message
}
# api/routes/message.py
import pymongo
from fastapi import APIRouter
from ..depends import GetUser, atlas
from ..model.message import Message
from ..schema.crud import CreateResponse, ListResponse
from ..schema.message import CreateMessage
router = APIRouter()
MessageAdapter = atlas(name='message')
@router.get('', response_model=ListResponse[Message])
async def list_messages(user: GetUser, collection: MessageAdapter):
cursor = collection.find({'user_id': user.id}).sort('created', pymongo.DESCENDING)
return ListResponse(data=[Message.from_pymongo(doc) for doc in cursor])
@router.post('', response_model=CreateResponse[Message])
async def create_message(user: GetUser, collection: MessageAdapter, create_object: CreateMessage):
to_insert = create_object.to_pymongo(user_id=user.id)
response = collection.insert_one(to_insert)
return Message(
id=response.inserted_id,
created=to_insert['created'],
user_id=to_insert['user_id'],
message=create_object.message
)
# api/router.py
from fastapi import APIRouter
router = APIRouter()
@router.get('/hello-world')
async def hello_world():
return dict(message='hello world')
# api/router.py
from fastapi import APIRouter
from .depends import GetUser
router = APIRouter()
@router.get('/hello-world')
async def hello_world(user: GetUser):
return dict(message=f'hello {user.name}')
# api/router.py
from fastapi import APIRouter
from .routes import message
router = APIRouter()
router.include_router(message.router, prefix='/message', tags=['message'])
from typing import Generic
from pydantic import BaseModel, RootModel
from ..model.document import DocumentType
class CreateResponse(RootModel[DocumentType], Generic[DocumentType]):
root: DocumentType
class ListResponse(BaseModel, Generic[DocumentType]):
data: list[DocumentType]
# api/schema/message.py
import datetime as dt
from pydantic import BaseModel, constr
class CreateMessage(BaseModel):
message: constr(strip_whitespace=False, min_length=3)
def to_pymongo(self, user_id: str) -> dict:
return dict(created=dt.datetime.utcnow(), user_id=user_id, message=self.message)
# api/schema/user.py
from pydantic import BaseModel
class User(BaseModel):
id: str
name: str
email: str
class OktaUser(User):
@classmethod
def from_token_data(cls, data: dict) -> 'OktaUser':
return cls(id=data['sub'], name=data['name'], email=data['email'])
# server.py
from api.config import Settings
from api.factory import create_app
from fastapi import FastAPI
settings: Settings = Settings()
app: FastAPI = create_app(settings)
if __name__ == '__main__':
import uvicorn
uvicorn.run('server:app', host=settings.service_host, port=settings.service_port, reload=settings.reload_fastapi)
python -m venv venv
. venv/bin/activate
pip install -U pip setuptools
pip install pipenv
pipenv install fastapi 'pymongo[aws,srv]' httpx mangum pydantic-settings
pipenv install --dev uvicorn
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment