Skip to content

Instantly share code, notes, and snippets.

@sourcesimian
Created June 9, 2016 20:16
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sourcesimian/2131c9ab8f3900984349a9489e592391 to your computer and use it in GitHub Desktop.
Save sourcesimian/2131c9ab8f3900984349a9489e592391 to your computer and use it in GitHub Desktop.
Parser to interpret tabulated text
import re
class TextTable(object):
"""
Parser to interpret tabulated text, e.g:
>>> text = '''-------------------------------------------------------------------
... PRES HGHT TEMP DWPT RELH MIXR DRCT SKNT THTA THTE THTV
... hPa m C C % g/kg deg knot K K K
... -----------------------------------------------------------------------------
... 1006.0 61 29.4 10.4 31 320 9 302.0 325.9 303.5
... 77 29.0 9.7 30 7.56 315 19 301.8 324.6 303.2
... 1000.0 108 28.2 28 6.86 315 19
... 953.0 530 24.0 7.3 34 6.77 320 25 301.3
... 925.0 791 21.6 5.6 35 6.20 305 28 301.4 320.2 302.5
... 896.0 1064 36 5.67 290 34 302.0 319.3 303.0
... 850.0 1516 36 4.86 280 37 302.9 317.9 303.8
... 816.0 1857 13.2 -1.8 35 4.14 270 44 303.5 316.4 304.3
... 807.0 12.5 -2.5 35 3.96 270 45 303.7 316.1 304.4
... 744.0 2629 7.0 -8.0 313.9 305.4
... 674.0 3431 4.2 -22.8 12
... 4582 -4.1 -31.1 10 0.49 280 38 313.9 315.7 314.0'''
>>> t = TextTable(text, heading_lines=2, ignore=('^-+$',), keys=lambda h: "%s (%s)" % (h[0], h[1]))
>>> print(t.keys())
['PRES (hPa)', 'HGHT (m)', 'TEMP (C)', 'DWPT (C)', 'RELH (%)', 'MIXR (g/kg)', 'DRCT (deg)', 'SKNT (knot)', 'THTA (K)', 'THTE (K)']
>>> print(t.headings(0))
['PRES', 'HGHT', 'TEMP', 'DWPT', 'RELH', 'MIXR', 'DRCT', 'SKNT', 'THTA', 'THTE']
>>> print(t.headings(1))
['hPa', 'm', 'C', 'C', '%', 'g/kg', 'deg', 'knot', 'K', 'K']
>>> print(t.row(2))
['1000.0', '108', '28.2', None, '28', '6.86', '315', '19', None, None]
>>> print(t.column('TEMP (C)'))
['29.4', '29.0', '28.2', '24.0', '21.6', None, None, '13.2', '12.5', '7.0', '4.2', '-4.1']
>>> for row in t.rows():
... print(row)
... break
(0, ['1006.0', None, '1000.0', '953.0', '925.0', '896.0', '850.0', '816.0', '807.0', '744.0', '674.0', None])
>>> for col in t.columns():
... print(col)
... break
('PRES (hPa)', ['1006.0', None, '1000.0', '953.0', '925.0', '896.0', '850.0', '816.0', '807.0', '744.0', '674.0', None])
"""
_data = None
def __init__(self, text, heading_lines=None, keys=None, ignore=None):
lines = self._split_lines(text,
ignore or ())
self._setup_column_ranges(lines)
self._parse_table(lines,
heading_lines or 1,
keys or (lambda h: ' '.join(h)))
def keys(self):
return [col['key'] for col in self._data]
def headings(self, i):
return [col['heading'][i] for col in self._data]
def row(self, index):
return [col['values'][index] for col in self._data]
def rows(self):
for i, row in enumerate(zip([col['values'] for col in self._data])):
yield i, row[0]
def column(self, key):
return self._keys[key]['values']
def columns(self):
for col in self._data:
yield col['key'], col['values']
def _parse_table(self, lines, heading_lines, keys):
cols = []
for i in range(len(self._ranges)):
cols.append({'heading': [], 'values': []})
for line in lines[:heading_lines]:
items = self._split_columns(line)
for col, heading in zip(cols, items):
col['heading'].append(heading)
for line in lines[heading_lines:]:
items = self._split_columns(line)
for col, heading in zip(cols, items):
col['values'].append(heading)
def key(col):
k = keys(col['heading'])
col['key'] = k
return k
self._data = cols
self._keys = {key(col): col for col in cols}
def _split_lines(self, text, ignore):
lines = []
skip = [re.compile(i) for i in ignore]
for line in text.strip().splitlines():
if any([s.match(line) for s in skip]):
continue
lines.append(line)
return lines
def _setup_column_ranges(self, lines):
mask = []
for line in lines:
while len(mask) < len(line):
mask.append(False)
for i, ch in enumerate(line):
if ch != ' ':
mask[i] = True
ranges = []
b = None
for i, m in enumerate(mask):
if m:
if b is None:
b = i
else:
if b:
ranges.append((b, i))
b = None
self._ranges = ranges
def _split_columns(self, line):
items = []
for b, e in self._ranges:
value = line[b:e].strip()
if not value:
value = None
items.append(value)
assert len(self._ranges) == len(items)
return items
if __name__ == "__main__":
import doctest
doctest.testmod()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment