Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Plaintext table parser
#! /usr/local/bin/python
from pyparsing import *
import StringIO
from decimal import Decimal
import string
import logging
logger = logging.getLogger(__name__)
testme = """
# comment
# ----------------------------------------
[people]
#h id , name , size , active , type
#t integer, word , decimal, boolean, word
#d 0 , bob , 0.00 , f , NA
# ----------------------------------------
1 , frank , 2.63 , t , none
2 , dave , 1 , FALSE , tree
3 , jon , 4 , 1 , list
4 , mike , 1.85 , 0 , tree
5 , tim , , trUE , goat
# ----------------------------------------
"""
# a comma seperated files with seperate sections
# comments are the '#' character
# sections have named headers as well as data types
# and defult values.
class ToBoolean(TokenConverter):
""" Converter to make token boolean """
def postParse(self, instring, loc, tokenlist):
""" Converts the first token to boolean """
tok = string.lower(tokenlist[0])
if tok in ["t", "true", "1"]:
return True
elif tok in ["f", "false", "0"]:
return False
else:
raise Exception
class ToInteger(TokenConverter):
""" Converter to make token into an integer """
def postParse(self, instring, loc, tokenlist):
""" Converts the first token to an integer """
return int(tokenlist[0])
class ToDecimal(TokenConverter):
""" Converter to make token into a float """
def postParse(self, instring, loc, tokenlist):
""" Converts the first token into a float """
return Decimal(tokenlist[0])
decimal_sep = "."
sign = oneOf("+ -")
symbols = "_-."
bool_true = Or([CaselessLiteral("true"), CaselessLiteral("t"), Literal("1")])
bool_false = Or([CaselessLiteral("false"), CaselessLiteral("f"), Literal("0")])
boolean = ToBoolean(Or([bool_true, bool_false]))
integer = ToInteger(
Combine(Optional(sign) + Word(nums))
)
decimal = ToDecimal(
Combine(
Optional(sign) +
Word(nums) +
Optional(decimal_sep + Word(nums)) +
Optional(oneOf("E e") + Optional(sign) + Word(nums))
)
)
word = Word(alphanums, alphanums + symbols)
qstring = (sglQuotedString | dblQuotedString)
# parse_stream
# stream -> dict(string, dict(string, val))
# example:
# >>> thefile = parse_stream(open("myfile.jb"))
# >>> thefile["phonebook"][0]["area-code"]
# 01291
#
def parse_stream(stream):
logger.debug("Parsing stream: %s" % stream)
permittedvalue = Or(Word(alphanums + symbols), qstring)
newfile = JBFile()
comment = Group(Literal('#') + restOfLine).suppress()
commentlines = ZeroOrMore(comment)
datatypenames = oneOf("decimal word integer boolean qstring")
datatypes = Or([permittedvalue])
titleline = Literal("[").suppress() + word + Literal("]").suppress()
titleline.setParseAction(newfile.nextSection)
headingsline = Literal("#h").suppress() + delimitedList(word)
headingsline.setParseAction(newfile.addHeading)
typeline = Literal("#t").suppress() + delimitedList(Optional(datatypenames, default="word"))
typeline.setParseAction(newfile.addTypes)
defaultsline = Literal("#d").suppress() + delimitedList(Optional(datatypes, default=None))
defaultsline.setParseAction(newfile.addDefaults)
csvbody = delimitedList(Optional(permittedvalue,default=None))
csvbody.setParseAction(newfile.addLine)
infolines = Optional(headingsline + Optional(typeline + Optional(defaultsline)))
csvlines = OneOrMore(commentlines + csvbody)
parser = commentlines + OneOrMore(titleline + infolines + csvlines) + commentlines
print "ready!?"
parser.parseFile(stream)
print "qwe!"
return newfile
class JBSection(object):
def __init__(self, name):
self.name = name
self.data = []
def addHeading(self, headings):
print "headings", headings
self.headings = headings
self.num_columns = len(headings)
def addTypes(self, types):
print "types", types
assert len(types) == self.num_columns
self.type_name = types
def convert(self, column, item):
newtype = self.type_name[column]
if item == None or item == "":
return None
if newtype == "decimal":
full_decimal = StringStart() + decimal + StringEnd()
return full_decimal.parseString(str(item))[0]
if newtype == "integer":
full_decimal = StringStart() + integer + StringEnd()
return full_decimal.parseString(str(item))[0]
if newtype == "boolean":
full_decimal = StringStart() + boolean + StringEnd()
return full_decimal.parseString(str(item))[0]
if newtype == "qstring":
full_decimal = StringStart() + qstring + StringEnd()
return full_decimal.parseString(str(item))[0]
if newtype == "word":
full_decimal = StringStart() + word + StringEnd()
return full_decimal.parseString(str(item))[0]
raise Exception, "super bad"
def ConvertTypes(self, line):
"""converts line from a list of str to the correct type"""
assert len(line) == self.num_columns
return [self.convert(column,item) for column,item in enumerate(line)]
def FillInDefaults(self, line):
assert len(line) == self.num_columns
newline = line[:]
for column,item in enumerate(line):
if item == None:
newline[column] = self.defaults[column]
return newline
def addDefaults(self, defaults):
print "defaults", defaults
assert len(defaults) == self.num_columns
self.defaults = self.ConvertTypes(defaults)
def addLine(self, line):
print "line", line
assert len(line) == self.num_columns
self.data.append(self.ConvertTypes(self.FillInDefaults(line)))
def getData(self, row, column):
if isinstance(column,int):
return self.data[row][column]
elif isinstance(column, str):
return self.data[row][self.headings.index(column)]
else:
raise Exception
def test_JBSection():
news = JBSection("news")
news.addHeading("a b c d".split())
news.addTypes("integer integer integer integer".split())
news.addDefaults("0 123 -0 -1".split())
news.addLine(["1","1","1","1"])
news.addLine(["","1","",""])
print news.data[0]
print news.data[1]
print news.getData(0,"a")
test_JBSection()
class JBFile(object):
def __init__(self):
self.sections = {}
def nextSection(self, tokens):
logger.debug("Pushing section: %s" % tokens[0])
print "section", tokens
assert tokens[0] not in self.sections, "more than one section called " + tokens[0]
self.current_section = self.sections[tokens[0]] = JBSection(tokens[0])
def addHeading(self, tokens):
self.current_section.addHeading(tokens)
def addTypes(self, tokens):
self.current_section.addTypes(tokens)
def addDefaults(self, tokens):
self.current_section.addDefaults(tokens)
def addLine(self, tokens):
if len(tokens) == 1 and tokens[0] == None:
return
self.current_section.addLine(tokens)
def getData(self, section_name, row, column):
section = self.sections[section_name]
return section.getData(row, column)
# stuff = parse_stream(StringIO.StringIO(testme))
# print stuff.getData("people", 3, "name")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.