Skip to content

Instantly share code, notes, and snippets.

@iCUE-Solutions
Last active December 4, 2023 17:55
Show Gist options
  • Save iCUE-Solutions/091001e87343d74d72fd34113bd987eb to your computer and use it in GitHub Desktop.
Save iCUE-Solutions/091001e87343d74d72fd34113bd987eb to your computer and use it in GitHub Desktop.
chromadb-sum
"""
Filename: /home/dirk/dev/iCUE/iCUE-SmartScribe/langchain_sum.py
Path: /home/dirk/dev/iCUE/iCUE-SmartScribe
Created Date: Friday, April 21st 2023, 2:20:37 pm
Author: Dirk Liebich
Copyright (c) 2023 iCUE Solutions GmbH
"""
import os
import glob
import streamlit as st
from langchain import OpenAI, PromptTemplate, LLMChain
# from langchain.text_splitter import clear
from langchain.chains.mapreduce import MapReduceChain
from langchain.prompts import PromptTemplate
from langchain.text_splitter import RecursiveCharacterTextSplitter
from langchain.text_splitter import SpacyTextSplitter
from langchain.chains.summarize import load_summarize_chain
from langchain.docstore.document import Document
from langchain.embeddings.openai import OpenAIEmbeddings
from langchain.vectorstores import Chroma
from langchain.document_loaders import DirectoryLoader
from langchain.llms import OpenAI
from langchain.chains import RetrievalQA
from langchain.prompts import PromptTemplate
# from chromadb.config import Settings
# client = Chroma.Client(Settings(anonymized_telemetry=False))
# Set up the OpenAI API credentials
os.environ["OPENAI_API_KEY"] = "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"
llm = OpenAI(temperature=0)
davinci = OpenAI(model_name="text-davinci-003")
import os
import glob
def tag_txt_processed(directory):
os.chdir(directory)
for file in glob.glob("*.txt"):
new_name = file.replace(".txt", ".processed.txt")
os.rename(file, new_name)
def get_unprocessed_txt_files(directory):
txt_files = []
for file_name in os.listdir(directory):
if file_name.endswith('.txt') and '.processed.txt' not in file_name:
txt_files.append(os.path.join(directory, file_name))
print(f"text files: {txt_files}")
return txt_files
# util
def process_text_files(input_dir, output_dir="../summary"):
# Create output directory if it doesn't exist
if not os.path.exists(output_dir):
os.makedirs(output_dir)
# Get all text files from the input directory but not those with .processed.txt in the name already (tagged)
# text_files = glob.glob(os.path.join(input_dir, "*.txt"))
text_files = get_unprocessed_txt_files(input_dir)
print(f"input_dir: {text_files}")
st.write(text_files)
for text_file in text_files:
# Read the contents of the text file
with open(text_file, "r", encoding="utf-8") as file:
file_contents = file.read()
print(f"Processing:{text_file}")
# Process the file contents with the two given functions
summary = load_split_summarize(text_file)
keywords = load_split_create_keywords(text_file)
# tag the processed file, so we are not reusing again
tag_txt_processed(input_dir)
# Combine the output of the two functions
combined_output = f"SUMMARY\n\n{summary}\n\nKEYWORDS{keywords}"
st.write(combined_output)
# Write the combined output to a new text file in the output directory
output_file_name = (
os.path.splitext(os.path.basename(text_file))[0] + "_summary_keywords.txt"
)
output_file_path = os.path.join(output_dir, output_file_name)
st.write(output_file_path)
with open(output_file_path, "w", encoding="utf-8") as output_file:
output_file.write(combined_output)
def load_split_summarize(file_in, chunks=3000):
# use spacy to split into 3000 chunks
text_splitter = SpacyTextSplitter(chunk_size=chunks)
# grab a text
with open(file_in) as f:
text = f.read()
texts = text_splitter.split_text(text)
docs = [Document(page_content=t) for t in texts[:]]
# here comes our prompt
prompt_template = """Extract all key information and write a 800 word concise summary of the following:
{text}
Start by stating the name of the company and the names of the individuals talking.
At the end of the summary mention positive and negative mentions and general topcis that were discussed more intense. List a least 3 but no more then 6 positive and negative mentions.
Make sure to use complete sentences and proper interpunction.
Always make full sentences, using proper interpunction. Do not start any sentence with a leading dot or blank."""
PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"])
chain = load_summarize_chain(
OpenAI(temperature=0),
chain_type="map_reduce",
return_intermediate_steps=False,
map_prompt=PROMPT,
combine_prompt=PROMPT,
)
res = chain({"input_documents": docs}, return_only_outputs=True)
return res["output_text"].replace("\n", "")
def load_split_create_keywords(file_in, chunks=3000):
# use spacy to split into 3000 chunks
text_splitter = SpacyTextSplitter(chunk_size=chunks)
# grab a text
with open(file_in) as f:
text = f.read()
texts = text_splitter.split_text(text)
docs = [Document(page_content=t) for t in texts[:]]
#
# BEGIN
#
prompt_template = """You are a sales manager who has just reviewed the transcript of the following sales call:
{text}
Analyze the transcript and provide the following information:
1. All speaker names, their positions and talking time percentages for each speaker.
2. Top 6 sentiments (3 positive, 3 negative) with context.
3. Top 10 keywords.
4. List all entities like dates, monetary amounts.
Format the output using complete sentences, proper punctuation, and Markdown language."""
#
# END
#
PROMPT = PromptTemplate(template=prompt_template, input_variables=["text"])
chain = load_summarize_chain(
OpenAI(temperature=0),
chain_type="map_reduce", ## should try refine as well
return_intermediate_steps=False,
map_prompt=PROMPT,
combine_prompt=PROMPT,
)
res = chain({"input_documents": docs}, return_only_outputs=True)
return res["output_text"]
#
# use process_text_files("../trans", "../summary")
# output is in ../summary
#
def dir_loader(path, chunk_size=1000):
loader = DirectoryLoader(path, glob="**/*.txt")
documents = loader.load()
text_splitter = RecursiveCharacterTextSplitter(
chunk_size=chunk_size,
chunk_overlap=0,
length_function=len,
)
return text_splitter.split_documents(documents)
def prepare_prompt():
prompt_template = """Use the following pieces of context to answer the question at the end. If you don't know the answer, just say that you don't know, don't try to make up an answer.
{context}
Question: {question}
Answer:"""
PROMPT = PromptTemplate(
template=prompt_template, input_variables=["context", "question"]
)
return PROMPT
def qa_doc(question, path="../transcript"):
PROMPT = prepare_prompt()
# for now only OpenAI
embeddings = OpenAIEmbeddings()
# check if we have a vectorstore
persist_directory="db"
if not os.path.exists(persist_directory):
print("Creating vectorstore...")
# read all content
text_chunk_list = dir_loader(path)
# create embeddings
docsearch = Chroma.from_documents(text_chunk_list, embeddings, persist_directory=persist_directory)
docsearch.persist()
docsearch._client_settings.anonymized_telemetry = False
else:
print("Loading vectorstore...")
docsearch = Chroma(persist_directory=persist_directory, embedding_function=embeddings)
# prepare retrieval --search_kwargs={"top_k": 6}
# https://towardsdatascience.com/4-ways-of-question-answering-in-langchain-188c6707cc5a
chain_type_kwargs = {"prompt": PROMPT}
qa = RetrievalQA.from_chain_type(
llm=OpenAI(),
chain_type="stuff",
retriever=docsearch.as_retriever(search_type="similarity", search_kwargs={"k":16}),
return_source_documents=True,
chain_type_kwargs=chain_type_kwargs,
)
# get answer
result = qa({"query": question})
# print(result["result"])
# # print sources
# for res in result["source_documents"]:
# print(f" Answers extracted from {res.metadata}")
return result["result"], result["source_documents"]
# a, s = qa_doc("List top 10 talking points.", "../transcript")
# print("")
# print(a)
# print("")
# for res in s:
# print(f" Answers extracted from {res.metadata}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment