Skip to content

Instantly share code, notes, and snippets.

@bbrelje
Created May 9, 2020 21:18
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bbrelje/14111c6df37404b7da6a46f762bb9af4 to your computer and use it in GitHub Desktop.
Save bbrelje/14111c6df37404b7da6a46f762bb9af4 to your computer and use it in GitHub Desktop.
OpenMDAO Parallel Load Balancing and Speedup
import numpy as np
import openmdao.api as om
import time
from mpi4py import MPI
import unittest
from openmdao.utils.assert_utils import assert_near_equal
from openmdao.core.total_jac import _TotalJacInfo
import random
class DelayComp(om.ExplicitComponent):
def initialize(self):
self.counter = 0
self.options.declare('time', default=3.0)
self.options.declare('size', default=1)
def setup(self):
size = self.options['size']
self.add_input('x', shape=size)
self.add_output('y', shape=size)
self.add_output('y2', shape=size)
self.declare_partials('y', 'x')
self.declare_partials('y2', 'x')
def compute(self, inputs, outputs):
waittime = self.options['time']
size = self.options['size']
if not inputs._under_complex_step:
time.sleep(waittime)
outputs['y'] = np.linspace(3, 10, size) * inputs['x']
outputs['y2'] = np.linspace(2, 4, size) * inputs['x']
def compute_jacvec_product(self, inputs, d_inputs, d_outputs, mode):
waittime = self.options['time']
size = self.options['size']
if mode == 'fwd':
time.sleep(waittime)
if 'x' in d_inputs:
self.counter += 1
if 'y' in d_outputs:
d_outputs['y'] += np.linspace(3, 10, size)*d_inputs['x']
if 'y2' in d_outputs:
d_outputs['y2'] += np.linspace(2, 4, size)*d_inputs['x']
elif mode == 'rev':
if 'x' in d_inputs:
self.counter += 1
time.sleep(waittime)
if 'y' in d_outputs:
d_inputs['x'] += np.linspace(3, 10, size)*d_outputs['y']
if 'y2' in d_outputs:
d_inputs['x'] += np.linspace(2, 4, size)*d_outputs['y2']
model = om.Group()
iv = om.IndepVarComp()
mysize = 500
iv.add_output('x', val=3.0 * np.ones((mysize, )))
model.add_subsystem('iv', iv)
pg = model.add_subsystem('pg', om.ParallelGroup(), promotes=['*'])
# create a bunch of components with random execution times
comm = MPI.COMM_WORLD
N_PROCS = comm.size
N_PARALLEL_COMPS = comm.bcast(random.randint(20*N_PROCS, 27*N_PROCS), 0)
print(N_PARALLEL_COMPS)
TOTAL_TIME = 0.0
for i_comp in range(N_PARALLEL_COMPS):
delay_time = random.random()*0.3 # random runtime between zero and 1 seconds
delay_time = comm.bcast(delay_time, 0)
comp_name = 'dc_' + str(i_comp)
TOTAL_TIME += delay_time
pg.add_subsystem(comp_name, DelayComp(size=mysize, time=delay_time), proc_weight=delay_time)
model.connect('iv.x', comp_name+'.x')
perfect_scaling_time = TOTAL_TIME / N_PROCS
prob = om.Problem(model=model)
prob.setup()
time1 = time.time()
prob.run_model()
time2 = time.time()
runtime = time2 - time1
scaling_efficiency = perfect_scaling_time / runtime
speedup = TOTAL_TIME / runtime
print(scaling_efficiency * 100)
print(speedup)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment