Skip to content

Instantly share code, notes, and snippets.

@rdhyee
Created March 25, 2015 22:49
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rdhyee/3ee2841d411ee21aab68 to your computer and use it in GitHub Desktop.
Save rdhyee/3ee2841d411ee21aab68 to your computer and use it in GitHub Desktop.
A little module I wrote a while back to work with unicode and CSV. (I'm hoping that I won't need it because I can use https://github.com/jdunck/python-unicodecsv instead.
# -*- coding: utf-8 -*-
"""
unicode_csv.py -- wrap the csv module to allow for the handling of CSV files with unicode
"""
#http://docs.python.org/library/csv.html#examples
import csv
import codecs
import difflib
import StringIO
import unittest
class Recoder(object):
"""
Iterator that reads an encoded stream and reencodes the input to the input encoding (default UTF-8)
"""
def __init__(self, f, encoding="UTF-8"):
self.encoding = encoding
self.reader = codecs.getreader(encoding)(f)
def __iter__(self):
return self
def next(self):
return self.reader.next().encode(self.encoding)
class UnicodeReader(object):
"""
A CSV reader which will iterate over lines in the CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
self.encoding = encoding
f = Recoder(f, self.encoding)
self.reader = csv.reader(f, dialect=dialect, **kwds)
def next(self):
row = self.reader.next()
return [unicode(s, self.encoding) for s in row]
def __iter__(self):
return self
class UnicodeWriter(object):
"""
A CSV writer which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, dialect=csv.excel, encoding="utf-8", **kwds):
# Redirect output to a queue
self.queue = StringIO.StringIO()
self.writer = csv.writer(self.queue, dialect=dialect, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow([s.encode("utf-8") for s in row])
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
class UnicodeDictReader(object):
def __init__(self,f, fieldnames=None, restkey=None, restval=None, dialect=csv.excel, encoding="utf-8", **kwds):
self.encoding = encoding
f = Recoder(f, self.encoding)
self.dictreader = csv.DictReader(f, fieldnames=None, restkey=None, restval=None, dialect=csv.excel, **kwds)
def next(self):
row = self.dictreader.next()
return dict([(unicode(k,self.encoding), unicode(v,self.encoding)) for (k,v) in row.items()])
@property
def dialect(self):
return self.dictreader.dialect
@property
def fieldnames(self):
return self.dictreader.fieldnames
@property
def line_num(self):
return self.dictreader.line_num
def __iter__(self):
return self
class UnicodeDictWriter(object):
"""
A CSV DictWriter which will write rows to CSV file "f",
which is encoded in the given encoding.
"""
def __init__(self, f, fieldnames, restval='', extrasaction='raise', dialect='excel', encoding="utf-8", *args, **kwds):
# Redirect output to a queue
self.queue = StringIO.StringIO()
self.writer = csv.DictWriter(self.queue, fieldnames=fieldnames, restval=restval, extrasaction=extrasaction,
dialect=dialect, *args, **kwds)
self.stream = f
self.encoder = codecs.getincrementalencoder(encoding)()
def writerow(self, row):
self.writer.writerow(dict([(k.encode("utf-8"),v.encode("utf-8")) for (k,v) in row.items()]))
# Fetch UTF-8 output from the queue ...
data = self.queue.getvalue()
data = data.decode("utf-8")
# ... and reencode it into the target encoding
data = self.encoder.encode(data)
# write to the target stream
self.stream.write(data)
# empty queue
self.queue.truncate(0)
def writerows(self, rows):
for row in rows:
self.writerow(row)
def output_to_csv(f, headers, rows, write_header=True, convert_values_to_unicode=True):
"""
take rows, an iterable of dicts (and corresponding headers) and output as a CSV file to f
"""
cw = UnicodeDictWriter(f, headers)
if write_header:
cw.writerow(dict([(h,h) for h in headers]))
for row in rows:
if convert_values_to_unicode:
row = dict([(k, unicode(v)) for (k,v) in row.items()])
cw.writerow(row)
return f
# Unit tests
class TestUnicodeCsv(unittest.TestCase):
def setUp(self):
pass
def tearDown(self):
pass
def test_unicodereader(self):
test_string = u"""word,language
中文,Chinese
français,French""".encode("UTF-8")
input_buffer = StringIO.StringIO(test_string)
reader = UnicodeReader(input_buffer)
reader_output = list(reader)
self.assertEqual(reader_output, [[u'word', u'language'], [u"中文", u"Chinese"], [u"français",u"French"]])
def test_UnicodeDictReader(self):
test_string = u"""word,language
中文,Chinese
français,French""".encode("UTF-8")
input_buffer = StringIO.StringIO(test_string)
reader = UnicodeDictReader(input_buffer)
reader_output = list(reader)
self.assertEqual(reader_output, [{u'word':u"中文", u'language':u"Chinese"}, {u'word':u"français",u'language':u"French"}])
def test_unicodewriter(self):
headers = [u'word', u'language']
output_string = StringIO.StringIO()
uwriter = UnicodeWriter(output_string)
uwriter.writerow(headers)
uwriter.writerow([u"中文", u"Chinese"])
uwriter.writerow([u"français",u"French"])
expected_value = ""
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), headers)) + "\r\n"
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), [u"中文", u"Chinese"])) + "\r\n"
expected_value += ",".join(map(lambda x: x.encode("UTF-8"), [u"français",u"French"])) + "\r\n"
output = output_string.getvalue()
#print "-".join(map(lambda x: hex(ord(x)), list(output)))
#print "-".join(map(lambda x: hex(ord(x)), list(expected_value)))
#for line in difflib.context_diff(list(output),list(expected_value)):
# print line
#print len(output), len(expected_value)
#print
self.assertEqual(output, expected_value)
def test_unicodedictwriter():
headers = ['word', 'language']
output_file = open("test_name.csv","w")
uwriter = UnicodeDictWriter(output_file,headers)
uwriter.writerow(dict([(h,h) for h in headers]))
uwriter.writerow({"word":u"中文", "language":"Chinese"})
uwriter.writerow({"word":u"français","language":"French"})
if __name__ == "__main__":
#test_unicodewriter()
#test_unicodedictwriter()
unittest.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment