Last active
December 20, 2015 12:29
-
-
Save lecram/6131282 to your computer and use it in GitHub Desktop.
CSV read/write/print/join.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import codecs | |
import collections | |
def join(t1, t2, pivots=None, keys=None): | |
keys1 = t1[0]._fields | |
keys2 = t2[0]._fields | |
if pivots is None: | |
pivots = list(set(keys1) & set(keys2)) | |
if keys is None: | |
keys = list(set(keys1) | set(keys2)) | |
Row = collections.namedtuple("Row", keys) | |
table = [] | |
for r1 in t1: | |
id1 = [getattr(r1, key, "") for key in pivots] | |
d = r1._asdict() | |
for r2 in t2: | |
id2 = [getattr(r2, key, "") for key in pivots] | |
if id1 == id2: | |
d.update(r2._asdict()) | |
break | |
table.append(Row(*(d.get(key, "") for key in keys))) | |
return table | |
def getone(table, key, query): | |
for row in table: | |
if row._asdict()[key] == query: | |
return row | |
def getall(table, key, query): | |
for row in table: | |
if row._asdict()[key] == query: | |
yield row | |
def addkey(table, key, func): | |
newkeys = table[0]._fields + (key,) | |
Row = collections.namedtuple("Row", newkeys) | |
for row in table: | |
d = row._asdict() | |
d[key] = func(row) | |
newrow = Row(*(d[key] for key in newkeys)) | |
yield newrow | |
def delkeys(table, keys_to_del): | |
newkeys = tuple(filter(lambda i: i not in keys_to_del, table[0]._fields)) | |
Row = collections.namedtuple("Row", newkeys) | |
for row in table: | |
d = row._asdict() | |
newrow = Row(*(d[key] for key in newkeys)) | |
yield newrow | |
def readfile(path, keys=None, sep=',', comm='#'): | |
table = [] | |
with codecs.open(path, "r", "utf8") as f: | |
line = "" | |
while not line: | |
line = f.readline().split(comm)[0].strip() | |
fkeys = line.split(sep) | |
if keys is None: | |
keys = fkeys | |
indexes = [i for i in range(len(fkeys)) if fkeys[i] in keys] | |
Row = collections.namedtuple("Row", keys) | |
for line in f: | |
line = line.split(comm)[0].strip() | |
if not line: | |
continue | |
fvalues = line.split(sep) | |
values = [fvalues[i] for i in indexes] | |
table.append(Row(*values)) | |
return table | |
def writefile(path, table, keys=None, sep=','): | |
if keys is None: | |
keys = table[0]._fields | |
with codecs.open(path, "w", "utf8") as f: | |
f.write(sep.join(keys) + '\n') | |
for row in table: | |
f.write(sep.join(row._asdict()[key] for key in keys) + '\n') | |
def pprint(table, keys=None): | |
if keys is None: | |
keys = table[0]._fields | |
lens = [] | |
for k in keys: | |
lens.append(len(max([x._asdict()[k] for x in table] + [k], key=len))) | |
formats = ["%%-%ds" % length for length in lens] | |
pattern = " | ".join(formats) | |
separator = "-+-".join(['-' * n for n in lens]) | |
print(pattern % tuple(keys)) | |
print(separator) | |
for line in table: | |
print(pattern % tuple(line._asdict()[k] for k in keys)) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment