Skip to content

Instantly share code, notes, and snippets.

@lundberg
Last active August 29, 2015 14:07
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save lundberg/fd41fa748585e5108db8 to your computer and use it in GitHub Desktop.
Save lundberg/fd41fa748585e5108db8 to your computer and use it in GitHub Desktop.
Python csv io router
import codecs
import csv
try:
from cStringIO import StringIO
except ImportError:
from io import StringIO
class CSVReadWriter(object):
def __init__(self, rows_generator, dialect=csv.excel, encoding='utf-8', **kwargs):
self.eof = False
self.rows = rows_generator
self.buffer = StringIO()
self.csv = csv.writer(self.buffer, dialect=dialect, **kwargs)
self.encoder = codecs.getincrementalencoder(encoding)()
def reset(self, buffer=None):
self.buffer.truncate(0)
if buffer:
self.buffer.write(buffer)
def next(self):
return next(self.rows)
def read(self, blocksize=-1):
if self.eof:
# Clean up and return EOF
self.buffer.truncate(0)
self.buffer.close()
return ''
# Try to get blocksize from buffer
buffer_pos = self.buffer.tell()
buffered = self.buffer.read(blocksize or -1)
buffersize = self.buffer.tell() - buffer_pos
if buffersize >= blocksize > 0:
# Return buffered blocksize
return buffered
else:
# Reset buffer to incomplete buffered blocksize
self.reset(buffered)
# Fill buffer until out of rows or blocksize passed
while not self.eof and (blocksize < 0 or self.buffer.tell() < blocksize):
try:
row = self.next()
row = [self.encoder.encode(c) if isinstance(c, unicode) else c for c in row]
self.csv.writerow(row)
except StopIteration:
self.eof = True
# Return blocksize from buffer
self.buffer.seek(0, 0)
buffered = self.buffer.read(blocksize or -1)
return buffered
def readline(self, blocksize=-1):
if blocksize:
# Route to byte read if blocksize is given
return self.read(blocksize)
try:
# Get next row and encode
row = self.next()
row = [c.encode('utf-8') if isinstance(c, unicode) else c for c in row]
# Write row to buffer
self.csv.writerow(row)
# Get row from buffer and re-encode in selected encoding
data = self.buffer.getvalue()
data = data.decode('utf-8')
data = self.encoder.encode(data)
self.buffer.truncate(0)
except StopIteration:
self.eof = True
data = ''
# Return csv formatted row
return data
# rows_gen = (r for r in rows)
# writer = CSVReadWriter(rows_gen)
# writer.readline()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment