Skip to content

Instantly share code, notes, and snippets.

@westonpace
Last active April 17, 2024 21:28
Show Gist options
  • Save westonpace/46bb3ad9d077f06ae920e490cb526a9f to your computer and use it in GitHub Desktop.
Save westonpace/46bb3ad9d077f06ae920e490cb526a9f to your computer and use it in GitHub Desktop.
bulk_upsert_ingest_lancedb
from datetime import datetime, timedelta
import numpy as np
import pyarrow as pa
import lancedb
IDS = np.arange(13 * 1024 * 1024)
def make_initial_table(offset):
print(f"Making initial table with offset {offset}")
return pa.table({
"id": np.arange(offset, offset + (1024*1024), dtype=np.uint64),
"payload": np.arange(offset, offset + (1024*1024), dtype=np.double)
})
def make_update_table():
ids = np.random.choice(IDS, size=1024, replace=False)
payloads = np.random.rand(1024)
return pa.table({
"id": ids,
"payload": payloads
})
conn = lancedb.connect("/tmp/my_big_data")
offset = 0
tbl = conn.create_table("my_table", mode="overwrite", data=make_initial_table(offset))
for _ in range(12):
offset += 1024*1024
tbl.add(make_initial_table(offset))
tbl.create_scalar_index("id")
start = datetime.now()
intermediate = start
for batch_num in range(1024*1024):
if (batch_num + 1) % 10 == 0:
end = datetime.now()
duration = end - intermediate
intermediate = end
row_count = tbl.count_rows()
print(f"Compacting after inserting {batch_num} batches ({duration} seconds) [{row_count} rows]")
tbl.compact_files()
tbl.to_lance().optimize.optimize_indices()
tbl.cleanup_old_versions(timedelta(seconds=0))
data = make_update_table()
tbl.merge_insert("id").when_matched_update_all().when_not_matched_insert_all().execute(data)
end = datetime.now()
duration = end - start
print(f"Total duration: {duration} seconds")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment