Created
May 8, 2018 20:21
-
-
Save bekozi/0164e7b7364ac7d2578276ab476ceaa7 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import netCDF4 as nc | |
import ocgis as ESMF | |
import numpy as np | |
from ocgis.vmachine.mpi import rank_print | |
NTIMESTEPS = 3 | |
NELEMPERPET = 4 | |
IO_TAG = 111 | |
class ESMF_IOHandle(object): | |
def __init__(self, arrayBundle, filename, srcPetList=[0], dstPetList=[0]): | |
self.filename = filename | |
self.srcPetList = srcPetList | |
self.dstPetList = dstPetList | |
self.localRoot = self.dstPetList[0] | |
self.subcommName = '__io_handle_{}__'.format(self.localRoot) | |
self.fileHandle = None | |
self.is_dst = ESMF.vm.rank in dstPetList | |
self.sends = [] | |
self.recvs = [] | |
ESMF.vm.create_subcomm(self.subcommName, dstPetList) | |
with ESMF.vm.scoped_by_name(self.subcommName): | |
if not ESMF.vm.is_null: | |
if ESMF.vm.rank == 0: | |
ds = nc.Dataset(self.filename, mode='w') | |
for array in arrayBundle.values(): | |
for dimension in array.dimensions: | |
if dimension.name not in ds.dimensions: | |
ds.createDimension(dimension.name, dimension.size) | |
ncvar = ds.createVariable(array.name, array.dtype, array.dimension_names) | |
for k, v in array.attrs.items(): | |
ncvar.setncattr(k, v) | |
for k, v in arrayBundle.attrs.items(): | |
ds.setncattr(k, v) | |
ds.close() | |
ESMF.vm.barrier() | |
def Run(self, arrayBundle, slc=None): | |
if slc is None: | |
slc = slice(None) | |
localPet = ESMF.vm.rank | |
if localPet in self.srcPetList: | |
while len(self.sends) > 0: | |
if not all(ii[1].Test() for ii in self.sends): | |
continue | |
else: | |
self.sends = [] | |
buf = arrayBundle['array1'].v().copy() | |
req = ESMF.vm.comm.Isend([buf, 'i'], dest=self.dstPetList[0], tag=IO_TAG) | |
self.sends.append((localPet, req)) | |
else: | |
for pet in self.srcPetList: | |
buf = np.zeros(NELEMPERPET, 'i') | |
req = ESMF.vm.comm.Irecv([buf, 'i'], source=pet, tag=IO_TAG) | |
self.recvs.append((pet, req, buf)) | |
finished = [False] * len(self.recvs) | |
while len(self.recvs) > 0: | |
for ii, recv in enumerate(self.recvs): | |
if recv[1].Test(): | |
start = recv[0] * NELEMPERPET | |
stop = start + NELEMPERPET | |
ds = nc.Dataset(self.filename, mode='a') | |
ds.variables['array1'][slc[0], start:stop] = recv[2] | |
ds.close() | |
finished[ii] = True | |
if all(finished): | |
self.recvs = [] | |
def Finalize(self): | |
if self.is_dst: | |
if self.fileHandle is not None: | |
self.fileHandle.close() | |
ESMF.vm.free_subcomm(name=self.subcommName) | |
def model_run(timestep, arrayBundle, srcPetList): | |
localPet = ESMF.vm.rank | |
if localPet in srcPetList: | |
for srcArray in arrayBundle.values(): | |
value = np.ones(NELEMPERPET, dtype='i') * (localPet + 1) + (timestep + 1) * 100 | |
value = value.reshape(1, -1) | |
srcArray.v()[:] = value | |
return arrayBundle | |
def main(): | |
# Get local rank for the executing process. | |
localPet = ESMF.vm.rank | |
# Computational PET(s) producing field data. | |
pets_compute = [0, 1, 2] | |
# Worker PET(s) writing data to disk. | |
pets_io = [3] | |
# Attributes to attach to the output array. | |
array_attrs = {'array1_attr': 55} | |
if localPet in pets_compute: | |
# Array decomposition on compute nodes. | |
dimensions = [ESMF.Dimension('time', size_current=1), | |
ESMF.Dimension('dim_array1', NELEMPERPET)] | |
else: | |
# Array decomposition on IO nodes. This is the actual shape of the final output data. | |
dimensions = [ESMF.Dimension('time', size=3), | |
ESMF.Dimension('dim_array1', size=len(pets_compute) * NELEMPERPET)] | |
# Create the array on the compute and IO nodes. The decomposition is different for compute v. io. | |
array1 = ESMF.Variable(name='array1', attrs=array_attrs, dimensions=dimensions, dtype='i') | |
# Create an array bundle and add some global attributes. | |
arrayBundle = ESMF.VariableCollection(variables=[array1]) | |
arrayBundle.attrs['something_global'] = 180. | |
# Create the IO "handle". A handle writes (or reads) from a single file in this example. Multi-files could be | |
# handled via an overload. | |
ioHandle = ESMF_IOHandle(arrayBundle, filename='tmp_iofoo_test.nc', srcPetList=pets_compute, dstPetList=pets_io) | |
# Here is the time loop for the model. The model run updates data in the array bundle on compute nodes. | |
for timestep in range(NTIMESTEPS): | |
model_run(timestep, arrayBundle, pets_compute) | |
# This is a slice / indexing object that carries the information about where in the array bundle data was | |
# updated. | |
slc = [slice(timestep, timestep + 1), slice(None)] | |
# Run the IO handle to persist the portion of the array bundle that was updated to disk. The writing happens | |
# asynchronously, and there are numerous ways to control the backlog. Right now, the compute PETs check if the | |
# data has been written to disk before sending a new batch. | |
ioHandle.Run(arrayBundle, slc=slc) | |
# Finalize the IO handle and free any resources. | |
ioHandle.Finalize() | |
# Print the output file to make sure everything looks okay. | |
ESMF.vm.barrier() | |
with ESMF.vm.scoped('inspect', [0]): | |
if not ESMF.vm.is_null: | |
rd = ESMF.RequestDataset(ioHandle.filename, crs=None) | |
rd.inspect() | |
for ovar in rd.get().values(): | |
print('ovar value for {}=\n'.format(ovar.name), ovar.get_value()) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment