Skip to content

Instantly share code, notes, and snippets.

@rmcgibbo
Last active August 29, 2015 14:06
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rmcgibbo/32ca6845c5d415b8e784 to your computer and use it in GitHub Desktop.
Save rmcgibbo/32ca6845c5d415b8e784 to your computer and use it in GitHub Desktop.
from __future__ import print_function
import os
import sys
import json
import argparse
import traceback
from fnmatch import fnmatch
import mdtraj as md
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0)
try:
from scandir import walk as _walk
except ImportError:
from os import walk as _walk
import warnings
warnings.warn('Get `scandir` for faster performance: https://github.com/benhoyt/scandir')
__doc__ = '''Convert an MD dataset into a more standard format
This script will walk down the filesystem, starting in ``root``, looking
for directories which contain one or more files matching ``patern`` using
shell-style "glob" formatting. In each of these directories, the matching
files will be sorted by filename (natural order), interpreted as chunks
of a single contiguous MD trajectory, and loaded.
[This script assumes that trajectory files in the same leaf directory
are chunks of a contiguous MD trajectory. If that's not the case for your
dataset, this is the WRONG script for you.]
The concatenated trajectory will be saved to disk inside the ``outdir``
directory, under a filename set by the ``outfmt`` format string.
A record of conversion will be saved inside the ``metadata`` JSON file,
which contains a mapping from output trajectory name to a list of the
filenames of the chunks it was converted from.
'''
def keynat(string):
'''A natural sort helper function for sort() and sorted()
without using regular expression.
>>> items = ('Z', 'a', '10', '1', '9')
>>> sorted(items)
['1', '10', '9', 'Z', 'a']
>>> sorted(items, key=keynat)
['1', '9', '10', 'Z', 'a']
'''
r = []
for c in string:
try:
c = int(c)
try:
r[-1] = r[-1] * 10 + c
except:
r.append(c)
except:
r.append(c)
return r
def walk_project(root, pattern):
for dirpath, dirnames, filenames in _walk(root):
filenames = sorted([os.path.join(dirpath, fn) for fn in filenames if fnmatch(fn, pattern)], key=keynat)
if len(filenames) > 0:
yield filenames
def load_chunks(chunk_fns, top, discard_first=True):
trajectories = []
for fn in chunk_fns:
t = md.load(fn, top=top)
if discard_first:
t = t[1:]
trajectories.append(t)
return trajectories[0].join(trajectories[1:])
def parse_args():
parser = argparse.ArgumentParser(description=__doc__,
formatter_class=argparse.RawTextHelpFormatter)
parser.add_argument('root', help='Root of the directory structure containing '
'the MD trajectories to convert (filesystem path)')
parser.add_argument('outdir', help='Output directory in which to save '
'converted trajectories. default="trajectories/"',
default='trajectories')
parser.add_argument('--outfmt', help='Output format. default="traj-%%08d.dcd"',
default='traj-%08d.dcd')
parser.add_argument('--pattern', help='Glob pattern for trajectory chunks (example: \'frame*.xtc\')',
required=True)
parser.add_argument('--metadata', help='Path to metadata file. default="trajectories.json"',
required=True, default='trajectories.json')
parser.add_argument('--discard-first', help='Flag to discard the initial frame '
'in each chunk before concatenating trajectories',
action='store_true')
parser.add_argument('--topology', help='Path to system topology file (e.g. pdb/prmtop/psf)',
required=True)
parser.add_argument('--dry-run', help='Trace the execution, without '
'actually running any actions', action='store_true')
parser.parse_args()
args = parser.parse_args()
if not os.path.exists(args.outdir):
os.makedirs(args.outdir)
return argparse.Namespace(
root=args.root,
outdir=args.outdir,
outfmt=args.outfmt,
pattern=args.pattern,
metadata=args.metadata,
discard_first=args.discard_first,
top=md.core.trajectory._parse_topology(args.topology),
dry_run=args.dry_run)
def main():
args = parse_args()
if os.path.exists(args.metadata):
with open(args.metadata) as f:
metadata = json.load(f)
else:
metadata = {}
for chunk_fns in walk_project(args.root, args.pattern):
if chunk_fns in metadata.values():
print('Skipping %s. Already processed' % os.path.dirname(chunk_fns[0]))
continue
print('Loading %s: %d files...' % (os.path.dirname(chunk_fns[0]), len(chunk_fns)))
try:
traj = load_chunks(chunk_fns, args.top, discard_first=args.discard_first)
except ValueError:
print('======= Error loading chunks! Skipping ==========', file=sys.stderr)
print('-' * 60)
traceback.print_exc(file=sys.stderr)
print('-' * 60)
continue
out_filename = args.outfmt % len(metadata)
assert out_filename not in metadata
print('Saving... ', end=' ')
if not args.dry_run:
traj.save(os.path.join(args.outdir, out_filename))
print('%s: [%s]' % (out_filename, ', '.join(os.path.basename(e) for e in chunk_fns)))
metadata[out_filename] = chunk_fns
# sync it back to disk
if not args.dry_run:
with open(args.metadata, 'w') as f:
json.dump(metadata, f)
if __name__ == '__main__':
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment