Skip to content

Instantly share code, notes, and snippets.

@Neah-Ko
Last active January 31, 2024 11:16
Show Gist options
  • Save Neah-Ko/2a90cf6da3f810a98e5f3485f0a031d4 to your computer and use it in GitHub Desktop.
Save Neah-Ko/2a90cf6da3f810a98e5f3485f0a031d4 to your computer and use it in GitHub Desktop.
anndata h5py profile
import os
import sys
import psutil
import inspect
import time
from pathlib import Path
import h5py as hp
import anndata as ad
from anndata.experimental import sparse_dataset
def elapsed_since(start):
#return time.strftime("%H:%M:%S", time.gmtime(time.time() - start))
elapsed = time.time() - start
if elapsed < 1:
return str(round(elapsed*1000,2)) + "ms"
if elapsed < 60:
return str(round(elapsed, 2)) + "s"
if elapsed < 3600:
return str(round(elapsed/60, 2)) + "min"
else:
return str(round(elapsed / 3600, 2)) + "hrs"
def get_process_memory():
process = psutil.Process(os.getpid())
mi = process.memory_info()
return mi.rss, mi.vms, mi.shared
def format_bytes(bytes):
if abs(bytes) < 1000:
return str(bytes)+"B"
elif abs(bytes) < 1e6:
return str(round(bytes/1e3,2)) + "kB"
elif abs(bytes) < 1e9:
return str(round(bytes / 1e6, 2)) + "MB"
else:
return str(round(bytes / 1e9, 2)) + "GB"
def profile(func, *args, **kwargs):
def wrapper(*args, **kwargs):
rss_before, vms_before, shared_before = get_process_memory()
start = time.time()
result = func(*args, **kwargs)
elapsed_time = elapsed_since(start)
rss_after, vms_after, shared_after = get_process_memory()
print("Profiling: {:>20} RSS: {:>8} | VMS: {:>8} | SHR {"
":>8} | time: {:>8} | result {:>8}"
.format("<" + func.__name__ + ">",
format_bytes(rss_after - rss_before),
format_bytes(vms_after - vms_before),
format_bytes(shared_after - shared_before),
elapsed_time,
format_bytes(result)))
return result
if inspect.isfunction(func):
return wrapper
elif inspect.ismethod(func):
return wrapper(*args,**kwargs)
def cs_to_bytes(X) -> int:
return int(X.data.nbytes + X.indptr.nbytes + X.indices.nbytes)
def MB(n) -> int:
return n//(1024**2)
@profile
def h5_size(filepath):
hf = hp.File(filepath)
count = 0
for sub_idx in ("X", "layers", "obs", "obsm", "obsp",
"raw", "uns","var", "varm", "varp"):
subarr = hf.get(sub_idx, None)
if subarr:
spd = sparse_dataset(subarr)
count += cs_to_bytes(spd._to_backed()) if spd else 0
return count
@profile
def anndata_size(filepath):
adata = ad.read_h5ad(filepath, backed='r')
return adata.__sizeof__(with_disk=True)
def main():
f = None
if len(sys.argv) > 1:
f = Path(sys.argv[1])
print(f)
else:
print("Pass filepath as first command argument")
exit()
h5_size(f)
anndata_size(f)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment