Skip to content

Instantly share code, notes, and snippets.

@danielballan
Created January 13, 2024 13:29
Show Gist options
  • Save danielballan/2850c1d19bc6d73827942252cc367622 to your computer and use it in GitHub Desktop.
Save danielballan/2850c1d19bc6d73827942252cc367622 to your computer and use it in GitHub Desktop.
dask h5py lifecycle issue
import dask
import dask.array as da
import distributed
from distributed.protocol.serialize import dask_deserialize, dask_serialize
from dask.distributed import Client
import h5py
import numpy as np
import logging
logger = logging.Logger(__name__)
def register_custom_deserializer():
dask._h5py_files = {}
@dask_deserialize.register(h5py.File)
def deserialize_h5py_file(header, frames):
import h5py
file = h5py.File(header["filename"], mode="r")
dask._h5py_files[header["filename"]] = file
print('stashed', dask._h5py_files)
return file
@dask_deserialize.register((h5py.Group, h5py.Dataset))
def deserialize_h5py_dataset(header, frames):
file = deserialize_h5py_file(header, frames)
print('deserialized dataset')
return file[header["name"]]
print('registered')
def close_all_files():
print("CLOSING!!", dask._h5py_files)
while dask._h5py_files:
filename, file = dask._h5py_files.popitem()
file.close()
import time; time.sleep(0.1)
if __name__ == "__main__":
print(f"Version of Dask: {dask.__version__}")
print(f"Version of Distributed: {distributed.__version__}")
print(f"===============================")
# Create HDF5 file
print("Creating HDF5 file")
fln = "test.h5"
with h5py.File(fln, "w") as f:
dset = f.create_dataset("data", data=np.random.random(size=(100, 100)), chunks=(10, 10), dtype="float64")
print("Creating client")
client = Client()
client.run(register_custom_deserializer)
# Process the file
print("Loading and processing data")
with h5py.File(fln, "r") as f:
data = da.from_array(f["data"], chunks=(10, 10))
sm_fut = da.sum(data, axis=0).persist(scheduler=client)
sm = sm_fut.compute(scheduler=client)
print(f"sm={sm}")
client.run(close_all_files)
# Try to open file for writing
print("Attempting to open file for writing")
try:
with h5py.File(fln, "r+") as f:
print("File was opened for writing !!!")
except OSError as ex:
logger.exception("Failed to open file for writing: %s", ex)
print("Closing client")
client.close()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment