Skip to content

Instantly share code, notes, and snippets.

@pantaray
Last active October 4, 2022 09:26
Show Gist options
  • Save pantaray/6c3f17ef005d2c7b33f17bc6d6841508 to your computer and use it in GitHub Desktop.
Save pantaray/6c3f17ef005d2c7b33f17bc6d6841508 to your computer and use it in GitHub Desktop.
Proposed feature addition for ACME
# Consider the following function
def arr_func(x, y, z=3):
return x * (y + z)
# We want to evaluate `arr_func` for three different values of `x` and the same `y`:
xList = [np.array([1, 2, 3]), np.array([-1, -2, -3]), np.array([-1, 2, -3])]
y = np.array([4, 5, 6])
# Firing off `ParallelMap` and collecting results in memory yields
with ParallelMap(arr_func, xList, y, z=0.5, write_worker_results=False) as pmap:
res = pmap.compute()
>>> res
[array([ 7, 14, 21]), array([ -8, -16, -24]), array([ -9, 18, -27])]
# Now, with `write_worker_results = True` (the default) ACME currently
# creates `n_inputs` HDF5 containers:
with ParallelMap(arr_func, xList, y, z=0.5) as pmap:
res = pmap.compute()
>>> res
['/cs/home/username/ACME_20220906-1135-448825/arr_func_0.h5',
'/cs/home/username/ACME_20220906-1135-448825/arr_func_1.h5',
'/cs/home/username/ACME_20220906-1135-448825/arr_func_2.h5']
# How about, ACME as a new default (additionally) generates a single HDF5
# container that collects all worker-results:
>>> res
'/cs/home/username/ACME_20220906-1135-448825/arr_func.h5'
# Each worker-result can be accessed using a by-worker HDF group:
>>> h5f = h5py.File('/cs/home/username/ACME_20220906-1135-448825/arr_func.h5', 'r')
>>> h5f.keys()
<KeysViewHDF5 ['worker_0', 'worker_1', 'worker_2']>
>>> h5f['worker_0'].keys()
<KeysViewHDF5 ['result_0']>
>>> h5f['worker_1'].keys()
<KeysViewHDF5 ['result_0']>
>>> h5f['worker_2'].keys()
<KeysViewHDF5 ['result_0']>
# The actual results can then be accessed directly from the single container
>>> h5f['worker_0']['result_0'][()]
array([ 7, 14, 21])
>>> h5f['worker_1']['result_0'][()]
array([ -8, -16, -24])
>>> h5f['worker_2']['result_0'][()]
array([ -9, 18, -27])
# In addition, ACME could (optionally) support a `outShape` keyword to not
# store results in different HDF groups/datasets but collect everything in one
# array-like structure. By specifying `outShape = (None, 3)` you could tell
# ACME to stack all resulting 3-element arrays along the first dimension:
with ParallelMap(arr_func, xList, y, z=0.5, outShape=(None, 3)) as pmap:
res = pmap.compute()
>>> h5f = h5py.File('/cs/home/username/ACME_20220906-1135-448825/arr_func.h5', 'r')
>>> h5f['result_0'][()]
array([[ 7, 14, 21],
[ -8, -16, -24],
[ -9, 18, -27]])
@timnaher
Copy link

timnaher commented Sep 6, 2022

Great ideas!

I am a bit confused by the worker's result. Does it make sense to split the output among workers? At the moment I do not see the point in accessing what a single worker did, particularly if you recycle the workers. I assume that you use njobs = nworkers here, so it makes sense in the example. If you use 5 workers for 10 jobs, I think it should be possible to index the HDF5 container per job and not per worker.

Also, would this file (e.g. in Mathias's case) not blow up? I'm not so familiar with HDF5, so Im sorry if this is obvious. I assume that in line 32 you load the whole file into memory, making the outcome similar to write_worker_results=False, but with previously saving a version to disk?

I love the idea of being more flexible with the output of ACME. Particularly the last idea is great. Nevertheless, I think the basic functionality is already nice. Mathia's computation would lead to something like 800_000_000 results which are written to disk. It could also be an option to create fewer files than that. The user might specify the level of concatenation, e.g. merge 1000 results in 1 HDF5 file. This might be easier on the local memory when loading these files... I usually do some operations while I load the files as well. This could be more manageable with this approach. E.g. I save a cross-spectral density for all frequencies to disk. However, when I load the data I only take out a certain frequency band. Nevertheless, I want to save the whole spectrum to disk in the first place to avoid doing the parallel computation twice.

@pantaray
Copy link
Author

pantaray commented Sep 6, 2022

Thanks for the quick feedback!

Does it make sense to split the output among workers? At the moment I do not see the point in accessing what a single worker did, particularly if you recycle the workers. I assume that you use njobs = nworkers here, so it makes sense in the example. If you use 5 workers for 10 jobs, I think it should be possible to index the HDF5 container per job and not per worker.

Ahem, yes, the worker/job dichotomy again. You're absolutely right, I meant jobs, i.e., it should be

h5f['job_0']['result_0'][()]

etc. However, I'm not sure that's a great name either (since the 5 "workers" are actually 5 SLURM jobs). Maybe h5f['run_0'][...]? What do you think?

Also, would this file (e.g. in Mathias's case) not blow up? I'm not so familiar with HDF5, so Im sorry if this is obvious. I assume that in line 32 you load the whole file into memory, making the outcome similar to write_worker_results=False, but with previously saving a version to disk?

Line 32 does not load anything into memory - it just opens a file handle to the container. However, in the example provided above, the entire dataset is loaded once the [()] operator is used (Line 57, for instance). However, an arbitrarily large HDF5 dataset can still be indexed without overflowing local memory using usual array slicing (e.g., h5f['result_0'][:, 0]) only reads the first column from disk).

Mathia's computation would lead to something like 800_000_000 results which are written to disk. It could also be an option to create fewer files than that.

The actual collection of the results could be implemented using virtual HDF5 datasets. This way the single all-encompassing results container is just a collection of pointers to the individual files generated by the workers. In cases where still lots and lots of files (like 800 mil) would be generated, a separate option (e.g., single_file = True) could be included, that forces workers to actually write to the same file using a distributed lock (as we currently implemented in Mathias' case). This would avoid the creation of a bazillion of very small files that might draw heavily on the used file-system. In our preliminary tests, he write performance using such a lock was surprisingly good, so this might be an even easier option that collecting sub-sets of results in dedicated files, I think.

@timnaher
Copy link

timnaher commented Sep 6, 2022

Haha, yes I agree it's all so confusing... How about calling them computations instead? comp_0, comp_1, etc

Line 32 does not load anything into memory - it just opens a file handle to the container. However, in the example provided above, the entire dataset is loaded once the [()] operator is used (Line 57, for instance). However, an arbitrarily large HDF5 dataset can still be indexed without overflowing local memory using usual array slicing (e.g., h5f['result_0'][:, 0]) only reads the first column from disk).

Great! Then I think it makes a lot of sense to throw them all in 1 container! I still like the single_file = True option though, just to be flexible.

I'm excited about the new features, sounds all great!

@pantaray
Copy link
Author

pantaray commented Sep 7, 2022

Great! Thanks for the quick feedback!

Haha, yes I agree it's all so confusing... How about calling them computations instead? comp_0, comp_1, etc

Sold! comp_* it is :)

@KatharineShapcott
Copy link

Sorry I'm a bit late to replay here, I really like the idea! comp_* seems like a good naming convention too. Just a small worry, what will happen if not all the computations complete? Will the whole file be corrupted or only the parts that didn't return?

@pantaray
Copy link
Author

pantaray commented Oct 4, 2022

Hi Katharine! Thanks for taking a look at this and the feedback!

In the prototype implementation (results_collection branch in the repo) file crash handling is more or less unchanged: if some computations crash, the "symlinks" in the results file pointing to the incomplete calculations just grasp at nothing. If your "emergency pickling" mechanism kicks in, the results file is removed (since HDF5 files cannot contain links to pickled data) and it's basically back to the current state (ACME generates a bunch of .pickle and .h5 files).
Only the case single_file=True (i.e., all workers really write to the same file, i.e., the results container really is just one regular file not a collection of symlinks) might run into trouble. If a worker dies while writing to the file, it might end up being corrupted and unreadable - I've included a warning message for this case.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment