Skip to content

Instantly share code, notes, and snippets.

@fnauman
Last active March 11, 2019 07:43
Show Gist options
  • Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
Save fnauman/e105bdf901616345b70145c6ded7c963 to your computer and use it in GitHub Desktop.
mhd with shenfun
from mpi4py import MPI
import numpy as np
from shenfun import *
nu = 0.000625
eta = 0.01
end_time = 0.1
dt = 0.01 # no adaptive time stepping
comm = MPI.COMM_WORLD
N = (2**5, 2**5, 2**5) # 32^3
L = [2*np.pi,4*np.pi,6*np.pi] # from TGMHD.py demo from spectralDNS
dim = 3
# Define basis/tensor product spaces
V0 = Basis(N[0], 'F', dtype='D') # Complex-Complex
V1 = Basis(N[1], 'F', dtype='D') # Complex-Complex
V2 = Basis(N[2], 'F', dtype='d') # Real-Complex
T = TensorProductSpace(comm, (V0, V1, V2), **{'planner_effort': 'FFTW_MEASURE'}) # x,y,z now Fourier basis
TV = VectorTensorProductSpace(T) # Vector implies any function "u" will now have 3 components
VM = MixedTensorProductSpace([T]*2*dim)
#u = TrialFunction(T) # Galerkin method uses the 'weak/variational' forumation
#v = TestFunction(T)
# Mesh variables
X = T.local_mesh(True)
K = T.local_wavenumbers(scaled=True)
for i in range(dim):
X[i] = X[i].astype(float)
K[i] = K[i].astype(float)
K2 = np.zeros(T.local_shape(True), dtype=float)
for i in range(dim):
K2 += K[i]*K[i]
K_over_K2 = np.zeros(TV.local_shape(), dtype=float) # TV = vector
for i in range(dim):
K_over_K2[i] = K[i] / np.where(K2 == 0, 1, K2)
# Dealiased grid
kw = {'padding_factor': 1,
'dealias_direct': '2/3-rule'}
dtypes = ['D','D','d']
Vp = [Basis(N[i], 'F', domain=(0, L[i]),
dtype=dtypes[i], **kw) for i in range(dim)]
Tp = TensorProductSpace(comm, Vp, dtype=float,
collapse_fourier=True,
**{'planner_effort': 'FFTW_MEASURE'})
VTp = VectorTensorProductSpace(Tp)
VMp = MixedTensorProductSpace([Tp]*2*dim)
ub_dealias = Array(VMp)
UB = Array(VM) # Both V and B vectors: 6 components
P = Array(T) # Pressure scalar: 1 component
curl = Array(TV) # Vector: 3 components
UB_hat = Function(VM) # K-space V,B vectors
P_hat = Function(T) # K-space Pressure scalar
#dU = Function(VM)
#Source = Array(VM)
ZZ_hat = np.zeros((3, 3) + Tp.local_shape(True), dtype=complex) # Work array
# Create views into large data structures
U, B = UB[:3], UB[3:]
U_hat, B_hat = UB_hat[:3], UB_hat[3:]
# Primary variable
#u = UB_hat
def set_Elsasser(c, ZZ, K):
c[:3] = -1j*(K[0]*(ZZ[:, 0] + ZZ[0, :])
+ K[1]*(ZZ[:, 1] + ZZ[1, :])
+ K[2]*(ZZ[:, 2] + ZZ[2, :]))/2.0
c[3:] = 1j*(K[0]*(ZZ[0, :] - ZZ[:, 0])
+ K[1]*(ZZ[1, :] - ZZ[:, 1])
+ K[2]*(ZZ[2, :] - ZZ[:, 2]))/2.0
return c
def divergenceConvection(c, z0, z1, Tp, K, ZZ_hat):
"""Divergence convection using Elsasser variables
z0=U+B
z1=U-B
"""
for i in range(3):
for j in range(3):
ZZ_hat[i, j] = Tp.forward(z0[i]*z1[j], ZZ_hat[i, j])
c = set_Elsasser(c, ZZ_hat, K)
return c
def NonlinearRHS(self, ub_hat, **params): # called getConvection in MHD.py
global ub_dealias, Tp, VMp, K, ZZ_hat
ub_dealias = VMp.backward(ub_hat, ub_dealias)
u_dealias = ub_dealias[:3]
b_dealias = ub_dealias[3:]
# Compute convective term and place in dU
Nlin = divergenceConvection(Nlin, u_dealias+b_dealias, u_dealias-b_dealias,
Tp, K, ZZ_hat)
return Nlin
def LinearRHS(self, **params): # called pressure diffusion in MHD.py
"""Add contributions from pressure and diffusion to the rhs"""
global TV, UB_hat, P_hat, K, K_over_K2
u_hat = UB_hat[:3]
b_hat = UB_hat[3:]
# Compute pressure (To get actual pressure multiply by 1j)
P_hat = np.sum(Lin[:3]*K_over_K2, 0, out=P_hat)
# Add pressure gradient
for i in range(3):
Lin[i] = -P_hat*K[i]
# Add contribution from diffusion
Lin[:3] -= nu*K2*u_hat
Lin[3:] -= eta*K2*b_hat
return Lin
if __name__ == '__main__':
for integrator in (RK4, ETDRK4):
# Initialization
U[0] = np.sin(X[0])*np.cos(X[1])*np.cos(X[2])
U[1] = -np.cos(X[0])*np.sin(X[1])*np.cos(X[2])
U[2] = 0
B[0] = np.sin(X[0])*np.sin(X[1])*np.cos(X[2])
B[1] = np.cos(X[0])*np.cos(X[1])*np.cos(X[2])
B[2] = 0
UB[:3], UB[3:] = U, B
UB_hat = UB.forward(UB_hat)
# Solve
integ = integrator(VMp, Lin=LinearRHS, Nlin=NonlinearRHS)
integ.setup(dt)
UB_hat = integ.solve(UB, UB_hat, dt, (0, end_time))
# Check accuracy
UB = UB_hat.backward(UB)
U,B = UB[:3], UB[3:]
k = comm.reduce(0.5*np.sum(U.astype(np.float64)*U.astype(np.float64))/np.prod(np.array(N)))
b = comm.reduce(0.5*np.sum(B.astype(np.float64)*B.astype(np.float64))/np.prod(np.array(N)))
if comm.Get_rank() == 0:
print(k,b)
assert np.round(k - 0.124565408177, 7) == 0
assert np.round(b - 0.124637762143, 7) == 0
@mikaem
Copy link

mikaem commented Mar 11, 2019

You have to use L=LinearRHS, N=NonlinearRHS. See IntegratorBase class

@mikaem
Copy link

mikaem commented Mar 11, 2019

The pressure gradient is not linear in UB. It needs to go into the NonlinearRHS

@mikaem
Copy link

mikaem commented Mar 11, 2019

Use def NonlinearRHS(self, UB, UB_hat, dU **params):, where dU is the rhs array.

@mikaem
Copy link

mikaem commented Mar 11, 2019

This is actually a bit tricky since there are different coefficients for the first three items (U/nu) and the last three (B/eta). You can implement it as

def LinearRHS(self, **params): # called pressure diffusion in MHD.py
    L = inner(div(grad(u)), v).scale # Really just -K2, but code is clearer
    L = np.broadcast_to(L[np.newaxis, ...], (6,)+L.shape).copy()
    L[:3] *= nu
    L[3:] *= eta
    return L  # Now this L will be multiplied with UB_hat inside the integrator since it is linear in UB_hat

@mikaem
Copy link

mikaem commented Mar 11, 2019

This gist should work

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment