Skip to content

Instantly share code, notes, and snippets.

@thearn
Created December 30, 2014 17:00
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thearn/36a1c2b8dc7b967750a8 to your computer and use it in GitHub Desktop.
Save thearn/36a1c2b8dc7b967750a8 to your computer and use it in GitHub Desktop.
from openmdao.main.api import Component
from openmdao.lib.datatypes.api import Float, Array, Str
from rk4 import RK4
import numpy as np
import pylab
# http://www.math.psu.edu/tseng/class/Math251/Notes-Predator-Prey.pdf
class PredPrey(RK4):
# initial condition
y0 = Array(np.array([3.0, 1.0]), iotype="in")
# external (i.e. independent variable)
t = Array(np.zeros((1,10)), iotype="in")
# state (i.e. solution)
# 2D array, because we could make a system of eqs
y = Array(np.zeros((2,10)), iotype="out")
a = Float(4.0, iotype="in")
b = Float(2.0, iotype="in")
c = Float(2.0, iotype="in")
d = Float(1.0, iotype="in")
r = Array(np.array([2.0]), iotype="in")
def __init__(self, start, stop, num_points):
super(PredPrey, self).__init__()
# get the right time step, etc
self.h = (stop - start) / float(num_points)
self.t = np.linspace(start, stop, num_points).reshape((1,num_points))
self.y = np.zeros((2, num_points))
self.state_var = "y"
self.init_state_var = "y0"
self.external_vars = ["t"]
self.fixed_external_vars = ["r",]
def f_dot(self, external, state):
x, y = state
t, r = external
dx = self.a*x - self.b*x*y
dy = - self.c*y + self.d*x*y
return np.array([dx, dy])
def list_deriv_vars(self):
return ("t", "y0",), ("y",)
def df_dy(self, external, state):
J = np.zeros((2, 2))
x, y = state
t, r = external
J[0,0] = self.a - self.b*y
J[0,1] = - self.b*x
J[1,0] = self.d*y
J[1,1] = - self.c + self.d*x
return J
def df_dx(self, external, state):
J = np.zeros((2, 2))
return J
if __name__ == "__main__":
c = PredPrey(0, 10, 5)
c.run()
pylab.subplot(211)
pylab.plot(c.y[0], c.y[1], 'k', linewidth=.5)
pylab.xlabel("x")
pylab.ylabel("y")
pylab.subplot(212)
pylab.plot(c.t[0], c.y[0], label="sheep")
pylab.plot(c.t[0], c.y[1], label="wolves")
pylab.xlabel("t")
pylab.legend()
pylab.show()
""" RK4 time integration component """
import numpy as np
import scipy.sparse
import scipy.sparse.linalg
from openmdao.main.api import Component
from openmdao.lib.datatypes.api import Float, Array, Str
# Allow non-standard variable names for scientific calc
# pylint: disable-msg=C0103
class RK4(Component):
"""Inherit from this component to use.
State variable dimension: (num_states, num_time_points)
External input dimension: (input width, num_time_points)
"""
h = Float(.01, units="s", iotype="in",
desc="Time step used for RK4 integration")
state_var = Str("", iotype="in",
desc="Name of the variable to be used for time "
"integration")
init_state_var = Str("", iotype="in",
desc="Name of the variable to be used for initial "
"conditions")
external_vars = Array([], iotype="in", dtype=str,
desc="List of names of variables that are external "
"to the system but DO vary with time.")
fixed_external_vars = Array([], iotype="in", dtype=str,
desc="List of names of variables that are "
"external to the system but DO NOT "
"vary with time.")
def initialize(self):
"""Set up dimensions and other data structures."""
self._y = self.get(self.state_var)
self._y0 = self.get(self.init_state_var)
self.n_states, self.n = self._y.shape
self.ny = self.n_states*self.n
self.nJ = self.n_states*(self.n + self.n_states*(self.n-1))
ext = []
self.ext_index_map = {}
for e in self.external_vars:
var = self.get(e)
self.ext_index_map[e] = len(ext)
#TODO: Check that shape[-1]==self.n
ext.extend(var.reshape(-1, self.n))
for e in self.fixed_external_vars:
var = self.get(e)
self.ext_index_map[e] = len(ext)
flat_var = var.flatten()
#create n copies of the var
ext.extend(np.tile(flat_var,(self.n, 1)).T)
self.external = np.array(ext)
#TODO
#check that len(y0) = self.n_states
self.n_external = len(ext)
self.reverse_name_map = {
self.state_var:'y',
self.init_state_var:'y0'
}
e_vars = np.hstack((self.external_vars, self.fixed_external_vars))
for i, var in enumerate(e_vars):
self.reverse_name_map[var] = i
self.name_map = dict([(v, k) for k, v in
self.reverse_name_map.iteritems()])
#TODO
# check that all ext arrays of of shape (self.n, )
#TODO
#check that length of state var and external
# vars are the same length
def f_dot(self, external, state):
"""Time rate of change of state variables.
external: array or external variables for a single time step
state: array of state variables for a single time step.
This must be overridden in derived classes.
"""
raise NotImplementedError
def df_dy(self, external, state):
"""Derivatives of states with respect to states.
external: array or external variables for a single time step
state: array of state variables for a single time step.
This must be overridden in derived classes.
"""
raise NotImplementedError
def df_dx(self, external, state):
"""derivatives of states with respect to external vars
external: array or external variables for a single time step
state: array of state variables for a single time step.
This must be overridden in derived classes.
"""
raise NotImplementedError
def eigs(self):
"""
Computes the eigenvalues of the Jacobian at the last time point
"""
external = self.external[:,-1]
state = self.get(state_var)[:,-1]
local_jacobian = self.df_dy(external, state)
eigs,ev = np.linalg.eig(local_jacobian)
return eigs
def execute(self):
"""Solve for the states at all time integration points."""
self.initialize()
n_state = self.n_states
n_time = self.n
h = self.h
# Copy initial state into state array for t=0
self._y = self._y.reshape((self.ny, ))
self._y[0:n_state] = self._y0
# Cache f_dot for use in linearize()
size = (n_state, self.n)
self._a = np.zeros(size)
self._b = np.zeros(size)
self._c = np.zeros(size)
self._d = np.zeros(size)
for k in xrange(0, n_time-1):
k1 = (k)*n_state
k2 = (k+1)*n_state
# Next state a function of current input
ex = self.external[:, k] if self.external.shape[0] \
else np.array([])
# Next state a function of previous state
y = self._y[k1:k2]
self._a[:, k] = a = self.f_dot(ex, y)
self._b[:, k] = b = self.f_dot(ex + h/2., y + h/2.*a)
self._c[:, k] = c = self.f_dot(ex + h/2., y + h/2.*b)
self._d[:, k] = d = self.f_dot(ex + h, y + h*c)
self._y[n_state+k1:n_state+k2] = \
y + h/6.*(a + 2*(b + c) + d)
state_var_name = self.name_map['y']
setattr(self, state_var_name,
self._y.T.reshape((n_time, n_state)).T)
#print "executed", self.name
def provideJ(self):
"""Linearize about current point."""
n_state = self.n_states
n_time = self.n
h = self.h
I = np.eye(n_state)
# Sparse Jacobian with respect to states
#self.Ja = np.zeros((self.nJ, ))
#self.Ji = np.zeros((self.nJ, ))
#self.Jj = np.zeros((self.nJ, ))
# Full Jacobian with respect to states
self.Jy = np.zeros((self.n, self.n_states, self.n_states))
# Full Jacobian with respect to inputs
self.Jx = np.zeros((self.n, self.n_external, self.n_states))
#self.Ja[:self.ny] = np.ones((self.ny, ))
#self.Ji[:self.ny] = np.arange(self.ny)
#self.Jj[:self.ny] = np.arange(self.ny)
for k in xrange(0, n_time-1):
k1 = k*n_state
k2 = k1 + n_state
ex = self.external[:, k] if self.external.shape[0] \
else np.array([])
y = self._y[k1:k2]
a = self._a[:, k]
b = self._b[:, k]
c = self._c[:, k]
# State vars
df_dy = self.df_dy(ex, y)
dg_dy = self.df_dy(ex + h/2., y + h/2.*a)
dh_dy = self.df_dy(ex + h/2., y + h/2.*b)
di_dy = self.df_dy(ex + h, y + h*c)
da_dy = df_dy
db_dy = dg_dy + dg_dy.dot(h/2.*da_dy)
dc_dy = dh_dy + dh_dy.dot(h/2.*db_dy)
dd_dy = di_dy + di_dy.dot(h*dc_dy)
dR_dy = -I - self.h/6.*(da_dy + 2*(db_dy + dc_dy) + dd_dy)
self.Jy[k, :, :] = dR_dy
#for i in xrange(n_state):
#for j in xrange(n_state):
#iJ = self.ny + i + n_state*(j + k1)
#self.Ja[iJ] = dR_dy[i, j]
##self.Ji[iJ] = k2 + i
##self.Jj[iJ] = k1 + j
#self.Ji[iJ] = i*n_time + k + 1
#self.Jj[iJ] = j*n_time + k
##print self.Ji[iJ], self.Jj[iJ], self.Ja[iJ]
# External vars (Inputs)
df_dx = self.df_dx(ex, y)
dg_dx = self.df_dx(ex + h/2., y + h/2.*a)
dh_dx = self.df_dx(ex + h/2., y + h/2.*b)
di_dx = self.df_dx(ex + h, y + h*c)
da_dx = df_dx
db_dx = dg_dx + dg_dy.dot(h/2*da_dx)
dc_dx = dh_dx + dh_dy.dot(h/2*db_dx)
dd_dx = di_dx + di_dy.dot(h*dc_dx)
# Input-State Jacobian at each time point.
# No Jacobian with respect to previous time points.
self.Jx[k+1, :, :] = h/6*(da_dx + 2*(db_dx + dc_dx) + dd_dx).T
#self.J = scipy.sparse.csc_matrix((self.Ja, (self.Ji, self.Jj)),
#shape=(self.ny, self.ny))
#self.JT = self.J.transpose()
#self.Minv = scipy.sparse.linalg.splu(self.J).solve
def apply_deriv(self, arg, result):
""" Matrix-vector product with the Jacobian. """
#result = self._applyJint(arg, result)
result_ext = self._applyJext(arg)
svar = self.state_var
if svar in result:
result[svar] += result_ext
else:
result[svar] = result_ext
# TODO - Uncommment this when it is supported in OpenMDAO.
#def applyMinv(self, arg, result):
#"""Apply derivatives with respect to state variables."""
#state = self.state_var
#if self.state_var in arg:
#flat_y = arg[state].flatten()
#result[state] = self.Minv(flat_y).reshape((self.n_states, self.n))
#return result
#def _applyMinvT(self, arg, result):
#"""Apply derivatives with respect to state variables."""
#state = self.state_var
#z = result.copy()
#if self.state_var in arg:
#flat_y = arg[state].flatten()
#result[state] = self.Minv(flat_y, 'T').reshape((self.n_states, self.n))
#return result
def _applyJint(self, arg, result):
"""Apply derivatives with respect to state variables."""
res1 = dict([(self.reverse_name_map[k], v)
for k, v in result.iteritems()])
state = self.state_var
if state in arg:
flat_y = arg[state].reshape((self.n_states*self.n))
result["y"] = self.J.dot(flat_y).reshape((self.n_states, self.n))
res1 = dict([(self.name_map[k],v) for k, v in res1.iteritems()])
return res1
def _applyJext(self, arg):
"""Apply derivatives with respect to inputs"""
#Jx --> (n_times, n_external, n_states)
n_state = self.n_states
n_time = self.n
result = np.zeros((n_state, n_time))
# Time-varying inputs
for name in self.external_vars:
if name not in arg:
continue
# take advantage of fact that arg is often pretty sparse
if len(np.nonzero(arg[name])[0]) == 0:
continue
# Collapse incoming a*b*...*c*n down to (ab...c)*n
var = self.get(name)
shape = var.shape
arg[name] = arg[name].reshape((np.prod(shape[:-1]),
shape[-1]))
i_ext = self.ext_index_map[name]
ext_length = np.prod(arg[name][:, 0].shape)
for j in xrange(n_time-1):
Jsub = self.Jx[j+1, i_ext:i_ext+ext_length, :]
J_arg = Jsub.T.dot(arg[name][:, j])
result[:, j+1:n_time] += np.tile(J_arg, (n_time-j-1, 1)).T
# Time-invariant inputs
for name in self.fixed_external_vars:
if name not in arg:
continue
# take advantage of fact that arg is often pretty sparse
if len(np.nonzero(arg[name])[0]) == 0:
continue
ext_var = getattr(self, name)
if len(ext_var) > 1:
arg[name] = arg[name].flatten()
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape)
for j in xrange(n_time-1):
Jsub = self.Jx[j+1, i_ext:i_ext+ext_length, :]
J_arg = Jsub.T.dot(arg[name])
result[:, j+1:n_time] += np.tile(J_arg, (n_time-j-1, 1)).T
# Initial State
name = self.init_state_var
if name in arg:
# take advantage of fact that arg is often pretty sparse
if len(np.nonzero(arg[name])[0]) > 0:
fact = np.eye(self.n_states)
result[:, 0] = arg[name]
for j in xrange(1, n_time):
fact = fact.dot(-self.Jy[j-1, :, :])
result[:, j] += fact.dot(arg[name])
return result
def apply_derivT(self, arg, result):
""" Matrix-vector product with the transpose of the Jacobian. """
mode = 'Ken'
if mode == 'Ken':
r2 = self._applyJextT_limited(arg, result)
for k, v in r2.iteritems():
if k in result and result[k] is not None:
result[k] += v
else:
result[k] = v
elif mode == 'John':
r2 = self._applyJextT(arg, result)
r1 = self._applyJintT(arg, result)
for k, v in r2.iteritems():
if k in result and result[k] is not None:
result[k] += v
else:
result[k] = v
if self.state_var in r1:
result[self.state_var] = r1[self.state_var]
if self.init_state_var in r1:
result[self.init_state_var] = r1[self.init_state_var]
else:
raise RuntimeError('Pick Ken or John')
def _applyJintT(self, arg, required_results):
"""Apply derivatives with respect to state variables."""
result = {}
state = self.state_var
init_state = self.init_state_var
if state in arg:
if state in required_results:
flat_y = arg[state].flatten()
result[state] = -self.JT.dot(flat_y).reshape((self.n_states, self.n))
if init_state in required_results:
result[init_state] = -result[state][:, 0]
for j in xrange(1, self.n):
result[init_state] -= result[state][:, j]
#print self.J
#print 'arg', arg, 'result', result
return result
def _applyJextT(self, arg, required_results):
"""Apply derivatives with respect to inputs. Ignore all contributions
from past time points and let them come in via previous states
instead."""
#Jx --> (n_times, n_external, n_states)
n_time = self.n
result = {}
if self.state_var in arg:
argsv = arg[self.state_var].T
# Use this when we incorporate state deriv
# Time-varying inputs
for name in self.external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape)/n_time
result[name] = np.zeros((ext_length, n_time))
for k in xrange(n_time-1):
# argsum is often sparse, so check it first
if len(np.nonzero(argsv[k+1, :])[0]) > 0:
Jsub = self.Jx[k+1, i_ext:i_ext+ext_length, :]
result[name][:, k] += Jsub.dot(argsv[k+1, :])
# Use this when we incorporate state deriv
# Time-invariant inputs
for name in self.fixed_external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape)
result[name] = np.zeros((ext_length))
for k in xrange(n_time-1):
# argsum is often sparse, so check it first
if len(np.nonzero(argsv[k+1, :])[0]) > 0:
Jsub = self.Jx[k+1, i_ext:i_ext+ext_length, :]
result[name] += Jsub.dot(argsv[k+1, :])
for k, v in result.iteritems():
ext_var = getattr(self, k)
result[k] = v.reshape(ext_var.shape)
return result
def _applyJextT_limited(self, arg, required_results):
"""Apply derivatives with respect to inputs"""
# Jx --> (n_times, n_external, n_states)
n_time = self.n
result = {}
if self.state_var in arg:
argsv = arg[self.state_var].T
argsum = np.zeros(argsv.shape)
# Calculate these once, and use for every output
for k in xrange(n_time - 1):
argsum[k, :] = np.sum(argsv[k + 1:, :], 0)
# argsum is often sparse, so save indices.
nonzero_k = np.unique(argsum.nonzero()[0])
# Time-varying inputs
for name in self.external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape) / n_time
result[name] = np.zeros((ext_length, n_time))
i_ext_end = i_ext + ext_length
for k in nonzero_k:
Jsub = self.Jx[k + 1, i_ext:i_ext_end, :]
result[name][:, k] += Jsub.dot(argsum[k, :])
# Time-invariant inputs
for name in self.fixed_external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape)
result[name] = np.zeros((ext_length))
i_ext_end = i_ext + ext_length
for k in nonzero_k:
Jsub = self.Jx[k + 1, i_ext:i_ext_end, :]
result[name] += Jsub.dot(argsum[k, :])
# Initial State
name = self.init_state_var
if name in required_results:
fact = -self.Jy[0, :, :].T
result[name] = argsv[0, :] + fact.dot(argsv[1, :])
for k in xrange(1, n_time-1):
fact = fact.dot(-self.Jy[k, :, :].T)
result[name] += fact.dot(argsv[k+1, :])
for k, v in result.iteritems():
ext_var = getattr(self, k)
result[k] = v.reshape(ext_var.shape)
return result
def _applyJextT_limited_old(self, arg, required_results):
"""Apply derivatives with respect to inputs"""
# Jx --> (n_times, n_external, n_states)
n_time = self.n
result = {}
if self.state_var in arg:
argsv = arg[self.state_var].T
argsum = np.zeros(argsv.shape)
# Calculate these once, and use for every output
for k in xrange(n_time - 1):
argsum[k, :] = np.sum(argsv[k + 1:, :], 0)
# argsum is often sparse, so save indices.
nonzero_k = np.unique(argsum.nonzero()[0])
# Time-varying inputs
for name in self.external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape) / n_time
result[name] = np.zeros((ext_length, n_time))
i_ext_end = i_ext + ext_length
for k in nonzero_k:
Jsub = self.Jx[k + 1, i_ext:i_ext_end, :]
result[name][:, k] += Jsub.dot(argsum[k, :])
# Time-invariant inputs
for name in self.fixed_external_vars:
if name not in required_results:
continue
ext_var = getattr(self, name)
i_ext = self.ext_index_map[name]
ext_length = np.prod(ext_var.shape)
result[name] = np.zeros((ext_length))
i_ext_end = i_ext + ext_length
for k in nonzero_k:
Jsub = self.Jx[k + 1, i_ext:i_ext_end, :]
result[name] += Jsub.dot(argsum[k, :])
# Initial State
name = self.init_state_var
if name in required_results:
result[name] = argsv[0, :] + argsum[0, :]
for k, v in result.iteritems():
ext_var = getattr(self, k)
result[k] = v.reshape(ext_var.shape)
return result
from openmdao.main.api import Component
from openmdao.lib.datatypes.api import Float, Array, Str
from rk4 import RK4
import numpy as np
import pylab
# http://mathfaculty.fullerton.edu/mathews/n2003/rungekutta/RungeKuttaMod/Links/RungeKuttaMod_lnk_2.html
class simple(RK4):
# initial condition
y0 = Array(np.array([1]), iotype="in")
# external (i.e. independent variable)
t = Array(np.zeros((10)), iotype="in")
# state (i.e. solution)
# 2D array, because we could make a system of eqs
y = Array(np.zeros((1,10)), iotype="out")
def __init__(self, start, stop, num_points):
super(simple, self).__init__()
# get the right time step, etc
n = num_points
self.h = (stop - start) / float(num_points)
self.t = np.linspace(start, stop, num_points)
self.y = np.zeros((1, num_points))
self.state_var = "y"
self.init_state_var = "y0"
self.external_vars = ["t"]
def list_deriv_vars(self):
return ("t",), ("y",)
def f_dot(self, external, state):
y, t = state[0], external[0]
val = 1. - t*y
return val
def df_dy(self, external, state):
y, t = state[0], external[0]
val = np.array([[-t]])
return val
def df_dx(self, external, state):
y, t = state[0], external[0]
val = np.array([[-y]])
return val
c = simple(0, 5, 25)
c.h = 0.01
c.run()
pylab.plot(c.t, c.y[0])
pylab.show()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment