|
# Langchain for Dev |
|
import os |
|
|
|
## Data |
|
from langchain_community.document_loaders import TextLoader,PyPDFLoader, Docx2txtLoader |
|
from langchain_text_splitters import CharacterTextSplitter, RecursiveCharacterTextSplitter |
|
from langchain_core.documents.base import Document |
|
|
|
|
|
## DB |
|
from langchain_community.vectorstores import Milvus |
|
|
|
## LLM |
|
from langchain_openai import OpenAIEmbeddings |
|
from langchain.prompts import ChatPromptTemplate |
|
from langchain_openai import ChatOpenAI |
|
|
|
## Memory |
|
from langchain.memory import ConversationBufferMemory, ConversationBufferWindowMemory, VectorStoreRetrieverMemory |
|
|
|
## Chain |
|
from langchain.schema.runnable import RunnablePassthrough, RunnableParallel, RunnableLambda |
|
from langchain.chains import ConversationChain |
|
|
|
|
|
|
|
# Langchain for Ops |
|
from langchain import hub # Prompt managing from the langchainhub site |
|
from dotenv import load_dotenv # Lading environment variables from a file |
|
from pathlib import Path # Intuitive path management regardless of OS |
|
|
|
# Milvus |
|
from pymilvus import Collection,connections,MilvusClient |
|
import uuid |
|
|
|
|
|
# Variables |
|
load_dotenv() |
|
|
|
|
|
CHUNK_SIZE = int(os.getenv("CHUNK_SIZE")) |
|
MODEL_NAME = os.getenv("MODEL_NAME") |
|
EMBEDDING_MODEL_NAME = os.getenv("EMBEDDING_MODEL_NAME") |
|
BASE_FILE_PATH= os.getenv("BASE_FILE_PATH") |
|
|
|
MILVUS_TOKEN = os.getenv("MILVUS_TOKEN") |
|
MILVUS_URI = os.getenv("MILVUS_URI") |
|
CONNECTION_ARGS = {'uri': MILVUS_URI, 'token': MILVUS_TOKEN} |
|
COLLECTION_NAME = os.getenv("COLLECTION_NAME") |
|
|
|
|
|
# Class Milvus |
|
class MilvusMemory: |
|
def __init__(self, embeddings,uri, token, collection_name,connection_args=CONNECTION_ARGS): |
|
#connections.connect("default", uri=uri, token=token) |
|
self.collection = MilvusClient(uri = uri, token= token) |
|
self.vectorstore = Milvus( |
|
embedding_function=embeddings, |
|
connection_args=connection_args, |
|
collection_name=collection_name, |
|
drop_old=False, |
|
auto_id=True |
|
) |
|
self.embeddings = embeddings |
|
|
|
def memory_insert(self, query, session=""): |
|
|
|
if isinstance(query, str): |
|
text_to_embed = query |
|
else: |
|
text_to_embed = query.page_content |
|
|
|
if not session.strip() or session == '.' or not Path(session).is_file(): |
|
session = str(uuid.uuid1()) |
|
else: |
|
session = Path(session).as_posix() |
|
expr = f"source == '{session}'" |
|
expr = expr.encode('utf-8', 'ignore').decode('utf-8') |
|
pks = self.vectorstore.get_pks(expr) |
|
if pks: |
|
self.collection.delete(collection_name=COLLECTION_NAME, ids=pks) |
|
|
|
|
|
vector = self.embeddings.embed_query(text_to_embed) |
|
data = {"source": session, "text": text_to_embed ,"vector": vector} |
|
|
|
self.collection.insert(collection_name= COLLECTION_NAME, data=data) |
|
return session |
|
|
|
def update_entity(self, file_path, vectorstore): |
|
print("-----------upsert start-----------") |
|
expr = f"source == '{file_path}'" |
|
pks = vectorstore.get_pks(expr) |
|
|
|
# Load new documents to be inserted |
|
loader = TextLoader(file_path, encoding='utf-8') |
|
documents = loader.load() |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=CHUNK_SIZE, chunk_overlap=0) |
|
docs = text_splitter.split_documents(documents) |
|
|
|
if pks: |
|
# Prepare retrieval options |
|
retrieverOptions = {"expr": f"pk == {pks[0]}"} |
|
# Retrieve existing documents |
|
retriever = vectorstore.as_retriever(search_kwargs=retrieverOptions) |
|
existing_docs = retriever.get_relevant_documents(expr) |
|
print(existing_docs) |
|
|
|
# Check if documents exist and print information |
|
if existing_docs: |
|
print(f'existing_docs : {existing_docs}') |
|
existing_doc = existing_docs[0] |
|
print(f"upsert before: {existing_doc.page_content}") |
|
else: |
|
print("No existing text content found.") |
|
|
|
# Delete the outdated entity |
|
vectorstore.delete(pks) |
|
|
|
print(f'docs : {docs}, docs_type: {type(docs)}') |
|
# Add the new documents to the vector store after deletion |
|
vectorstore.add_documents(docs) |
|
|
|
# Fetch the primary keys for new documents based on the same expression |
|
new_pks = vectorstore.get_pks(expr) |
|
|
|
# Print the information about deletion and creation |
|
print(f"Entity with pk={pks} deleted and new entity created with pk={new_pks}.") |
|
else: |
|
print(f"No entity found for {file_path}. Creating entity...") |
|
vectorstore.add_documents(docs) |
|
print("New entity created.") |
|
|
|
print("-----------Upsert finished-----------") |
|
return None |
|
|
|
def create_or_update_collection(self, splits_path='./', chunk_size=CHUNK_SIZE): |
|
# Transform splits_path to Path object |
|
splits_path = Path(splits_path) |
|
|
|
# Use Path objects to navigate directories and files |
|
for file_path in splits_path.rglob('*.md'): |
|
# Transform POSIX style string to file path |
|
session = file_path.as_posix() |
|
|
|
print(f'after as_posix session: {session}') |
|
# Using TextLoader and RecursiveCharacterTextSplitter to Process Files |
|
loader = TextLoader(session) |
|
documents = loader.load() |
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size) |
|
splits = text_splitter.split_documents(documents) |
|
|
|
# Update the collection for each split |
|
for split in splits: |
|
self.memory_insert(query=split, session=session) |
|
|
|
def Milvus_chain(self, query, llm, prompt_template, session='',file_path_session=''): |
|
|
|
print(f"befre invoke from milvus_chain session : {session} ") |
|
print(f"befre invoke from milvus_chain file_path_session : {file_path_session} ") |
|
if len(file_path_session) > 2: |
|
file_history, file_question, file_answer = invoke_from_retriever(query, llm, prompt_template, self.vectorstore, file_path_session) |
|
file_info = f"\nInformation: {file_path_session}\n{file_history}" |
|
|
|
print(f'file_history : {file_history}') |
|
print(f'file_question : {file_question}') |
|
print(f'file_answer : {file_answer}') |
|
|
|
|
|
print(f"before file_path query : {query}") |
|
query = query + file_info |
|
print(f"after file_path query : {query}") |
|
|
|
history, question, answer = invoke_from_retriever(query, llm, prompt_template, self.vectorstore, session) |
|
print(f"after invoke milvus_chain question : {question} ") |
|
print(f"after invoke milvus_chain answer : {answer} ") |
|
session = self.memory_insert(history + "\nHUMAN:" + question + "\nAI:" + answer, session=session) |
|
return history, question, answer, session |
|
|
|
|
|
def invoke_from_retriever(query, llm, prompt_template, vectorstore , uuid=''): |
|
expr = f"source == '{uuid}'" |
|
retrieverOptions = {"expr": expr , 'k' : 1} |
|
pks = vectorstore.get_pks(expr) |
|
retriever = vectorstore.as_retriever(search_kwargs=retrieverOptions) |
|
|
|
print(f'pks: {pks}') |
|
print(f'retriever : {retriever}') |
|
if pks: |
|
history = retriever.get_relevant_documents(query)[0].page_content + "\n" |
|
else: |
|
history = "" |
|
|
|
# Set up the components of the chain. |
|
setup_and_retrieval = RunnableParallel( |
|
Library_base_knowledge = RunnableLambda(lambda _: load_base_template(BASE_FILE_PATH)), |
|
history_conversation=RunnableLambda(lambda _: history), # Use RunnableLambda for static content |
|
input=RunnablePassthrough() # This can just pass the question as is |
|
) |
|
|
|
# Construct and invoke the chain |
|
rag_chain = setup_and_retrieval | prompt_template | llm |
|
answer = rag_chain.invoke(query).content.rstrip("\nNone") |
|
|
|
return history, query, answer |
|
|
|
|
|
def load_base_template(file_path): |
|
try: |
|
return Path(file_path).read_text(encoding='utf-8') |
|
except FileNotFoundError: |
|
return "" |
|
|
|
|
|
def split_multiple_documents(current_path, chunk_size: int): |
|
documents = [] |
|
|
|
# Walk through all directories and files starting at current_path |
|
for root, dirs, files in os.walk(current_path): |
|
for file in files: |
|
if file.endswith('.txt') or file.endswith('.md'): |
|
text_path = os.path.join(root, file) # Correctly join the path to the file |
|
loader = TextLoader(text_path) |
|
documents.extend(loader.load()) |
|
|
|
text_splitter = RecursiveCharacterTextSplitter(chunk_size=chunk_size) |
|
all_splits = text_splitter.split_documents(documents) |
|
return all_splits |
|
|
|
|
|
def extract_path(query, keyword='file_path: '): |
|
# 'file_path: ' Find path by ketword |
|
if keyword not in query: |
|
return '' |
|
start_index = query.find(keyword) |
|
prefix = '../../' |
|
|
|
|
|
if start_index != -1: |
|
# 'file_path: ' |
|
start_index += len(keyword) |
|
temp_extract = query[start_index:] |
|
|
|
# Find postion '.md' from temp_extract string |
|
end_index = temp_extract.find('.md') |
|
|
|
if end_index != -1: |
|
# Extract final path that include '.md' |
|
extracted_path = temp_extract[:end_index + len('.md')] |
|
return prefix + extracted_path |
|
else: |
|
print("'.md' not found.") |
|
|
|
|
|
def create_collection(collection_name=COLLECTION_NAME, connection_args= CONNECTION_ARGS, embeddings= '',splits_path ='./'): |
|
|
|
splits = split_multiple_documents(splits_path, CHUNK_SIZE) if splits_path else './' |
|
|
|
if embeddings == '': |
|
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL_NAME) |
|
|
|
print(f"----------Embedding Starting from {splits_path} to Milvus----------") |
|
vector_store = Milvus( |
|
embedding_function=embeddings, |
|
connection_args=connection_args, |
|
collection_name=collection_name, |
|
drop_old=True, |
|
auto_id=True, |
|
|
|
).from_documents( |
|
splits, |
|
embedding=embeddings, |
|
collection_name=collection_name, |
|
connection_args=connection_args, |
|
) |
|
print(f"----------Embedding has Finished {splits_path} files into Milvus----------") |
|
return None |
|
|
|
|
|
def vectorstore_milvus(embeddings , connection_args=CONNECTION_ARGS, collection_name=COLLECTION_NAME ): |
|
|
|
vectorstore = Milvus( |
|
embedding_function=embeddings, |
|
connection_args=connection_args, |
|
collection_name=collection_name, |
|
drop_old=False, |
|
auto_id=True |
|
) |
|
|
|
return vectorstore |
|
|
|
|
|
docs_splits = split_multiple_documents('./', CHUNK_SIZE) |
|
prompt_template = hub.pull("murphy/librarian_guide") |
|
embeddings = OpenAIEmbeddings(model=EMBEDDING_MODEL_NAME) |
|
llm = ChatOpenAI(model_name=MODEL_NAME, temperature=0) |
|
vectorstore = vectorstore_milvus(embeddings) |
|
|
|
# history , query , answer = invoke_from_retriever("HUMAN:Hello we talk about gitlab AI:Based on the information available in Murphy's library, here is a relevant file: - **File Path**: 200/210/210.20/210.20 a.md - **Description**: The solution about Gitlab. GitLab is one devsecops solution a" +"What is about detailed?", llm, prompt_template, vectorstore , uuid='../../200/210/210.20/210.20 a.md') |
|
# print(f'history: {history}') |
|
# print(f'query : {query}') |
|
# print(f'answer :{answer}') |
|
|
|
|
|
# def extract_pattern(string): |
|
# prefix = "../../" |
|
# if string.startswith(prefix): |
|
# return string[len(prefix):] |
|
# else: |
|
# return string |
|
|
|
|
|
# create_collection(splits_path='../../200') |
|
|
|
|