Created
December 8, 2015 23:17
-
-
Save markdewing/28223759c2dbe24e1147 to your computer and use it in GitHub Desktop.
Visualization of molecular dynamics using VisPy
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function, division | |
import simflat | |
import initatoms | |
import time | |
import constants | |
import argparse | |
from multiprocessing import Process, Queue | |
from Queue import Empty | |
import collections | |
import numpy as np | |
try: | |
import imageio | |
except ImportError: | |
print("To enable saving screenshots, install `imageio` package") | |
from vispy import scene, app | |
from vispy.visuals.transforms import STTransform | |
# Translation of CoMD to Python | |
def advanceVelocity(atoms, dt): | |
for i in range(atoms.nAtoms): | |
atoms.p[i,0] += dt*atoms.f[i,0] | |
atoms.p[i,1] += dt*atoms.f[i,1] | |
atoms.p[i,2] += dt*atoms.f[i,2] | |
def advancePosition(sim, atoms, dt): | |
for i in range(atoms.nAtoms): | |
iSpecies = atoms.species[i] | |
invMass = 1.0/sim.species[iSpecies].mass | |
atoms.r[i,0] += dt*atoms.p[i,0]*invMass | |
atoms.r[i,1] += dt*atoms.p[i,1]*invMass | |
atoms.r[i,2] += dt*atoms.p[i,2]*invMass | |
def timestep(sim, nSteps, dt): | |
for ii in range(nSteps): | |
advanceVelocity(sim.atoms, 0.5*dt) | |
advancePosition(sim, sim.atoms, dt) | |
# redistributeAtoms | |
sim.ePot = sim.pot.computeForce(sim.atoms) | |
advanceVelocity(sim.atoms, 0.5*dt) | |
#print('ke,pe = ',initatoms.kineticEnergy(sim)/sim.atoms.nAtoms,sim.ePot/sim.atoms.nAtoms) | |
sim.eKinetic = initatoms.kineticEnergy(sim) | |
def initValidate(sim): | |
ke = initatoms.kineticEnergy(sim) | |
pe = sim.ePot | |
print("Initial PE (cohesive energy) : ",pe/sim.atoms.nAtoms) | |
print("Initial energy : ",(ke+pe)/sim.atoms.nAtoms," atom count : ",sim.atoms.nAtoms) | |
firstCall = True | |
iStepPrev = -1 | |
def printInfo(sim, iStep, elapsedTime): | |
global firstCall, iStepPrev | |
if firstCall: | |
firstCall = False | |
#print("# Loop Time(fs) Total Energy Potential Energy Kinetic Energy Temperature Performance (us/atom) # Atoms") | |
print("#{:>100s}".format("Performance")) | |
print("#{:>6s} {:>10s} {:>18s} {:>18s} {:>18s} {:>12s} {:^12s} {:^12s}".\ | |
format("Loop", "Time(fs)", "Total Energy", "Potential Energy", "Kinetic Energy", "Temperature", "(us/atom)", "# Atoms")) | |
nEval = iStep - iStepPrev | |
iStepPrev = iStep | |
simtime = iStep * sim.dt | |
n = sim.atoms.nAtoms | |
eTotal = (sim.ePot + sim.eKinetic)/n | |
eK = sim.eKinetic / n | |
eU = sim.ePot / n | |
Temp = (sim.eKinetic/n)/(constants.kB_eV * 1.5) | |
timePerAtom = 1.0e6 * elapsedTime/(n*nEval) | |
print(" {:6d} {:10.2f} {:18.12f} {:18.12f} {:18.12f} {:12.4f} {:10.4f} {:12d}".format(iStep, simtime, eTotal, eU, eK, Temp, timePerAtom, n)) | |
#print(iStep, simtime, eTotal, eU, eK, Temp, timePerAtom, n) | |
def printPerformanceResults(sim, elapsedTime): | |
n = sim.atoms.nAtoms | |
nEval = sim.nSteps - sim.nSkip * sim.printRate | |
timePerAtom = 1.0e6 * elapsedTime/(n*nEval) | |
print() | |
print("Average all atom update rate: %10.2f us/atom" % timePerAtom) | |
print() | |
def parseCommandLine(): | |
parser = argparse.ArgumentParser() | |
parser.add_argument("-x","--nx", type=int, default=4, help="number of unit cells in x") | |
parser.add_argument("-y","--ny", type=int, default=4, help="number of unit cells in y") | |
parser.add_argument("-z","--nz", type=int, default=4, help="number of unit cells in z") | |
parser.add_argument("-N","--nSteps", type=int, default=100, help="total number of time steps") | |
parser.add_argument("-n","--printRate", type=int, default=10, help="number of steps between output") | |
parser.add_argument("--skip", type=int, default=1, help="number of blocks to skip in averaged output") | |
parser.add_argument("-D","--dt", type=float, default=1, help="time step (in fs)") | |
parser.add_argument("-l","--lat", type=float, default=-1, help="lattice parameter (Angstroms)") | |
parser.add_argument("-T","--temp", type=float, default=600, help="initial temperature (K)") | |
args = parser.parse_args() | |
return args | |
def run_comd(pos_queue): | |
cmd = parseCommandLine() | |
# init subsystems | |
sim = simflat.initSimulation(cmd) | |
print('nAtoms = ',sim.atoms.nAtoms) | |
#for i in range(min(sim.atoms.nAtoms, 10)): | |
#for i in range(sim.atoms.nAtoms): | |
# print i,sim.atoms.r[i,:] | |
pos_queue.put(sim.atoms.r) | |
sim.ePot = sim.pot.computeForce(sim.atoms) | |
sim.eKinetic = initatoms.kineticEnergy(sim) | |
initValidate(sim) | |
timestepTime = 0.0 | |
timestepTimeOneIteration = 0.0 | |
iStep = 0 | |
for jStep in range(0, sim.nSteps, sim.printRate): | |
printInfo(sim, iStep, timestepTimeOneIteration) | |
start = time.clock() | |
timestep(sim, sim.printRate, sim.dt) | |
end = time.clock() | |
timestepTimeOneIteration = end - start | |
if jStep >= sim.nSkip * sim.printRate: | |
timestepTime += timestepTimeOneIteration | |
pos_queue.put(sim.atoms.r) | |
iStep += sim.printRate | |
printInfo(sim, iStep, timestepTimeOneIteration) | |
printPerformanceResults(sim, timestepTime) | |
# validate result | |
# | |
# Start of visualization code | |
# | |
class AtomCanvas(scene.SceneCanvas): | |
def __init__(self): | |
scene.SceneCanvas.__init__(self, keys='interactive', bgcolor='black', | |
size=(800, 600), show=True) | |
self.unfreeze() | |
self.view = self.central_widget.add_view() | |
# Try different cameras | |
#self.view.camera = 'fly' | |
#self.view.camera = 'turntable' | |
#self.view.camera = 'panzoom' | |
#self.view.camera = 'arcball' | |
self.view.camera = scene.cameras.TurntableCamera(fov=0.0, elevation=10.0, azimuth=40.0, scale_factor=30.0) | |
self.axis = scene.visuals.XYZAxis(parent=self.view.scene) | |
# Number of previous positions to keep in the visualization | |
# Set to 1 to see only the current position | |
self.ntrails = 50 | |
self.pos = collections.deque(maxlen=self.ntrails) | |
self.particles = collections.deque(maxlen=self.ntrails) | |
self.freeze() | |
def add_positions(self, pos): | |
if len(self.particles) < self.ntrails: | |
self.particles.appendleft(scene.visuals.Markers()) | |
self.view.add(self.particles[0]) | |
else: | |
self.particles.rotate() | |
self.pos.appendleft(pos) | |
alpha = 0.9 # transparency of the current position | |
for i in range(len(self.particles)): | |
self.particles[i].set_data(self.pos[i], edge_color=None, face_color=(1,1,1,alpha), size=10) | |
# Adjust the decay of transparency for the trails | |
alpha*= 0.95 | |
def screenshot(): | |
global app_obj | |
writer = imageio.get_writer('screenshot.png') | |
im = app_obj.render() | |
writer.append_data(im) | |
writer.close() | |
# Save animation clips. | |
# Using global variables not the best, but okay for a simple script. | |
save_animation = False | |
animation_timer = None | |
animation_writer = None | |
def toggle_saving_animation(): | |
global save_animation, animation_timer, animation_writer | |
if save_animation: | |
save_animation = False | |
animation_timer.stop() | |
animation_writer.close() | |
animation_writer = None | |
else: | |
animation_writer = imageio.get_writer('animation.gif', duration=0.1) | |
animation_timer.start() | |
save_animation = True | |
def save_animation_frame(event): | |
global app_obj, animation_writer | |
if animation_writer: | |
im = app_obj.render() | |
animation_writer.append_data(im) | |
def check_queue(event): | |
global app_obj, pos_queue | |
try: | |
pos = pos_queue.get(False) | |
app_obj.add_positions(pos) | |
except Empty: | |
pass | |
if __name__ == '__main__': | |
global app_obj, pos_queue | |
# Use separate processes for the display loop and the simulation. | |
# Communicate positions from the simulation to the display via a queue | |
pos_queue = Queue() | |
canvas = AtomCanvas() | |
app_obj = canvas | |
@canvas.connect | |
def on_key_press(ev): | |
if ev.key.name == 'S': | |
screenshot() | |
if ev.key.name == 'A': | |
toggle_saving_animation() | |
# Set a timer to check the queue for new positions | |
timer = app.Timer(connect=check_queue) | |
timer.start() | |
# For saving animation clips | |
animation_timer = app.Timer(0.1,connect=save_animation_frame) | |
# Start simulation | |
p = Process(target=run_comd, args=(pos_queue,)) | |
p.start() | |
# Start GUI event loop | |
canvas.app.run() | |
p.join() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment