Last active
August 29, 2015 14:06
-
-
Save rmcgibbo/32ca6845c5d415b8e784 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import print_function | |
import os | |
import sys | |
import json | |
import argparse | |
import traceback | |
from fnmatch import fnmatch | |
import mdtraj as md | |
sys.stdout = os.fdopen(sys.stdout.fileno(), 'w', 0) | |
try: | |
from scandir import walk as _walk | |
except ImportError: | |
from os import walk as _walk | |
import warnings | |
warnings.warn('Get `scandir` for faster performance: https://github.com/benhoyt/scandir') | |
__doc__ = '''Convert an MD dataset into a more standard format | |
This script will walk down the filesystem, starting in ``root``, looking | |
for directories which contain one or more files matching ``patern`` using | |
shell-style "glob" formatting. In each of these directories, the matching | |
files will be sorted by filename (natural order), interpreted as chunks | |
of a single contiguous MD trajectory, and loaded. | |
[This script assumes that trajectory files in the same leaf directory | |
are chunks of a contiguous MD trajectory. If that's not the case for your | |
dataset, this is the WRONG script for you.] | |
The concatenated trajectory will be saved to disk inside the ``outdir`` | |
directory, under a filename set by the ``outfmt`` format string. | |
A record of conversion will be saved inside the ``metadata`` JSON file, | |
which contains a mapping from output trajectory name to a list of the | |
filenames of the chunks it was converted from. | |
''' | |
def keynat(string): | |
'''A natural sort helper function for sort() and sorted() | |
without using regular expression. | |
>>> items = ('Z', 'a', '10', '1', '9') | |
>>> sorted(items) | |
['1', '10', '9', 'Z', 'a'] | |
>>> sorted(items, key=keynat) | |
['1', '9', '10', 'Z', 'a'] | |
''' | |
r = [] | |
for c in string: | |
try: | |
c = int(c) | |
try: | |
r[-1] = r[-1] * 10 + c | |
except: | |
r.append(c) | |
except: | |
r.append(c) | |
return r | |
def walk_project(root, pattern): | |
for dirpath, dirnames, filenames in _walk(root): | |
filenames = sorted([os.path.join(dirpath, fn) for fn in filenames if fnmatch(fn, pattern)], key=keynat) | |
if len(filenames) > 0: | |
yield filenames | |
def load_chunks(chunk_fns, top, discard_first=True): | |
trajectories = [] | |
for fn in chunk_fns: | |
t = md.load(fn, top=top) | |
if discard_first: | |
t = t[1:] | |
trajectories.append(t) | |
return trajectories[0].join(trajectories[1:]) | |
def parse_args(): | |
parser = argparse.ArgumentParser(description=__doc__, | |
formatter_class=argparse.RawTextHelpFormatter) | |
parser.add_argument('root', help='Root of the directory structure containing ' | |
'the MD trajectories to convert (filesystem path)') | |
parser.add_argument('outdir', help='Output directory in which to save ' | |
'converted trajectories. default="trajectories/"', | |
default='trajectories') | |
parser.add_argument('--outfmt', help='Output format. default="traj-%%08d.dcd"', | |
default='traj-%08d.dcd') | |
parser.add_argument('--pattern', help='Glob pattern for trajectory chunks (example: \'frame*.xtc\')', | |
required=True) | |
parser.add_argument('--metadata', help='Path to metadata file. default="trajectories.json"', | |
required=True, default='trajectories.json') | |
parser.add_argument('--discard-first', help='Flag to discard the initial frame ' | |
'in each chunk before concatenating trajectories', | |
action='store_true') | |
parser.add_argument('--topology', help='Path to system topology file (e.g. pdb/prmtop/psf)', | |
required=True) | |
parser.add_argument('--dry-run', help='Trace the execution, without ' | |
'actually running any actions', action='store_true') | |
parser.parse_args() | |
args = parser.parse_args() | |
if not os.path.exists(args.outdir): | |
os.makedirs(args.outdir) | |
return argparse.Namespace( | |
root=args.root, | |
outdir=args.outdir, | |
outfmt=args.outfmt, | |
pattern=args.pattern, | |
metadata=args.metadata, | |
discard_first=args.discard_first, | |
top=md.core.trajectory._parse_topology(args.topology), | |
dry_run=args.dry_run) | |
def main(): | |
args = parse_args() | |
if os.path.exists(args.metadata): | |
with open(args.metadata) as f: | |
metadata = json.load(f) | |
else: | |
metadata = {} | |
for chunk_fns in walk_project(args.root, args.pattern): | |
if chunk_fns in metadata.values(): | |
print('Skipping %s. Already processed' % os.path.dirname(chunk_fns[0])) | |
continue | |
print('Loading %s: %d files...' % (os.path.dirname(chunk_fns[0]), len(chunk_fns))) | |
try: | |
traj = load_chunks(chunk_fns, args.top, discard_first=args.discard_first) | |
except ValueError: | |
print('======= Error loading chunks! Skipping ==========', file=sys.stderr) | |
print('-' * 60) | |
traceback.print_exc(file=sys.stderr) | |
print('-' * 60) | |
continue | |
out_filename = args.outfmt % len(metadata) | |
assert out_filename not in metadata | |
print('Saving... ', end=' ') | |
if not args.dry_run: | |
traj.save(os.path.join(args.outdir, out_filename)) | |
print('%s: [%s]' % (out_filename, ', '.join(os.path.basename(e) for e in chunk_fns))) | |
metadata[out_filename] = chunk_fns | |
# sync it back to disk | |
if not args.dry_run: | |
with open(args.metadata, 'w') as f: | |
json.dump(metadata, f) | |
if __name__ == '__main__': | |
main() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment