Skip to content

Instantly share code, notes, and snippets.

@lecram
Last active December 20, 2015 12:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lecram/6131282 to your computer and use it in GitHub Desktop.
Save lecram/6131282 to your computer and use it in GitHub Desktop.
CSV read/write/print/join.
import codecs
import collections
def join(t1, t2, pivots=None, keys=None):
keys1 = t1[0]._fields
keys2 = t2[0]._fields
if pivots is None:
pivots = list(set(keys1) & set(keys2))
if keys is None:
keys = list(set(keys1) | set(keys2))
Row = collections.namedtuple("Row", keys)
table = []
for r1 in t1:
id1 = [getattr(r1, key, "") for key in pivots]
d = r1._asdict()
for r2 in t2:
id2 = [getattr(r2, key, "") for key in pivots]
if id1 == id2:
d.update(r2._asdict())
break
table.append(Row(*(d.get(key, "") for key in keys)))
return table
def getone(table, key, query):
for row in table:
if row._asdict()[key] == query:
return row
def getall(table, key, query):
for row in table:
if row._asdict()[key] == query:
yield row
def addkey(table, key, func):
newkeys = table[0]._fields + (key,)
Row = collections.namedtuple("Row", newkeys)
for row in table:
d = row._asdict()
d[key] = func(row)
newrow = Row(*(d[key] for key in newkeys))
yield newrow
def delkeys(table, keys_to_del):
newkeys = tuple(filter(lambda i: i not in keys_to_del, table[0]._fields))
Row = collections.namedtuple("Row", newkeys)
for row in table:
d = row._asdict()
newrow = Row(*(d[key] for key in newkeys))
yield newrow
def readfile(path, keys=None, sep=',', comm='#'):
table = []
with codecs.open(path, "r", "utf8") as f:
line = ""
while not line:
line = f.readline().split(comm)[0].strip()
fkeys = line.split(sep)
if keys is None:
keys = fkeys
indexes = [i for i in range(len(fkeys)) if fkeys[i] in keys]
Row = collections.namedtuple("Row", keys)
for line in f:
line = line.split(comm)[0].strip()
if not line:
continue
fvalues = line.split(sep)
values = [fvalues[i] for i in indexes]
table.append(Row(*values))
return table
def writefile(path, table, keys=None, sep=','):
if keys is None:
keys = table[0]._fields
with codecs.open(path, "w", "utf8") as f:
f.write(sep.join(keys) + '\n')
for row in table:
f.write(sep.join(row._asdict()[key] for key in keys) + '\n')
def pprint(table, keys=None):
if keys is None:
keys = table[0]._fields
lens = []
for k in keys:
lens.append(len(max([x._asdict()[k] for x in table] + [k], key=len)))
formats = ["%%-%ds" % length for length in lens]
pattern = " | ".join(formats)
separator = "-+-".join(['-' * n for n in lens])
print(pattern % tuple(keys))
print(separator)
for line in table:
print(pattern % tuple(line._asdict()[k] for k in keys))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment