Skip to content

Instantly share code, notes, and snippets.

@atiw003
Created March 11, 2010 23:22
Show Gist options
  • Save atiw003/329825 to your computer and use it in GitHub Desktop.
Save atiw003/329825 to your computer and use it in GitHub Desktop.
#!/usr/bin/python
import codecs
import csv
try:
from cStringIO import StringIO
except ImportError:
from StringIO import StringIO
class ReaderCol(object):
"""
An object that will read in csv files using the csv module and use the
first row as headers to define fieldnames. There are some limitations, eg:
- The csv file must be rectangular (each row must have same number of
columns, and vice versa)
- The column names must be unique
This depends on the csv, (C)StringIO, and codecs modules.
Sample usage:
reader = ReaderCol()
reader.read_filename(in_filename, encoding="utf-8")
"""
def __init__(self):
self._data = {}
@property
def fieldnames(self):
return self._data.keys()
def __validate_fieldnames(self, fieldnames):
if len(set(fieldnames)) != len(fieldnames):
raise ValueError("Fieldnames must be unique")
def __validate_row(self, row_data):
if not self.row_length:
# empty data, good to go
return
if self.row_length != len(row_data):
raise ValueError("row_data is of different length from stored")
if sorted(row_data.keys()) != sorted(self.fieldnames):
raise ValueError("row_data has different keys from stored")
def __validate_col(self, col_data):
if not self.col_length in (len(col_data), -1):
raise ValueError("col_data is of different length from stored")
def read_filename(self, filename, encoding='ascii', fieldnames=None,
dialect='excel', *args, **kwargs):
"""
Effectively, invoke read_file with specified encoding.
"""
fd = codecs.open(filename, 'r', encoding).read().encode(encoding)
s = StringIO()
s.write(fd)
s.seek(0)
self.read_file(s, fieldnames=fieldnames, dialect=dialect, *args, **kwargs)
def read_file(self, file, fieldnames=None, dialect='excel', *args, **kwargs):
"""
Read a file object into the reader instance. All optional arguments are
passed to a csv.DictReader. Note that using restkey and restval will
produce unknown results and is not recommended. The DictReader does not
deal with non-ascii characters properly. If you need to read non-ascii
characters, use read_filename, which should handle the specified
encoding properly.
"""
reader = csv.DictReader(file, fieldnames=fieldnames, dialect=dialect,
*args, **kwargs)
self.__validate_fieldnames(reader.fieldnames)
for field in reader.fieldnames:
self._data[field] = []
for row in reader:
for field in self.fieldnames:
self._data[field].append(row[field])
def write_filename(self, filename, fieldnames=None, encoding='ascii',
dialect='excel', *args, **kwargs):
"""
Effectively, invoke write_file on specified filename with specified
encoding.
"""
s = StringIO()
self.write_file(s, fieldnames, dialect=dialect, *args, **kwargs)
fd = codecs.open(filename, 'wb', encoding)
s.seek(0)
s = unicode(s.read(), encoding=encoding)
fd.write(s)
def write_file(self, file, fieldnames=None, dialect='excel', *args, **kwargs):
"""
Write the current data payload to the file-like object. Arguments are
effectively passed to a csv.DictWriter instance. If fieldnames is not
specified, it will guess at an order instead of failing like
csv.DictWriter. Note that the DictWriter does not deal with non-ascii
characters nicely. If you need to open a file with a non-ascii
encoding, use write_filename.
"""
if not fieldnames:
fieldnames = sorted(self.fieldnames)
elif sorted(fieldnames) != sorted(self.fieldnames):
raise ValueError("Fieldnames do not match internal")
writer = csv.DictWriter(file, fieldnames)
writer.writerow(dict(zip(fieldnames, fieldnames)))
for row in self.iter_rows():
writer.writerow(row)
def to_string(self, fieldnames=None, dialect='excel', *args, **kwargs):
"""
Return a string representation of the csv file in its current state.
See write_file.
"""
s = StringIO()
self.write_file(s, fieldnames=fieldnames, dialect=dialect, *args, **kwargs)
@property
def col_length(self):
try:
return len(self._data[self.fieldnames[0]])
except IndexError:
# if no fieldnames, no data, any length for new data
return -1
def get_col(self, col_name):
"Return the named column"
return self._data[col_name]
def add_col(self, col_name, col_data, allow_collision=False):
"""
Add a column with the specified data. Note that the data must be the
same length as the previously existing columns. If the column name
already exists and allow_collision is not set to True, a ValueError
will be raised notifying you of the naming collision.
"""
self.__validate_col(col_data)
if not allow_collision and col_name in self._data:
raise ValueError("Fieldname already exists")
self._data[col_name] = list(col_data)
def replace_col(self, col_name, col_data):
"""
Like add_col, but allows collisions always
"""
self.add_col(col_name, col_data, True)
def rename_col(self, old_col, new_col):
"Rename a column from old_col to new_col"
self._data[new_col] = self._data[old_col]
del self._data[old_col]
def del_col(self, col_name):
"Delete column named col_name"
del self._data[col_name]
def iter_cols(self):
"""
Returns an iterator that returns each column as a dict with the key
being the column name and the value a list with the column data.
"""
for i in self.fieldnames:
yield {i:self.get_col(i)}
@property
def row_length(self):
return len(self.fieldnames)
def get_row(self, row_num):
"""
Return the row as a dict with the keys the column names and the values
from the given row index (starting with 0).
"""
return dict([(i, self._data[i][row_num]) for i in self.fieldnames])
def del_row(self, row_num):
"Delete the row with the index specified by row_num"
for i in self.fieldnames:
del self._data[i][row_num]
def append_row(self, row_data):
"""
add a row of data. row_data must be a dict with keys matching
fieldnames
"""
self.__validate_row(row_data)
for i,j in row_data.items():
self._data[i].append(j)
def replace_row(self, row_num, row_data):
"""
replace a row at row index (row_num) with the specified data,
row_data must be a dict with keys matching fieldnames
"""
self.__validate_row(row_data)
for i,j in row_data.items():
self._data[i][row_num] = j
def insert_row(self, row_num, row_data):
"""
insert a row before row index (row_num) with the specified data,
row_data must be a dict with keys matching fieldnames
"""
self.__validate_row(row_data)
for i,j in row_data.items():
self._data[i].insert(row_num, j)
def iter_rows(self):
"""
returns a generator that returns a row as a dict, with the keys as
fieldnames
"""
for i in xrange(self.col_length):
yield self.get_row(i)
def edit_cell(self, col_name, row_num, data):
"""
Change data in column col_name at row index row_num with the 'data'
"""
self._data[col_name][row_num] = data
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment