Skip to content

Instantly share code, notes, and snippets.

@chatcannon
Last active December 30, 2015 07:19
Show Gist options
  • Save chatcannon/7795048 to your computer and use it in GitHub Desktop.
Save chatcannon/7795048 to your computer and use it in GitHub Desktop.
Parser for the SPEC file format used for x-ray scattering data
"""SPEC.py - code for reading files in the SPEC data format
Copyright (c) Christopher Kerr 2013
You may use and redistribute this file under the terms of the
GNU General Public Licence (GPL), version 3 or, at your option,
any later version."""
__all__ = ['SPECfile']
import re
import itertools
import time
import sys
from warnings import warn
import numpy as np
def takewhile_dropwhile(pred, seq):
"""Like takewhile and dropwhile from itertools but returns both sequences
Reads all the takewhile elements into a list but leaves dropwhile as an iterator"""
takewhile = []
for elem in seq:
if pred(elem):
takewhile.append(elem)
else:
# use itertools.chain to fake pushing the element back on the front of seq
return takewhile, itertools.chain([elem], seq)
# no elements fail the predicate
return takewhile, []
not_s_line = lambda x: not x.startswith("#S ")
def is_empty_list(x):
if isinstance(x, list):
return not bool(x)
else:
return False
def filter_comments(lines):
filtered = []
comments = []
Ublocks = dict()
in_U_block = False
current_U_name = None
for line in lines:
if in_U_block:
current_U_match = re.match('#U%s(\s+.*)$' % re.escape(current_U_name), line)
if current_U_match:
Ublocks[current_U_name].append(current_U_match.group(1))
continue
else:
in_U_block = False
current_U_name = None
current_U_prefix = None
## No 'else' here because we want to catch one U block immediately following another
U_match = re.match("#U(\S+)(\s+.*)$", line)
if U_match:
in_U_block = True
current_U_name = U_match.group(1)
if current_U_name in Ublocks:
if current_U_name == 'O' and U_match.group(2).strip() == '*** END OF DATA ***':
continue # Don't throw an error when seeing "END OF DATA"
else:
raise ValueError("""Duplicate #U block: %s
Current line: '%s'
Previous block: '''%s'''""" % (current_U_name, line.strip(),
'\n'.join(Ublocks[current_U_name])))
Ublocks[current_U_name] = [U_match.group(2)]
else:
if line.startswith('#C'):
comments.append(line[2:].strip())
else:
filtered.append(line)
## Remove shared indentation from blocks
for name, block in Ublocks.items():
min_indent = min(map(lambda x: len(x) - len(x.lstrip()), block))
Ublocks[name] = '\n'.join(map(lambda x: x[min_indent:].rstrip(), block))
return filtered, comments, Ublocks
def spec_strptime(datestring):
# Normalise spaces
datestring = re.sub('\s+', ' ', datestring.strip())
if sys.version_info.major >= 3:
# time.strptime is useless with timezones
datestring = re.sub(r'\bCET\b', '+0100', datestring)
datestring = re.sub(r'\bCEST\b', '+0200', datestring)
else:
# time.strptime can't handle %z
datestring = re.sub(r"\s+[A-Z]+\b", '', datestring)
try:
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y")
except ValueError:
try:
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y %z")
except ValueError:
try:
return time.strptime(datestring, "%a %b %d %H:%M:%S %z %Y")
except ValueError:
warn("Ignoring timezone from date '%s'" % datestring)
return time.strptime(datestring, "%a %b %d %H:%M:%S %Y")
class SPECblock:
"""A data block from a file in the SPEC format"""
def __init__(self, s_line, s_block_body):
s_match = re.match("#S +(\d+) +(.+)$", s_line)
self.scan_num = int(s_match.group(1))
self.name = s_match.group(2).strip()
filtered, self.comments, self.Ublocks = filter_comments(s_block_body)
n_match = re.match("#N +(\d+)$", filtered[0])
N = int(n_match.group(1))
l_match = re.match("#L (.+)$", filtered[1])
self.headers = re.split(' +', l_match.group(1).strip())
if len(self.headers) != N:
raise ValueError("""Number of #L headers does not equal number from #N
Number expected: %d
Header: '%s'""" % (N, self.headers))
self.data = np.loadtxt(filtered[2:], ndmin=2)
if self.data.shape[1] != N:
raise ValueError("""Number of data columns does not equal number from #N
Number expected: %d
Data shape: %s""" % (N, tuple(self.data.shape)))
def __getitem__(self, key):
if isinstance(key, str):
if key in self.headers:
return self.data[:, self.headers.index(key)]
else:
raise KeyError("%s is not in the headers" % key)
else:
return self.data[key]
class SPECfile:
"""A file in the SPEC format"""
def __init__(self, file_or_path):
if isinstance(file_or_path, str):
file_or_path = open(file_or_path, 'r')
fblock, rest = takewhile_dropwhile(not_s_line, file_or_path)
namematch = re.match("#F +(.+)$", fblock[0])
self.name = namematch.group(1).strip()
hdrs, self.comments, self.Ublocks = filter_comments(fblock[1:])
self.date = None
for hdr in hdrs:
if hdr.startswith('#D '):
self.date = spec_strptime(hdr[3:])
elif hdr.strip() in ('', '#'):
pass # Ignore blank lines
else:
warn("Unrecognised header: '%s'" % hdr)
self.blocks = []
self.blocks_by_name = dict()
self.blocks_by_number = dict()
while not is_empty_list(rest):
s_line = next(rest)
sblock_body, rest = takewhile_dropwhile(not_s_line, rest)
sblock = SPECblock(s_line, sblock_body)
self.blocks.append(sblock)
if sblock.name in self.blocks_by_name:
raise ValueError("Duplicate S block name: %s" % sblock.name)
else:
self.blocks_by_name[sblock.name] = sblock
if sblock.scan_num in self.blocks_by_number:
self.blocks_by_number[sblock.scan_num].append(sblock)
else:
self.blocks_by_number[sblock.scan_num] = [sblock]
def __getitem__(self, key):
if isinstance(key, str):
return self.blocks_by_name[key]
elif isinstance(key, int):
return self.blocks_by_number[key]
else:
raise KeyError("Key must be either an int or a string")
def __iter__(self):
return iter(self.blocks)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment