Skip to content

Instantly share code, notes, and snippets.

Last active March 11, 2019 07:43
Show Gist options
  • Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
mhd with shenfun
from mpi4py import MPI
import numpy as np
from shenfun import *
nu = 0.000625
eta = 0.01
end_time = 0.1
dt = 0.01 # no adaptive time stepping
N = (2**5, 2**5, 2**5) # 32^3
L = [2*np.pi,4*np.pi,6*np.pi] # from demo from spectralDNS
dim = 3
# Define basis/tensor product spaces
V0 = Basis(N[0], 'F', dtype='D') # Complex-Complex
V1 = Basis(N[1], 'F', dtype='D') # Complex-Complex
V2 = Basis(N[2], 'F', dtype='d') # Real-Complex
T = TensorProductSpace(comm, (V0, V1, V2), **{'planner_effort': 'FFTW_MEASURE'}) # x,y,z now Fourier basis
TV = VectorTensorProductSpace(T) # Vector implies any function "u" will now have 3 components
VM = MixedTensorProductSpace([T]*2*dim)
#u = TrialFunction(T) # Galerkin method uses the 'weak/variational' forumation
#v = TestFunction(T)
# Mesh variables
X = T.local_mesh(True)
K = T.local_wavenumbers(scaled=True)
for i in range(dim):
X[i] = X[i].astype(float)
K[i] = K[i].astype(float)
K2 = np.zeros(T.local_shape(True), dtype=float)
for i in range(dim):
K2 += K[i]*K[i]
K_over_K2 = np.zeros(TV.local_shape(), dtype=float) # TV = vector
for i in range(dim):
K_over_K2[i] = K[i] / np.where(K2 == 0, 1, K2)
# Dealiased grid
kw = {'padding_factor': 1,
'dealias_direct': '2/3-rule'}
dtypes = ['D','D','d']
Vp = [Basis(N[i], 'F', domain=(0, L[i]),
dtype=dtypes[i], **kw) for i in range(dim)]
Tp = TensorProductSpace(comm, Vp, dtype=float,
**{'planner_effort': 'FFTW_MEASURE'})
VTp = VectorTensorProductSpace(Tp)
VMp = MixedTensorProductSpace([Tp]*2*dim)
ub_dealias = Array(VMp)
UB = Array(VM) # Both V and B vectors: 6 components
P = Array(T) # Pressure scalar: 1 component
curl = Array(TV) # Vector: 3 components
UB_hat = Function(VM) # K-space V,B vectors
P_hat = Function(T) # K-space Pressure scalar
#dU = Function(VM)
#Source = Array(VM)
ZZ_hat = np.zeros((3, 3) + Tp.local_shape(True), dtype=complex) # Work array
# Create views into large data structures
U, B = UB[:3], UB[3:]
U_hat, B_hat = UB_hat[:3], UB_hat[3:]
# Primary variable
#u = UB_hat
def set_Elsasser(c, ZZ, K):
c[:3] = -1j*(K[0]*(ZZ[:, 0] + ZZ[0, :])
+ K[1]*(ZZ[:, 1] + ZZ[1, :])
+ K[2]*(ZZ[:, 2] + ZZ[2, :]))/2.0
c[3:] = 1j*(K[0]*(ZZ[0, :] - ZZ[:, 0])
+ K[1]*(ZZ[1, :] - ZZ[:, 1])
+ K[2]*(ZZ[2, :] - ZZ[:, 2]))/2.0
return c
def divergenceConvection(c, z0, z1, Tp, K, ZZ_hat):
"""Divergence convection using Elsasser variables
for i in range(3):
for j in range(3):
ZZ_hat[i, j] = Tp.forward(z0[i]*z1[j], ZZ_hat[i, j])
c = set_Elsasser(c, ZZ_hat, K)
return c
def NonlinearRHS(self, ub_hat, **params): # called getConvection in
global ub_dealias, Tp, VMp, K, ZZ_hat
ub_dealias = VMp.backward(ub_hat, ub_dealias)
u_dealias = ub_dealias[:3]
b_dealias = ub_dealias[3:]
# Compute convective term and place in dU
Nlin = divergenceConvection(Nlin, u_dealias+b_dealias, u_dealias-b_dealias,
Tp, K, ZZ_hat)
return Nlin
def LinearRHS(self, **params): # called pressure diffusion in
"""Add contributions from pressure and diffusion to the rhs"""
global TV, UB_hat, P_hat, K, K_over_K2
u_hat = UB_hat[:3]
b_hat = UB_hat[3:]
# Compute pressure (To get actual pressure multiply by 1j)
P_hat = np.sum(Lin[:3]*K_over_K2, 0, out=P_hat)
# Add pressure gradient
for i in range(3):
Lin[i] = -P_hat*K[i]
# Add contribution from diffusion
Lin[:3] -= nu*K2*u_hat
Lin[3:] -= eta*K2*b_hat
return Lin
if __name__ == '__main__':
for integrator in (RK4, ETDRK4):
# Initialization
U[0] = np.sin(X[0])*np.cos(X[1])*np.cos(X[2])
U[1] = -np.cos(X[0])*np.sin(X[1])*np.cos(X[2])
U[2] = 0
B[0] = np.sin(X[0])*np.sin(X[1])*np.cos(X[2])
B[1] = np.cos(X[0])*np.cos(X[1])*np.cos(X[2])
B[2] = 0
UB[:3], UB[3:] = U, B
UB_hat = UB.forward(UB_hat)
# Solve
integ = integrator(VMp, Lin=LinearRHS, Nlin=NonlinearRHS)
UB_hat = integ.solve(UB, UB_hat, dt, (0, end_time))
# Check accuracy
UB = UB_hat.backward(UB)
U,B = UB[:3], UB[3:]
k = comm.reduce(0.5*np.sum(U.astype(np.float64)*U.astype(np.float64))/
b = comm.reduce(0.5*np.sum(B.astype(np.float64)*B.astype(np.float64))/
if comm.Get_rank() == 0:
assert np.round(k - 0.124565408177, 7) == 0
assert np.round(b - 0.124637762143, 7) == 0
Copy link

mikaem commented Mar 11, 2019

You have to use L=LinearRHS, N=NonlinearRHS. See IntegratorBase class

Copy link

mikaem commented Mar 11, 2019

The pressure gradient is not linear in UB. It needs to go into the NonlinearRHS

Copy link

mikaem commented Mar 11, 2019

Use def NonlinearRHS(self, UB, UB_hat, dU **params):, where dU is the rhs array.

Copy link

mikaem commented Mar 11, 2019

This is actually a bit tricky since there are different coefficients for the first three items (U/nu) and the last three (B/eta). You can implement it as

def LinearRHS(self, **params): # called pressure diffusion in
    L = inner(div(grad(u)), v).scale # Really just -K2, but code is clearer
    L = np.broadcast_to(L[np.newaxis, ...], (6,)+L.shape).copy()
    L[:3] *= nu
    L[3:] *= eta
    return L  # Now this L will be multiplied with UB_hat inside the integrator since it is linear in UB_hat

Copy link

mikaem commented Mar 11, 2019

This gist should work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment