Skip to content

Instantly share code, notes, and snippets.

@thearn
Created March 8, 2016 18:09
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thearn/38fa31ab5a0712ebfcc8 to your computer and use it in GitHub Desktop.
Save thearn/38fa31ab5a0712ebfcc8 to your computer and use it in GitHub Desktop.
import numpy as np
from openmdao.core.component import Component
from pycycle.constants import R_UNIVERSAL_ENG, MIN_VALID_CONCENTRATION
from openmdao.core.options import OptionsDictionary
from openmdao.components.indep_var_comp import IndepVarComp
from ad import adnumber
from ad.admath import log
class ChemEq(Component):
""" Find the equilibirum composition for a given gaseous mixture """
def __init__(self, thermo, mode="T"):
super(ChemEq, self).__init__()
self.thermo = thermo
self.mode = mode
self.num_prod = num_prod = thermo.num_prod
self.num_element = num_element = thermo.num_element
self.fd_options['step_size'] = 1e-6
self.fd_options['step_type'] = "absolute"
self.solver_options = OptionsDictionary()
self.solver_options.add_option('tol', 1e-10, desc="internal solver tolerance")
self.solver_options.add_option('maxiter', 150, desc="internal solver maximum iteration")
# Input vars
self.add_param('init_prod_amounts', val=thermo.init_prod_amounts, shape=(num_prod,),
desc="initial mass fractions of products, before equilibrating")
self.add_param('P', val=1.0, units="bar", desc="Pressure")
self.state_meta = {}
if mode=="T":
self.add_param('T', val=284., units="degK", desc="Temperature")
elif mode=="h" or mode=="S":
self.state_meta['T'] = {'log':True, 'idx':num_prod+num_element, 'under_relax':False}
self.add_output('T', val=284., units="degK", desc="Temperature",
**self.state_meta['T'])
else:
raise ValueError('Only "T", "h", or "S" are allowed values for mode')
if mode == "h":
self.add_param('h', val=0., units="cal/g", description="Enthalpy")
elif mode == "S":
self.add_param('S', val=0., units="cal/(g*degK)", description="Entropy")
# State vars
self.n_init = np.ones(num_prod) / num_prod / 10 # thermo.init_prod_amounts
self.state_meta['n'] = {'log':True, 'idx':slice(0,num_prod), 'under_relax':True}
self.add_output('n', val=self.n_init, shape=num_prod,
desc="mole fractions of the mixture",
**self.state_meta['n'])
self.state_meta['pi'] = {'log':False, 'idx':slice(num_prod,num_prod+num_element), 'under_relax':False}
self.add_output('pi', val=np.zeros(num_element),
desc="modified lagrange multipliers from the Gibbs lagrangian",
**self.state_meta['pi'])
# Outputs
self.add_output('b0', shape=num_element, # when converged, b0=b
desc='assigned kg-atoms of element i per total kg of reactant for the initial prod amounts')
self.add_output('n_moles', shape=1, desc="1/molecular weight of gas")
# allocate the newton Jacobian
self.size = size = num_prod + num_element
if mode != "T":
size += 1 # added T as a state variable
self._dRdy = np.zeros((size, size))
self._R = np.zeros(size)
self.total_iters = 0
self.DEBUG=False
def _apply_nonlinear(self, params, unknowns, resids):
"""
Total derivs = dF/dX - dF/dy[(dR/dy)^-1 (dR/dx)]
assumed ordering in all arrays:
params:
mode var (T, h, or S), P, init_prod_amounts
states:
n, pi, T (if mode is h or S)
outputs:
b0, n_moles, n, pi, T (if mode is h or S)
"""
thermo = self.thermo
n = unknowns['n']
#n = abs(np.random.randn(n.size))
pi = unknowns['pi']
if self.mode != "T":
T = unknowns['T']
else:
T = params['T']
if self.mode == "h":
h = params['h']
h = adnumber(h)
if self.mode == "S":
S = params['S']
S = adnumber(S)
P = params['P'] + abs(np.random.randn(1)*10)
ipa = params['init_prod_amounts']
ipa = adnumber(ipa)
T = adnumber(T)
P = adnumber(P)
n = adnumber(n)
pi = adnumber(pi)
n_moles_old = unknowns['n_moles'].copy()
n_moles = unknowns['n_moles'] = np.sum(n)
resids['n_moles'] = n_moles - n_moles_old
self.H0_T = H0_T = thermo.H0(T)
self.S0_T = S0_T = thermo.S0(T)
self.mu = H0_T - S0_T + log(n) + log(P) - log(n_moles)
# residuals from the gibbs energy minimization
resids_n = self.mu - np.sum(pi*thermo.aij.T, axis=1)
resids['n'] = resids_n
b0_old = unknowns['b0'].copy()
b0 = unknowns['b0'] = np.sum(thermo.aij*ipa, axis=1) # vectorization for speedup
resids_b0 = b0 - b0_old
resids['b0'] = resids_b0
# residuals from the conservation of mass
resids_pi = np.sum(thermo.aij*n, axis=1) - b0
resids['pi'] = resids_pi
if self.mode == "h":
self.sum_n_H0_T = np.sum(n*H0_T)
resids_t = h - self.sum_n_H0_T*R_UNIVERSAL_ENG*T
resids['T'] = resids_t
elif self.mode == "S":
resids_t = S- R_UNIVERSAL_ENG*np.sum(n*(S0_T-log(n)+log(n_moles)-log(P)))
resids['T'] = resids_t
deriv = {}
deriv['n', 'T'] = thermo.H0_applyJ(T, 1.0) - thermo.S0_applyJ(T, 1.0)
deriv['n', 'P'] = np.ones(self.num_prod)/P
deriv['n', 'ipa'] = np.zeros(self.num_prod)
#[i.gradient(ipa)[0] for i in resids_n]
deriv['pi', 'T'] = np.zeros(self.num_element)
deriv['pi', 'P'] = np.zeros(self.num_element)
deriv['pi', 'ipa'] = -thermo.aij
deriv['T', 'P'] = 0.0
if self.mode == "S":
deriv['T', 'P'] = R_UNIVERSAL_ENG * 1/P * sum(n)
deriv['T', 'ipa'] = np.zeros(self.num_prod)
# VERIFY DR/DY
self._calc_dRdy(params, unknowns)
# # dn/dn
print "resid_n, n"
d1 = [i.gradient(n) for i in resids_n]
d2 = self._dRdy[:self.num_prod, :self.num_prod]
print np.linalg.norm(d1 - d2)
print "resid_n, pi"
d1 = [i.gradient(pi) for i in resids_n]
end_element = self.num_prod+self.num_element
d2 = self._dRdy[:self.num_prod, self.num_prod:end_element]
print np.linalg.norm(d1 - d2)
print "resid_pi, n"
d1 = [i.gradient(n) for i in resids_pi]
d2 = self._dRdy[self.num_prod:end_element, :self.num_prod]
print np.linalg.norm(d1 - d2)
print "resid_pi, pi"
d1 = [i.gradient(pi) for i in resids_pi]
d2 = self._dRdy[self.num_prod:end_element,self.num_prod:end_element]
print np.linalg.norm(d1 - d2)
print "resid_pi, P"
d1 = [i.gradient(P) for i in resids_pi]
print d1
print deriv['pi', 'P']
print
print "resids_n, P"
d1 = [i.gradient(P) for i in resids_n]
print d1
print deriv['n', 'P']
print
print "resids_t, P"
d1 = resids_t.gradient(P)
print d1
print deriv['T', 'P']
print
# # -------------
if self.mode != "T":
print "resid_t, t"
d1 = resids_t.gradient(T)
d2 = self._dRdy[-1, -1]
print np.linalg.norm(d1 - d2)
print "resid_t, n"
d1 = resids_t.gradient(n)
d2 = self._dRdy[-1, :self.num_prod]
print np.linalg.norm(d1 - d2)
print "resid_t, pi"
d1 = resids_t.gradient(pi)
d2 = self._dRdy[-1, self.num_prod:end_element]
print np.linalg.norm(d1 - d2)
print "resid_n, t"
d1 = [i.gradient(T)[0] for i in resids_n]
d2 = self._dRdy[:self.num_prod, -1]
print np.linalg.norm(d1 - d2)
print "resid_pi, t"
d1 = [i.gradient(T) for i in resids_pi]
d2 = self._dRdy[self.num_prod:end_element, -1]
print np.linalg.norm(d1 - d2)
# -----------------
#print d1
# VERIFY DR/DX
inputs = [T, P, ipa]
states = [resids_n, resids_pi]
inpn = ["T", "P", "ipa"]
stn = ["n", "pi"]
from pprint import pprint
s_ = 0
for s in states:
i_ = 0
for i in inputs:
print inpn[i_], stn[s_]
if inpn[i_] == "T" and stn[s_] == "n":
d1 = [k.gradient(i)[0] for k in s]
else:
d1 = [k.gradient(i) for k in s]
d2 = deriv[stn[s_], inpn[i_]]
print np.linalg.norm(d1 - d2)
print "-----"
i_ +=1
s_ +=1
quit()
def _calc_dRdy(self, params, unknowns):
""" computes the jacobian for the newton solver """
thermo = self.thermo
aij = thermo.aij
num_prod = self.num_prod
n = unknowns['n']
dRdy = self._dRdy
# dRgibbs_dn
dRdy[:num_prod, :num_prod] = -1/np.sum(n)
np.fill_diagonal(dRdy[:num_prod, :num_prod], 1/n - 1/np.sum(n))
mode = self.mode
if mode != "T":
# dRgibbs_dT
T = unknowns['T']
self.dH0_dT = thermo.H0_applyJ(T, 1)
self.dS0_dT = thermo.S0_applyJ(T, 1)
dRdy[:num_prod,-1] = self.dH0_dT - self.dS0_dT
# dRmass_dT = 0
if mode == "h":
# dRT_dn
dRdy[-1,:num_prod] = - R_UNIVERSAL_ENG*T*self.H0_T
# dRT_dT
dRdy[-1,-1] = -R_UNIVERSAL_ENG*(T*np.sum(n*self.dH0_dT) + self.sum_n_H0_T)
if mode == "S":
P = params['P']
n_moles = unknowns['n_moles']
# dRT_dn = 0
dRdy[-1,:num_prod] = -R_UNIVERSAL_ENG*(self.S0_T - np.log(n)+np.log(n_moles)-np.log(P))
# dRT_dT = 0
#uc*(S0_T + np.log(sum_nj) - np.log(P) - np.log(nj))
valid_products = n > MIN_VALID_CONCENTRATION
dRdy[-1,-1] = -R_UNIVERSAL_ENG*np.sum(n[valid_products]*self.dS0_dT[valid_products])
end_element = num_prod+self.num_element
# dRgibbs_dpi
dRdy[:num_prod, num_prod:end_element] = -aij.T
# dRmass_dn
dRdy[num_prod:end_element, :self.num_prod] = aij
return dRdy
def _build_R_vec(self, resids):
# assemble the residual vector for the solver
R = self._R
for s_var, s_meta in self.state_meta.items():
R[s_meta['idx']] = resids[s_var]
return R
def solve_nonlinear(self, params, unknowns, resids):
local_resids = resids.vec.copy()
R_gibbs = resids['n']
# R_mass = resids['pi']
n = unknowns['n']
# n[n<=0] = 1e-5 # hack!
self._apply_nonlinear(params, unknowns, resids)
R = self._build_R_vec(resids)
err = np.linalg.norm(R)
if err > 1e-1: #sometimes the last converged point is not a good guess
n = unknowns['n'] = self.n_init
self._apply_nonlinear(params, unknowns, resids)
R = self._build_R_vec(resids)
err = np.linalg.norm(R)
self.iter_count = 1
tol = self.solver_options['tol']
maxiter = self.solver_options['maxiter']
while err > tol and self.iter_count < maxiter:
if self.DEBUG:
print(self.iter_count, R, err)
# raw_input()
self._calc_dRdy(params, unknowns)
delta_y = np.linalg.solve(self._dRdy, -R)
valid_products = n > MIN_VALID_CONCENTRATION
R_max = abs(5*R_gibbs[-1])
if np.any(valid_products):
R_max = max(R_max, np.max(np.abs(R_gibbs[valid_products])))
# lambdaf is an under-relaxation factor that will be 1 except in very unconverged states
lambdaf = 2 / (R_max+1e-20)
if (lambdaf > 1):
lambdaf = 1
for s_var,s_meta in self.state_meta.items():
idx = s_meta['idx']
if s_meta['log']:
step = delta_y[idx]/unknowns[s_var]
if np.isscalar(step):
if step>1e4:
step = 1e4
elif step<-1e4:
step = -1e4
else:
step[step>1e4] = 1e4
step[step<-1e4] = -1e4
if s_meta['under_relax']:
delta_s = np.exp(lambdaf*step)
else:
delta_s = np.exp(step)
unknowns[s_var] *= delta_s
else:
delta_s = delta_y[idx]
unknowns[s_var] += delta_s
self._apply_nonlinear(params, unknowns, resids)
R = self._build_R_vec(resids)
err = np.linalg.norm(R)
self.iter_count += 1
resids.vec[:] = local_resids
# print "finished chem_eq solve", self.iter_count, err
def jacobian(self, params, unknowns, resids):
dRdy = self._dRdy
num_prod = self.num_prod
num_element = self.num_element
thermo = self.thermo
nb0 = num_element
nn = num_prod
npi = num_element
ninit = num_prod
# MODE = T
if self.mode == "T":
nF = num_prod + 2*num_element + 1
ny = num_prod + num_element
else:
nF = num_prod + 2*num_element + 2
ny = num_prod + num_element + 1
nx = num_prod + 2
"""
Total derivs = dF/dX - dF/dy[(dR/dy)^-1 (dR/dx)]
assumed ordering in all arrays:
params:
mode var (T, h, or S), P, init_prod_amounts
states:
n, pi, T (if mode is h or S)
outputs:
b0, n_moles, n, pi, T (if mode is h or S)
"""
# param index locations for dFdx, dydx, and total deriv j
mode_idx = 0
P_idx = 1
ipa_idx = slice(2, 2 + ninit)
# state index locations (not used here, just for reference)
# but is consistent with dRdy
state_n_idx = slice(0, nn)
state_pi_idx = slice(state_n_idx.stop, state_n_idx.stop + npi)
state_T_idx = state_pi_idx.stop
# output index locations for dFdx, dFdy, and total deriv j
b0_idx = slice(0, num_element)
n_moles_idx = b0_idx.stop
n_idx = slice(n_moles_idx, n_moles_idx + nn)
pi_idx = slice(n_idx.stop, n_idx.stop + npi)
t_idx = pi_idx.stop
## output wrt input
dFdx = np.zeros((nF, nx))
# db0/dinitprod
dFdx[b0_idx, ipa_idx] = self.thermo.aij
# dpi/d_initprod
dFdx[pi_idx, ipa_idx] = -self.thermo.aij
if self.mode == "T":
T = params['T']
# dn/dt
dFdx[n_idx, mode_idx] = thermo.H0_applyJ(T, 1) - thermo.S0_applyJ(T, 1)
else:
# dT/ds or dT/dh
dFdx[t_idx, mode_idx] = 1
## output wrt states
dFdy = np.zeros((nF, ny))
# dnmoles/dn
dFdy[n_moles_idx, :num_prod] = 1
# dn/dn
dFdy[n_idx, : num_prod] = 1
# dpi/dpi
dFdy[pi_idx, num_prod:] = 1
## residuals wrt inputs
dRdx = np.zeros((ny, nx))
dRdx[:nx,:] = np.eye(nx)
## states wrt inputs (matrix-matrix system)
dydx = np.linalg.solve(dRdy, dRdx)
## total derivs
j = dFdx - dFdy.dot(dydx)
J = {}
J['b0', self.mode] = j[b0_idx, mode_idx].reshape((nb0,1))
J['b0', 'P'] = j[b0_idx, P_idx].reshape((nb0,1))
J['b0', 'init_prod_amounts'] = j[b0_idx, ipa_idx].reshape((nb0,ninit))
J['n_moles', self.mode] = j[n_moles_idx, mode_idx].reshape((1,1))
J['n_moles', 'P'] = j[n_moles_idx, P_idx].reshape((1,1))
J['n_moles', 'init_prod_amounts'] = j[n_moles_idx, ipa_idx].reshape((1,ninit))
J['n', self.mode] = j[n_idx, mode_idx].reshape((nn,1))
J['n', 'P'] = j[n_idx, P_idx].reshape((nn,1))
J['n', 'init_prod_amounts'] = j[n_idx, ipa_idx].reshape((nn,ninit))
J['pi', self.mode] = j[pi_idx, mode_idx].reshape((npi,1))
J['pi', 'P'] = j[pi_idx, P_idx].reshape((npi,1))
J['pi', 'init_prod_amounts'] = j[pi_idx, ipa_idx].reshape((npi,ninit))
if self.mode == "S" or self.mode == "h":
J['T', self.mode] = j[t_idx, mode_idx].reshape((1,1))
J['T', 'P'] = j[t_idx, P_idx].reshape((1,1))
J['T', 'init_prod_amounts'] = j[t_idx, ipa_idx].reshape((1,ninit))
print J['T', 'P']
return J
if __name__ == "__main__":
from openmdao.api import Problem, Group
from pycycle import species_data
thermo = species_data.Thermo(species_data.janaf, {'H': 1, 'CO2': 1, 'Ar': 1, 'O': 1, 'N': 1})
ipa= np.array([ 3.15433423e-04, 0.00000000e+00, 8.69292039e-04,
0.00000000e+00, 1.07446088e-05, 3.82488497e-06,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
0.00000000e+00, 0.00000000e+00, 0.00000000e+00,
2.63003773e-02, 0.00000000e+00, 0.00000000e+00,
7.05560402e-03])
P= 31.026407819265
h= 106.9708225318558
T= 1626.1565398377627
def make():
prob = Problem()
prob.root = Group()
prob.root.add('chemeq', ChemEq(thermo, mode="h"), promotes=["*"])
params = (
('P', P, {'units':'psi'}),
('h', h, {'units':'cal/g'}),
('init_prod_amounts', ipa),
)
prob.root.add('des_vars', IndepVarComp(params), promotes=["*"])
prob.setup(check=False)
return prob
prob = make()
prob.run()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment