Skip to content

Instantly share code, notes, and snippets.

@bekozi
Created May 8, 2018
Embed
What would you like to do?
import netCDF4 as nc
import ocgis as ESMF
import numpy as np
from ocgis.vmachine.mpi import rank_print
NTIMESTEPS = 3
NELEMPERPET = 4
IO_TAG = 111
class ESMF_IOHandle(object):
def __init__(self, arrayBundle, filename, srcPetList=[0], dstPetList=[0]):
self.filename = filename
self.srcPetList = srcPetList
self.dstPetList = dstPetList
self.localRoot = self.dstPetList[0]
self.subcommName = '__io_handle_{}__'.format(self.localRoot)
self.fileHandle = None
self.is_dst = ESMF.vm.rank in dstPetList
self.sends = []
self.recvs = []
ESMF.vm.create_subcomm(self.subcommName, dstPetList)
with ESMF.vm.scoped_by_name(self.subcommName):
if not ESMF.vm.is_null:
if ESMF.vm.rank == 0:
ds = nc.Dataset(self.filename, mode='w')
for array in arrayBundle.values():
for dimension in array.dimensions:
if dimension.name not in ds.dimensions:
ds.createDimension(dimension.name, dimension.size)
ncvar = ds.createVariable(array.name, array.dtype, array.dimension_names)
for k, v in array.attrs.items():
ncvar.setncattr(k, v)
for k, v in arrayBundle.attrs.items():
ds.setncattr(k, v)
ds.close()
ESMF.vm.barrier()
def Run(self, arrayBundle, slc=None):
if slc is None:
slc = slice(None)
localPet = ESMF.vm.rank
if localPet in self.srcPetList:
while len(self.sends) > 0:
if not all(ii[1].Test() for ii in self.sends):
continue
else:
self.sends = []
buf = arrayBundle['array1'].v().copy()
req = ESMF.vm.comm.Isend([buf, 'i'], dest=self.dstPetList[0], tag=IO_TAG)
self.sends.append((localPet, req))
else:
for pet in self.srcPetList:
buf = np.zeros(NELEMPERPET, 'i')
req = ESMF.vm.comm.Irecv([buf, 'i'], source=pet, tag=IO_TAG)
self.recvs.append((pet, req, buf))
finished = [False] * len(self.recvs)
while len(self.recvs) > 0:
for ii, recv in enumerate(self.recvs):
if recv[1].Test():
start = recv[0] * NELEMPERPET
stop = start + NELEMPERPET
ds = nc.Dataset(self.filename, mode='a')
ds.variables['array1'][slc[0], start:stop] = recv[2]
ds.close()
finished[ii] = True
if all(finished):
self.recvs = []
def Finalize(self):
if self.is_dst:
if self.fileHandle is not None:
self.fileHandle.close()
ESMF.vm.free_subcomm(name=self.subcommName)
def model_run(timestep, arrayBundle, srcPetList):
localPet = ESMF.vm.rank
if localPet in srcPetList:
for srcArray in arrayBundle.values():
value = np.ones(NELEMPERPET, dtype='i') * (localPet + 1) + (timestep + 1) * 100
value = value.reshape(1, -1)
srcArray.v()[:] = value
return arrayBundle
def main():
# Get local rank for the executing process.
localPet = ESMF.vm.rank
# Computational PET(s) producing field data.
pets_compute = [0, 1, 2]
# Worker PET(s) writing data to disk.
pets_io = [3]
# Attributes to attach to the output array.
array_attrs = {'array1_attr': 55}
if localPet in pets_compute:
# Array decomposition on compute nodes.
dimensions = [ESMF.Dimension('time', size_current=1),
ESMF.Dimension('dim_array1', NELEMPERPET)]
else:
# Array decomposition on IO nodes. This is the actual shape of the final output data.
dimensions = [ESMF.Dimension('time', size=3),
ESMF.Dimension('dim_array1', size=len(pets_compute) * NELEMPERPET)]
# Create the array on the compute and IO nodes. The decomposition is different for compute v. io.
array1 = ESMF.Variable(name='array1', attrs=array_attrs, dimensions=dimensions, dtype='i')
# Create an array bundle and add some global attributes.
arrayBundle = ESMF.VariableCollection(variables=[array1])
arrayBundle.attrs['something_global'] = 180.
# Create the IO "handle". A handle writes (or reads) from a single file in this example. Multi-files could be
# handled via an overload.
ioHandle = ESMF_IOHandle(arrayBundle, filename='tmp_iofoo_test.nc', srcPetList=pets_compute, dstPetList=pets_io)
# Here is the time loop for the model. The model run updates data in the array bundle on compute nodes.
for timestep in range(NTIMESTEPS):
model_run(timestep, arrayBundle, pets_compute)
# This is a slice / indexing object that carries the information about where in the array bundle data was
# updated.
slc = [slice(timestep, timestep + 1), slice(None)]
# Run the IO handle to persist the portion of the array bundle that was updated to disk. The writing happens
# asynchronously, and there are numerous ways to control the backlog. Right now, the compute PETs check if the
# data has been written to disk before sending a new batch.
ioHandle.Run(arrayBundle, slc=slc)
# Finalize the IO handle and free any resources.
ioHandle.Finalize()
# Print the output file to make sure everything looks okay.
ESMF.vm.barrier()
with ESMF.vm.scoped('inspect', [0]):
if not ESMF.vm.is_null:
rd = ESMF.RequestDataset(ioHandle.filename, crs=None)
rd.inspect()
for ovar in rd.get().values():
print('ovar value for {}=\n'.format(ovar.name), ovar.get_value())
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment