Skip to content

Instantly share code, notes, and snippets.

@Cdaprod
Forked from hwchase17/runnable-astream.py
Created December 5, 2023 15:35
Show Gist options
  • Save Cdaprod/da5a3dd75d1173f9bc210ae8ff19c45c to your computer and use it in GitHub Desktop.
Save Cdaprod/da5a3dd75d1173f9bc210ae8ff19c45c to your computer and use it in GitHub Desktop.
from langchain.prompts import ChatPromptTemplate
from langchain.chat_models import ChatOpenAI
from langchain.schema.output_parser import StrOutputParser
from langchain.vectorstores import Chroma
from langchain.embeddings import OpenAIEmbeddings
from langchain.schema.runnable import RunnablePassthrough
from langchain.schema.runnable import RunnableMap
from langchain.schema import format_document
from typing import AsyncGenerator
# Create the retriever
vectorstore = Chroma.from_texts(["harrison worked at kensho"], embedding=OpenAIEmbeddings())
retriever = vectorstore.as_retriever()
from langchain.prompts.prompt import PromptTemplate
_template = """Given the following conversation and a follow up question, rephrase the follow up question to be a standalone question, in its original language.
Chat History:
{chat_history}
Follow Up Input: {question}
Standalone question:"""
CONDENSE_QUESTION_PROMPT = PromptTemplate.from_template(_template)
template = """Answer the question based only on the following context:
{context}
Question: {question}
"""
ANSWER_PROMPT = ChatPromptTemplate.from_template(template)
DEFAULT_DOCUMENT_PROMPT = PromptTemplate.from_template(template="{page_content}")
def _combine_documents(docs, document_prompt = DEFAULT_DOCUMENT_PROMPT, document_separator="\n\n"):
doc_strings = [format_document(doc, document_prompt) for doc in docs]
return document_separator.join(doc_strings)
from typing import Tuple, List
def _format_chat_history(chat_history: List[Tuple]) -> str:
buffer = ""
for dialogue_turn in chat_history:
human = "Human: " + dialogue_turn[0]
ai = "Assistant: " + dialogue_turn[1]
buffer += "\n" + "\n".join([human, ai])
return buffer
from langchain.memory import ConversationBufferMemory
memory = ConversationBufferMemory(return_messages=True, output_key="answer", input_key="question")
standalone_question_chain = {
"question": lambda x: x["question"],
"chat_history": lambda x: _format_chat_history(x['chat_history'])
} | CONDENSE_QUESTION_PROMPT | ChatOpenAI(temperature=0) | StrOutputParser()
response_chain = ANSWER_PROMPT | ChatOpenAI()
import asyncio
async def generate_response(message: str) -> AsyncGenerator[str, None]:
_memory = memory.load_memory_variables({})
standalone_question = await standalone_question_chain.ainvoke({
"question": message,
"chat_history": _format_chat_history(_memory["history"])
})
retrieved_docs = await retriever.ainvoke(standalone_question)
final_response = ""
async for m in response_chain.astream({
"question": standalone_question,
"context": _combine_documents(retrieved_docs)
}):
final_response += m.content
yield m.content
# Need to save memory explicitly
memory.save_context({"question": message}, {"answer": final_response})
import nest_asyncio
nest_asyncio.apply()
async def run_async(gen):
return [item async for item in gen]
async def main():
responses = await run_async(generate_response("where did harrison work"))
for response in responses:
print(response)
await main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment