Created
January 23, 2018 21:40
-
-
Save blackrobot/7934596aebc4d19f3de478821723d27c to your computer and use it in GitHub Desktop.
Mixins for working with CSV files in Python
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from __future__ import absolute_import, print_function, unicode_literals | |
from collections import namedtuple | |
import io | |
import re | |
# python 3 backport of the stdlib csv module | |
# https://github.com/ryanhiebert/backports.csv | |
from backports import csv | |
def sniff_dialect(file_obj, chunk_size=1024): | |
""" Tries to sniff the dialect of the csv file from the first | |
few bytes. This may raise a `csv.Error` exception for a | |
malformed file_obj. | |
""" | |
chunk = file_obj.read(chunk_size) | |
dialect = csv.Sniffer().sniff(chunk) | |
return dialect | |
class StripWhitespaceDictReader(csv.DictReader): | |
""" Strips leading and tailing whitespace from reader values. """ | |
def __next__(self): | |
row = super(StripWhitespaceDictReader, self).__next__() | |
return {key: value.strip() for key, value in row.items()} | |
next = __next__ | |
class SkipBlankDictReader(csv.DictReader): | |
""" Returns rows that have at least one non-empty string value. """ | |
def __next__(self): | |
row = super(SkipBlankDictReader, self).__next__() | |
if any(row.values()): | |
return row | |
return self.__next__() | |
next = __next__ | |
class NamedTupleReader(csv.DictReader): | |
""" Returns csv rows as named tuples. """ | |
def __next__(self): | |
row = super(NamedTupleReader, self).__next__() | |
if not hasattr(self, '_model'): | |
self._model = namedtuple('Row', self.fieldnames) | |
return self._model(**row) | |
next = __next__ | |
class NormFieldsDictReader(csv.DictReader): | |
""" Normalizes fieldnames by applying self.sanitize_fieldname(...) | |
to each string | |
""" | |
@property | |
def fieldnames(self): | |
if not hasattr(self, '_norm_fieldnames'): | |
fieldnames = csv.DictReader.fieldnames.fget(self) | |
self.original_fieldnames = list(fieldnames) | |
self._norm_fieldnames = self.sanitize_fieldnames(fieldnames) | |
return self._norm_fieldnames | |
def sanitize_fieldnames(self, fieldnames): | |
""" Calls sanitize_fieldname on each fieldname, and ensures | |
name uniqueness. | |
""" | |
fieldnames = ( | |
self.sanitize_fieldname(name) for name in fieldnames | |
) | |
clean_names = [] | |
for name in fieldnames: | |
num = 0 | |
clean_name = name | |
while clean_name in clean_names: | |
num += 1 | |
clean_name = self.sanitize_fieldname( | |
'{} {}'.format(name, num) | |
) | |
clean_names.append(clean_name) | |
return clean_names | |
def sanitize_fieldname(self, fieldname): | |
""" Lowercase the name, and replace non-alphanumeric characters | |
with underscores. | |
""" | |
words = re.split(r'[\W_]+', fieldname.lower(), flags=re.UNICODE) | |
return '_'.join(word for word in words if word) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment