Skip to content

Instantly share code, notes, and snippets.

@funsim
Created November 19, 2013 13:24
Show Gist options
  • Save funsim/7545305 to your computer and use it in GitHub Desktop.
Save funsim/7545305 to your computer and use it in GitHub Desktop.
from opentidalfarm import *
class ADDolfinExpression(object):
def element_order(self, V):
''' Returns the element polynomial order '''
return TestFunction(V).element().degree()
def __init__(self, f):
self.f = f
order = self.element_order(f.function_space())
if order < 2:
raise ValueError, "For AD, the dolfin function must be of polynomial degree > 1"
# The gradient is always in DG and of one order less
V = dolfin.VectorFunctionSpace(self.f.function_space().mesh(),
"DG", order - 1)
self.first_deriv = dolfin.project(dolfin.grad(self.f), V)
# split to get df/dx and df/dy
dfdx, dfdy = self.first_deriv.split()
# get second derivative in x and y
second_deriv_x = dolfin.project(dolfin.grad(dfdx), V)
second_deriv_y = dolfin.project(dolfin.grad(dfdy), V)
# get cross derivative; [df/(dxdy) = df/(dydx)]
self.cross_deriv = second_deriv_x.split()[1]
# split second derivatives to avoid cross products
self.second_deriv_x = second_deriv_x.split()[0]
self.second_deriv_y = second_deriv_y.split()[1]
def __call__(self, x):
if isinstance(x[0], ad.ADF) and isinstance(x[1], ad.ADF):
ad_funcs = list(map(ad.to_auto_diff,x))
val = self.f(x)
variables = ad_funcs[0]._get_variables(ad_funcs)
lc_wrt_args = self.first_deriv(x)
qc_wrt_args = numpy.array([self.second_deriv_x(x), self.second_deriv_y(x)])
cp_wrt_args = 0.#self.cross_deriv(x)
lc_wrt_vars, qc_wrt_vars, cp_wrt_vars = ad._apply_chain_rule(
ad_funcs, variables, lc_wrt_args, qc_wrt_args, cp_wrt_args)
return ad.ADF(val, lc_wrt_vars, qc_wrt_vars, cp_wrt_vars)
else:
return self.f(x)
def make_wake():
import dolfin_adjoint
# mimic a SteadyConfiguration but change a few things along the way
config = DefaultConfiguration()
config.params['steady_state'] = True
config.params['initial_condition'] = ConstantFlowInitialCondition(config)
config.params['include_advection'] = True
config.params['include_diffusion'] = True
config.params['diffusion_coef'] = 3.0
config.params['quadratic_friction'] = True
config.params['newton_solver'] = True
config.params['friction'] = Constant(0.0025)
config.params['theta'] = 1.0
config.params["dump_period"] = 0
mesh = RectangleMesh(-50, -50, 50, 50, 100, 100)
V, H = config.finite_element(mesh)
config.function_space = MixedFunctionSpace([V, H])
config.turbine_function_space = FunctionSpace(mesh, 'CG', 2)
class Domain(object):
"""
Domain object used for setting boundary conditions etc later on
"""
def __init__(self, mesh):
class InFlow(SubDomain):
def inside(self, x, on_boundary):
return near(x[0], -50)
class OutFlow(SubDomain):
def inside(self, x, on_boundary):
return near(x[0], 50)
inflow = InFlow()
outflow = OutFlow()
self.mesh = mesh
self.boundaries = FacetFunction("size_t", mesh)
self.boundaries.set_all(0)
inflow.mark(self.boundaries, 1)
outflow.mark(self.boundaries, 2)
self.ds = Measure('ds')[self.boundaries]
config.domain = Domain(mesh)
config.set_domain(config.domain, warning=False)
# Boundary conditions
bc = DirichletBCSet(config)
bc.add_constant_flow(1, 1.0 + 1e-10)
bc.add_zero_eta(2)
config.params['bctype'] = 'strong_dirichlet'
config.params['strong_bc'] = bc
config.params['free_slip_on_sides'] = True
# Optimisation settings
config.params['functional_final_time_only'] = True
config.params['automatic_scaling'] = False
# Turbine settings
config.params['turbine_pos'] = []
config.params['turbine_friction'] = []
config.params['turbine_x'] = 20.
config.params['turbine_y'] = 20.
config.params['controls'] = ['turbine_pos']
# Finally set some DOLFIN optimisation flags
parameters['form_compiler']['cpp_optimize'] = True
parameters['form_compiler']['cpp_optimize_flags'] = '-O3'
parameters['form_compiler']['optimize'] = True
# place a turbine with default friction
turbine = [(0., 0.)]
config.set_turbine_pos(turbine)
# solve the shallow water equations
rf = ReducedFunctional(config)
info_blue("Generating the wake model...")
rf.j(rf.initial_control())
# get state
state = rf.last_state
V = VectorFunctionSpace(config.function_space.mesh(),
"CG", 2, dim=2)
u_out = TrialFunction(V)
v_out = TestFunction(V)
M_out = assemble(inner(v_out, u_out)*dx)
out_state = Function(V)
rhs = dolfin_adjoint.assemble(inner(v_out, state.split()[0]) *dx)
dolfin_adjoint.solve(M_out, out_state.vector(), rhs, "cg", "sor", annotate=False)
info_green("Wake model generated.")
return out_state
if __name__=='__main__':
import numpy
import ad
# generate some wake
some_wake = make_wake()
# take the x-component as it dominates -- made it "AD-able"
wake = ADDolfinExpression(some_wake.split()[0])
# test point
x0 = numpy.array((0.,0.))
x0 = ad.adnumber(x0)
def j(x):
return wake(x)
def dj(x, forget):
return wake(x).gradient(x)
conv = helpers.test_gradient_array(j, dj, x0, seed = 1.)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment