Skip to content

Instantly share code, notes, and snippets.

@oneryalcin
Last active March 22, 2024 14:36
Show Gist options
  • Star 13 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save oneryalcin/2921408da70266aa61f9c40cb2973865 to your computer and use it in GitHub Desktop.
Save oneryalcin/2921408da70266aa61f9c40cb2973865 to your computer and use it in GitHub Desktop.
Server Side Events (SSE) with FastAPi and (partially) Langchain
# I couldn't get return generators from chains so I had to do a bit of low level SSE, Hope this is useful
# Probably you'll use another Vector Store instead of OpenSearch, but if you want to mimic what I did here,
# please use the fork of `OpenSearchVectorSearch` in https://github.com/oneryalcin/langchain
import json
import os
import logging
from typing import List, Generator
import requests
import sseclient
from fastapi import FastAPI
from fastapi.responses import StreamingResponse
from langchain.vectorstores import OpenSearchVectorSearch
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.text_splitter import Document
OPENSEARCH_URL = '<VECTOR STORE URL>'
OPENSEARCH_INDEX = '<VECTOR STORE INDEX NAME>'
OPENAI_API_KEY = '<OPENAI KEY>'
OPENAI_COMPLETIONS_V1_URL = "https://api.openai.com/v1/completions"
app = FastAPI(
title='Streaming Langchain with Vectorsearch',
description='Example streaming API endpoint for VectorSearch and OpenAI',
version='0.0.1',
)
@app.on_event("startup")
async def startup_event():
global docsearch
logger.info("Loading vector store")
docsearch = OpenSearchVectorSearch(
opensearch_url=OPENSEARCH_URL,
index_name=OPENSEARCH_INDEX,
embedding_function=OpenAIEmbeddings()
)
def streaming_request(prompt: str, model: str = 'text-davinci-003'):
"""Generator for each chunk received from OpenAI as response
:param prompt: User Prompt
:param model: OpenAI Model name
:return: generator object for streaming response from OpenAI
"""
req_headers = {
'Accept': 'text/event-stream',
'Authorization': 'Bearer ' + os.getenv('OPENAI_API_KEY')
}
req_body = {
'model': model,
'prompt': prompt,
'max_tokens': 400,
'temperature': 0,
'stream': True,
}
request = requests.post(url=OPENAI_COMPLETIONS_V1_URL, stream=True, headers=req_headers, json=req_body)
client = sseclient.SSEClient(request)
for event in client.events():
if event.data != '[DONE]':
text = json.loads(event.data)['choices'][0]['text']
yield json.dumps({'data': text})
else:
yield '[DONE]'
def gen_prompt(docs: List[Document], query: str) -> str:
"""Build question prompt
:param docs: Documents returned from the Vector search
:param query: User question
:return: generator object for streaming response from OpenAI
"""
return f"""To answer the question please only use the Context given, nothing else. Do not make up answer,
simply say 'I don't know' if you are not sure.
Question: {query}
Context: {[doc.page_content for doc in docs]}
Answer:
"""
@app.post('/streaming/ask')
async def main(query: str) -> StreamingResponse:
"""Streaming API, this endpoint uses Server Side Events
:param query: User question
:return: Streaming Response chunks from OpenAI
"""
docs = docsearch.similarity_search(query, k=4)
prompt = gen_prompt(docs, query)
return StreamingResponse(streaming_request(prompt), media_type="application/json")
if __name__ == '__main__':
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000, log_level="debug")
@oneryalcin
Copy link
Author

Check streaming with CURL or postman:

curl -X POST --no-buffer http://127.0.0.1:8000/streaming/ask\?query\=what%20is%20Apple%20doing%3F

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment