Skip to content

Instantly share code, notes, and snippets.

@dangunter
Created June 21, 2023 23:50
Show Gist options
  • Save dangunter/46649864e5f438efff3b1ae9da16e2fd to your computer and use it in GitHub Desktop.
Save dangunter/46649864e5f438efff3b1ae9da16e2fd to your computer and use it in GitHub Desktop.
CSV-ish
"""
Csvish is a line-oriented format that combines JSON metadata and CSV data.
Format rules:
- Pure whitespace lines are ignored
- Lines starting with '##' are comments and ignored
- Lines starting with '%%' start a new section. The whitespace-stripped characters
after the '%%' are the section name. A dot (".") in the section name creates
a sub-section, e.g. "animal.dog.metadata". The section data type is inferred.
- Lines after '%%' starting with '{' are parsed as JSON until the start of the
next section (or EOF). Comment and blank lines are not included in the JSON.
- Lines after '%%' not starting with '{' are parsed as CSV until the start of the
next section (or EOF). Comment and blank lines are not included in the CSV.
"""
from collections import namedtuple
import csv
import json
from os import linesep
from pathlib import Path
from typing import Union, TextIO, Iterable, Dict
Section = namedtuple("Section", ["type", "value"])
class States:
start = "start"
section = "section"
csv = "csv"
json = "json"
class FormatError(ValueError):
def __init__(self, n, msg):
super().__init__(f"Line {n}: {msg}")
class Csvish:
"""Read and parse Csvish data.
"""
def __init__(self, input: Union[TextIO, Path, Iterable, str]):
"""Constructor.
Args:
input: Input data in Csvish format
Attributes:
sections (dict): Tree of sections, with string names in the nodes
and :class:`Section` objects at the leaves
Raises:
FormatError: for a format error in the input
"""
self.sections = {}
if isinstance(input, TextIO):
parse_input = input
elif isinstance(input, Path):
parse_input = input.open("r", encoding="utf-8")
elif isinstance(input, str):
parse_input = (s.strip() for s in input.split("\n"))
elif isinstance(input, Iterable):
parse_input = input
else:
parse_input = input
self._parse(parse_input)
def _parse(self, input: Iterable):
s, e = States, FormatError
state = s.start
sections = {}
current_section = None
line_buffer = []
def is_whitespace(s):
return s.strip() == ""
# main loop
n = 0
for line in input:
n += 1
if state == s.start:
if line.startswith("##"):
continue
if line.startswith("%%"):
current_section, state = self._new_section(line), s.section
elif is_whitespace(line):
continue
else:
raise e(n, f"Expected comment or section, got:\n{line}")
elif state == s.section:
if line.startswith("##"):
continue
if line.startswith("%%"):
raise e(n, f"Empty section '{current_section}'")
elif is_whitespace(line):
raise e(n, f"First line in section '{current_section}' was empty")
elif line.startswith("{"):
state, line_buffer = s.json, [line]
else:
state, line_buffer = s.csv, [line]
elif state == s.json:
if line.startswith("##"):
continue
if line.startswith("%%"):
self._save_json(line_buffer, current_section, sections, lineno=n)
current_section, state = self._new_section(line), s.section
else:
line_buffer.append(line.rstrip())
elif state == s.csv:
if line.startswith("%%"):
self._save_csv(line_buffer, current_section, sections, lineno=n)
current_section, state = self._new_section(line), s.section
elif line.startswith("##") or is_whitespace(line):
continue
else:
line_buffer.append(line.rstrip())
# EOF
if state == s.start:
raise e(n, "Empty file")
elif state == s.section:
raise e(n, f"File ends with empty section '{current_section}'")
elif state == s.csv:
self._save_csv(line_buffer, current_section, sections, lineno=n)
elif state == s.json:
self._save_json(line_buffer, current_section, sections, lineno=n)
self.sections = sections
@staticmethod
def _new_section(line):
name = line[2:].strip()
return tuple(name.split("."))
@classmethod
def _save_json(cls, line_buffer, current_section, sections, lineno=0):
try:
data = linesep.join(line_buffer)
cls._set_section(
sections, current_section, Section(States.json, json.loads(data)), lineno
)
except json.JSONDecodeError as err:
raise FormatError(lineno, f"Bad JSON for section '{current_section}': {err}")
@classmethod
def _save_csv(cls, line_buffer, current_section, sections, lineno=0):
try:
reader = csv.reader(line_buffer)
rows = [line for line in reader]
except csv.Error as err:
raise FormatError(lineno, f"Bad CSV for section '{current_section}': {err}")
cls._set_section(sections, current_section, Section(States.csv, rows), lineno)
@staticmethod
def _set_section(sections, parts, value, lineno):
cur = sections
for p in parts[:-1]:
if p not in cur:
cur[p] = {}
cur = cur[p]
leaf = parts[-1]
if not isinstance(cur, dict):
raise FormatError(lineno, f"Unexpected subsection '{leaf}'")
cur[leaf] = value
def example_usage():
raw = """
## Example file
%% poem.meta
{
"metadata": 123
}
%% poem.data
name,color
roses,red
violets,blue
## More data below
%% Section 3
{
"metadata": 345
}
"""
import pprint
import tempfile
with tempfile.TemporaryFile("w+", encoding="utf-8") as f:
for line in raw.split("\n"):
f.write(line.lstrip() + "\n")
f.seek(0)
x = Csvish(f)
print("-- file --")
pprint.pprint(x.sections)
print("-- string --")
y = Csvish(raw)
pprint.pprint(y.sections)
# Example class building on the interface
class Albums(Csvish):
Album = namedtuple("Album", "name info tracks")
def __init__(self, input_data):
super().__init__(input_data)
self.albums = [self.Album(name=s, info=self.sections[s]["info"].value,
tracks=self.sections[s]["tracks"].value)
for s in self.sections]
raw = """
%% revolver.info
{
"title": "Revolver",
"artist": "The Beatles",
"year": 1966
}
%% revolver.tracks
track,length,title
1,"2:36","Taxman"
2,"2:11","Eleanor Rigby"
3,"3:07","Love You To"
%% let-it-bleed.info
{
"title": "Let It Bleed",
"artist": "The Rolling Stones",
"year": 1969
}
%% let-it-bleed.tracks
track,length,title
1,"4:31","Gimme Shelter"
2,"4:19","Love in Vain"
3,"3:09","Country Honk"
"""
albums = Albums(raw)
print("-- albums --")
for album in albums.albums:
print(f"Title: {album.info['title']}")
print(f"Year: {album.info['year']}")
for track in album.tracks:
print(f"{track[0]} - {track[2]} ({track[1]})")
if __name__ == "__main__":
example_usage()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment