Created
November 19, 2013 13:24
-
-
Save funsim/7545305 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from opentidalfarm import * | |
class ADDolfinExpression(object): | |
def element_order(self, V): | |
''' Returns the element polynomial order ''' | |
return TestFunction(V).element().degree() | |
def __init__(self, f): | |
self.f = f | |
order = self.element_order(f.function_space()) | |
if order < 2: | |
raise ValueError, "For AD, the dolfin function must be of polynomial degree > 1" | |
# The gradient is always in DG and of one order less | |
V = dolfin.VectorFunctionSpace(self.f.function_space().mesh(), | |
"DG", order - 1) | |
self.first_deriv = dolfin.project(dolfin.grad(self.f), V) | |
# split to get df/dx and df/dy | |
dfdx, dfdy = self.first_deriv.split() | |
# get second derivative in x and y | |
second_deriv_x = dolfin.project(dolfin.grad(dfdx), V) | |
second_deriv_y = dolfin.project(dolfin.grad(dfdy), V) | |
# get cross derivative; [df/(dxdy) = df/(dydx)] | |
self.cross_deriv = second_deriv_x.split()[1] | |
# split second derivatives to avoid cross products | |
self.second_deriv_x = second_deriv_x.split()[0] | |
self.second_deriv_y = second_deriv_y.split()[1] | |
def __call__(self, x): | |
if isinstance(x[0], ad.ADF) and isinstance(x[1], ad.ADF): | |
ad_funcs = list(map(ad.to_auto_diff,x)) | |
val = self.f(x) | |
variables = ad_funcs[0]._get_variables(ad_funcs) | |
lc_wrt_args = self.first_deriv(x) | |
qc_wrt_args = numpy.array([self.second_deriv_x(x), self.second_deriv_y(x)]) | |
cp_wrt_args = 0.#self.cross_deriv(x) | |
lc_wrt_vars, qc_wrt_vars, cp_wrt_vars = ad._apply_chain_rule( | |
ad_funcs, variables, lc_wrt_args, qc_wrt_args, cp_wrt_args) | |
return ad.ADF(val, lc_wrt_vars, qc_wrt_vars, cp_wrt_vars) | |
else: | |
return self.f(x) | |
def make_wake(): | |
import dolfin_adjoint | |
# mimic a SteadyConfiguration but change a few things along the way | |
config = DefaultConfiguration() | |
config.params['steady_state'] = True | |
config.params['initial_condition'] = ConstantFlowInitialCondition(config) | |
config.params['include_advection'] = True | |
config.params['include_diffusion'] = True | |
config.params['diffusion_coef'] = 3.0 | |
config.params['quadratic_friction'] = True | |
config.params['newton_solver'] = True | |
config.params['friction'] = Constant(0.0025) | |
config.params['theta'] = 1.0 | |
config.params["dump_period"] = 0 | |
mesh = RectangleMesh(-50, -50, 50, 50, 100, 100) | |
V, H = config.finite_element(mesh) | |
config.function_space = MixedFunctionSpace([V, H]) | |
config.turbine_function_space = FunctionSpace(mesh, 'CG', 2) | |
class Domain(object): | |
""" | |
Domain object used for setting boundary conditions etc later on | |
""" | |
def __init__(self, mesh): | |
class InFlow(SubDomain): | |
def inside(self, x, on_boundary): | |
return near(x[0], -50) | |
class OutFlow(SubDomain): | |
def inside(self, x, on_boundary): | |
return near(x[0], 50) | |
inflow = InFlow() | |
outflow = OutFlow() | |
self.mesh = mesh | |
self.boundaries = FacetFunction("size_t", mesh) | |
self.boundaries.set_all(0) | |
inflow.mark(self.boundaries, 1) | |
outflow.mark(self.boundaries, 2) | |
self.ds = Measure('ds')[self.boundaries] | |
config.domain = Domain(mesh) | |
config.set_domain(config.domain, warning=False) | |
# Boundary conditions | |
bc = DirichletBCSet(config) | |
bc.add_constant_flow(1, 1.0 + 1e-10) | |
bc.add_zero_eta(2) | |
config.params['bctype'] = 'strong_dirichlet' | |
config.params['strong_bc'] = bc | |
config.params['free_slip_on_sides'] = True | |
# Optimisation settings | |
config.params['functional_final_time_only'] = True | |
config.params['automatic_scaling'] = False | |
# Turbine settings | |
config.params['turbine_pos'] = [] | |
config.params['turbine_friction'] = [] | |
config.params['turbine_x'] = 20. | |
config.params['turbine_y'] = 20. | |
config.params['controls'] = ['turbine_pos'] | |
# Finally set some DOLFIN optimisation flags | |
parameters['form_compiler']['cpp_optimize'] = True | |
parameters['form_compiler']['cpp_optimize_flags'] = '-O3' | |
parameters['form_compiler']['optimize'] = True | |
# place a turbine with default friction | |
turbine = [(0., 0.)] | |
config.set_turbine_pos(turbine) | |
# solve the shallow water equations | |
rf = ReducedFunctional(config) | |
info_blue("Generating the wake model...") | |
rf.j(rf.initial_control()) | |
# get state | |
state = rf.last_state | |
V = VectorFunctionSpace(config.function_space.mesh(), | |
"CG", 2, dim=2) | |
u_out = TrialFunction(V) | |
v_out = TestFunction(V) | |
M_out = assemble(inner(v_out, u_out)*dx) | |
out_state = Function(V) | |
rhs = dolfin_adjoint.assemble(inner(v_out, state.split()[0]) *dx) | |
dolfin_adjoint.solve(M_out, out_state.vector(), rhs, "cg", "sor", annotate=False) | |
info_green("Wake model generated.") | |
return out_state | |
if __name__=='__main__': | |
import numpy | |
import ad | |
# generate some wake | |
some_wake = make_wake() | |
# take the x-component as it dominates -- made it "AD-able" | |
wake = ADDolfinExpression(some_wake.split()[0]) | |
# test point | |
x0 = numpy.array((0.,0.)) | |
x0 = ad.adnumber(x0) | |
def j(x): | |
return wake(x) | |
def dj(x, forget): | |
return wake(x).gradient(x) | |
conv = helpers.test_gradient_array(j, dj, x0, seed = 1.) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment