Skip to content

Instantly share code, notes, and snippets.

@blackrobot
Created January 23, 2018 21:40
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blackrobot/7934596aebc4d19f3de478821723d27c to your computer and use it in GitHub Desktop.
Save blackrobot/7934596aebc4d19f3de478821723d27c to your computer and use it in GitHub Desktop.
Mixins for working with CSV files in Python
from __future__ import absolute_import, print_function, unicode_literals
from collections import namedtuple
import io
import re
# python 3 backport of the stdlib csv module
# https://github.com/ryanhiebert/backports.csv
from backports import csv
def sniff_dialect(file_obj, chunk_size=1024):
""" Tries to sniff the dialect of the csv file from the first
few bytes. This may raise a `csv.Error` exception for a
malformed file_obj.
"""
chunk = file_obj.read(chunk_size)
dialect = csv.Sniffer().sniff(chunk)
return dialect
class StripWhitespaceDictReader(csv.DictReader):
""" Strips leading and tailing whitespace from reader values. """
def __next__(self):
row = super(StripWhitespaceDictReader, self).__next__()
return {key: value.strip() for key, value in row.items()}
next = __next__
class SkipBlankDictReader(csv.DictReader):
""" Returns rows that have at least one non-empty string value. """
def __next__(self):
row = super(SkipBlankDictReader, self).__next__()
if any(row.values()):
return row
return self.__next__()
next = __next__
class NamedTupleReader(csv.DictReader):
""" Returns csv rows as named tuples. """
def __next__(self):
row = super(NamedTupleReader, self).__next__()
if not hasattr(self, '_model'):
self._model = namedtuple('Row', self.fieldnames)
return self._model(**row)
next = __next__
class NormFieldsDictReader(csv.DictReader):
""" Normalizes fieldnames by applying self.sanitize_fieldname(...)
to each string
"""
@property
def fieldnames(self):
if not hasattr(self, '_norm_fieldnames'):
fieldnames = csv.DictReader.fieldnames.fget(self)
self.original_fieldnames = list(fieldnames)
self._norm_fieldnames = self.sanitize_fieldnames(fieldnames)
return self._norm_fieldnames
def sanitize_fieldnames(self, fieldnames):
""" Calls sanitize_fieldname on each fieldname, and ensures
name uniqueness.
"""
fieldnames = (
self.sanitize_fieldname(name) for name in fieldnames
)
clean_names = []
for name in fieldnames:
num = 0
clean_name = name
while clean_name in clean_names:
num += 1
clean_name = self.sanitize_fieldname(
'{} {}'.format(name, num)
)
clean_names.append(clean_name)
return clean_names
def sanitize_fieldname(self, fieldname):
""" Lowercase the name, and replace non-alphanumeric characters
with underscores.
"""
words = re.split(r'[\W_]+', fieldname.lower(), flags=re.UNICODE)
return '_'.join(word for word in words if word)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment