Last active
December 18, 2023 09:14
-
-
Save erny/2569d6555bd9349e2afc110a63ffca1a to your computer and use it in GitHub Desktop.
FastAPI Simple content negotiation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
"""Simple content negotiation | |
Usage: | |
Example 1: | |
from renderers import render | |
@router.post("/myapp/items/") | |
async def api_login(item: Item, accept: Optional[str] = Header(default='application/jwt')): | |
... | |
stored_item = store.save(item) | |
return render(stored_item, accept, status_code=201) | |
Example 2: | |
from renderers import JSONRenderer, XMLRenderer, PlainTextRenderer, render | |
class MyItemRenderer(PlainTextRenderer): | |
def render( | |
self, value: Any, status_code: int = 200, | |
headers: Optional[Dict[str, str]] = None, media_type: Optional[str] = None, | |
): | |
try: | |
value = "\n".join([f"{fld}: {getattr(value, fld)}" for fld in value.__fields__]) | |
except AttributeError: | |
value = str(value) | |
return super().render(value, status_code=status_code, headers=headers, media_type=media_type) | |
@router.post("/myapp/items/") | |
async def api_login(item: Item, accept: Optional[str] = Header(default='application/jwt')): | |
... | |
stored_item = store.save(item) | |
return render( | |
stored_item, accept, status_code=201, | |
renderers=[JSONRenderer, XMLRenderer, MyItemRenderer]) | |
""" | |
from typing import Any, ClassVar, Dict, List, Optional, Tuple, Type | |
import json | |
from fastapi.responses import JSONResponse, PlainTextResponse | |
class Renderer: | |
media_types: ClassVar[Tuple[str, ...]] = ('application/jwt', 'text/plain') | |
def render( | |
self, value: Any, status_code: int = 200, | |
headers: Optional[Dict[str, str]] = None, media_type: Optional[str] = None, | |
): | |
if not isinstance(value, str): | |
value = str(value) | |
return PlainTextResponse(value, status_code=status_code, headers=headers, media_type=media_type) | |
PlainTextRenderer = Renderer | |
class JSONRenderer(Renderer): | |
media_types = ('application/json', ) | |
def render( | |
self, value: Any, status_code: int = 200, | |
headers: Optional[Dict[str, str]] = None, media_type: Optional[str] = None, | |
): | |
return JSONResponse(value, status_code=status_code, headers=headers, media_type=media_type) | |
class XMLRenderer(Renderer): | |
media_types = ('application/xml', 'text/xml') | |
def render( | |
self, value: Any, status_code: int = 200, | |
headers: Optional[Dict[str, str]] = None, media_type: Optional[str] = None, | |
): | |
try: | |
value = value.json() | |
except AttributeError: | |
value = json.dumps(value) | |
import json2xml | |
value = json2xml.Json2xml(value).to_xml() | |
return PlainTextResponse(value, status_code=status_code, headers=headers, media_type=media_type) | |
def render( | |
value: Any, | |
accept: Optional[str], | |
status_code: Optional[int], | |
headers: Optional[Dict[str, str]], | |
renderers: Optional[List[Type]] = None, | |
): | |
"""Render response taking into accout the requested media type in 'accept'""" | |
renderers = renderers or [JSONRenderer, PlainTextRenderer, XMLRenderer] | |
if accept: | |
for media_type in accept.split(','): | |
media_type = media_type.split(';')[0].strip() | |
for renderer in renderers: | |
if media_type in renderer.media_types: | |
return renderer().render(value, status_code=status_code, headers=headers, media_type=media_type) | |
renderer = renderers[0] | |
media_type = renderer.media_types[0] | |
return renderer.render(value, status_code=status_code, headers=headers, media_type=media_type) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment