Skip to content

Instantly share code, notes, and snippets.

@romanodev
Created August 24, 2020 18:55
Show Gist options
  • Save romanodev/0c5ac5c98d97220cd53c78b69457d41d to your computer and use it in GitHub Desktop.
Save romanodev/0c5ac5c98d97220cd53c78b69457d41d to your computer and use it in GitHub Desktop.
from mpi4py import MPI
import numpy as np
comm = MPI.COMM_WORLD
def allocate_shared_dict(varss):
"""allocate_shared_dict( {'data_1':np.zeros(10),'data_2':np.ones(10)} if comm.rank == 0 else None) """
dict_output = {}
if comm.Get_rank() == 0:
var_meta = {}
for var,value in varss.items():
if type(value) == list: value = np.array(value)
if value.dtype == np.int64:
data_type = 0
itemsize = MPI.INT.Get_size()
elif value.dtype == np.float64:
data_type = 1
itemsize = MPI.DOUBLE.Get_size()
else:
print('Data type for shared memory not supported for ' + var)
quit()
size = np.prod(value.shape)
nbytes = size * itemsize
var_meta[var] = [value.shape,data_type,itemsize,nbytes]
else: nbytes = 0; var_meta = None
var_meta = comm.bcast(var_meta,root=0)
#ALLOCATING MEMORY---------------
for n,(var,meta) in enumerate(var_meta.items()):
win = MPI.Win.Allocate_shared(meta[3],meta[2], comm=comm)
buf,itemsize = win.Shared_query(0)
assert itemsize == meta[2]
dt = 'i' if meta[1] == 0 else 'd'
output = np.ndarray(buffer=buf,dtype=dt,shape=meta[0])
if comm.rank == 0: output[:] = varss[var]
dict_output[var] = output
del varss
comm.Barrier()
return dict_output
def shared_array(value):
"""sa = shared_array(np.zeros(10) if comm.rank == 0 else None)"""
data = {'dummy':value}
return allocate_shared_dict(data)['dummy']
if __name__ == '__main__':
shared_dict = allocate_shared_dict( {'data_1':np.zeros(10),'data_2':np.ones(10)} if comm.rank == 0 else None)
#For a single dataset
#sa = shared_array(np.zeros(10) if comm.rank == 0 else None)
if comm.rank == 0:
shared_dict['data_1'][0] = 1
if comm.rank == 1:
print(shared_dict['data_1'])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment