Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 7 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save JoshuaPurtell/c1182551fa609736d47df4af82f7c5ab to your computer and use it in GitHub Desktop.
Save JoshuaPurtell/c1182551fa609736d47df4af82f7c5ab to your computer and use it in GitHub Desktop.
import asyncio
import copy
import hashlib
import json
import os
import random
from dataclasses import dataclass
from typing import Any, Dict, List, Optional
import numpy as np
import requests
import torch
from neural_cherche import models, retrieve, train, utils
from openai import AsyncOpenAI
import wandb
TRAINING_BATCH_SIZE=8
LR=10e-5
N_EPOCHS=20
INFERENCE_BATCH_SIZE=32
DATA_SAVE_PATH = "colbert-finetuning/data"
MODEL_SAVE_PATH = "colbert-finetuning/models"
CHECKPOINT_DIR = "colbert-finetuning/checkpoints"
LLM_MODEL = "gpt-3.5-turbo-0125"
EMBEDDING_MODEL = "text-embedding-3-large"
N2P_RATIO = 10
client = AsyncOpenAI(
api_key=os.environ.get("OPENAI_API_KEY"),
)
# Random Boilerplate - replace in your own code
import os
import json
class LLM:
model_name: str
embeddings_cache_path: str = "embeddings_cache.json"
generations_cache_path: str = "generations_cache.json"
embeddings_cache: dict
generations_cache: dict
def __init__(self, model_name: str, embeddings_mode: str, temperature: Optional[float] = 0, max_tokens: Optional[int] = None, top_p: Optional[float] = None, frequency_penalty: Optional[float] = None, presence_penalty: Optional[float] = None, stop: Optional[str] = None):
self.model_name = model_name
self.embeddings_model = embeddings_mode
self.temperature = temperature
# Load caches
self.embeddings_cache = self.load_cache(self.embeddings_cache_path)
self.generations_cache = self.load_cache(self.generations_cache_path)
def load_cache(self, path):
if os.path.exists(path):
with open(path, 'r') as file:
return json.load(file)
return {}
def save_cache(self, path, cache):
with open(path, 'w') as file:
json.dump(cache, file)
async def generate(self, system_prompt, user_prompt):
cache_key = json.dumps({"system_prompt": system_prompt, "user_prompt": user_prompt})
if cache_key in self.generations_cache:
return self.generations_cache[cache_key]
messages = [
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt},
]
response = await client.chat.completions.create(
messages=messages,
model=self.model_name,
)
generated_content = response.choices[0].message.content
self.generations_cache[cache_key] = generated_content
self.save_cache(self.generations_cache_path, self.generations_cache)
return generated_content
async def embed(self, text):
if text in self.embeddings_cache:
return self.embeddings_cache[text]
response = await client.embeddings.create(
model=self.embeddings_model,
input=text,
)
embedding = response.data[0].embedding
self.embeddings_cache[text] = embedding
self.save_cache(self.embeddings_cache_path, self.embeddings_cache)
return embedding
@dataclass
class ExampleDocument:
text: str
chunks: List[str]
class ColberFinetuningDataBuilder:
documents: List[ExampleDocument]
LLM: LLM
data_save_path: str
def __init__(self, documents_as_dictionaries: List[Dict[str, str]],data_save_path=DATA_SAVE_PATH,):
self.documents = [ExampleDocument(**doc) for doc in documents_as_dictionaries]
self.llm = LLM(LLM_MODEL, EMBEDDING_MODEL)
self.train_triplets = []
self.test_triplets = []
self.data_save_path = data_save_path
# doing hard negatives with embeddings improves over random negatives
async def build_synthetic_triplets(self, n_positive: int = 20, ratio_negative_to_positive: int = N2P_RATIO, negative_type: str = "random"):
if os.path.exists(self.data_save_path+"/train_triplets.json"):
with open(self.data_save_path+"/train_triplets.json") as f:
self.train_triplets = json.load(f)
with open(self.data_save_path+"/test_triplets.json") as f:
self.test_triplets = json.load(f)
return
triplets = []
n_positives_remaining = n_positive
while n_positives_remaining > 0:
doc_index = random.randint(0, len(self.documents) - 1)
random_document = self.documents[doc_index]
random_positive_index = random.randint(0, len(random_document.chunks) - 1)
random_positive = random_document.chunks[random_positive_index]
all_negatives = [chunk for document in self.documents for chunk in document.chunks]
query = await self.llm.generate(
system_prompt="Given the following text chunk, formulate a question that is non-trivial to answer and that depends on the information in the text chunk",
user_prompt=f"# Text Chunk\n{random_positive}\nYour question:"
)
if negative_type == "random":
random_negative_indices = random.sample([i for i in range(len(self.documents[doc_index].chunks)) if abs(i-random_positive_index)>5], ratio_negative_to_positive)
negatives = [random_document.chunks[random_negative_index] for random_negative_index in random_negative_indices]
else:
query_embedding = await self.llm.embed(query)
negative_embeddings = [await self.llm.embed(negative) for negative in all_negatives]
similarities = [np.dot(query_embedding, negative_embedding) / (np.linalg.norm(query_embedding) * np.linalg.norm(negative_embedding)) for negative_embedding in negative_embeddings]
negatives = [all_negatives[i] for i in np.argsort(similarities)[:ratio_negative_to_positive]]
for negative in negatives:
triplets.append((query, random_positive, negative))
n_positives_remaining -= 1
self.train_triplets = triplets[: int(len(triplets) * 0.5)]
self.test_triplets = triplets[int(len(triplets) * 0.5) :]
with open(self.data_save_path+"/train_triplets.json", "w") as file:
json.dump(self.train_triplets, file)
with open(self.data_save_path+"/test_triplets.json", "w") as file:
json.dump(self.test_triplets, file)
class ColbertRetriever:
def __init__(self, document: ExampleDocument, path_to_model: str = "raphaelsty/neural-cherche-colbert"):
self.document = document
self.colbert_model = models.ColBERT(
model_name_or_path=path_to_model,
device="cuda" if torch.cuda.is_available() else "mps", # or mps/cpu
)
self.retriever = retrieve.ColBERT(
key="id",
on=["chunk"], # the field to search on, can be a list of fields
model=self.colbert_model,
)
documents_embeddings = self.retriever.encode_documents(
documents=[{'id':i, 'chunk':chunk} for i, chunk in enumerate(document.chunks)],
batch_size=INFERENCE_BATCH_SIZE,
)
self.retriever = self.retriever.add(
documents_embeddings=documents_embeddings,
)
def batch_retrieve(self, queries:List[str], k: int=20) -> List[List[Dict[str,Any]]]:
queries_embeddings = self.retriever.encode_queries(
queries=queries, # list of queries
batch_size=INFERENCE_BATCH_SIZE,
)
chunk_scores_by_query: List[List[Dict[str,Any]]] = self.retriever(
queries_embeddings=queries_embeddings,
batch_size=INFERENCE_BATCH_SIZE,
k=k, # number of documents to retrieve
)
return [self.document.chunks[chunk_score['id']] for query_chunk_scores in chunk_scores_by_query for chunk_score in query_chunk_scores]
class OpenaiVectorRetriever:
def __init__(self, document: ExampleDocument):
self.document = document
self.llm = LLM(LLM_MODEL, EMBEDDING_MODEL)
self.chunk_embeddings = asyncio.run(self.batch_embed(self.document.chunks))
async def batch_embed(self,chunks:List[str]) -> List[str]:
embeddings = []
for chunk in chunks:
embeddings.append(await self.llm.embed(chunk))
return embeddings
def cosine_similarity(self, a, b):
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
async def retrieve(self, query: str, k: int = 20) -> List[str]:
query_embedding = await self.llm.embed(query)
similarities = [self.cosine_similarity(query_embedding, chunk_embedding) for chunk_embedding in self.chunk_embeddings]
retrieved = [document_chunk for document_chunk, similarity in sorted(zip(self.document.chunks, similarities), key=lambda x: x[1], reverse=True)[:k]]
return retrieved
class ColbertFineTuner:
data_builder: ColberFinetuningDataBuilder
base_model: models.ColBERT
optimizer: torch.optim.Adam
def __init__(
self,
documents: List[ExampleDocument],
finetuning_databuilder: ColberFinetuningDataBuilder,
model_save_dir_path,
):
self.data_builder = finetuning_databuilder
self.base_model = models.ColBERT(
model_name_or_path="raphaelsty/neural-cherche-colbert",
device="cuda" if torch.cuda.is_available() else "mps", # or mps
)
self.optimizer = torch.optim.AdamW(self.base_model.parameters(), lr=LR)#,weight_decay=0.01
self.documents = documents
self.model_save_dir_path = model_save_dir_path
def local_finetuning(self):
if os.path.exists(self.model_save_dir_path+"/model.safetensors"):
return
wandb.init(project="colbert_finetuning", name="local_finetuning_run")
model = models.ColBERT(
model_name_or_path="raphaelsty/neural-cherche-colbert",
device="cuda" if torch.cuda.is_available() else "mps", # or mps
)
optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
X = copy.deepcopy(self.data_builder.train_triplets)
model_weight_sums = []
for step, (anchor, positive, negative) in enumerate(
utils.iter(
X,
epochs=N_EPOCHS, # number of epochs
batch_size=TRAINING_BATCH_SIZE, # number of triples per batch
shuffle=True,
)
):
loss = train.train_colbert(
model=model,
optimizer=optimizer,
anchor=anchor,
positive=positive,
negative=negative,
step=step,
gradient_accumulation_steps=50,
)
if step % 10 == 0:
loss_f = loss['loss'].detach().cpu().numpy()
wandb.log({"loss": loss_f, "step": step + 1})
model_weight_sums.append(sum(param.data.cpu().sum() for param in model.parameters()))
if step % 50 == 0:
model.save_pretrained(CHECKPOINT_DIR)
loss_f = loss['loss'].detach().cpu().numpy()
print(f"Final Step {step + 1}: Loss = {loss_f:.4f}")
model.save_pretrained(self.model_save_dir_path)
wandb.finish()
torch.mps.empty_cache()
def benchmarking(self, k:int=10):
baseline_vector_hits = {}
baseline_colbert_hits = {}
finetuned_colbert_hits = {}
base_vector_retriever = OpenaiVectorRetriever(self.data_builder.documents[0])
base_retriever = ColbertRetriever(self.data_builder.documents[0])
fine_tuned_retriever = ColbertRetriever(self.data_builder.documents[0], path_to_model=self.model_save_dir_path)
for triplet in self.data_builder.test_triplets:
query, positive, _ = triplet
pair_hash = hashlib.sha256((query + positive).encode()).hexdigest()
if pair_hash in baseline_vector_hits:
continue
base_vector_retrieved = asyncio.run(base_vector_retriever.retrieve(query, k=k))
base_colbert_retrieved = base_retriever.batch_retrieve([query], k=k)
fine_tuned_retrieved = fine_tuned_retriever.batch_retrieve([query], k=k)
baseline_vector_hits[pair_hash] = positive in base_vector_retrieved[0]
baseline_colbert_hits[pair_hash] = positive in base_colbert_retrieved[0]
finetuned_colbert_hits[pair_hash] = positive in fine_tuned_retrieved[0]
if baseline_vector_hits[pair_hash]!= finetuned_colbert_hits[pair_hash]:
print("Query: ", query)
print("Positive: ", positive)
print("Baseline Vector: ", base_vector_retrieved[0])
print("Baseline Colbert: ", base_colbert_retrieved[0])
print("Fine-tuned Colbert: ", fine_tuned_retrieved[0])
print(baseline_vector_hits[pair_hash])
print(baseline_colbert_hits[pair_hash])
print(f"Mean top-{k} hit-rate for baseline Vector: ", sum(baseline_vector_hits.values())/len(baseline_vector_hits))
print(f"Mean top-{k} hit-rate for baseline Colbert: ", sum(baseline_colbert_hits.values())/len(baseline_colbert_hits))
print(f"Mean top-{k} hit-rate for fine-tuned Colbert: ", sum(finetuned_colbert_hits.values())/len(finetuned_colbert_hits))
if __name__ == "__main__":
os.makedirs("colbert-finetuning", exist_ok=True)
os.makedirs("colbert-finetuning/data/pg", exist_ok=True)
os.makedirs("colbert-finetuning/models/pg", exist_ok=True)
#Ensure you source a .env with your openai api key @ OPENAI_API_KEY
# Ensure you have wandb installed and have run wandb login
# Ensure you have the neural_cherche package installed https://github.com/raphaelsty/neural-cherche
# Do pip install neural-cherche
## REPLACE WITH YOUR OWN DOCUMENT(S) - Admittedly, finetuning on a single document as we do with the PG essay here doesn't have a huge impact
## This is llama-index's text file - please verify yourself :-)
essay_url = "https://raw.githubusercontent.com/run-llama/llama_index/main/docs/examples/data/paul_graham/paul_graham_essay.txt"
response = requests.get(essay_url)
document_text = response.text if response.status_code == 200 else "Failed to load document."
chunks = document_text.split("\n\n")
example_document = {
"text": document_text,
"chunks": [chunk for chunk in chunks if len(chunk.strip()) > 1]
}
# Run the finetuning job
N_POSITIVE_EXAMPLES = 300
data_builder = ColberFinetuningDataBuilder([example_document], data_save_path="colbert-finetuning/data/pg")
asyncio.run(data_builder.build_synthetic_triplets(n_positive=N_POSITIVE_EXAMPLES))
finetuner = ColbertFineTuner([example_document], data_builder, "colbert-finetuning/models/pg")
finetuner.local_finetuning()
# Evaluate the finetuning vs baseline colbert vs baseline vector retriever
TOP_K_CHUNKS_TO_RETRIEVE = 1
finetuner.benchmarking(k=TOP_K_CHUNKS_TO_RETRIEVE)
#Spoiler, you might get something like (when doing 200 examples and 20 epochs @ 3e-6 lr):
#Mean top-1 hit-rate for baseline Vector: 0.9259259259259259
#Mean top-1 hit-rate for baseline Colbert: 0.8271604938271605
#Mean top-1 hit-rate for fine-tuned Colbert: 0.8765432098765432
# After 400 examples and 25 epochs @ 5e-5 lr:
# Mean top-1 hit-rate for baseline Vector: 0.9152542372881356
# Mean top-1 hit-rate for baseline Colbert: 0.7796610169491526
# Mean top-1 hit-rate for fine-tuned Colbert: 0.9152542372881356
# After 400 examples and 40 epochs @ 10e-5 lr:
# Mean top-1 hit-rate for baseline Vector: 0.925531914893617
# Mean top-1 hit-rate for baseline Colbert: 0.8191489361702128
# Mean top-1 hit-rate for fine-tuned Colbert: 0.9361702127659575
#Fine tuning works! We can see that vector embedding is pretty good for this task, though.
#In general, I think you'll find that for harder tasks, the fine-tuned colbert will outperform the vector embedding. And that longer training times and more examples will help.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment