Skip to content

Instantly share code, notes, and snippets.

@johnkerl
Created May 15, 2024 18:45
Show Gist options
  • Save johnkerl/d5002b994caccdfb059efba081f28bb5 to your computer and use it in GitHub Desktop.
Save johnkerl/d5002b994caccdfb059efba081f28bb5 to your computer and use it in GitHub Desktop.
#!/usr/bin/env python
import sys
import os
import shutil
import time
import pyarrow as pa
import tiledb
import tiledbsoma
# ----------------------------------------------------------------
input_path = 'norm4'
output_path = 'bpout'
compression_level = 3
batch_stop = 1
cpu_count = 16
soma_init_buffer_bytes = 1024**3
do_stats = False
if len(sys.argv) >= 2:
batch_stop = int(sys.argv[1])
if len(sys.argv) >= 3:
soma_init_buffer_bytes = int(sys.argv[2])
if len(sys.argv) >= 4:
output_path = sys.argv[3]
print(f"INPUT_PATH {input_path}")
print(f"OUTPUT_PATH {output_path}")
print(f"COMPRESSION_LEVEL {compression_level}")
print(f"BUFFER_SIZE {soma_init_buffer_bytes}")
print(f"BATCH_OUT {batch_stop}")
if os.path.exists(output_path):
print()
print(f"REMOVING {output_path}")
shutil.rmtree(output_path)
print(f"REMOVED {output_path}")
print()
# ----------------------------------------------------------------
context = tiledbsoma.SOMATileDBContext(tiledb_config = {
"py.init_buffer_bytes": 4 * 1024**3,
"py.deduplicate": "true",
"soma.init_buffer_bytes": soma_init_buffer_bytes,
"sm.mem.reader.sparse_global_order.ratio_array_data": 0.3,
"sm.consolidation.total_buffer_size": 4 * 1024**3,
"sm.compute_concurrency_level": cpu_count,
"sm.io_concurrency_level": cpu_count,
})
platform_config = {
"tiledb": {
"create": {
"capacity": 2**16,
"dims": {
"soma_dim_0": {"tile": 2048, "filters": [{"_type": "ZstdFilter", "level": compression_level}]},
"soma_dim_1": {"tile": 2048, "filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": compression_level}]},
},
"attrs": {"soma_data": {"filters": ["ByteShuffleFilter", {"_type": "ZstdFilter", "level": compression_level}]}},
"cell_order": "row-major",
"tile_order": "row-major",
"allows_duplicates": True,
"sort_coords": True,
#"sort_coords": False,
},
}
}
print()
print(f"OPENING {input_path}")
t1 = time.time()
input = tiledbsoma.SparseNDArray.open(input_path, context=context, platform_config=platform_config)
t2 = time.time()
print(f"OPENED {input_path} SECONDS = %.3f" % (t2-t1))
print()
print(f"CREATING {output_path}")
t1 = time.time()
output = tiledbsoma.SparseNDArray.create(
output_path,
type = pa.float32(),
shape = input.shape,
platform_config=platform_config,
context=context,
)
t2 = time.time()
print(f"CREATED {output_path} SECONDS = %.3f" % (t2-t1))
# With the memory buffer sizes I've set above,
# I observe ~300 batches of ~500M entries each
i = 0
t1 = time.time()
print()
print(f"READING {input_path} batch {i}")
for batch in input.read().tables():
t2 = time.time()
print(f"READ {input_path} batch {i} SECONDS = %.3f" % (t2-t1))
if do_stats:
tiledbsoma.pytiledbsoma.tiledbsoma_stats_enable()
tiledbsoma.pytiledbsoma.tiledbsoma_stats_reset()
tiledb.stats_enable()
tiledb.stats_reset()
print()
print(f"WRITING {output_path} batch {i}")
t1 = time.time()
_ = output.write(batch, platform_config=platform_config)
t2 = time.time()
print(f"WROTE {output_path} batch {i} SECONDS = %.3f" % (t2-t1))
print()
print("----------------------------------------------------------------")
print("STATS FROM WRITE ONLY")
if do_stats:
tiledbsoma.pytiledbsoma.tiledbsoma_stats_dump()
tiledb.stats_dump()
i += 1
if batch_stop is not None and i >= batch_stop:
print(f"BREAKING BATCH")
break
print()
print(f"READING {input_path} batch {i}")
t1 = time.time()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment