Skip to content

Instantly share code, notes, and snippets.

@tamtam-fitness
Last active August 27, 2023 10:02
Show Gist options
  • Save tamtam-fitness/feecd2c8cd761a4f74b304d682bf2660 to your computer and use it in GitHub Desktop.
Save tamtam-fitness/feecd2c8cd761a4f74b304d682bf2660 to your computer and use it in GitHub Desktop.
FastAPIを用いたSSEの実装のサンプル(OPENAIによる回答させる機能付き)
import os
import openai
from fastapi import FastAPI
from sse_starlette.sse import EventSourceResponse
from pydantic import BaseModel
import uvicorn
openai.api_key = os.getenv("OPENAI_API_KEY")
class AskRequest(BaseModel):
query: str
async def ask_llm_stream(query: str):
# LangChainのstreamはコールバック周りが複雑な印象なので一旦openaiをそのまま使う
response = openai.ChatCompletion.create(
model="gpt-3.5-turbo",
stream=True, # SSEを使うための設定
messages=[
{
"role": "user",
"content": f"{query}"
}
],
)
for item in response:
try:
content = item['choices'][0]['delta']['content']
except:
content = ""
# dict型で返すことでよしなに変換してくれる
yield {"data": content}
yield {"data": "[DONE]"}
app = FastAPI()
# よしなにgetにしてqueryに渡すようにするでも可
@app.post("/streaming/ask")
async def ask_stream(ask_req: AskRequest) -> EventSourceResponse:
# イテラブルオブジェクトを引数に渡す
return EventSourceResponse(ask_llm_stream(ask_req.query))
if __name__ == "__main__":
uvicorn.run(app, host="0.0.0.0", port=8080)
@tamtam-fitness
Copy link
Author

tamtam-fitness commented Aug 26, 2023

curl -N -X POST  \
 -H "Content-Type: application/json" \
 -d '{"query":"FastAPIとは何ですか?"}' \
 http://localhost:8080/streaming/ask

@tamtam-fitness
Copy link
Author

tamtam-fitness commented Aug 26, 2023

import sseclient
import requests

url = 'http://0.0.0.0:8080/streaming/ask'
query = {"query": "FastAPIとは何ですか?"}
headers = {'Accept': 'text/event-stream'}
# https://github.com/mpetazzoni/sseclient
response = requests.post(url, stream=True, headers=headers, json=query)
client = sseclient.SSEClient(response)

for event in client.events():
    print(event.data)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment