Skip to content

Instantly share code, notes, and snippets.

@andersx
Created September 22, 2020 06:35
Show Gist options
  • Save andersx/d16e3daa982d4b74e25ed5f2b3a72fda to your computer and use it in GitHub Desktop.
Save andersx/d16e3daa982d4b74e25ed5f2b3a72fda to your computer and use it in GitHub Desktop.
GPR MD ASE example
#!/usr/bin/env python3
import sys
from copy import deepcopy
import numpy as np
import scipy.linalg
from scipy.linalg import norm
from ase import Atoms
from ase.io import read, write
from ase.optimize import *
from ase import units
from ase.md.velocitydistribution import MaxwellBoltzmannDistribution
from ase.md.langevin import Langevin
from ase.md.verlet import VelocityVerlet
from ase.io.trajectory import Trajectory
from os.path import splitext
from ase.visualize import view
# from ase_util import transform_to_eckart_frame
import uuid
import ase
from ase import Atoms
from ase.optimize import BFGS
from ase.vibrations import Vibrations
import qml
from qml.representations import generate_fchl_acsf
from qml.math import cho_solve
from tqdm import tqdm
import numpy as np
import time
from ase import Atoms
from ase.calculators.calculator import Calculator, all_changes
from qml.kernels import get_gp_kernel
from qml.kernels import get_symmetric_gp_kernel
from qml.representations import generate_fchl_acsf
new_cut = 8.0
cut_parameters = {
"rcut": new_cut,
"acut": new_cut,
"nRs2": int(24 * new_cut / 8.0),
"nRs3": int(20 * new_cut / 8.0),
}
def gen_representations(data):
nuclear_charges = []
print(list(data.keys()))
print(data["Z"])
max_atoms = 4
elements = [1,6,8]
print("max_atoms", max_atoms)
print("elements", elements)
reps = []
dreps = []
for i in tqdm(range(len(data["E"]))):
x, dx = generate_fchl_acsf(data["Z"][i], data["R"][i], elements=elements, gradients=True,
**cut_parameters)
reps.append(x)
dreps.append(dx)
energies = data["E"].flatten()
nuclear_charges = data["Z"].tolist()
reps = np.array(reps)
dreps = np.array(dreps)
return reps, dreps, nuclear_charges, energies
class QMLCalculatorGPR(Calculator):
name = 'QMLCalculator'
implemented_properties = ['energy', 'forces']
def __init__(self, parameters, representations, deriv_reps, charges, alphas, reducer=None, **kwargs):
super().__init__(**kwargs)
# unpack parameters
offset = parameters["offset"]
sigma = parameters["sigma"]
self.set_model(alphas, representations, deriv_reps, charges, offset, sigma)
self.energy = 0.0
self.reducer = reducer
self.last_atoms = -1
if reducer is not None:
self.repr = np.einsum("ijk,kl->ijl", representations, reducer)
self.drepr = np.einsum("ijkmn,kl->ijlmn", deriv_reps, reducer)
def calculate(self, atoms: Atoms = None, properties=('energy', 'forces'),
system_changes=all_changes):
if atoms is None:
atoms = self.atoms
if atoms is None:
raise ValueError(
'No ASE atoms supplied to calculation, and no ASE atoms supplied with initialisation.')
self.query(atoms)
if 'energy' in properties:
self.results['energy'] = self.energy
if 'forces' in properties:
self.results['forces'] = self.forces
return
def set_model(self, alphas, representations, deriv_reps, charges, offset, sigma):
self.alphas = alphas
self.repr = representations
self.drepr = deriv_reps
self.charges = charges
# Offset from training
self.offset = offset
# Hyper-parameters
self.sigma = sigma
self.max_atoms = self.repr.shape[1]
self.n_atoms = len(charges[0])
return
def query(self, atoms=None, print_time=True):
if print_time:
start = time.time()
# kcal/mol til ev
# kcal/mol/aangstrom til ev / aangstorm
conv_energy = 1.0 #0.0433635093659
conv_force = 1.0 # 0.0433635093659
coordinates = atoms.get_positions()
nuclear_charges = atoms.get_atomic_numbers()
n_atoms = coordinates.shape[0]
rep_start = time.time()
rep, drep = generate_fchl_acsf(
nuclear_charges,
coordinates,
gradients=True,
elements=[1,6,8],
# pad=self.max_atoms,
**cut_parameters)
rep_end = time.time()
einsum_start = time.time()
# Put data into arrays
Qs = [nuclear_charges]
Xs = np.array([rep], order="F")
dXs = np.array([drep], order="F")
if self.reducer is not None:
Xs = np.einsum("ijk,kl->ijl", Xs, self.reducer)
dXs = np.einsum("ijkmn,kl->ijlmn", dXs, self.reducer)
einsum_end = time.time()
kernel_start = time.time()
Ks = get_gp_kernel(self.repr, Xs, self.drepr, dXs, self.charges, Qs, self.sigma)
kernel_end = time.time()
pred_start = time.time()
# Energy prediction
energy_predicted = np.dot(Ks[:1,:], self.alphas)[0] + self.offset
self.energy = energy_predicted * conv_energy
# Force prediction
forces_predicted = np.dot(Ks[1:,:], self.alphas).reshape((n_atoms, 3))
self.forces = forces_predicted * conv_force
pred_end = time.time()
if print_time:
end = time.time()
# print("rep ", rep_end - rep_start)
# print("einsum ", einsum_end - einsum_start)
# print("kernel ", kernel_end - kernel_start)
# print("prediciton ", pred_end - pred_start)
print("qml query {:7.3f}s {:10.3f} ".format(end-start, energy_predicted))
return
def calculation_required(atoms, quantities):
print("TEST")
if atoms == self.last_atoms:
print("Not required")
return False
else:
print("Required")
return True
def get_potential_energy(self, atoms=None, force_consistent=False):
energy = self.energy
return energy
def get_forces(self, atoms=None):
# print(atoms.get_positions())
# print(self.last_atoms)
do_query = True
try:
D = atoms.get_positions() - self.last_atoms
if norm(D) < 1e-12:
do_query = False
except:
do_query = True
if do_query:
self.last_atoms = atoms.get_positions()
self.query(atoms=atoms)
forces = self.forces
return forces
def train_alphas(reps, dreps, nuclear_charges, E, F, train_idx, parameters):
kernel_name = "../scratch/K_gpr_%f.npy" % parameters["sigma"]
K = np.load(kernel_name)
print(reps.shape)
all_idx = np.array(list(range(4001)))
test_idx = np.array([i for i in all_idx if i not in train_idx])
print(train_idx)
print(test_idx)
natoms = 4
nmols = 4001
atoms = np.array([i for i in range(natoms*3)])
train_idx_force = np.array([atoms + (3*natoms)*j + nmols for j in train_idx]).flatten()
test_idx_force = np.array([atoms + (3*natoms)*j + nmols for j in test_idx]).flatten()
idx = np.concatenate((train_idx, train_idx_force))
C = deepcopy(K[idx[:,np.newaxis], idx])
eKs = deepcopy(K[test_idx[:,np.newaxis],idx])
fKs = deepcopy(K[test_idx_force[:,np.newaxis],idx])
Y = np.concatenate((E[train_idx], F[train_idx].flatten()))
alphas = cho_solve(C, Y, l2reg=llambda, destructive=True)
eYs = deepcopy(E[test_idx])
fYs = deepcopy(F[test_idx]).flatten()
eYss = np.dot(eKs, alphas)
fYss = np.dot(fKs, alphas)
ermse_test = np.sqrt(np.mean(np.square(eYss - eYs)))
emae_test = np.mean(np.abs(eYss - eYs))
frmse_test = np.sqrt(np.mean(np.square(fYss - fYs)))
fmae_test = np.mean(np.abs(fYss - fYs))
schnet_score = 0.01 * sum(np.square(eYss - eYs))
schnet_score += sum(np.square(fYss - fYs)) / natoms
print("TEST %5.2f %.2E %6.4e %10.8f %10.8f %10.8f %10.8f" % \
(parameters["sigma"], parameters["llambda"], schnet_score, emae_test, ermse_test, fmae_test, frmse_test))
repsize = reps.shape[2]
n_train = len(train_idx)
n_test = len(test_idx)
X = reps[train_idx]
Xs = reps[test_idx]
dX = dreps[train_idx]
dXs = dreps[test_idx]
Q = nuclear_charges[train_idx]
Qs = nuclear_charges[test_idx]
print(X.shape)
Xpca = X.reshape((len(train_idx)*natoms,repsize))
eigen_vecs, eigen_vals, Vh = np.linalg.svd(Xpca.T, full_matrices=False, compute_uv=True)
print(eigen_vecs.shape)
NPCA = 80
print(eigen_vals[NPCA-1])
cev = 100 - (np.sum(eigen_vals) - np.sum(eigen_vals[:NPCA])) / np.sum(eigen_vals) * 100
print("Cumulative explained variance = %f %%" % cev )
reducer = eigen_vecs[:,:NPCA]
print(reducer.shape)
print(X.shape)
X = np.einsum("ijk,kl->ijl", X, reducer)
print(X.shape)
print(Xs.shape)
Xs = np.einsum("ijk,kl->ijl", Xs, reducer)
print(Xs.shape)
print(dX.shape)
dX = np.einsum("ijkmn,kl->ijlmn", dX, reducer)
print(dX.shape)
print(dXs.shape)
dXs = np.einsum("ijkmn,kl->ijlmn", dXs, reducer)
print(dXs.shape)
# C = deepcopy(K[idx[:,np.newaxis], idx])
C = get_symmetric_gp_kernel(X, dX, Q, parameters["sigma"])
Ks = get_gp_kernel(X, Xs, dX, dXs, Q, Qs, parameters["sigma"])
# eKs = deepcopy(K[test_idx[:,np.newaxis],idx])
# fKs = deepcopy(K[test_idx_force[:,np.newaxis],idx])
eKs = deepcopy(Ks[:n_test])
fKs = deepcopy(Ks[n_test:])
print(C.shape)
print(eKs.shape)
print(fKs.shape)
Y = np.concatenate((E[train_idx], F[train_idx].flatten()))
alphas = cho_solve(C, Y, l2reg=llambda, destructive=True)
eYs = deepcopy(E[test_idx])
fYs = deepcopy(F[test_idx]).flatten()
eYss = np.dot(eKs, alphas)
fYss = np.dot(fKs, alphas)
ermse_test = np.sqrt(np.mean(np.square(eYss - eYs)))
emae_test = np.mean(np.abs(eYss - eYs))
frmse_test = np.sqrt(np.mean(np.square(fYss - fYs)))
fmae_test = np.mean(np.abs(fYss - fYs))
schnet_score = 0.01 * sum(np.square(eYss - eYs))
schnet_score += sum(np.square(fYss - fYs)) / natoms
print("TEST %5.2f %.2E %6.4e %10.8f %10.8f %10.8f %10.8f" % \
(parameters["sigma"], parameters["llambda"], schnet_score, emae_test, ermse_test, fmae_test, frmse_test))
return alphas, reducer
if __name__ == "__main__":
temperature = 300
timestep = 0.5
friction = 0.01
data = np.load(sys.argv[1])
train_idx = sorted(np.loadtxt(sys.argv[2], dtype=int))
sigma = float(sys.argv[3])
llambda = float(sys.argv[4])
E = data["E"] # * 23.06035 # * 627.51
F = data["F"] # * 23.06035 # * 627.51 # / 0.52917721092
# reps = np.load("../learning_curves_new_splits/npy_data/X.npy")
# dreps = np.load("../learning_curves_new_splits/npy_data/dX.npy")
reps, dreps, nuclear_charges, energies = gen_representations(data)
nuclear_charges = np.load("../learning_curves_new_splits/npy_data/Q.npy")
parameters = {
"sigma": sigma,
"offset": 0.0,
"llambda": llambda,
}
alphas, reducer = train_alphas(reps, dreps, nuclear_charges, E, F, train_idx, parameters)
np.save("alphas.npy", alphas)
# alphas = np.load("npy_data/alphas_gpr_%f.npy" % parameters["sigma"])
calculator = QMLCalculatorGPR(parameters, reps[train_idx], dreps[train_idx], nuclear_charges[train_idx],
alphas, reducer=reducer)
coordinates = np.array([
[ 0.20123873, -0.39602112, -0.32005839],
[-0.23296217, 0.48683071, 0.37425374],
[ 0.6071696 , -0.20890494, -1.3476949 ],
[ 0.22926032, -1.46543064, 0.01371154]])
nuclear_charges = np.array([6, 8, 1, 1,])
molecule = ase.Atoms(nuclear_charges, coordinates)
molecule.set_calculator(calculator)
BFGS(molecule).run(fmax=0.00001)
vib = Vibrations(molecule, nfree=4)
vib.run()
vib.summary()
# Set the momenta corresponding to a temperature T
MaxwellBoltzmannDistribution(molecule, temperature * units.kB)
# define the algorithm for MD: here Langevin alg. with with a time step of 0.1 fs,
# the temperature T and the friction coefficient to 0.02 atomic units.
dyn = Langevin(molecule, timestep * units.fs, temperature * units.kB, friction)
R_list = []
P_list = []
# Equilibrate in NVT ensemble for 100'000 steps (=50ps)
for i in range(100000):
dyn.run(1)
print("NVT equilibration at step: " + str(i))
pos = molecule.get_positions()
mom = molecule.get_momenta()
R_list.append(pos)
P_list.append(mom)
R_list = np.array(R_list)
P_list = np.array(R_list)
np.save("pos_equil.npy", R_list)
np.save("mom_equil.npy", P_list)
#change to NVE ensemble
dyn = VelocityVerlet(molecule, timestep * units.fs)
R_list = []
P_list = []
E_list = []
F_list = []
for i in range(400000):
dyn.run(1)
# pos = atoms.get_positions()
# mom = atoms.get_momenta()
pos = molecule.get_positions()
mom = molecule.get_momenta()
R_list.append(pos)
P_list.append(mom)
R_list = np.array(R_list)
P_list = np.array(R_list)
np.save("pos.npy", R_list)
np.save("mom.npy", P_list)
#print("running md simulation at step: " + str(i))
# epot = atoms.get_potential_energy()
# ekin = atoms.get_kinetic_energy()
# etot = epot + ekin
#
# for i in range(args.steps):
# dyn.run(1)
# if i%args.interval == 0: # run md now
#
# qi = atoms.get_charges()
# pos = atoms.get_positions()
#
# R_list.append(pos)
# q_list.append(qi)
# p_list.append(np.dot(qi, pos))
#
# #print("running md simulation at step: " + str(i))
# # traj.write()
# # time += 1
# mom_last_frame = atoms.get_momenta()
# #print(results)
# positions = np.asarray(R_list)
# charges = np.asarray(q_list)
# dipole = np.asarray(p_list)
# etot = np.asarray(etot)
# mom_last_frame = np.asarray(mom_last_frame)
#
# np.savez("ir_md_"+filename +'.npz', positions= positions, charges=charges, dipole = dipole, total_energy = etot, mom_last_frame = mom_last_frame)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment