Last active
January 31, 2024 11:16
-
-
Save Neah-Ko/2a90cf6da3f810a98e5f3485f0a031d4 to your computer and use it in GitHub Desktop.
anndata h5py profile
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import sys | |
import psutil | |
import inspect | |
import time | |
from pathlib import Path | |
import h5py as hp | |
import anndata as ad | |
from anndata.experimental import sparse_dataset | |
def elapsed_since(start): | |
#return time.strftime("%H:%M:%S", time.gmtime(time.time() - start)) | |
elapsed = time.time() - start | |
if elapsed < 1: | |
return str(round(elapsed*1000,2)) + "ms" | |
if elapsed < 60: | |
return str(round(elapsed, 2)) + "s" | |
if elapsed < 3600: | |
return str(round(elapsed/60, 2)) + "min" | |
else: | |
return str(round(elapsed / 3600, 2)) + "hrs" | |
def get_process_memory(): | |
process = psutil.Process(os.getpid()) | |
mi = process.memory_info() | |
return mi.rss, mi.vms, mi.shared | |
def format_bytes(bytes): | |
if abs(bytes) < 1000: | |
return str(bytes)+"B" | |
elif abs(bytes) < 1e6: | |
return str(round(bytes/1e3,2)) + "kB" | |
elif abs(bytes) < 1e9: | |
return str(round(bytes / 1e6, 2)) + "MB" | |
else: | |
return str(round(bytes / 1e9, 2)) + "GB" | |
def profile(func, *args, **kwargs): | |
def wrapper(*args, **kwargs): | |
rss_before, vms_before, shared_before = get_process_memory() | |
start = time.time() | |
result = func(*args, **kwargs) | |
elapsed_time = elapsed_since(start) | |
rss_after, vms_after, shared_after = get_process_memory() | |
print("Profiling: {:>20} RSS: {:>8} | VMS: {:>8} | SHR {" | |
":>8} | time: {:>8} | result {:>8}" | |
.format("<" + func.__name__ + ">", | |
format_bytes(rss_after - rss_before), | |
format_bytes(vms_after - vms_before), | |
format_bytes(shared_after - shared_before), | |
elapsed_time, | |
format_bytes(result))) | |
return result | |
if inspect.isfunction(func): | |
return wrapper | |
elif inspect.ismethod(func): | |
return wrapper(*args,**kwargs) | |
def cs_to_bytes(X) -> int: | |
return int(X.data.nbytes + X.indptr.nbytes + X.indices.nbytes) | |
def MB(n) -> int: | |
return n//(1024**2) | |
@profile | |
def h5_size(filepath): | |
hf = hp.File(filepath) | |
count = 0 | |
for sub_idx in ("X", "layers", "obs", "obsm", "obsp", | |
"raw", "uns","var", "varm", "varp"): | |
subarr = hf.get(sub_idx, None) | |
if subarr: | |
spd = sparse_dataset(subarr) | |
count += cs_to_bytes(spd._to_backed()) if spd else 0 | |
return count | |
@profile | |
def anndata_size(filepath): | |
adata = ad.read_h5ad(filepath, backed='r') | |
return adata.__sizeof__(with_disk=True) | |
def main(): | |
f = None | |
if len(sys.argv) > 1: | |
f = Path(sys.argv[1]) | |
print(f) | |
else: | |
print("Pass filepath as first command argument") | |
exit() | |
h5_size(f) | |
anndata_size(f) | |
if __name__ == "__main__": | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment