Skip to content

Instantly share code, notes, and snippets.

@YanSte
Created April 19, 2024 11:02
Show Gist options
  • Save YanSte/7be29bc93f21b010f64936fa334a185f to your computer and use it in GitHub Desktop.
Save YanSte/7be29bc93f21b010f64936fa334a185f to your computer and use it in GitHub Desktop.
Langchain FastAPI and Stream Response
# Fast API
```
@router.get("/myExample")
async def mySpecialAPI(
session_id: UUID,
input="Hello",
) -> StreamResponse:
# Note: Don't write await we need a coroutine
invoke = chain.ainvoke(..)
callback = MyCallback(..)
return StreamResponse(invoke, callback)
```
# Custom Stream Response
```
from __future__ import annotations
import asyncio
import typing
from typing import Any, AsyncIterable, Coroutine
from fastapi.responses import StreamingResponse as FastApiStreamingResponse
from starlette.background import BackgroundTask
class StreamResponse(FastApiStreamingResponse):
def __init__(
self,
invoke: Coroutine,
callback: MyCustomAsyncIteratorCallbackHandler,
status_code: int = 200,
headers: typing.Mapping[str, str] | None = None,
media_type: str | None = "text/event-stream",
background: BackgroundTask | None = None,
) -> None:
super().__init__(
content=StreamResponse.send_message(callback, invoke),
status_code=status_code,
headers=headers,
media_type=media_type,
background=background,
)
@staticmethod
async def send_message(
callback: AsyncIteratorCallbackHandler, invoke: Coroutine
) -> AsyncIterable[str]:
asyncio.create_task(invoke)
async for token in callback.aiter():
yield token
```
# My Custom Callbackhandler
```
from __future__ import annotations
import asyncio
from typing import Any, AsyncIterator, List
class MyCustomAsyncIteratorCallbackHandler(AsyncCallbackHandler):
"""Callback handler that returns an async iterator."""
# Note: Can be a BaseModel than str
queue: asyncio.Queue[Optional[str]]
# Pass your params as you want
def __init__(self) -> None:
self.queue = asyncio.Queue()
async def on_llm_new_token(
self,
token: str,
tags: List[str] | None = None,
**kwargs: Any,
) -> None:
self.queue.put_nowait(token)
async def on_llm_end(
self,
response: LLMResult,
tags: List[str] | None = None,
**kwargs: Any,
) -> None:
self.queue.put_nowait(None)
# Note: Ect.. for error
async def aiter(self) -> AsyncIterator[str]:
while True:
token = await self.queue.get()
if isinstance(token, str):
yield token # Note: or a BaseModel.model_dump_json() etc..
elif token is None:
self.queue.task_done()
break
```
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment