Skip to content

Instantly share code, notes, and snippets.

@JoaoRodrigues
Last active October 24, 2016 23:42
Show Gist options
  • Save JoaoRodrigues/2a6773d962d6fba4530a940a07c8f51f to your computer and use it in GitHub Desktop.
Save JoaoRodrigues/2a6773d962d6fba4530a940a07c8f51f to your computer and use it in GitHub Desktop.
Smooths a trajectory using mdtraj and an weighted adaptive window algorithm.
#!/usr/bin/env python
"""
Smooths a trajectory using mdtraj and an weighted adaptive window algorithm.
"""
from __future__ import print_function, division
import argparse
import os
import sys
import numpy as np
import mdtraj as md
# Functions
def adaptive_smoothing(trajectory, window=5):
"""
Smooths a trajectory using an adaptive window that narrows at the edges
of the trajectory and weights the frames depending on their distance to
the central frame.
"""
n_frames = trajectory.n_frames
msg = 'Running adaptive smoothing on {} frames (window={})'
print(msg.format(n_frames, window))
for frame_no, frame_xyz in enumerate(trajectory.xyz):
# Smooth an equal number of frames in each direction
if frame_no < cmd.window:
# Adapt window size
# Keep symmetry
# [0], [0,1,0], [0,0,1,0,0], ...
beg_frame = 0
end_frame = (frame_no*2) + 1 # 0: [0,1[, 1: [0, 1, 2, 3[, ...
half_window_size = frame_no + 1
elif n_frames - frame_no <= window:
# Not enough remaining frames to average normally
# Reduce averaging window symmetrically
end_frame = n_frames
beg_frame = frame_no - (n_frames - frame_no) + 1
half_window_size = n_frames - frame_no
else:
beg_frame = frame_no - window # 15: 10
end_frame = frame_no + window + 1 # 15: 21
half_window_size = window + 1
window_frames = trajectory.xyz[beg_frame: end_frame]
# Define weights based on distance to central frame
_w = [x+1 for x in range(half_window_size)] # avoid zero weights
_raw_w = _w + _w[::-1][1:]
try:
weights = map(lambda x: x/max(_raw_w), _raw_w)
except ZeroDivisionError: # First/Last frame
weights = [1]
# Average coordinates
average_xyz = np.average(window_frames, weights=weights, axis=0)
# Update trajectory
trajectory.xyz[frame_no] = average_xyz
return trajectory
def halt(message):
"""Writes a message to STDERR and exists the program"""
print(message, file=sys.stderr)
sys.exit(1)
# Main
if __name__ == '__main__':
_filters = {'adaptive': adaptive_smoothing}
ap = argparse.ArgumentParser(description=__doc__)
ap_io = ap.add_argument_group('I/O Options')
ap_io.add_argument('trajectory', type=str, help='Input Trajectory')
ap_io.add_argument('topology', type=str, help='Input Topology')
ap_io.add_argument('--selection', type=str,
help='Filter atoms in input trajectory (e.g. backbone)')
ap_io.add_argument('--reference', type=str,
help='Reference PDB structure to fit trajectory to')
ap_io.add_argument('--read_from', type=int,
help='Frame number to start reading trajectory data')
ap_io.add_argument('--read_until', type=int,
help='Frame number to stop reading trajectory data')
ap_io.add_argument('-o', '--output', type=str, help='Output Trajectory')
ap_io.add_argument('--downsample', type=int,
help='Downsample output trajectory (1 every N frames)')
ap_smooth = ap.add_argument_group('Smoothing Options')
ap_smooth.add_argument('-w', '--window', type=int, default=5,
help='Number of frames to average')
ap_smooth.add_argument('-s', '--smooth_function',
help='Type of smoothing function',
choices=_filters.keys(), default='adaptive')
ap_smooth.add_argument('-p', '--pad', action='store_true',
help='Pad ends of trajectory with start/end frames')
cmd = ap.parse_args()
# Check input files exist
if not os.path.isfile(cmd.trajectory):
msg = 'Trajectory input file not found: {}'.format(cmd.trajectory)
halt(msg)
if not os.path.isfile(cmd.topology):
msg = 'Topology input file not found: {}'.format(cmd.topology)
halt(msg)
# Read trajectory
print('== Started ==')
trj = md.load(cmd.trajectory, top=cmd.topology)
if cmd.read_from:
trj = trj[cmd.read_from-1:]
if cmd.read_until:
trj = trj[:cmd.read_until]
print('Loaded {0.n_atoms} atoms / {0.n_frames} frames'.format(trj))
# Strip by selection
if cmd.selection:
atomsel = trj.top.select(cmd.selection)
trj.atom_slice(atomsel, inplace=True)
if cmd.pad:
print('Padding trajectory with {} frames'.format(cmd.window))
for _ in range(cmd.window):
trj = trj[0] + trj + trj[-1]
# Align trajectory to reference or first frame
if cmd.reference:
print('Aligning trajectory to {}'.format(cmd.reference))
refe = md.load_pdb(cmd.reference)
else:
print('Aligning trajectory to first frame')
refe = trj
atomsel = trj.top.select('protein and name CA and not (resname ACE or resname NME)')
trj.superpose(refe, 0, atom_indices=atomsel, ref_atom_indices=atomsel)
# Smooth frames
smooth_function = _filters[cmd.smooth_function]
trj = smooth_function(trj, window=cmd.window)
# Downsample
if cmd.downsample:
print('Downsampling (1 every {} frames)'.format(cmd.downsample))
trj = trj[::cmd.downsample]
# Output smoothed trajectory
if not cmd.output:
cmd.output = 'smooth.pdb'
print('Saving smoothed trajectory to: {}'.format(cmd.output))
trj.save(cmd.output)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment