Last active
March 5, 2020 16:33
-
-
Save wiederm/7ac5c29e5a0dea9d17ef16dda93fe02d to your computer and use it in GitHub Desktop.
BAR & FEP forward, reverse & Non-equ work values for MM to ANI in vacuum
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torchani | |
import torch | |
model = torchani.models.ANI1ccx() | |
import neutromeratio | |
import numpy as np | |
import mdtraj as md | |
import nglview | |
from tqdm import tqdm | |
import pickle | |
from collections import defaultdict | |
# openff imports | |
import openforcefield | |
from openforcefield.topology import Molecule | |
from openforcefield.typing.engines.smirnoff import ForceField | |
forcefield = ForceField('openff_unconstrained-1.0.0.offxml') | |
print(openforcefield._version.get_versions()) | |
# openmm imports | |
from simtk import openmm as mm | |
from simtk.openmm import LangevinIntegrator | |
from simtk.openmm.app import Simulation | |
# simulation settings | |
from simtk import unit | |
from neutromeratio.constants import temperature, exclude_set | |
from openmmtools.constants import kB | |
import matplotlib.pyplot as plt | |
# analysis imports | |
from pymbar import MBAR, EXP | |
import simtk | |
from neutromeratio.constants import device, platform, nm_to_angstroms, hartree_to_kJ_mol, mass_dict_in_daltons, speed_unit, distance_unit | |
import random | |
kBT = kB * temperature | |
stepsize = 1 * unit.femtosecond | |
collision_rate = 1 / unit.picosecond | |
class ANI1_force_and_energy(object): | |
def __init__(self, | |
model: torchani.models.ANI1ccx, | |
atoms: str, | |
): | |
""" | |
Performs energy and force calculations. | |
Parameters | |
---------- | |
model: | |
atoms: str | |
a string of atoms in the indexed order | |
""" | |
self.device = device | |
self.model = model | |
self.atoms = atoms | |
self.species = self.model.species_to_tensor(atoms).to(device).unsqueeze(0) | |
self.platform = platform | |
def minimize(self, coords: simtk.unit.quantity.Quantity, maxiter: int = 1000, | |
show_plot: bool = False): | |
""" | |
Minimizes the molecule. | |
Parameters | |
---------- | |
coords:simtk.unit.quantity.Quantity | |
maxiter: int | |
Maximum number of minimization steps performed. | |
lambda_value: float | |
indicate position in lambda protocoll to control how dummy atoms are handled | |
Returns | |
------- | |
coords:simtk.unit.quantity.Quantity | |
""" | |
from scipy import optimize | |
assert(type(coords) == unit.Quantity) | |
x = coords.value_in_unit(unit.angstrom) | |
self.memory_of_energy = [] | |
print("Begin minimizing...") | |
f = optimize.minimize(self._traget_energy_function, x, method='BFGS', | |
jac=True, options={'maxiter': maxiter, 'disp': True}) | |
logger.critical(f"Minimization status: {f.success}") | |
memory_of_energy = copy.deepcopy(self.memory_of_energy) | |
self.memory_of_energy = [] | |
return f.x.reshape(-1, 3) * unit.angstrom, memory_of_energy | |
def calculate_force(self, x: simtk.unit.quantity.Quantity, lambda_value: float = 0.0) -> (simtk.unit.quantity.Quantity, simtk.unit.quantity.Quantity): | |
""" | |
Given a coordinate set the forces with respect to the coordinates are calculated. | |
Parameters | |
---------- | |
x : array of floats, unit'd (distance unit) | |
initial configuration | |
lambda_value : float | |
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero | |
Returns | |
------- | |
F : float, unit'd | |
E : float, unit'd | |
""" | |
assert(type(x) == unit.Quantity) | |
assert(float(lambda_value) <= 1.0 and float(lambda_value) >= 0.0) | |
coordinates = torch.tensor([x.value_in_unit(unit.nanometer)], | |
requires_grad=True, device=self.device, dtype=torch.float32) | |
energy_in_kJ_mol, = self._calculate_energy(coordinates) | |
# derivative of E (in kJ/mol) w.r.t. coordinates (in nm) | |
derivative = torch.autograd.grad((energy_in_kJ_mol).sum(), coordinates)[0] | |
if self.platform == 'cpu': | |
F = - np.array(derivative)[0] | |
elif self.platform == 'cuda': | |
F = - np.array(derivative.cpu())[0] | |
else: | |
raise RuntimeError('Platform needs to be specified. Either CPU or CUDA.') | |
return (F * (unit.kilojoule_per_mole / unit.nanometer), | |
energy_in_kJ_mol.item() * unit.kilojoule_per_mole) | |
def _calculate_energy(self, coordinates: torch.tensor): | |
""" | |
Helpter function to return energies as tensor. | |
Given a coordinate set the energy is calculated. | |
Parameters | |
---------- | |
coordinates : torch.tensor | |
coordinates in nanometer without units attached | |
lambda_value : float | |
between 0.0 and 1.0 | |
Returns | |
------- | |
energy_in_kJ_mol : torch.tensor | |
return the energy with restraints added | |
""" | |
stddev_in_hartree = torch.tensor(0.0,device=self.device, dtype=torch.float64) | |
_, energy_in_hartree = self.model((self.species, coordinates * nm_to_angstroms)) | |
# convert energy from hartrees to kJ/mol | |
return energy_in_hartree * hartree_to_kJ_mol | |
def _traget_energy_function(self, x) -> float: | |
""" | |
Given a coordinate set (x) the energy is calculated in kJ/mol. | |
Parameters | |
---------- | |
x : array of floats, unit'd (distance unit) | |
initial configuration | |
lambda_value : float | |
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero | |
Returns | |
------- | |
E : float, unit'd | |
""" | |
x = x.reshape(-1, 3) * unit.angstrom | |
F, E = self.calculate_force(x) | |
F_flat = -np.array(F.value_in_unit(unit.kilojoule_per_mole/unit.angstrom).flatten(), dtype=np.float64) | |
self.memory_of_energy.append(E) | |
return E.value_in_unit(unit.kilojoule_per_mole), F_flat | |
def calculate_energy(self, x: simtk.unit.quantity.Quantity, lambda_value: float = 0.0): | |
""" | |
Given a coordinate set (x) the energy is calculated in kJ/mol. | |
Parameters | |
---------- | |
x : array of floats, unit'd (distance unit) | |
initial configuration | |
lambda_value : float | |
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero | |
Returns | |
------- | |
energy : unit.Quantity | |
energy in kJ/mol | |
""" | |
assert(type(x) == unit.Quantity) | |
coordinates = torch.tensor([x.value_in_unit(unit.nanometer)], | |
requires_grad=True, device=self.device, dtype=torch.float32) | |
energy_in_kJ_mol, = self._calculate_energy(coordinates) | |
energy = energy_in_kJ_mol.item() * unit.kilojoule_per_mole | |
return energy | |
def create_sim(molecule): | |
"""Create vacuum simulation system""" | |
platform = mm.Platform.getPlatformByName('Reference') | |
integrator = LangevinIntegrator(temperature, collision_rate, stepsize) | |
topology = molecule.to_topology() | |
system = forcefield.create_openmm_system(topology) | |
sim = Simulation(topology, system, integrator, platform=platform) | |
molecule.generate_conformers() | |
sim.context.setPositions(molecule.conformers[0]) | |
sim.minimizeEnergy() | |
sim.context.setVelocitiesToTemperature(temperature) | |
return sim | |
def get_positions(sim): | |
"""get position of system in a state""" | |
return sim.context.getState(getPositions=True).getPositions(asNumpy=True) | |
def collect_samples_mm(sim, n_samples=100, n_steps_per_sample=1000): | |
"""generate samples using a classical FF""" | |
samples = [] | |
for _ in tqdm(range(n_samples)): | |
sim.step(n_steps_per_sample) | |
samples.append(get_positions(sim)) | |
return samples | |
def compute_mm_energy(sim, positions): | |
"""compute mm energy for given positions""" | |
sim.context.setPositions(positions) | |
return sim.context.getState(getEnergy=True).getPotentialEnergy() | |
def compute_mm_force(sim, positions): | |
"""compute mm forces given a position""" | |
sim.context.setPositions(positions) | |
return sim.context.getState(getForces=True).getForces(asNumpy=True) | |
def collect_samples_ani(mol, x0, n_steps=100): | |
"""generate samples using ANIccx""" | |
model = neutromeratio.ani.PureANI1ccx() | |
species_string = ''.join([a.element.symbol for a in mol.atoms]) | |
species = model.species_to_tensor(species_string).unsqueeze(0) | |
energy_function = neutromeratio.ANI1_force_and_energy(model = model, | |
atoms = species_string, | |
mol = None) | |
energy_and_force = lambda x : energy_function.calculate_force(x) | |
langevin = neutromeratio.LangevinDynamics(atoms = species_string, | |
energy_and_force= energy_and_force) | |
trajectory = langevin.run_dynamics(x0, n_steps, stepsize=1.0 * unit.femtosecond, progress_bar=True) | |
return trajectory | |
def compute_ani_energy(mol, samples): | |
"""compute ani energy given a position""" | |
species_string = ''.join([a.element.symbol for a in mol.atoms]) | |
species = model.species_to_tensor(species_string).unsqueeze(0) | |
coordinates = torch.tensor([sample / unit.angstrom for sample in samples], dtype=torch.float32) | |
energy = model((torch.stack([species[0]] * len(samples)), coordinates)) | |
return energy.energies.detach().numpy() * 627.5 * unit.kilocalorie_per_mole # convert from hartree to kcal/mol | |
def staged_free_energy(sim, mol): | |
"""Generate samples using ANI and MM and the MBAR estimater to calculate | |
the free energy difference between ANI and MM""" | |
print('Explicit sampling of endstates') | |
# collect sampls from MM | |
samples_mm = collect_samples_mm(sim, n_samples=1000) | |
# collect samples from ANI | |
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=1000)[0][::10] | |
# combine samples from MM and ANI | |
snapshots = samples_mm + samples_ani | |
# compute ANI energy | |
e_ani = compute_ani_energy(mol, snapshots)/kBT | |
# compute MM energy | |
e_mm = [compute_mm_energy(sim, snap)/kBT for snap in snapshots] | |
N_k = [len(samples_mm), len(samples_ani)] | |
U_kn = np.array([e_mm, e_ani]) | |
bar = MBAR(U_kn, N_k) | |
D_f = bar.getFreeEnergyDifferences(return_dict=True)['Delta_f'][0][-1] | |
dD_f = bar.getFreeEnergyDifferences(return_dict=True)['dDelta_f'][0][-1] | |
return D_f, dD_f | |
def one_sided_exp_mm_to_ani(sim, mol): | |
"""Use FEP to calculate work values of instantaneous switching from MM to ANI on MM sampels. | |
forward (MM/MM to ANI/MM) | |
""" | |
print('FEP forward') | |
# get mm samples and calculate energy and calculate MM energies | |
samples_mm = collect_samples_mm(sim, n_samples=100) | |
u_mm_mm = [compute_mm_energy(sim, snap)/kBT for snap in samples_mm] | |
# calculate ANI energy on the MM samples | |
u_ani_mm = compute_ani_energy(mol, samples_mm)/kBT | |
# calculate work | |
endpoint_reduced_works = (u_ani_mm - u_mm_mm) | |
# DeltaG(t1_MM --> t1_ANI) | |
exp_endpoint = EXP(endpoint_reduced_works, return_dict=True) | |
D_f, dD_f = (exp_endpoint['Delta_f']), exp_endpoint['dDelta_f'] | |
stddev_w = np.std(endpoint_reduced_works) | |
return D_f, dD_f, stddev_w, endpoint_reduced_works | |
def one_sided_exp_ani_to_mm(sim, mol): | |
"""Use FEP to calculate work values instantaneous switching from ANI to MM on ANI sampels. | |
reverse (ANI/ANI to MM/ANI) | |
""" | |
print('FEP reverse') | |
# get ani samples and calculate ANI energies | |
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=500)[0][::5] | |
u_ani_ani = compute_ani_energy(mol, samples_ani)/kBT | |
# calculate mm energy on the samples | |
u_mm_ani = [compute_mm_energy(sim, snap)/kBT for snap in samples_ani] | |
# calculate work | |
endpoint_reduced_works = (u_mm_ani- u_ani_ani) | |
# DeltaG(t1_ANI --> t1_MM) | |
exp_endpoint = EXP(endpoint_reduced_works, return_dict=True) | |
D_f, dD_f = (exp_endpoint['Delta_f']), exp_endpoint['dDelta_f'] | |
stddev_w = np.std(endpoint_reduced_works) | |
return D_f, dD_f, stddev_w, endpoint_reduced_works | |
def mixing(a, b, lambda_value): | |
return ((1.-lambda_value) * a) + (lambda_value * b) | |
def noneq_sampling_forward(sim, mol): | |
"""Use nonequ switching to calculate work values from MM to ANI starting with MM sampels. | |
forward (MM/MM to ANI/MM) | |
""" | |
print('Noneq sampling forward') | |
atoms = ''.join([a.element.symbol for a in mol.atoms]) | |
f_e = ANI1_force_and_energy(model=model, atoms=atoms) | |
# generate mass arrays | |
masses = np.array([mass_dict_in_daltons[a] for a in atoms]) * unit.daltons | |
sigma_v = np.array([unit.sqrt(kB * temperature / m) / speed_unit for m in masses]) * speed_unit | |
v0 = np.random.randn(len(sigma_v), 3) * sigma_v[:, None] | |
# convert initial state numpy arrays with correct attached units | |
v = np.array(v0.value_in_unit(speed_unit)) * speed_unit | |
# dimensionless scalars | |
a = np.exp(- collision_rate * stepsize) | |
b = np.sqrt(1 - np.exp(-2 * collision_rate * stepsize)) | |
# MM starting samples | |
samples_mm = collect_samples_mm(sim, n_samples=1000) | |
# traj is accumulated as a list of arrays with attached units | |
traj = [] | |
w_list = [] | |
for _ in tqdm(range(50)): | |
# select starting conformations | |
x = np.array(random.choice(samples_mm).value_in_unit(distance_unit)) * distance_unit | |
# initial force | |
f_mm = compute_mm_force(sim, x) | |
F = f_mm | |
w = 0.0 | |
lam_values = (np.linspace(0,1,21)) | |
for idx in range(1, len(lam_values)): | |
# v | |
v += (stepsize * 0.5) * F / masses[:, None] | |
# r | |
x += (stepsize * 0.5) * v | |
# o | |
v = (a * v) + (b * sigma_v[:, None] * np.random.randn(*x.shape)) | |
# r | |
x += (stepsize * 0.5) * v | |
# calculate F | |
f_ani = f_e.calculate_force(x)[0] | |
f_mm = compute_mm_force(sim, x) | |
F = mixing(f_mm, f_ani, lam_values[idx]) | |
# v | |
v += (stepsize * 0.5) * F / masses[:, None] | |
traj.append(x) | |
# calculate work | |
# evaluate u_t(x_t) - u_{t-1}(x_t) | |
u_mm = compute_mm_energy(sim, x)/kBT | |
u_ani = f_e.calculate_energy(x)/kBT | |
u_now = mixing(u_mm, u_ani, lam_values[idx]) | |
u_before = mixing(u_mm, u_ani, lam_values[idx-1]) | |
w += (u_now - u_before) | |
w_list.append(w) | |
offset = min(w_list) | |
w_unitless_offset = [(w - offset) for w in w_list] | |
avgsumexp_unitless_offset = np.average(np.exp(w_unitless_offset)) | |
logavgsumexp = np.log(avgsumexp_unitless_offset) + offset | |
stddev_w = np.std(w_unitless_offset) | |
return logavgsumexp, stddev_w, w_list | |
def noneq_sampling_reverse(sim, mol): | |
"""Use nonequ switching to calculate work values from MM to ANI starting with MM sampels. | |
reverse (ANI/ANI to ANI/MM) | |
""" | |
print('Noneq sampling reverse') | |
atoms = ''.join([a.element.symbol for a in mol.atoms]) | |
f_e = ANI1_force_and_energy(model=model, atoms=atoms) | |
# generate mass arrays | |
masses = np.array([mass_dict_in_daltons[a] for a in atoms]) * unit.daltons | |
sigma_v = np.array([unit.sqrt(kB * temperature / m) / speed_unit for m in masses]) * speed_unit | |
v0 = np.random.randn(len(sigma_v), 3) * sigma_v[:, None] | |
# convert initial state numpy arrays with correct attached units | |
v = np.array(v0.value_in_unit(speed_unit)) * speed_unit | |
# dimensionless scalars | |
a = np.exp(- collision_rate * stepsize) | |
b = np.sqrt(1 - np.exp(-2 * collision_rate * stepsize)) | |
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=200)[0][::2] | |
# traj is accumulated as a list of arrays with attached units | |
traj = [] | |
w_list = [] | |
for _ in tqdm(range(50)): | |
# select starting conformations | |
x = np.array(random.choice(samples_ani).value_in_unit(distance_unit)) * distance_unit | |
# initial force | |
f_ani = f_e.calculate_force(x)[0] | |
F = f_ani | |
# initial work | |
w = 0.0 | |
lam_values = (np.linspace(0,1,21)) | |
for idx in range(1, len(lam_values)): | |
# v | |
v += (stepsize * 0.5) * F / masses[:, None] | |
# r | |
x += (stepsize * 0.5) * v | |
# o | |
v = (a * v) + (b * sigma_v[:, None] * np.random.randn(*x.shape)) | |
# r | |
x += (stepsize * 0.5) * v | |
# calculate F | |
f_ani = f_e.calculate_force(x)[0] | |
f_mm = compute_mm_force(sim, x) | |
F = mixing(f_ani, f_mm, lam_values[idx]) | |
# v | |
v += (stepsize * 0.5) * F / masses[:, None] | |
traj.append(x) | |
# calculate work | |
# evaluate u_t(x_t) - u_{t-1}(x_t) | |
u_mm = compute_mm_energy(sim, x)/kBT | |
u_ani = f_e.calculate_energy(x)/kBT | |
u_now = mixing(u_ani, u_mm, lam_values[idx]) | |
u_before = mixing(u_ani, u_mm, lam_values[idx-1]) | |
w += (u_now - u_before) | |
w_list.append(w) | |
offset = min(w_list) | |
w_unitless_offset = [(w - offset) for w in w_list] | |
avgsumexp_unitless_offset = np.average(np.exp(w_unitless_offset)) | |
logavgsumexp = np.log(avgsumexp_unitless_offset) + offset | |
stddev_w = np.std(w_unitless_offset) | |
return logavgsumexp, stddev_w, w_list | |
# loop over all tautomer pairs, calculate BAR & FEP forward, reverse & Non-equ work values forward/reverse | |
exp_results = pickle.load(open('../data/exp_results.pickle', 'rb')) | |
r = defaultdict(list) | |
for name in exp_results: | |
if name in exclude_set: | |
continue | |
t1_smiles = exp_results[name]['t1-smiles'] | |
t2_smiles = exp_results[name]['t2-smiles'] | |
t1 = Molecule.from_smiles(t1_smiles, hydrogens_are_explicit=False) | |
t2 = Molecule.from_smiles(t2_smiles, hydrogens_are_explicit=False) | |
t1_sim = create_sim(t1) | |
t2_sim = create_sim(t2) | |
tmp = {} | |
for mol, sim in zip([t1, t2], [t1_sim, t2_sim]): | |
tmp['explicitSampling'] = staged_free_energy(sim, mol) | |
tmp['FEP_forward'] = one_sided_exp_mm_to_ani(sim, mol) | |
tmp['FEP_reverse'] = one_sided_exp_ani_to_mm(sim, mol) | |
tmp['nonequ_forward'] = noneq_sampling_forward(sim, mol) | |
tmp['nonequ_reverse'] = noneq_sampling_reverse(sim, mol) | |
r[name].append(tmp) | |
staged_dG = [] | |
staged_ddG = [] | |
fep_forward_dG = [] | |
fep_forward_ddG = [] | |
fep_reverse_dG = [] | |
fep_reverse_ddG = [] | |
nonequ_forw_dG = [] | |
nonequ_forw_ddG = [] | |
nonequ_rev_dG = [] | |
nonequ_rev_ddG = [] | |
for name in r: | |
e1 = r[name][0]['explicitSampling'][0] | |
e2 = r[name][0]['FEP_forward'][0] | |
e3 = (-1) *r[name][0]['FEP_reverse'][0] | |
e4 = r[name][0]['nonequ_forward'][0] | |
e5 = (-1) * r[name][0]['nonequ_reverse'][0] | |
e_all = [e1,e2,e3,e4,e5] | |
# offset the values to make it possible to compare different molecules | |
staged_dG.append(e1 - min(e_all)) | |
staged_ddG.append(r[name][0]['explicitSampling'][1]) | |
fep_forward_dG.append(e2 - min(e_all)) | |
fep_forward_ddG.append(r[name][0]['FEP_forward'][2]) | |
fep_reverse_dG.append((e3 - min(e_all))) | |
fep_reverse_ddG.append(r[name][0]['FEP_reverse'][2]) | |
nonequ_forw_dG.append((e4 - min(e_all))) | |
nonequ_forw_ddG.append(r[name][0]['nonequ_forward'][1]) | |
nonequ_rev_dG.append((e5 - min(e_all))) | |
nonequ_rev_ddG.append(r[name][0]['nonequ_reverse'][1]) | |
ax = plt.gca() | |
ax.errorbar([i for i in range(len(staged_dG))], staged_dG, yerr=staged_ddG, ms=5, fmt='-ob', alpha=0.9, ecolor='b', capthick=2, label='staged') | |
ax.errorbar([i for i in range(len(staged_dG))], fep_forward_dG, yerr=fep_forward_ddG, ms=5, fmt='-og', alpha=0.9, ecolor='g', capthick=2, label='instantaneous forward') | |
ax.errorbar([i for i in range(len(staged_dG))], fep_reverse_dG, yerr=fep_reverse_ddG, ms=5, fmt='-oy', alpha=0.9, ecolor='y', capthick=2, label='instantaneous reverse') | |
ax.errorbar([i for i in range(len(staged_dG))], nonequ_forw_dG, yerr=nonequ_forw_ddG, ms=5, fmt='-oc', alpha=0.9, ecolor='c', capthick=2, label='nonequ forward') | |
ax.errorbar([i for i in range(len(staged_dG))], nonequ_rev_dG, yerr=nonequ_rev_ddG, ms=5, fmt='-om', alpha=0.9, ecolor='m', capthick=2, label='nonequ reverse') | |
plt.xlabel('# molecules') | |
#plt.ylim(-5,50) | |
plt.ylabel('relative offset ddG [kcal/mol]') | |
plt.legend() | |
plt.plot() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment