Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Replacement for csv.reader that reads encoded input into a dict based on column definitions.
import csv
class StreamedDataConverter(object):
"""A conversion description that can convert one list of data into a processed dict using the cols attribute.
>>> class MyConverter(StreamedDataConverter):
... cols = ['a', 'b', 'c']
...
>>> c = MyConverter(iter([[4,5,6],[6,7,8]]))
>>> c.map_fieldnames(['a','b','c'])
>>> list(c)
[{u'a': 4, u'c': 6, u'b': 5}, {u'a': 6, u'c': 8, u'b': 7}]
>>> class MyConverter(StreamedDataConverter):
... cols = dict(
... a=dict(convert=int),
... q=dict(column='b', convert=int),
... c=dict(convert=lambda z: 'nope'),
... )
...
>>> c = MyConverter(iter([[4,5,6],[6,7,8]]))
>>> c.map_fieldnames(['a','b','c'])
>>> list(c)
[{u'a': 4, u'q': 5, u'c': 'nope'}, {u'a': 6, u'q': 7, u'c': 'nope'}]
"""
drop_blank=True
encoding = 'utf-8-sig'
def __init__(self, data_iterator, encoding=None):
if encoding:
self.encoding = encoding
self.data_iter = data_iterator
self.i = 1
def __iter__(self):
self.i = 1
return self
def map_fieldnames(self, fields):
old_fn_list = [x.decode(self.encoding) for x in fields]
new_fn_list = []
cols = {}
if hasattr(self.cols, 'items'):
for colname, opts in self.cols.iteritems():
opts['name'] = colname
if 'column' in opts:
cols[opts['column']] = opts
else:
cols[colname] = opts
else:
cols = dict((x, dict(name=x)) for x in self.cols)
for on in old_fn_list:
if on in cols:
new_fn_list.append(cols[on]['name'])
cols.pop(on)
continue
on = on.lower()
if on in cols:
new_fn_list.append(cols[on]['name'])
cols.pop(on)
continue
on = ' '.join(on.strip().split())
if on in cols:
new_fn_list.append(cols[on]['name'])
cols.pop(on)
continue
on = on.replace(' ', '_')
if on in cols:
new_fn_list.append(cols[on]['name'])
cols.pop(on)
continue
new_fn_list.append(None)
if len(cols):
raise Exception("These columns were not in the input file: %s"%(','.join(cols)))
self._fieldnames = [x.decode(self.encoding) if hasattr(x, 'decode') else x for x in new_fn_list]
def __next__(self):
try:
oldret = next(self.data_iter)
except StopIteration:
self.i = 0
raise
self.i += 1
try:
ret = dict(zip(self._fieldnames, [x.decode(self.encoding) if hasattr(x, 'decode') else x for x in oldret]))
except UnicodeDecodeError as e:
raise ValueError("Encoding error on input line %d: %s"%(self.i, e))
if self.drop_blank:
if None in oldret:
oldret.pop(None)
if not hasattr(self.cols, 'items'):
return ret
#dict with options
for colname, opts in self.cols.iteritems():
if 'convert' in opts:
fn = opts['convert']
param = ret[opts['name']]
try:
res = fn(param)
except Exception as e:
#TODO: raise exception relating to the actual column
raise ValueError("Problem converting column '%s' on input line %d: %s"%(colname, self.i, e))
raise
ret[opts['name']] = res
return ret
next = __next__
class NormalDictReader(object):
r"""A replacement csv.DictReader that relies on column definitions defined in a subclass and deals with encoding issues.
Examples:
class MyReader(NormalDictReader):
cols = ['a', 'b', 'c']
>>> class MyReader(NormalDictReader):
... cols = dict(
... a = dict(column='a', convert=int),
... q = dict(column='b'),
... c = dict(column='c', convert=lambda x: int(x)),
... )
...
>>> import StringIO
>>> list(MyReader(StringIO.StringIO("a,b,c\n1,2,3\n4,5,6")))
[{u'a': 1, u'q': u'2', u'c': 3}, {u'a': 4, u'q': u'5', u'c': 6}]
"""
drop_blank=True
encoding = 'utf-8-sig'
def __init__(self, openfile, encoding=None, *args, **kwargs):
if encoding:
self.encoding = encoding
self._r = csv.reader(openfile, *args, **kwargs)
self._conv = StreamedDataConverter(self._r, encoding=self.encoding)
self._conv.cols = self.cols
self._conv.map_fieldnames(next(self._r))
def __iter__(self):
return self
def __next__(self):
return next(self._conv)
next = __next__
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.