Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Mixins for working with CSV files in Python
from __future__ import absolute_import, print_function, unicode_literals
from collections import namedtuple
import io
import re
# python 3 backport of the stdlib csv module
# https://github.com/ryanhiebert/backports.csv
from backports import csv
def sniff_dialect(file_obj, chunk_size=1024):
""" Tries to sniff the dialect of the csv file from the first
few bytes. This may raise a `csv.Error` exception for a
malformed file_obj.
"""
chunk = file_obj.read(chunk_size)
dialect = csv.Sniffer().sniff(chunk)
return dialect
class StripWhitespaceDictReader(csv.DictReader):
""" Strips leading and tailing whitespace from reader values. """
def __next__(self):
row = super(StripWhitespaceDictReader, self).__next__()
return {key: value.strip() for key, value in row.items()}
next = __next__
class SkipBlankDictReader(csv.DictReader):
""" Returns rows that have at least one non-empty string value. """
def __next__(self):
row = super(SkipBlankDictReader, self).__next__()
if any(row.values()):
return row
return self.__next__()
next = __next__
class NamedTupleReader(csv.DictReader):
""" Returns csv rows as named tuples. """
def __next__(self):
row = super(NamedTupleReader, self).__next__()
if not hasattr(self, '_model'):
self._model = namedtuple('Row', self.fieldnames)
return self._model(**row)
next = __next__
class NormFieldsDictReader(csv.DictReader):
""" Normalizes fieldnames by applying self.sanitize_fieldname(...)
to each string
"""
@property
def fieldnames(self):
if not hasattr(self, '_norm_fieldnames'):
fieldnames = csv.DictReader.fieldnames.fget(self)
self.original_fieldnames = list(fieldnames)
self._norm_fieldnames = self.sanitize_fieldnames(fieldnames)
return self._norm_fieldnames
def sanitize_fieldnames(self, fieldnames):
""" Calls sanitize_fieldname on each fieldname, and ensures
name uniqueness.
"""
fieldnames = (
self.sanitize_fieldname(name) for name in fieldnames
)
clean_names = []
for name in fieldnames:
num = 0
clean_name = name
while clean_name in clean_names:
num += 1
clean_name = self.sanitize_fieldname(
'{} {}'.format(name, num)
)
clean_names.append(clean_name)
return clean_names
def sanitize_fieldname(self, fieldname):
""" Lowercase the name, and replace non-alphanumeric characters
with underscores.
"""
words = re.split(r'[\W_]+', fieldname.lower(), flags=re.UNICODE)
return '_'.join(word for word in words if word)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.