Last active
December 4, 2023 17:55
-
-
Save iCUE-Solutions/091001e87343d74d72fd34113bd987eb to your computer and use it in GitHub Desktop.
chromadb-sum
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Filename: /home/dirk/dev/iCUE/iCUE-SmartScribe/langchain_sum.py | |
Path: /home/dirk/dev/iCUE/iCUE-SmartScribe | |
Created Date: Friday, April 21st 2023, 2:20:37 pm | |
Author: Dirk Liebich | |
Copyright (c) 2023 iCUE Solutions GmbH | |
""" | |
import os | |
import glob | |
import streamlit as st | |
from langchain import OpenAI, PromptTemplate, LLMChain | |
# from langchain.text_splitter import clear | |
from langchain.chains.mapreduce import MapReduceChain | |
from langchain.prompts import PromptTemplate | |
from langchain.text_splitter import RecursiveCharacterTextSplitter | |
from langchain.text_splitter import SpacyTextSplitter | |
from langchain.chains.summarize import load_summarize_chain | |
from langchain.docstore.document import Document | |
from langchain.embeddings.openai import OpenAIEmbeddings | |
from langchain.vectorstores import Chroma | |
from langchain.document_loaders import DirectoryLoader | |
from langchain.llms import OpenAI | |
from langchain.chains import RetrievalQA | |
from langchain.prompts import PromptTemplate | |
# from chromadb.config import Settings | |
# client = Chroma.Client(Settings(anonymized_telemetry=False)) | |
# Set up the OpenAI API credentials | |
os.environ["OPENAI_API_KEY"] = "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" | |
llm = OpenAI(temperature=0) | |
davinci = OpenAI(model_name="text-davinci-003") | |
import os | |
import glob | |
def tag_txt_processed(directory): | |
os.chdir(directory) | |
for file in glob.glob("*.txt"): | |
new_name = file.replace(".txt", ".processed.txt") | |
os.rename(file, new_name) | |
def get_unprocessed_txt_files(directory): | |
txt_files = [] | |
for file_name in os.listdir(directory): | |
if file_name.endswith('.txt') and '.processed.txt' not in file_name: | |
txt_files.append(os.path.join(directory, file_name)) | |
print(f"text files: {txt_files}") | |
return txt_files | |
# util | |
def process_text_files(input_dir, output_dir="../summary"): | |
# Create output directory if it doesn't exist | |
if not os.path.exists(output_dir): | |
os.makedirs(output_dir) | |
# Get all text files from the input directory but not those with .processed.txt in the name already (tagged) | |
# text_files = glob.glob(os.path.join(input_dir, "*.txt")) | |
text_files = get_unprocessed_txt_files(input_dir) | |
print(f"input_dir: {text_files}") | |
st.write(text_files) | |
for text_file in text_files: | |
# Read the contents of the text file | |
with open(text_file, "r", encoding="utf-8") as file: | |
file_contents = file.read() | |
print(f"Processing:{text_file}") | |
# Process the file contents with the two given functions | |
summary = load_split_summarize(text_file) | |
keywords = load_split_create_keywords(text_file) | |
# tag the processed file, so we are not reusing again | |
tag_txt_processed(input_dir) | |
# Combine the output of the two functions | |
combined_output = f"SUMMARY\n\n{summary}\n\nKEYWORDS{keywords}" | |
st.write(combined_output) | |
# Write the combined output to a new text file in the output directory | |
output_file_name = ( | |
os.path.splitext(os.path.basename(text_file))[0] + "_summary_keywords.txt" | |
) | |
output_file_path = os.path.join(output_dir, output_file_name) | |
st.write(output_file_path) | |
with open(output_file_path, "w", encoding="utf-8") as output_file: | |
output_file.write(combined_output) | |
def load_split_summarize(file_in, chunks=3000): | |
# use spacy to split into 3000 chunks | |
text_splitter = SpacyTextSplitter(chunk_size=chunks) | |
# grab a text | |
with open(file_in) as f: | |
text = f.read() | |
texts = text_splitter.split_text(text) | |
docs = [Document(page_content=t) for t in texts[:]] | |
# here comes our prompt | |
prompt_template = """Extract all key information and write a 800 word concise summary of the following: | |
{text} | |
Start by stating the name of the company and the names of the individuals talking. | |
At the end of the summary mention positive and negative mentions and general topcis that were discussed more intense. List a least 3 but no more then 6 positive and negative mentions. | |
Make sure to use complete sentences and proper interpunction. | |
Always make full sentences, using proper interpunction. Do not start any sentence with a leading dot or blank.""" | |
PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"]) | |
chain = load_summarize_chain( | |
OpenAI(temperature=0), | |
chain_type="map_reduce", | |
return_intermediate_steps=False, | |
map_prompt=PROMPT, | |
combine_prompt=PROMPT, | |
) | |
res = chain({"input_documents": docs}, return_only_outputs=True) | |
return res["output_text"].replace("\n", "") | |
def load_split_create_keywords(file_in, chunks=3000): | |
# use spacy to split into 3000 chunks | |
text_splitter = SpacyTextSplitter(chunk_size=chunks) | |
# grab a text | |
with open(file_in) as f: | |
text = f.read() | |
texts = text_splitter.split_text(text) | |
docs = [Document(page_content=t) for t in texts[:]] | |
# | |
# BEGIN | |
# | |
prompt_template = """You are a sales manager who has just reviewed the transcript of the following sales call: | |
{text} | |
Analyze the transcript and provide the following information: | |
1. All speaker names, their positions and talking time percentages for each speaker. | |
2. Top 6 sentiments (3 positive, 3 negative) with context. | |
3. Top 10 keywords. | |
4. List all entities like dates, monetary amounts. | |
Format the output using complete sentences, proper punctuation, and Markdown language.""" | |
# | |
# END | |
# | |
PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"]) | |
chain = load_summarize_chain( | |
OpenAI(temperature=0), | |
chain_type="map_reduce", ## should try refine as well | |
return_intermediate_steps=False, | |
map_prompt=PROMPT, | |
combine_prompt=PROMPT, | |
) | |
res = chain({"input_documents": docs}, return_only_outputs=True) | |
return res["output_text"] | |
# | |
# use process_text_files("../trans", "../summary") | |
# output is in ../summary | |
# | |
def dir_loader(path, chunk_size=1000): | |
loader = DirectoryLoader(path, glob="**/*.txt") | |
documents = loader.load() | |
text_splitter = RecursiveCharacterTextSplitter( | |
chunk_size=chunk_size, | |
chunk_overlap=0, | |
length_function=len, | |
) | |
return text_splitter.split_documents(documents) | |
def prepare_prompt(): | |
prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer. | |
{context} | |
Question: {question} | |
Answer:""" | |
PROMPT = PromptTemplate( | |
template=prompt_template, input_variables=["context", "question"] | |
) | |
return PROMPT | |
def qa_doc(question, path="../transcript"): | |
PROMPT = prepare_prompt() | |
# for now only OpenAI | |
embeddings = OpenAIEmbeddings() | |
# check if we have a vectorstore | |
persist_directory="db" | |
if not os.path.exists(persist_directory): | |
print("Creating vectorstore...") | |
# read all content | |
text_chunk_list = dir_loader(path) | |
# create embeddings | |
docsearch = Chroma.from_documents(text_chunk_list, embeddings, persist_directory=persist_directory) | |
docsearch.persist() | |
docsearch._client_settings.anonymized_telemetry = False | |
else: | |
print("Loading vectorstore...") | |
docsearch = Chroma(persist_directory=persist_directory, embedding_function=embeddings) | |
# prepare retrieval --search_kwargs={"top_k": 6} | |
# https://towardsdatascience.com/4-ways-of-question-answering-in-langchain-188c6707cc5a | |
chain_type_kwargs = {"prompt": PROMPT} | |
qa = RetrievalQA.from_chain_type( | |
llm=OpenAI(), | |
chain_type="stuff", | |
retriever=docsearch.as_retriever(search_type="similarity", search_kwargs={"k":16}), | |
return_source_documents=True, | |
chain_type_kwargs=chain_type_kwargs, | |
) | |
# get answer | |
result = qa({"query": question}) | |
# print(result["result"]) | |
# # print sources | |
# for res in result["source_documents"]: | |
# print(f" Answers extracted from {res.metadata}") | |
return result["result"], result["source_documents"] | |
# a, s = qa_doc("List top 10 talking points.", "../transcript") | |
# print("") | |
# print(a) | |
# print("") | |
# for res in s: | |
# print(f" Answers extracted from {res.metadata}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment