Skip to content

Instantly share code, notes, and snippets.

@bbrelje
Last active July 23, 2020 22:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save bbrelje/f8a16ee42000f385973b85dd45ed6107 to your computer and use it in GitHub Desktop.
Save bbrelje/f8a16ee42000f385973b85dd45ed6107 to your computer and use it in GitHub Desktop.
import openmdao.api as om
import numpy as np
from openconcept.utilities.math.integrals import NewIntegrator
import warnings
# OpenConcept PhaseGroup will be used to hold analysis phases with time integration
# =============== These will go in OpenConcept Core ===============#
def find_integrators_in_model(system, abs_namespace, timevars, states):
durationvar = system._problem_meta['oc_time_var']
# check if we are a group or not
if isinstance(system, om.Group):
for subsys in system._subsystems_allprocs:
if not abs_namespace:
next_namespace = subsys.name
else:
next_namespace = abs_namespace + '.' + subsys.name
find_integrators_in_model(subsys, next_namespace, timevars, states)
else:
# if the duration variable shows up we need to add its absolute path to timevars
if isinstance(system, NewIntegrator):
for varname in system._var_rel_names['input']:
if varname == durationvar:
timevars.append(abs_namespace + '.' + varname)
for state in system._state_vars.keys():
states.append(abs_namespace + '.' + state)
class PhaseGroup(om.Group):
def __init__(self, **kwargs):
num_nodes = kwargs.get('num_nodes', 11)
super(PhaseGroup, self).__init__(**kwargs)
self._oc_time_var_name = 'duration'
self._oc_num_nodes = num_nodes
def _setup_procs(self, pathname, comm, mode, prob_meta):
# need to pass down the name of the duration variable via prob_meta
prob_meta.update({'oc_time_var': self._oc_time_var_name})
prob_meta.update({'oc_num_nodes': self._oc_num_nodes})
super(PhaseGroup, self)._setup_procs(pathname, comm, mode, prob_meta)
def _configure(self):
super(PhaseGroup, self)._configure()
# check child subsys for variables to be integrated and add them all
timevars = []
states = []
find_integrators_in_model(self, '', timevars, states)
# make connections from duration to integrated vars automatically
for var_abs_address in timevars:
self.connect(self._oc_time_var_name, var_abs_address)
self._oc_states_list = states
class IntegratorGroup(om.Group):
def _setup_procs(self, pathname, comm, mode, prob_meta):
# TODO allow non-second duration units
num_nodes = prob_meta['oc_num_nodes']
self.add_subsystem('ode_integ', NewIntegrator(time_setup='duration', diff_units='s', num_nodes=num_nodes))
super(IntegratorGroup, self)._setup_procs(pathname, comm, mode, prob_meta)
def _configure(self):
super(IntegratorGroup, self)._configure()
for subsys in self._subsystems_allprocs:
for var in subsys._var_rel_names['output']:
# check if there are any variables to integrate
tags = subsys._var_rel2meta[var]['tags']
if 'integrate' in tags:
state_name = None
state_units = None
state_val = 0.0
state_lower = -1e30
state_upper = 1e30
state_promotes = False
for tag in tags:
split_tag = tag.split(':')
if split_tag[0] == 'state_name':
state_name = split_tag[-1]
elif split_tag[0] == 'state_units':
state_units = split_tag[-1]
elif split_tag[0] == 'state_val':
state_val = eval(split_tag[-1])
elif split_tag[0] == 'state_lower':
state_lower = float(split_tag[-1])
elif split_tag[0] == 'state_upper':
state_upper = float(split_tag[-1])
elif split_tag[0] == 'state_promotes':
state_promotes = eval(split_tag[-1])
if state_name is None:
raise ValueError('Must provide a state_name tag for integrated variable '+subsys.name+'.'+var)
if state_units is None:
warnings.warn('OpenConcept integration variable '+subsys.name+'.'+var+' '+'has no units specified. This can be dangerous.')
self.ode_integ.add_integrand(state_name, rate_name=var, val=state_val,
units=state_units, lower=state_lower, upper=state_upper)
# make the rate connection
self.connect(subsys.name+'.'+var, 'ode_integ'+'.'+var)
if state_promotes:
self.ode_integ._var_promotes['output'].append(state_name)
class Trajectory(om.Group):
def __init__(self, **kwargs):
super(Trajectory, self).__init__(**kwargs)
self._oc_phases_to_link = []
def _configure(self):
super(Trajectory, self)._configure()
for linkage in self._oc_phases_to_link:
self._link_phases(linkage[0], linkage[1], linkage[2])
def _link_phases(self, phase1, phase2, states_to_skip=None):
# find all the states in each phase
# if they appear in both phase1 and phase2, connect them
# unless the state is in states_to_skip
# if they do not appear in both, do nothing or maybe raise an error message
# print a report of states linked
phase1_states = phase1._oc_states_list
phase2_states = phase2._oc_states_list
for state in phase1_states:
if state in phase2_states:
state_phase1_abs_name = phase1.name + '.' + state + '_final'
state_phase2_abs_name = phase2.name + '.' + state + '_initial'
self.connect(state_phase1_abs_name, state_phase2_abs_name)
def link_phases(self, phase1, phase2, states_to_skip=None):
# need to cache this because the data we need isn't ready yet
self._oc_phases_to_link.append((phase1, phase2, states_to_skip))
# --------------------- This is just an example
class NewtonSecondLaw(om.ExplicitComponent):
"A regular OpenMDAO component"
def initialize(self):
self.options.declare('num_nodes', default=1)
def setup(self):
num_nodes = self.options['num_nodes']
self.add_input('mass', val=2.0*np.ones((num_nodes,)), units='kg')
self.add_input('force', val=1.0*np.ones((num_nodes,)), units='N')
# mark the output variable for integration using openmdao tags
self.add_output('accel', val=0.5*np.ones((num_nodes,)), units='m/s**2', tags=['integrate',
'state_name:velocity',
'state_units:m/s',
'state_val:5.0',
'state_promotes:True'])
self.declare_partials(['*'], ['*'], method='cs')
def compute(self, inputs, outputs):
outputs['accel'] = inputs['force'] / inputs['mass']
class DragEquation(om.ExplicitComponent):
"Another regular OpenMDAO component that happens to take a state variable as input"
def initialize(self):
self.options.declare('num_nodes', default=1)
def setup(self):
num_nodes = self.options['num_nodes']
self.add_input('velocity', val=0.0*np.ones((num_nodes,)), units='m/s')
self.add_output('force', val=0.0*np.ones((num_nodes,)), units='N')
self.declare_partials(['*'], ['*'], method='cs')
def compute(self, inputs, outputs):
outputs['force'] = -0.10 * inputs['velocity'] ** 2
class VehicleModel(IntegratorGroup):
"""
A user wishing to integrate an ODE rate will need to subclass
this IntegratorGroup instead of the default OpenMDAO Group
but it behaves in all other ways exactly the same.
You can now incorporate this Vehicle Model in your model tree
using the regular Group. Only the direct parent of the rate
to be integrated has to use this special class.
"""
def initialize(self):
self.options.declare('num_nodes', default=11)
def setup(self):
num_nodes = self.options['num_nodes']
self.add_subsystem('nsl', NewtonSecondLaw(num_nodes=num_nodes))
self.add_subsystem('drag', DragEquation(num_nodes=num_nodes))
# velocity output is created automatically by the integrator
# if you want you can promote it, or you can connect it directly as here
self.connect('velocity', 'drag.velocity')
self.connect('drag.force','nsl.force')
class MyPhase(PhaseGroup):
"An OpenConcept Phase comprises part of a time-based trajectory and always needs to have a 'duration' defined"
def setup(self):
self.add_subsystem('ivc', om.IndepVarComp('duration', val=5.0, units='s'), promotes_outputs=['duration'])
self.add_subsystem('vm', VehicleModel())
class MyTraj(Trajectory):
"An OpenConcept Trajectory consists of one or more phases that may be linked together. This will often be a top-level model"
def setup(self):
self.add_subsystem('phase1', MyPhase())
self.add_subsystem('phase2', MyPhase())
# the link_phases directive ensures continuity of state variables across phase boundaries
self.link_phases(self.phase1, self.phase2)
if __name__ == "__main__":
prob = om.Problem(MyTraj())
prob.model.nonlinear_solver = om.NewtonSolver(iprint=2)
prob.model.linear_solver = om.DirectSolver()
prob.model.nonlinear_solver.options['solve_subsystems'] = True
prob.model.nonlinear_solver.options['maxiter'] = 20
prob.model.nonlinear_solver.options['atol'] = 1e-6
prob.model.nonlinear_solver.options['rtol'] = 1e-6
prob.setup()
# set the initial value of the state at the beginning of the trajectory
prob['phase1.vm.ode_integ.velocity_initial'] = 10.0
prob.run_model()
prob.model.list_outputs(print_arrays=True)
prob.model.list_inputs(print_arrays=True)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment