Skip to content

Instantly share code, notes, and snippets.

@commonism
Created August 30, 2023 06:32
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save commonism/2ea36e85b29bcfa4a53a7326b584989b to your computer and use it in GitHub Desktop.
Save commonism/2ea36e85b29bcfa4a53a7326b584989b to your computer and use it in GitHub Desktop.
pagination parameter caching
# https://github.com/Dorthu/openapi3/issues/110
import asyncio
import dataclasses
import random
import sys
import string
import uuid
if sys.version_info >= (3, 9):
from typing import List, Annotated
else:
from typing import List
from typing_extensions import Annotated
from pathlib import Path
import uvloop
from hypercorn.asyncio import serve
from hypercorn.config import Config
import pydantic
from fastapi import FastAPI, Request, Response, Query, UploadFile, Body
from fastapi.responses import PlainTextResponse
import pytest
import pytest_asyncio
import aiopenapi3
app = FastAPI(version="1.0.0", title="pagination test", servers=[{"url": "/", "description": "Default, relative server"}])
@pytest.fixture(scope="session")
def config(unused_tcp_port_factory):
c = Config()
c.bind = [f"localhost:{unused_tcp_port_factory()}"]
return c
@pytest_asyncio.fixture(scope="session")
async def server(event_loop, config):
policy = asyncio.get_event_loop_policy()
uvloop.install()
try:
sd = asyncio.Event()
task = event_loop.create_task(serve(app, config, shutdown_trigger=sd.wait))
yield config
finally:
sd.set()
await task
asyncio.set_event_loop_policy(policy)
@pytest_asyncio.fixture(scope="session")
async def client(event_loop, server):
api = await aiopenapi3.OpenAPI.load_async(f"http://{server.bind[0]}/openapi.json")
return api
class Content(pydantic.BaseModel):
data: str = pydantic.Field(default_factory=lambda: ''.join([random.choice(string.ascii_letters) for i in range(16)]))
class Page(pydantic.BaseModel):
next: str|None
items: List[Content]
@dataclasses.dataclass
class Session:
total: int
offset: int = 0
SESSIONS = {}
@app.get("/pagination", operation_id="pagination")
def pagination(request: Request, response: Response, next: str|None = Query(None), perpage: int=Query(12)) -> Page:
if next is None:
SESSIONS[s:=str(uuid.uuid4())] = (S:=Session(total=random.randint(3*perpage,7*perpage), offset=perpage))
else:
if (S:=SESSIONS.get(next, None)) is not None:
del SESSIONS[next]
if S.offset + perpage < S.total:
S.offset += perpage
SESSIONS[s:=str(uuid.uuid4())] = S
else:
perpage = S.total - S.offset
s = None
else:
raise ValueError("Not found next")
return Page(next=s, items=[Content() for i in range(perpage)])
@pytest.mark.asyncio
async def test_aiopenapi3_pagination(event_loop, server, client):
params = dict()
size = 0
while True:
page = await client._.pagination(parameters=params)
for i in page.items:
size += len(i.data)
if page.next is None:
break
params["next"] = page.next
@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread")
async def test_sync_pagination(event_loop, server):
client = await asyncio.to_thread(
aiopenapi3.OpenAPI.load_sync,
f"http://{server.bind[0]}/openapi.json",
)
params = dict()
size = 0
while True:
page = await asyncio.to_thread(client._.pagination, parameters=params)
for i in page.items:
size += len(i.data)
if page.next is None:
break
params["next"] = page.next
@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread")
async def test_openapi3_pagination_to_thread(event_loop, server):
import openapi3
import requests
data = await asyncio.to_thread(requests.get, f"http://{server.bind[0]}/openapi.json")
data = data.json()
data["servers"][0]["url"] = f"http://{server.bind[0]}"
client = openapi3.OpenAPI(data)
params = dict()
size = 0
while True:
page = await asyncio.to_thread(client.call_pagination, parameters=params)
for i in page.items:
size += len(i.data)
if page.next is None:
break
params["next"] = page.next
@pytest.mark.asyncio
@pytest.mark.skipif(sys.version_info < (3, 9), reason="requires asyncio.to_thread")
async def test_openapi3_pagination_async(event_loop, server):
import openapi3
import requests
data = await asyncio.to_thread(requests.get, f"http://{server.bind[0]}/openapi.json")
data = data.json()
data["servers"][0]["url"] = f"http://{server.bind[0]}"
client = openapi3.OpenAPI(data)
params = dict()
size = 0
while True:
page = await client.call_pagination(parameters=params)
for i in page.items:
size += len(i.data)
if page.next is None:
break
params["next"] = page.next
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment