Skip to content

Instantly share code, notes, and snippets.

@enjalot
Created June 5, 2024 22:00
Show Gist options
  • Save enjalot/5a8746e77f022d09e5b31b78277438a4 to your computer and use it in GitHub Desktop.
Save enjalot/5a8746e77f022d09e5b31b78277438a4 to your computer and use it in GitHub Desktop.
trying to parallelize embedding on modal
import os
import json
import time
import asyncio
import subprocess
from modal import App, Image, Secret, Volume, build, enter, exit, gpu, method
# We first set out configuration variables for our script.
## Embedding Containers Configuration
# GPU_CONCURRENCY = 100
MODEL_ID = "nomic-ai/nomic-embed-text-v1.5"
MODEL_SLUG = MODEL_ID.split("/")[-1]
MODEL_DIR = "/model"
MODEL_REVISION="main"
GPU_CONCURRENCY = 100
# GPU_CONFIG = gpu.A100(size="80GB")
GPU_CONFIG = gpu.A100(size="40GB")
# GPU_CONFIG = gpu.H100()
# BATCH_SIZE = 512
BATCH_SIZE = 64
# BATCH_SIZE = 128
# MAX_TOKENS = 8192
MAX_TOKENS = 2048
## Dataset-Specific Configuration
DATASET_READ_VOLUME = Volume.from_name(
"embedding-fineweb-edu", create_if_missing=True
)
EMBEDDING_CHECKPOINT_VOLUME = Volume.from_name(
"checkpoint", create_if_missing=True
)
DATASET_DIR = "/data"
# DATASET_SAVE ="fineweb-edu-sample-10BT"
DATASET_SAVE ="fineweb-edu-sample-10BT-100k"
CHECKPOINT_DIR = "/checkpoint"
SAVE_TO_DISK = True
## Upload-Specific Configuration
# DATASET_HF_UPLOAD_REPO_NAME = "enjalot/fineweb-edu-sample-10BT"
DATASET_HF_UPLOAD_REPO_NAME = f"enjalot/{DATASET_SAVE}"
UPLOAD_TO_HF = True
def download_model_to_image(model_dir, model_name, model_revision):
from huggingface_hub import snapshot_download
from transformers.utils import move_cache
os.makedirs(model_dir, exist_ok=True)
snapshot_download(
repo_id=model_name,
revision=model_revision,
local_dir=model_dir,
ignore_patterns=["*.pt", "*.bin"], # Using safetensors
)
move_cache()
st_image = (
Image.debian_slim(python_version="3.10")
.pip_install(
"torch==2.1.2",
"numpy==1.26.3",
"transformers==4.39.3",
"hf-transfer==0.1.6",
"huggingface_hub==0.22.2",
"einops==0.7.0"
)
.env({"HF_HUB_ENABLE_HF_TRANSFER": "1"})
.run_function(
download_model_to_image,
timeout=60 * 20,
kwargs={
"model_dir": MODEL_DIR,
"model_name": MODEL_ID,
"model_revision": MODEL_REVISION,
},
secrets=[Secret.from_name("huggingface-secret")],
)
)
with st_image.imports():
import numpy as np
import torch
from torch.cuda.amp import autocast
from transformers import AutoTokenizer, AutoModel
app = App(
"fineweb-embeddings-st"
)
@app.cls(
gpu=GPU_CONFIG,
# cpu=16,
concurrency_limit=GPU_CONCURRENCY,
timeout=60 * 10,
container_idle_timeout=60 * 10,
allow_concurrent_inputs=1,
image=st_image,
)
class TransformerModel:
@enter()
def start_engine(self):
# import torch
# from transformers import AutoTokenizer, AutoModel
self.device = torch.device("cuda")
print("🥶 cold starting inference")
start = time.monotonic_ns()
self.model = AutoModel.from_pretrained(MODEL_ID, trust_remote_code=True, safe_serialization=True)#, rotary_scaling_factor=2 )
self.tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased", model_max_length=MAX_TOKENS)
self.model.to(self.device)
self.model.eval()
print(f"CUDA memory allocated: {torch.cuda.memory_allocated() / 1e6} MB")
duration_s = (time.monotonic_ns() - start) / 1e9
print(f"🏎️ engine started in {duration_s:.0f}s")
@method()
def embed(self, inputs):
# import numpy as np
# import torch
tok = self.tokenizer
# TODO: better understanding of how this gets called
print("inputs", len(inputs))
start = time.monotonic_ns()
texts = [x[1] for x in inputs]
texts = [t if len(t) <= 8000 else tok.decode(tok.encode(t)[:MAX_TOKENS]) for t in texts]
print("truncated in", (time.monotonic_ns() - start) / 1e9)
print("texts", len(texts))
# print(f"CUDA memory allocated before encoding: {torch.cuda.memory_allocated() / 1e6} MB")
start = time.monotonic_ns()
encoded_input = tok(texts, padding=True, truncation=True, return_tensors='pt')
print("encoded in", (time.monotonic_ns() - start) / 1e9)
start = time.monotonic_ns()
# print("moving to device")
encoded_input = {key: value.to(self.device) for key, value in encoded_input.items()}
# print("moved to device", (time.monotonic_ns() - start) / 1e9)
# print("encoded input size", encoded_input['input_ids'].nelement() * encoded_input['input_ids'].element_size() / 1e6, "MB")
# print(f"CUDA memory allocated after encoding: {torch.cuda.memory_allocated() / 1e6} MB")
start = time.monotonic_ns()
# print(torch.cuda.memory_summary(device=None, abbreviated=False))
with torch.no_grad(), autocast():
print(f"CUDA memory allocated before embedding: {torch.cuda.memory_allocated() / 1e6} MB")
model_output = self.model(**encoded_input)
print(f"CUDA memory allocated after model output: {torch.cuda.memory_allocated() / 1e6} MB")
# print(f"model output size: {model_output.nelement() * model_output.element_size() / 1e6} MB")
embeddings = model_output[0][:, 0]
# print(f"Embedding size: {embeddings.nelement() * embeddings.element_size() / 1e6} MB")
# print(f"CUDA memory allocated after embedding: {torch.cuda.memory_allocated() / 1e6} MB")
normalized_embeddings = torch.nn.functional.normalize(embeddings, p=2, dim=1)
normalized_embeddings_cpu = normalized_embeddings.cpu().numpy()
# Clean up torch memory
del encoded_input
del model_output
del embeddings
del normalized_embeddings
torch.cuda.empty_cache()
duration_s = (time.monotonic_ns() - start) / 1e9
print(f"embedding took {duration_s:.0f}s")
return inputs, normalized_embeddings_cpu
def generate_chunks_from_dataset(xs, max_tokens: int):
"""
Generate chunks from a dataset.
Args:
xs (list): The dataset containing dictionaries with "id", "text" keys.
chunk_size (int): The size of each chunk.
Yields:
tuple: A tuple containing the id and a chunk of text.
"""
for data in xs:
yield (data["id"], "clustering: " + data["text"])
def generate_batches(xs, batch_size):
batch = []
for x in xs:
batch.append(x)
if len(batch) == batch_size:
yield batch
batch = []
if batch:
yield batch
def load_dataset_from_disk():
"""
Load a dataset from disk and return a subset of the training data.
Returns:
Dataset: A subset of the training data.
"""
import time
from datasets import load_from_disk
start = time.perf_counter()
# Load the dataset as a Hugging Face dataset
print(f"Loading dataset from {DATASET_DIR}/{DATASET_SAVE}")
dataset = load_from_disk(f"{DATASET_DIR}/{DATASET_SAVE}")
print(f"Dataset loaded in {time.perf_counter()-start:.2f} seconds")
# return dataset["train"]
# TODO: have the 100k subset be proper subset
return dataset#["train"]
def save_dataset_to_intermediate_checkpoint(acc_chunks, embeddings, batch_size):
"""Saves the dataset to an intermediate checkpoint.
Args:
acc_chunks (list): Accumulated chunks
embeddings (list): Accumulated embeddings
batch_size (int): Batch size
"""
import pyarrow as pa
from datasets import Dataset
table = pa.Table.from_arrays(
[
pa.array([chunk[0] for chunk in acc_chunks]), # id
pa.array([chunk[1] for chunk in acc_chunks]), # text
pa.array(embeddings),
],
names=["id", "text", "embedding"],
)
path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}"
dataset = Dataset(table)
dataset.save_to_disk(path_parent_folder)
EMBEDDING_CHECKPOINT_VOLUME.commit()
print(f"Saved checkpoint at {path_parent_folder}")
def upload_result_to_hf(batch_size: int) -> None:
"""
Uploads the result to the Hugging Face Hub.
Args:
batch_size (int): The batch size for the model.
Returns:
None
"""
import os
import time
from huggingface_hub import HfApi
path_parent_folder = f"{CHECKPOINT_DIR}/{DATASET_SAVE}/{MODEL_SLUG}-{batch_size}"
api = HfApi(token=os.environ["HUGGINGFACE_TOKEN"])
api.create_repo(
repo_id=DATASET_HF_UPLOAD_REPO_NAME,
private=False,
repo_type="dataset",
exist_ok=True,
)
print(f"Pushing to hub {DATASET_HF_UPLOAD_REPO_NAME}")
start = time.perf_counter()
api.upload_folder(
folder_path=path_parent_folder,
repo_id=DATASET_HF_UPLOAD_REPO_NAME,
repo_type="dataset",
multi_commits=True,
multi_commits_verbose=True,
)
end = time.perf_counter()
print(f"Uploaded in {end-start}s")
@app.function(
# cpu=1
image=Image.debian_slim().pip_install(
"datasets", "pyarrow", "hf_transfer", "huggingface_hub", "transformers"
),
volumes={
DATASET_DIR: DATASET_READ_VOLUME,
CHECKPOINT_DIR: EMBEDDING_CHECKPOINT_VOLUME,
},
timeout=86400,
secrets=[Secret.from_name("huggingface-secret")],
)
def embed_dataset(batch_size: int = 512 * 50):
"""
Embeds a dataset with the Text Embeddings Inference container.
Args:
batch_size (int): The batch size to use. Defaults to 512 * 50.
Returns:
dict: A dictionary containing the benchmark results.
"""
import datetime
import time
if UPLOAD_TO_HF and not SAVE_TO_DISK:
raise ValueError(
"Uploading to HF requires SAVE_TO_DISK to be set to true in case of intermediate failure."
)
data = load_dataset_from_disk()
model = TransformerModel()
start = time.perf_counter()
print("generating chunks")
text_chunks = generate_chunks_from_dataset(data, max_tokens=MAX_TOKENS)
print("generated chunks", time.perf_counter() - start)
start = time.perf_counter()
print("generating batches")
batches = generate_batches(text_chunks, batch_size=batch_size)
print("generated batches", time.perf_counter() - start)
start = time.perf_counter()
acc_chunks = []
embeddings = []
print("BATCHES", len(data) / batch_size)
i = 0
for resp in model.embed.map(
batches, order_outputs=False, return_exceptions=True
):
if isinstance(resp, Exception):
print(f"Exception: {resp}")
# continue
return
batch_chunks, batch_embeddings = resp
acc_chunks.extend(batch_chunks)
embeddings.extend(batch_embeddings)
print("done with batch", i)
i+=1
end = time.perf_counter()
duration = end - start
resp = {
"batch_size": batch_size,
"n_gpu": GPU_CONCURRENCY,
"duration_mins": duration / 60,
}
if SAVE_TO_DISK:
save_dataset_to_intermediate_checkpoint(
acc_chunks, embeddings, batch_size
)
if UPLOAD_TO_HF:
upload_result_to_hf(batch_size)
return resp
@app.local_entrypoint()
def full_job():
batch_size = BATCH_SIZE
with open("benchmarks.json", "a") as f:
benchmark = embed_dataset.remote(batch_size=batch_size)
f.write(json.dumps(benchmark, indent=2) + "\n")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment