Skip to content

Instantly share code, notes, and snippets.

@wiederm
Last active March 5, 2020 16:33
Show Gist options
  • Save wiederm/7ac5c29e5a0dea9d17ef16dda93fe02d to your computer and use it in GitHub Desktop.
Save wiederm/7ac5c29e5a0dea9d17ef16dda93fe02d to your computer and use it in GitHub Desktop.
BAR & FEP forward, reverse & Non-equ work values for MM to ANI in vacuum
import torchani
import torch
model = torchani.models.ANI1ccx()
import neutromeratio
import numpy as np
import mdtraj as md
import nglview
from tqdm import tqdm
import pickle
from collections import defaultdict
# openff imports
import openforcefield
from openforcefield.topology import Molecule
from openforcefield.typing.engines.smirnoff import ForceField
forcefield = ForceField('openff_unconstrained-1.0.0.offxml')
print(openforcefield._version.get_versions())
# openmm imports
from simtk import openmm as mm
from simtk.openmm import LangevinIntegrator
from simtk.openmm.app import Simulation
# simulation settings
from simtk import unit
from neutromeratio.constants import temperature, exclude_set
from openmmtools.constants import kB
import matplotlib.pyplot as plt
# analysis imports
from pymbar import MBAR, EXP
import simtk
from neutromeratio.constants import device, platform, nm_to_angstroms, hartree_to_kJ_mol, mass_dict_in_daltons, speed_unit, distance_unit
import random
kBT = kB * temperature
stepsize = 1 * unit.femtosecond
collision_rate = 1 / unit.picosecond
class ANI1_force_and_energy(object):
def __init__(self,
model: torchani.models.ANI1ccx,
atoms: str,
):
"""
Performs energy and force calculations.
Parameters
----------
model:
atoms: str
a string of atoms in the indexed order
"""
self.device = device
self.model = model
self.atoms = atoms
self.species = self.model.species_to_tensor(atoms).to(device).unsqueeze(0)
self.platform = platform
def minimize(self, coords: simtk.unit.quantity.Quantity, maxiter: int = 1000,
show_plot: bool = False):
"""
Minimizes the molecule.
Parameters
----------
coords:simtk.unit.quantity.Quantity
maxiter: int
Maximum number of minimization steps performed.
lambda_value: float
indicate position in lambda protocoll to control how dummy atoms are handled
Returns
-------
coords:simtk.unit.quantity.Quantity
"""
from scipy import optimize
assert(type(coords) == unit.Quantity)
x = coords.value_in_unit(unit.angstrom)
self.memory_of_energy = []
print("Begin minimizing...")
f = optimize.minimize(self._traget_energy_function, x, method='BFGS',
jac=True, options={'maxiter': maxiter, 'disp': True})
logger.critical(f"Minimization status: {f.success}")
memory_of_energy = copy.deepcopy(self.memory_of_energy)
self.memory_of_energy = []
return f.x.reshape(-1, 3) * unit.angstrom, memory_of_energy
def calculate_force(self, x: simtk.unit.quantity.Quantity, lambda_value: float = 0.0) -> (simtk.unit.quantity.Quantity, simtk.unit.quantity.Quantity):
"""
Given a coordinate set the forces with respect to the coordinates are calculated.
Parameters
----------
x : array of floats, unit'd (distance unit)
initial configuration
lambda_value : float
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero
Returns
-------
F : float, unit'd
E : float, unit'd
"""
assert(type(x) == unit.Quantity)
assert(float(lambda_value) <= 1.0 and float(lambda_value) >= 0.0)
coordinates = torch.tensor([x.value_in_unit(unit.nanometer)],
requires_grad=True, device=self.device, dtype=torch.float32)
energy_in_kJ_mol, = self._calculate_energy(coordinates)
# derivative of E (in kJ/mol) w.r.t. coordinates (in nm)
derivative = torch.autograd.grad((energy_in_kJ_mol).sum(), coordinates)[0]
if self.platform == 'cpu':
F = - np.array(derivative)[0]
elif self.platform == 'cuda':
F = - np.array(derivative.cpu())[0]
else:
raise RuntimeError('Platform needs to be specified. Either CPU or CUDA.')
return (F * (unit.kilojoule_per_mole / unit.nanometer),
energy_in_kJ_mol.item() * unit.kilojoule_per_mole)
def _calculate_energy(self, coordinates: torch.tensor):
"""
Helpter function to return energies as tensor.
Given a coordinate set the energy is calculated.
Parameters
----------
coordinates : torch.tensor
coordinates in nanometer without units attached
lambda_value : float
between 0.0 and 1.0
Returns
-------
energy_in_kJ_mol : torch.tensor
return the energy with restraints added
"""
stddev_in_hartree = torch.tensor(0.0,device=self.device, dtype=torch.float64)
_, energy_in_hartree = self.model((self.species, coordinates * nm_to_angstroms))
# convert energy from hartrees to kJ/mol
return energy_in_hartree * hartree_to_kJ_mol
def _traget_energy_function(self, x) -> float:
"""
Given a coordinate set (x) the energy is calculated in kJ/mol.
Parameters
----------
x : array of floats, unit'd (distance unit)
initial configuration
lambda_value : float
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero
Returns
-------
E : float, unit'd
"""
x = x.reshape(-1, 3) * unit.angstrom
F, E = self.calculate_force(x)
F_flat = -np.array(F.value_in_unit(unit.kilojoule_per_mole/unit.angstrom).flatten(), dtype=np.float64)
self.memory_of_energy.append(E)
return E.value_in_unit(unit.kilojoule_per_mole), F_flat
def calculate_energy(self, x: simtk.unit.quantity.Quantity, lambda_value: float = 0.0):
"""
Given a coordinate set (x) the energy is calculated in kJ/mol.
Parameters
----------
x : array of floats, unit'd (distance unit)
initial configuration
lambda_value : float
between 0.0 and 1.0 - at zero contributions of alchemical atoms are zero
Returns
-------
energy : unit.Quantity
energy in kJ/mol
"""
assert(type(x) == unit.Quantity)
coordinates = torch.tensor([x.value_in_unit(unit.nanometer)],
requires_grad=True, device=self.device, dtype=torch.float32)
energy_in_kJ_mol, = self._calculate_energy(coordinates)
energy = energy_in_kJ_mol.item() * unit.kilojoule_per_mole
return energy
def create_sim(molecule):
"""Create vacuum simulation system"""
platform = mm.Platform.getPlatformByName('Reference')
integrator = LangevinIntegrator(temperature, collision_rate, stepsize)
topology = molecule.to_topology()
system = forcefield.create_openmm_system(topology)
sim = Simulation(topology, system, integrator, platform=platform)
molecule.generate_conformers()
sim.context.setPositions(molecule.conformers[0])
sim.minimizeEnergy()
sim.context.setVelocitiesToTemperature(temperature)
return sim
def get_positions(sim):
"""get position of system in a state"""
return sim.context.getState(getPositions=True).getPositions(asNumpy=True)
def collect_samples_mm(sim, n_samples=100, n_steps_per_sample=1000):
"""generate samples using a classical FF"""
samples = []
for _ in tqdm(range(n_samples)):
sim.step(n_steps_per_sample)
samples.append(get_positions(sim))
return samples
def compute_mm_energy(sim, positions):
"""compute mm energy for given positions"""
sim.context.setPositions(positions)
return sim.context.getState(getEnergy=True).getPotentialEnergy()
def compute_mm_force(sim, positions):
"""compute mm forces given a position"""
sim.context.setPositions(positions)
return sim.context.getState(getForces=True).getForces(asNumpy=True)
def collect_samples_ani(mol, x0, n_steps=100):
"""generate samples using ANIccx"""
model = neutromeratio.ani.PureANI1ccx()
species_string = ''.join([a.element.symbol for a in mol.atoms])
species = model.species_to_tensor(species_string).unsqueeze(0)
energy_function = neutromeratio.ANI1_force_and_energy(model = model,
atoms = species_string,
mol = None)
energy_and_force = lambda x : energy_function.calculate_force(x)
langevin = neutromeratio.LangevinDynamics(atoms = species_string,
energy_and_force= energy_and_force)
trajectory = langevin.run_dynamics(x0, n_steps, stepsize=1.0 * unit.femtosecond, progress_bar=True)
return trajectory
def compute_ani_energy(mol, samples):
"""compute ani energy given a position"""
species_string = ''.join([a.element.symbol for a in mol.atoms])
species = model.species_to_tensor(species_string).unsqueeze(0)
coordinates = torch.tensor([sample / unit.angstrom for sample in samples], dtype=torch.float32)
energy = model((torch.stack([species[0]] * len(samples)), coordinates))
return energy.energies.detach().numpy() * 627.5 * unit.kilocalorie_per_mole # convert from hartree to kcal/mol
def staged_free_energy(sim, mol):
"""Generate samples using ANI and MM and the MBAR estimater to calculate
the free energy difference between ANI and MM"""
print('Explicit sampling of endstates')
# collect sampls from MM
samples_mm = collect_samples_mm(sim, n_samples=1000)
# collect samples from ANI
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=1000)[0][::10]
# combine samples from MM and ANI
snapshots = samples_mm + samples_ani
# compute ANI energy
e_ani = compute_ani_energy(mol, snapshots)/kBT
# compute MM energy
e_mm = [compute_mm_energy(sim, snap)/kBT for snap in snapshots]
N_k = [len(samples_mm), len(samples_ani)]
U_kn = np.array([e_mm, e_ani])
bar = MBAR(U_kn, N_k)
D_f = bar.getFreeEnergyDifferences(return_dict=True)['Delta_f'][0][-1]
dD_f = bar.getFreeEnergyDifferences(return_dict=True)['dDelta_f'][0][-1]
return D_f, dD_f
def one_sided_exp_mm_to_ani(sim, mol):
"""Use FEP to calculate work values of instantaneous switching from MM to ANI on MM sampels.
forward (MM/MM to ANI/MM)
"""
print('FEP forward')
# get mm samples and calculate energy and calculate MM energies
samples_mm = collect_samples_mm(sim, n_samples=100)
u_mm_mm = [compute_mm_energy(sim, snap)/kBT for snap in samples_mm]
# calculate ANI energy on the MM samples
u_ani_mm = compute_ani_energy(mol, samples_mm)/kBT
# calculate work
endpoint_reduced_works = (u_ani_mm - u_mm_mm)
# DeltaG(t1_MM --> t1_ANI)
exp_endpoint = EXP(endpoint_reduced_works, return_dict=True)
D_f, dD_f = (exp_endpoint['Delta_f']), exp_endpoint['dDelta_f']
stddev_w = np.std(endpoint_reduced_works)
return D_f, dD_f, stddev_w, endpoint_reduced_works
def one_sided_exp_ani_to_mm(sim, mol):
"""Use FEP to calculate work values instantaneous switching from ANI to MM on ANI sampels.
reverse (ANI/ANI to MM/ANI)
"""
print('FEP reverse')
# get ani samples and calculate ANI energies
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=500)[0][::5]
u_ani_ani = compute_ani_energy(mol, samples_ani)/kBT
# calculate mm energy on the samples
u_mm_ani = [compute_mm_energy(sim, snap)/kBT for snap in samples_ani]
# calculate work
endpoint_reduced_works = (u_mm_ani- u_ani_ani)
# DeltaG(t1_ANI --> t1_MM)
exp_endpoint = EXP(endpoint_reduced_works, return_dict=True)
D_f, dD_f = (exp_endpoint['Delta_f']), exp_endpoint['dDelta_f']
stddev_w = np.std(endpoint_reduced_works)
return D_f, dD_f, stddev_w, endpoint_reduced_works
def mixing(a, b, lambda_value):
return ((1.-lambda_value) * a) + (lambda_value * b)
def noneq_sampling_forward(sim, mol):
"""Use nonequ switching to calculate work values from MM to ANI starting with MM sampels.
forward (MM/MM to ANI/MM)
"""
print('Noneq sampling forward')
atoms = ''.join([a.element.symbol for a in mol.atoms])
f_e = ANI1_force_and_energy(model=model, atoms=atoms)
# generate mass arrays
masses = np.array([mass_dict_in_daltons[a] for a in atoms]) * unit.daltons
sigma_v = np.array([unit.sqrt(kB * temperature / m) / speed_unit for m in masses]) * speed_unit
v0 = np.random.randn(len(sigma_v), 3) * sigma_v[:, None]
# convert initial state numpy arrays with correct attached units
v = np.array(v0.value_in_unit(speed_unit)) * speed_unit
# dimensionless scalars
a = np.exp(- collision_rate * stepsize)
b = np.sqrt(1 - np.exp(-2 * collision_rate * stepsize))
# MM starting samples
samples_mm = collect_samples_mm(sim, n_samples=1000)
# traj is accumulated as a list of arrays with attached units
traj = []
w_list = []
for _ in tqdm(range(50)):
# select starting conformations
x = np.array(random.choice(samples_mm).value_in_unit(distance_unit)) * distance_unit
# initial force
f_mm = compute_mm_force(sim, x)
F = f_mm
w = 0.0
lam_values = (np.linspace(0,1,21))
for idx in range(1, len(lam_values)):
# v
v += (stepsize * 0.5) * F / masses[:, None]
# r
x += (stepsize * 0.5) * v
# o
v = (a * v) + (b * sigma_v[:, None] * np.random.randn(*x.shape))
# r
x += (stepsize * 0.5) * v
# calculate F
f_ani = f_e.calculate_force(x)[0]
f_mm = compute_mm_force(sim, x)
F = mixing(f_mm, f_ani, lam_values[idx])
# v
v += (stepsize * 0.5) * F / masses[:, None]
traj.append(x)
# calculate work
# evaluate u_t(x_t) - u_{t-1}(x_t)
u_mm = compute_mm_energy(sim, x)/kBT
u_ani = f_e.calculate_energy(x)/kBT
u_now = mixing(u_mm, u_ani, lam_values[idx])
u_before = mixing(u_mm, u_ani, lam_values[idx-1])
w += (u_now - u_before)
w_list.append(w)
offset = min(w_list)
w_unitless_offset = [(w - offset) for w in w_list]
avgsumexp_unitless_offset = np.average(np.exp(w_unitless_offset))
logavgsumexp = np.log(avgsumexp_unitless_offset) + offset
stddev_w = np.std(w_unitless_offset)
return logavgsumexp, stddev_w, w_list
def noneq_sampling_reverse(sim, mol):
"""Use nonequ switching to calculate work values from MM to ANI starting with MM sampels.
reverse (ANI/ANI to ANI/MM)
"""
print('Noneq sampling reverse')
atoms = ''.join([a.element.symbol for a in mol.atoms])
f_e = ANI1_force_and_energy(model=model, atoms=atoms)
# generate mass arrays
masses = np.array([mass_dict_in_daltons[a] for a in atoms]) * unit.daltons
sigma_v = np.array([unit.sqrt(kB * temperature / m) / speed_unit for m in masses]) * speed_unit
v0 = np.random.randn(len(sigma_v), 3) * sigma_v[:, None]
# convert initial state numpy arrays with correct attached units
v = np.array(v0.value_in_unit(speed_unit)) * speed_unit
# dimensionless scalars
a = np.exp(- collision_rate * stepsize)
b = np.sqrt(1 - np.exp(-2 * collision_rate * stepsize))
samples_ani = collect_samples_ani(mol, get_positions(sim), n_steps=200)[0][::2]
# traj is accumulated as a list of arrays with attached units
traj = []
w_list = []
for _ in tqdm(range(50)):
# select starting conformations
x = np.array(random.choice(samples_ani).value_in_unit(distance_unit)) * distance_unit
# initial force
f_ani = f_e.calculate_force(x)[0]
F = f_ani
# initial work
w = 0.0
lam_values = (np.linspace(0,1,21))
for idx in range(1, len(lam_values)):
# v
v += (stepsize * 0.5) * F / masses[:, None]
# r
x += (stepsize * 0.5) * v
# o
v = (a * v) + (b * sigma_v[:, None] * np.random.randn(*x.shape))
# r
x += (stepsize * 0.5) * v
# calculate F
f_ani = f_e.calculate_force(x)[0]
f_mm = compute_mm_force(sim, x)
F = mixing(f_ani, f_mm, lam_values[idx])
# v
v += (stepsize * 0.5) * F / masses[:, None]
traj.append(x)
# calculate work
# evaluate u_t(x_t) - u_{t-1}(x_t)
u_mm = compute_mm_energy(sim, x)/kBT
u_ani = f_e.calculate_energy(x)/kBT
u_now = mixing(u_ani, u_mm, lam_values[idx])
u_before = mixing(u_ani, u_mm, lam_values[idx-1])
w += (u_now - u_before)
w_list.append(w)
offset = min(w_list)
w_unitless_offset = [(w - offset) for w in w_list]
avgsumexp_unitless_offset = np.average(np.exp(w_unitless_offset))
logavgsumexp = np.log(avgsumexp_unitless_offset) + offset
stddev_w = np.std(w_unitless_offset)
return logavgsumexp, stddev_w, w_list
# loop over all tautomer pairs, calculate BAR & FEP forward, reverse & Non-equ work values forward/reverse
exp_results = pickle.load(open('../data/exp_results.pickle', 'rb'))
r = defaultdict(list)
for name in exp_results:
if name in exclude_set:
continue
t1_smiles = exp_results[name]['t1-smiles']
t2_smiles = exp_results[name]['t2-smiles']
t1 = Molecule.from_smiles(t1_smiles, hydrogens_are_explicit=False)
t2 = Molecule.from_smiles(t2_smiles, hydrogens_are_explicit=False)
t1_sim = create_sim(t1)
t2_sim = create_sim(t2)
tmp = {}
for mol, sim in zip([t1, t2], [t1_sim, t2_sim]):
tmp['explicitSampling'] = staged_free_energy(sim, mol)
tmp['FEP_forward'] = one_sided_exp_mm_to_ani(sim, mol)
tmp['FEP_reverse'] = one_sided_exp_ani_to_mm(sim, mol)
tmp['nonequ_forward'] = noneq_sampling_forward(sim, mol)
tmp['nonequ_reverse'] = noneq_sampling_reverse(sim, mol)
r[name].append(tmp)
staged_dG = []
staged_ddG = []
fep_forward_dG = []
fep_forward_ddG = []
fep_reverse_dG = []
fep_reverse_ddG = []
nonequ_forw_dG = []
nonequ_forw_ddG = []
nonequ_rev_dG = []
nonequ_rev_ddG = []
for name in r:
e1 = r[name][0]['explicitSampling'][0]
e2 = r[name][0]['FEP_forward'][0]
e3 = (-1) *r[name][0]['FEP_reverse'][0]
e4 = r[name][0]['nonequ_forward'][0]
e5 = (-1) * r[name][0]['nonequ_reverse'][0]
e_all = [e1,e2,e3,e4,e5]
# offset the values to make it possible to compare different molecules
staged_dG.append(e1 - min(e_all))
staged_ddG.append(r[name][0]['explicitSampling'][1])
fep_forward_dG.append(e2 - min(e_all))
fep_forward_ddG.append(r[name][0]['FEP_forward'][2])
fep_reverse_dG.append((e3 - min(e_all)))
fep_reverse_ddG.append(r[name][0]['FEP_reverse'][2])
nonequ_forw_dG.append((e4 - min(e_all)))
nonequ_forw_ddG.append(r[name][0]['nonequ_forward'][1])
nonequ_rev_dG.append((e5 - min(e_all)))
nonequ_rev_ddG.append(r[name][0]['nonequ_reverse'][1])
ax = plt.gca()
ax.errorbar([i for i in range(len(staged_dG))], staged_dG, yerr=staged_ddG, ms=5, fmt='-ob', alpha=0.9, ecolor='b', capthick=2, label='staged')
ax.errorbar([i for i in range(len(staged_dG))], fep_forward_dG, yerr=fep_forward_ddG, ms=5, fmt='-og', alpha=0.9, ecolor='g', capthick=2, label='instantaneous forward')
ax.errorbar([i for i in range(len(staged_dG))], fep_reverse_dG, yerr=fep_reverse_ddG, ms=5, fmt='-oy', alpha=0.9, ecolor='y', capthick=2, label='instantaneous reverse')
ax.errorbar([i for i in range(len(staged_dG))], nonequ_forw_dG, yerr=nonequ_forw_ddG, ms=5, fmt='-oc', alpha=0.9, ecolor='c', capthick=2, label='nonequ forward')
ax.errorbar([i for i in range(len(staged_dG))], nonequ_rev_dG, yerr=nonequ_rev_ddG, ms=5, fmt='-om', alpha=0.9, ecolor='m', capthick=2, label='nonequ reverse')
plt.xlabel('# molecules')
#plt.ylim(-5,50)
plt.ylabel('relative offset ddG [kcal/mol]')
plt.legend()
plt.plot()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment