Last active
November 1, 2021 06:45
-
-
Save dwhswenson/932c0903a7b302e9f332ffa491a6fd0d to your computer and use it in GitHub Desktop.
Simple OpenMM-based MD from Gromacs files
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
# coding: utf-8 | |
########################################################################## | |
# this script was generated by openmm-builder. to customize it further, | |
# you can save the file to disk and edit it with your favorite editor. | |
########################################################################## | |
# REQUIRES: | |
# * OpenMM (core dynamics) | |
# * mdtraj (h5 reporter -- with velocities) | |
# * parmed (progress reporter) | |
# * openmmtools (VVVR integrator) | |
from __future__ import print_function | |
from simtk.openmm import app | |
import simtk.openmm as mm | |
import mdtraj as md | |
import parmed | |
import re | |
import openmmtools.integrators | |
from simtk import unit as u | |
from sys import stdout | |
import logging | |
logger = logging.getLogger(__name__) | |
class GMXSimulation(object): | |
""" | |
Create a basic OpenMM simulation from Gromacs files. | |
The resulting simulation has sane defaults for biomolecular simulation. | |
There's only limited customizability -- if you want more | |
customizability, just use OpenMM directly! | |
""" | |
def __init__(self, gro_file, top_file, platform_parameters, subset=None, | |
temperature=300*u.kelvin, pressure=1.0*u.atmosphere, | |
timestep=2.0*u.femtosecond, | |
time_per_frame=50*u.picosecond, | |
output_basename=None, output_index=None): | |
self.timestep = timestep | |
# load the stuff from the files | |
gro = app.GromacsGroFile(gro_file) | |
box_vectors = mm.Vec3(*gro.getPeriodicBoxVectors()) | |
top = app.GromacsTopFile(top_file, periodicBoxVectors=box_vectors) | |
# set up the integrator | |
integrator = self.build_integrator(temperature, pressure, timestep) | |
system = self.build_system(top) | |
system.addForce(mm.MonteCarloBarostat(pressure, temperature, 25)) | |
platform = platform_parameters['platform'] | |
self.system = system | |
self.integrator = integrator | |
self.simulation = self.build_simulation(top, system, integrator, | |
platform_parameters) | |
# set up the reporters | |
if output_basename is None: | |
output_basename = re.sub('\.gro$', '', gro_file) | |
if output_index is not None: | |
output_basename += str(output_index) # make 0-padded? | |
self.n_steps_per_frame = int(time_per_frame / timestep) | |
# log reporter (stdout) | |
log_reporter = app.StateDataReporter( | |
stdout, | |
reportInterval=self.n_steps_per_frame, | |
potentialEnergy=True, | |
kineticEnergy=True, | |
totalEnergy=False, | |
temperature=True, | |
volume=True | |
) | |
self.simulation.reporters.append(log_reporter) | |
# full reporter, including velocities | |
full_reporter = md.reporters.HDF5Reporter( | |
output_basename + ".h5", | |
reportInterval=self.n_steps_per_frame, | |
coordinates=True, | |
time=True, | |
cell=True, | |
potentialEnergy=False, #True, | |
kineticEnergy=False, #True, | |
velocities=True | |
) | |
# subset reporter (no H2O) | |
if subset is not None: | |
subset_reporter = md.reporters.DCDReporter( | |
output_basename + "_dry.dcd", | |
reportInterval=self.n_steps_per_frame, | |
atomSubset=subset | |
) | |
self.simulation.reporters.append(full_reporter) | |
if subset is not None: | |
self.simulation.reporters.append(subset_reporter) | |
self.output_basename = output_basename | |
def run_position_restrained(self, time, output_file=None): | |
pass | |
def _debug_state(self): | |
state = self.simulation.context.getState(getPositions=True, | |
getVelocities=True, | |
getEnergy=True) | |
coords = state.getPositions(asNumpy=True) | |
box = state.getPeriodicBoxVectors(asNumpy=True) | |
velocities = state.getVelocities(asNumpy=True) | |
potential = state.getPotentialEnergy() | |
kinetic = state.getKineticEnergy() | |
print(coords) | |
print(box) | |
print(velocities) | |
print(potential, kinetic) | |
def run(self, time): | |
n_steps = int(time / self.timestep) + 1 | |
prog_reporter = parmed.openmm.ProgressReporter( | |
f=self.output_basename + ".progress", | |
reportInterval=self.n_steps_per_frame, | |
totalSteps=n_steps, | |
potentialEnergy=True, | |
kineticEnergy=True, | |
totalEnergy=False, | |
temperature=True, | |
volume=True | |
) | |
self.simulation.reporters.append(prog_reporter) | |
self.simulation.step(n_steps) | |
def build_integrator(self, temperature, pressure, timestep): | |
integrator = openmmtools.integrators.VVVRIntegrator( | |
temperature=temperature, | |
collision_rate=1.0 / u.picoseconds, | |
timestep=timestep | |
) | |
integrator.setConstraintTolerance(0.00001) | |
return integrator | |
def build_system(self, topology): | |
system = topology.createSystem( | |
nonbondedMethod=app.PME, | |
nonbondedCutoff=1.0*u.nanometers, | |
constraints=app.HBonds, | |
rigidWater=True, | |
ewaldErrorTolerance=0.0005 | |
) | |
return system | |
def build_simulation(self, topology, system, integrator, | |
platform_parameters): | |
platform_name = platform_parameters['platform'] | |
properties = platform_parameters['properties'] | |
platform = mm.Platform.getPlatformByName(platform_name) | |
simulation = app.Simulation(topology.topology, system, integrator, | |
platform, properties) | |
return simulation | |
def initialize_random_setup(self, gro_file, temperature): | |
gro = app.GromacsGroFile(gro_file) | |
self.simulation.context.setPositions(gro.positions) | |
self.simulation.context.setVelocitiesToTemperature(temperature) | |
def make_parser(): | |
from argparse import ArgumentParser | |
parser = ArgumentParser() | |
parser.add_argument('-t', '--topology') | |
parser.add_argument('--subset', default="not water") | |
parser.add_argument('-T', '--temperature', default=300.0, type=float, | |
help='Simulation temperature (K)') | |
parser.add_argument('-P', '--pressure', default=1.0, type=float, | |
help='Simulation pressure (atm)') | |
parser.add_argument('--time-per-frame', default=50.0, type=float, | |
help='Time between saved frames (ps)') | |
parser.add_argument('--timestep', default=2.0, type=float, | |
help='Timestep for integration (fs)') | |
parser.add_argument('--platform', default="OpenCL") | |
parser.add_argument('--device-index', default=0) | |
parser.add_argument('-n', '--nruns', default=1, type=int) | |
parser.add_argument("--start-run", default=0, type=int) | |
#parser.add_argument('--posre', default=0.0, | |
#help="Position-restrained run time (ns)") | |
#parser.add_argument('--equil', default=0.0, | |
#help="Equilibration time (ns)") | |
parser.add_argument('--run', default=0.0, | |
help="Production run time (ns)") | |
parser.add_argument('--debug', action='store_true') | |
parser.add_argument('--log', default=None) | |
parser.add_argument('gro_file') | |
return parser | |
def prepare_logging(logfile, debug): | |
logger.setLevel(logging.INFO) | |
if debug: | |
logger.setLever(logging.DEBUG) | |
if logfile is None: | |
handler = logging.StreamHandler() | |
else: | |
handler = logging.FileHandler(logfile) | |
formatter = logging.Formatter( | |
'%(asctime)s - %(name)s - %(levelname)s - %(message)s' | |
) | |
handler.setFormatter(formatter) | |
logger.addHandler(handler) | |
if __name__ == "__main__": | |
parser = make_parser() | |
opts = parser.parse_args() | |
prepare_logging(opts.log, opts.debug) | |
mdtraj_topol = md.load(opts.gro_file).topology | |
if opts.subset.lower() == "none": | |
subset = None | |
else: | |
subset = mdtraj_topol.select(opts.subset) | |
if opts.platform == "OpenCL": | |
properties = { | |
'OpenCLPrecision': 'mixed', | |
'OpenCLDeviceIndex': opts.device_index | |
} | |
elif opts.platform == "CUDA": | |
properties = { | |
'CudaPrecision': 'mixed', | |
'CudaDeviceIndex': opts.device_index | |
} | |
else: | |
properties = None | |
platform_parameters = { | |
'platform': opts.platform, | |
'properties': properties | |
} | |
run_time = float(opts.run) * u.nanoseconds | |
for run_num in range(opts.start_run, opts.start_run + opts.nruns): | |
if opts.start_run == 0 and opts.nruns == 1: | |
output_index = None | |
else: | |
output_index = run_num | |
if output_index is not None: | |
logger.info("Starting run " + str(output_index)) | |
simulation = GMXSimulation( | |
gro_file=opts.gro_file, | |
top_file=opts.topology, | |
platform_parameters=platform_parameters, | |
subset=subset, | |
temperature=opts.temperature*u.kelvin, | |
pressure=opts.pressure*u.atmosphere, | |
timestep=opts.timestep*u.femtoseconds, | |
time_per_frame=opts.time_per_frame*u.picoseconds, | |
output_index=output_index | |
) | |
simulation.initialize_random_setup(opts.gro_file, | |
opts.temperature * u.kelvin) | |
if opts.debug: | |
simulation._debug_state() | |
simulation.simulation.step(1) | |
simulation._debug_state() | |
simulation.run(run_time) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment