Created
June 21, 2023 23:50
-
-
Save dangunter/46649864e5f438efff3b1ae9da16e2fd to your computer and use it in GitHub Desktop.
CSV-ish
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Csvish is a line-oriented format that combines JSON metadata and CSV data. | |
Format rules: | |
- Pure whitespace lines are ignored | |
- Lines starting with '##' are comments and ignored | |
- Lines starting with '%%' start a new section. The whitespace-stripped characters | |
after the '%%' are the section name. A dot (".") in the section name creates | |
a sub-section, e.g. "animal.dog.metadata". The section data type is inferred. | |
- Lines after '%%' starting with '{' are parsed as JSON until the start of the | |
next section (or EOF). Comment and blank lines are not included in the JSON. | |
- Lines after '%%' not starting with '{' are parsed as CSV until the start of the | |
next section (or EOF). Comment and blank lines are not included in the CSV. | |
""" | |
from collections import namedtuple | |
import csv | |
import json | |
from os import linesep | |
from pathlib import Path | |
from typing import Union, TextIO, Iterable, Dict | |
Section = namedtuple("Section", ["type", "value"]) | |
class States: | |
start = "start" | |
section = "section" | |
csv = "csv" | |
json = "json" | |
class FormatError(ValueError): | |
def __init__(self, n, msg): | |
super().__init__(f"Line {n}: {msg}") | |
class Csvish: | |
"""Read and parse Csvish data. | |
""" | |
def __init__(self, input: Union[TextIO, Path, Iterable, str]): | |
"""Constructor. | |
Args: | |
input: Input data in Csvish format | |
Attributes: | |
sections (dict): Tree of sections, with string names in the nodes | |
and :class:`Section` objects at the leaves | |
Raises: | |
FormatError: for a format error in the input | |
""" | |
self.sections = {} | |
if isinstance(input, TextIO): | |
parse_input = input | |
elif isinstance(input, Path): | |
parse_input = input.open("r", encoding="utf-8") | |
elif isinstance(input, str): | |
parse_input = (s.strip() for s in input.split("\n")) | |
elif isinstance(input, Iterable): | |
parse_input = input | |
else: | |
parse_input = input | |
self._parse(parse_input) | |
def _parse(self, input: Iterable): | |
s, e = States, FormatError | |
state = s.start | |
sections = {} | |
current_section = None | |
line_buffer = [] | |
def is_whitespace(s): | |
return s.strip() == "" | |
# main loop | |
n = 0 | |
for line in input: | |
n += 1 | |
if state == s.start: | |
if line.startswith("##"): | |
continue | |
if line.startswith("%%"): | |
current_section, state = self._new_section(line), s.section | |
elif is_whitespace(line): | |
continue | |
else: | |
raise e(n, f"Expected comment or section, got:\n{line}") | |
elif state == s.section: | |
if line.startswith("##"): | |
continue | |
if line.startswith("%%"): | |
raise e(n, f"Empty section '{current_section}'") | |
elif is_whitespace(line): | |
raise e(n, f"First line in section '{current_section}' was empty") | |
elif line.startswith("{"): | |
state, line_buffer = s.json, [line] | |
else: | |
state, line_buffer = s.csv, [line] | |
elif state == s.json: | |
if line.startswith("##"): | |
continue | |
if line.startswith("%%"): | |
self._save_json(line_buffer, current_section, sections, lineno=n) | |
current_section, state = self._new_section(line), s.section | |
else: | |
line_buffer.append(line.rstrip()) | |
elif state == s.csv: | |
if line.startswith("%%"): | |
self._save_csv(line_buffer, current_section, sections, lineno=n) | |
current_section, state = self._new_section(line), s.section | |
elif line.startswith("##") or is_whitespace(line): | |
continue | |
else: | |
line_buffer.append(line.rstrip()) | |
# EOF | |
if state == s.start: | |
raise e(n, "Empty file") | |
elif state == s.section: | |
raise e(n, f"File ends with empty section '{current_section}'") | |
elif state == s.csv: | |
self._save_csv(line_buffer, current_section, sections, lineno=n) | |
elif state == s.json: | |
self._save_json(line_buffer, current_section, sections, lineno=n) | |
self.sections = sections | |
@staticmethod | |
def _new_section(line): | |
name = line[2:].strip() | |
return tuple(name.split(".")) | |
@classmethod | |
def _save_json(cls, line_buffer, current_section, sections, lineno=0): | |
try: | |
data = linesep.join(line_buffer) | |
cls._set_section( | |
sections, current_section, Section(States.json, json.loads(data)), lineno | |
) | |
except json.JSONDecodeError as err: | |
raise FormatError(lineno, f"Bad JSON for section '{current_section}': {err}") | |
@classmethod | |
def _save_csv(cls, line_buffer, current_section, sections, lineno=0): | |
try: | |
reader = csv.reader(line_buffer) | |
rows = [line for line in reader] | |
except csv.Error as err: | |
raise FormatError(lineno, f"Bad CSV for section '{current_section}': {err}") | |
cls._set_section(sections, current_section, Section(States.csv, rows), lineno) | |
@staticmethod | |
def _set_section(sections, parts, value, lineno): | |
cur = sections | |
for p in parts[:-1]: | |
if p not in cur: | |
cur[p] = {} | |
cur = cur[p] | |
leaf = parts[-1] | |
if not isinstance(cur, dict): | |
raise FormatError(lineno, f"Unexpected subsection '{leaf}'") | |
cur[leaf] = value | |
def example_usage(): | |
raw = """ | |
## Example file | |
%% poem.meta | |
{ | |
"metadata": 123 | |
} | |
%% poem.data | |
name,color | |
roses,red | |
violets,blue | |
## More data below | |
%% Section 3 | |
{ | |
"metadata": 345 | |
} | |
""" | |
import pprint | |
import tempfile | |
with tempfile.TemporaryFile("w+", encoding="utf-8") as f: | |
for line in raw.split("\n"): | |
f.write(line.lstrip() + "\n") | |
f.seek(0) | |
x = Csvish(f) | |
print("-- file --") | |
pprint.pprint(x.sections) | |
print("-- string --") | |
y = Csvish(raw) | |
pprint.pprint(y.sections) | |
# Example class building on the interface | |
class Albums(Csvish): | |
Album = namedtuple("Album", "name info tracks") | |
def __init__(self, input_data): | |
super().__init__(input_data) | |
self.albums = [self.Album(name=s, info=self.sections[s]["info"].value, | |
tracks=self.sections[s]["tracks"].value) | |
for s in self.sections] | |
raw = """ | |
%% revolver.info | |
{ | |
"title": "Revolver", | |
"artist": "The Beatles", | |
"year": 1966 | |
} | |
%% revolver.tracks | |
track,length,title | |
1,"2:36","Taxman" | |
2,"2:11","Eleanor Rigby" | |
3,"3:07","Love You To" | |
%% let-it-bleed.info | |
{ | |
"title": "Let It Bleed", | |
"artist": "The Rolling Stones", | |
"year": 1969 | |
} | |
%% let-it-bleed.tracks | |
track,length,title | |
1,"4:31","Gimme Shelter" | |
2,"4:19","Love in Vain" | |
3,"3:09","Country Honk" | |
""" | |
albums = Albums(raw) | |
print("-- albums --") | |
for album in albums.albums: | |
print(f"Title: {album.info['title']}") | |
print(f"Year: {album.info['year']}") | |
for track in album.tracks: | |
print(f"{track[0]} - {track[2]} ({track[1]})") | |
if __name__ == "__main__": | |
example_usage() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment