Skip to content

Instantly share code, notes, and snippets.

@leelasd
Forked from dwhswenson/run_md.py
Created August 10, 2020 20:44
Show Gist options
  • Save leelasd/fc60168ecf5a4ccfb63c6b104a088b5f to your computer and use it in GitHub Desktop.
Save leelasd/fc60168ecf5a4ccfb63c6b104a088b5f to your computer and use it in GitHub Desktop.
Simple OpenMM-based MD from Gromacs files
#!/usr/bin/env python
# coding: utf-8
##########################################################################
# this script was generated by openmm-builder. to customize it further,
# you can save the file to disk and edit it with your favorite editor.
##########################################################################
# REQUIRES:
# * OpenMM (core dynamics)
# * mdtraj (h5 reporter -- with velocities)
# * parmed (progress reporter)
# * openmmtools (VVVR integrator)
from __future__ import print_function
from simtk.openmm import app
import simtk.openmm as mm
import mdtraj as md
import parmed
import re
import openmmtools.integrators
from simtk import unit as u
from sys import stdout
import logging
logger = logging.getLogger(__name__)
class GMXSimulation(object):
"""
Create a basic OpenMM simulation from Gromacs files.
The resulting simulation has sane defaults for biomolecular simulation.
There's only limited customizability -- if you want more
customizability, just use OpenMM directly!
"""
def __init__(self, gro_file, top_file, platform_parameters, subset=None,
temperature=300*u.kelvin, pressure=1.0*u.atmosphere,
timestep=2.0*u.femtosecond,
time_per_frame=50*u.picosecond,
output_basename=None, output_index=None):
self.timestep = timestep
# load the stuff from the files
gro = app.GromacsGroFile(gro_file)
box_vectors = mm.Vec3(*gro.getPeriodicBoxVectors())
top = app.GromacsTopFile(top_file, periodicBoxVectors=box_vectors)
# set up the integrator
integrator = self.build_integrator(temperature, pressure, timestep)
system = self.build_system(top)
system.addForce(mm.MonteCarloBarostat(pressure, temperature, 25))
platform = platform_parameters['platform']
self.system = system
self.integrator = integrator
self.simulation = self.build_simulation(top, system, integrator,
platform_parameters)
# set up the reporters
if output_basename is None:
output_basename = re.sub('\.gro$', '', gro_file)
if output_index is not None:
output_basename += str(output_index) # make 0-padded?
self.n_steps_per_frame = int(time_per_frame / timestep)
# log reporter (stdout)
log_reporter = app.StateDataReporter(
stdout,
reportInterval=self.n_steps_per_frame,
potentialEnergy=True,
kineticEnergy=True,
totalEnergy=False,
temperature=True,
volume=True
)
self.simulation.reporters.append(log_reporter)
# full reporter, including velocities
full_reporter = md.reporters.HDF5Reporter(
output_basename + ".h5",
reportInterval=self.n_steps_per_frame,
coordinates=True,
time=True,
cell=True,
potentialEnergy=False, #True,
kineticEnergy=False, #True,
velocities=True
)
# subset reporter (no H2O)
if subset is not None:
subset_reporter = md.reporters.DCDReporter(
output_basename + "_dry.dcd",
reportInterval=self.n_steps_per_frame,
atomSubset=subset
)
self.simulation.reporters.append(full_reporter)
if subset is not None:
self.simulation.reporters.append(subset_reporter)
self.output_basename = output_basename
def run_position_restrained(self, time, output_file=None):
pass
def _debug_state(self):
state = self.simulation.context.getState(getPositions=True,
getVelocities=True,
getEnergy=True)
coords = state.getPositions(asNumpy=True)
box = state.getPeriodicBoxVectors(asNumpy=True)
velocities = state.getVelocities(asNumpy=True)
potential = state.getPotentialEnergy()
kinetic = state.getKineticEnergy()
print(coords)
print(box)
print(velocities)
print(potential, kinetic)
def run(self, time):
n_steps = int(time / self.timestep) + 1
prog_reporter = parmed.openmm.ProgressReporter(
f=self.output_basename + ".progress",
reportInterval=self.n_steps_per_frame,
totalSteps=n_steps,
potentialEnergy=True,
kineticEnergy=True,
totalEnergy=False,
temperature=True,
volume=True
)
self.simulation.reporters.append(prog_reporter)
self.simulation.step(n_steps)
def build_integrator(self, temperature, pressure, timestep):
integrator = openmmtools.integrators.VVVRIntegrator(
temperature=temperature,
collision_rate=1.0 / u.picoseconds,
timestep=timestep
)
integrator.setConstraintTolerance(0.00001)
return integrator
def build_system(self, topology):
system = topology.createSystem(
nonbondedMethod=app.PME,
nonbondedCutoff=1.0*u.nanometers,
constraints=app.HBonds,
rigidWater=True,
ewaldErrorTolerance=0.0005
)
return system
def build_simulation(self, topology, system, integrator,
platform_parameters):
platform_name = platform_parameters['platform']
properties = platform_parameters['properties']
platform = mm.Platform.getPlatformByName(platform_name)
simulation = app.Simulation(topology.topology, system, integrator,
platform, properties)
return simulation
def initialize_random_setup(self, gro_file, temperature):
gro = app.GromacsGroFile(gro_file)
self.simulation.context.setPositions(gro.positions)
self.simulation.context.setVelocitiesToTemperature(temperature)
def make_parser():
from argparse import ArgumentParser
parser = ArgumentParser()
parser.add_argument('-t', '--topology')
parser.add_argument('--subset', default="not water")
parser.add_argument('-T', '--temperature', default=300.0, type=float,
help='Simulation temperature (K)')
parser.add_argument('-P', '--pressure', default=1.0, type=float,
help='Simulation pressure (atm)')
parser.add_argument('--time-per-frame', default=50.0, type=float,
help='Time between saved frames (ps)')
parser.add_argument('--timestep', default=2.0, type=float,
help='Timestep for integration (fs)')
parser.add_argument('--platform', default="OpenCL")
parser.add_argument('--device-index', default=0)
parser.add_argument('-n', '--nruns', default=1, type=int)
parser.add_argument("--start-run", default=0, type=int)
#parser.add_argument('--posre', default=0.0,
#help="Position-restrained run time (ns)")
#parser.add_argument('--equil', default=0.0,
#help="Equilibration time (ns)")
parser.add_argument('--run', default=0.0,
help="Production run time (ns)")
parser.add_argument('--debug', action='store_true')
parser.add_argument('--log', default=None)
parser.add_argument('gro_file')
return parser
def prepare_logging(logfile, debug):
logger.setLevel(logging.INFO)
if debug:
logger.setLever(logging.DEBUG)
if logfile is None:
handler = logging.StreamHandler()
else:
handler = logging.FileHandler(logfile)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
handler.setFormatter(formatter)
logger.addHandler(handler)
if __name__ == "__main__":
parser = make_parser()
opts = parser.parse_args()
prepare_logging(opts.log, opts.debug)
mdtraj_topol = md.load(opts.gro_file).topology
if opts.subset.lower() == "none":
subset = None
else:
subset = mdtraj_topol.select(opts.subset)
if opts.platform == "OpenCL":
properties = {
'OpenCLPrecision': 'mixed',
'OpenCLDeviceIndex': opts.device_index
}
elif opts.platform == "CUDA":
properties = {
'CudaPrecision': 'mixed',
'CudaDeviceIndex': opts.device_index
}
else:
properties = None
platform_parameters = {
'platform': opts.platform,
'properties': properties
}
run_time = float(opts.run) * u.nanoseconds
for run_num in range(opts.start_run, opts.start_run + opts.nruns):
if opts.start_run == 0 and opts.nruns == 1:
output_index = None
else:
output_index = run_num
if output_index is not None:
logger.info("Starting run " + str(output_index))
simulation = GMXSimulation(
gro_file=opts.gro_file,
top_file=opts.topology,
platform_parameters=platform_parameters,
subset=subset,
temperature=opts.temperature*u.kelvin,
pressure=opts.pressure*u.atmosphere,
timestep=opts.timestep*u.femtoseconds,
time_per_frame=opts.time_per_frame*u.picoseconds,
output_index=output_index
)
simulation.initialize_random_setup(opts.gro_file,
opts.temperature * u.kelvin)
if opts.debug:
simulation._debug_state()
simulation.simulation.step(1)
simulation._debug_state()
simulation.run(run_time)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment