Skip to content

Instantly share code, notes, and snippets.

Created May 8, 2018
What would you like to do?
import netCDF4 as nc
import ocgis as ESMF
import numpy as np
from ocgis.vmachine.mpi import rank_print
IO_TAG = 111
class ESMF_IOHandle(object):
def __init__(self, arrayBundle, filename, srcPetList=[0], dstPetList=[0]):
self.filename = filename
self.srcPetList = srcPetList
self.dstPetList = dstPetList
self.localRoot = self.dstPetList[0]
self.subcommName = '__io_handle_{}__'.format(self.localRoot)
self.fileHandle = None
self.is_dst = ESMF.vm.rank in dstPetList
self.sends = []
self.recvs = []
ESMF.vm.create_subcomm(self.subcommName, dstPetList)
with ESMF.vm.scoped_by_name(self.subcommName):
if not ESMF.vm.is_null:
if ESMF.vm.rank == 0:
ds = nc.Dataset(self.filename, mode='w')
for array in arrayBundle.values():
for dimension in array.dimensions:
if not in ds.dimensions:
ds.createDimension(, dimension.size)
ncvar = ds.createVariable(, array.dtype, array.dimension_names)
for k, v in array.attrs.items():
ncvar.setncattr(k, v)
for k, v in arrayBundle.attrs.items():
ds.setncattr(k, v)
def Run(self, arrayBundle, slc=None):
if slc is None:
slc = slice(None)
localPet = ESMF.vm.rank
if localPet in self.srcPetList:
while len(self.sends) > 0:
if not all(ii[1].Test() for ii in self.sends):
self.sends = []
buf = arrayBundle['array1'].v().copy()
req = ESMF.vm.comm.Isend([buf, 'i'], dest=self.dstPetList[0], tag=IO_TAG)
self.sends.append((localPet, req))
for pet in self.srcPetList:
buf = np.zeros(NELEMPERPET, 'i')
req = ESMF.vm.comm.Irecv([buf, 'i'], source=pet, tag=IO_TAG)
self.recvs.append((pet, req, buf))
finished = [False] * len(self.recvs)
while len(self.recvs) > 0:
for ii, recv in enumerate(self.recvs):
if recv[1].Test():
start = recv[0] * NELEMPERPET
stop = start + NELEMPERPET
ds = nc.Dataset(self.filename, mode='a')
ds.variables['array1'][slc[0], start:stop] = recv[2]
finished[ii] = True
if all(finished):
self.recvs = []
def Finalize(self):
if self.is_dst:
if self.fileHandle is not None:
def model_run(timestep, arrayBundle, srcPetList):
localPet = ESMF.vm.rank
if localPet in srcPetList:
for srcArray in arrayBundle.values():
value = np.ones(NELEMPERPET, dtype='i') * (localPet + 1) + (timestep + 1) * 100
value = value.reshape(1, -1)
srcArray.v()[:] = value
return arrayBundle
def main():
# Get local rank for the executing process.
localPet = ESMF.vm.rank
# Computational PET(s) producing field data.
pets_compute = [0, 1, 2]
# Worker PET(s) writing data to disk.
pets_io = [3]
# Attributes to attach to the output array.
array_attrs = {'array1_attr': 55}
if localPet in pets_compute:
# Array decomposition on compute nodes.
dimensions = [ESMF.Dimension('time', size_current=1),
ESMF.Dimension('dim_array1', NELEMPERPET)]
# Array decomposition on IO nodes. This is the actual shape of the final output data.
dimensions = [ESMF.Dimension('time', size=3),
ESMF.Dimension('dim_array1', size=len(pets_compute) * NELEMPERPET)]
# Create the array on the compute and IO nodes. The decomposition is different for compute v. io.
array1 = ESMF.Variable(name='array1', attrs=array_attrs, dimensions=dimensions, dtype='i')
# Create an array bundle and add some global attributes.
arrayBundle = ESMF.VariableCollection(variables=[array1])
arrayBundle.attrs['something_global'] = 180.
# Create the IO "handle". A handle writes (or reads) from a single file in this example. Multi-files could be
# handled via an overload.
ioHandle = ESMF_IOHandle(arrayBundle, filename='', srcPetList=pets_compute, dstPetList=pets_io)
# Here is the time loop for the model. The model run updates data in the array bundle on compute nodes.
for timestep in range(NTIMESTEPS):
model_run(timestep, arrayBundle, pets_compute)
# This is a slice / indexing object that carries the information about where in the array bundle data was
# updated.
slc = [slice(timestep, timestep + 1), slice(None)]
# Run the IO handle to persist the portion of the array bundle that was updated to disk. The writing happens
# asynchronously, and there are numerous ways to control the backlog. Right now, the compute PETs check if the
# data has been written to disk before sending a new batch.
ioHandle.Run(arrayBundle, slc=slc)
# Finalize the IO handle and free any resources.
# Print the output file to make sure everything looks okay.
with ESMF.vm.scoped('inspect', [0]):
if not ESMF.vm.is_null:
rd = ESMF.RequestDataset(ioHandle.filename, crs=None)
for ovar in rd.get().values():
print('ovar value for {}=\n'.format(, ovar.get_value())
if __name__ == '__main__':
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment