Created
March 11, 2010 23:22
-
-
Save atiw003/329825 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/python | |
import codecs | |
import csv | |
try: | |
from cStringIO import StringIO | |
except ImportError: | |
from StringIO import StringIO | |
class ReaderCol(object): | |
""" | |
An object that will read in csv files using the csv module and use the | |
first row as headers to define fieldnames. There are some limitations, eg: | |
- The csv file must be rectangular (each row must have same number of | |
columns, and vice versa) | |
- The column names must be unique | |
This depends on the csv, (C)StringIO, and codecs modules. | |
Sample usage: | |
reader = ReaderCol() | |
reader.read_filename(in_filename, encoding="utf-8") | |
""" | |
def __init__(self): | |
self._data = {} | |
@property | |
def fieldnames(self): | |
return self._data.keys() | |
def __validate_fieldnames(self, fieldnames): | |
if len(set(fieldnames)) != len(fieldnames): | |
raise ValueError("Fieldnames must be unique") | |
def __validate_row(self, row_data): | |
if not self.row_length: | |
# empty data, good to go | |
return | |
if self.row_length != len(row_data): | |
raise ValueError("row_data is of different length from stored") | |
if sorted(row_data.keys()) != sorted(self.fieldnames): | |
raise ValueError("row_data has different keys from stored") | |
def __validate_col(self, col_data): | |
if not self.col_length in (len(col_data), -1): | |
raise ValueError("col_data is of different length from stored") | |
def read_filename(self, filename, encoding='ascii', fieldnames=None, | |
dialect='excel', *args, **kwargs): | |
""" | |
Effectively, invoke read_file with specified encoding. | |
""" | |
fd = codecs.open(filename, 'r', encoding).read().encode(encoding) | |
s = StringIO() | |
s.write(fd) | |
s.seek(0) | |
self.read_file(s, fieldnames=fieldnames, dialect=dialect, *args, **kwargs) | |
def read_file(self, file, fieldnames=None, dialect='excel', *args, **kwargs): | |
""" | |
Read a file object into the reader instance. All optional arguments are | |
passed to a csv.DictReader. Note that using restkey and restval will | |
produce unknown results and is not recommended. The DictReader does not | |
deal with non-ascii characters properly. If you need to read non-ascii | |
characters, use read_filename, which should handle the specified | |
encoding properly. | |
""" | |
reader = csv.DictReader(file, fieldnames=fieldnames, dialect=dialect, | |
*args, **kwargs) | |
self.__validate_fieldnames(reader.fieldnames) | |
for field in reader.fieldnames: | |
self._data[field] = [] | |
for row in reader: | |
for field in self.fieldnames: | |
self._data[field].append(row[field]) | |
def write_filename(self, filename, fieldnames=None, encoding='ascii', | |
dialect='excel', *args, **kwargs): | |
""" | |
Effectively, invoke write_file on specified filename with specified | |
encoding. | |
""" | |
s = StringIO() | |
self.write_file(s, fieldnames, dialect=dialect, *args, **kwargs) | |
fd = codecs.open(filename, 'wb', encoding) | |
s.seek(0) | |
s = unicode(s.read(), encoding=encoding) | |
fd.write(s) | |
def write_file(self, file, fieldnames=None, dialect='excel', *args, **kwargs): | |
""" | |
Write the current data payload to the file-like object. Arguments are | |
effectively passed to a csv.DictWriter instance. If fieldnames is not | |
specified, it will guess at an order instead of failing like | |
csv.DictWriter. Note that the DictWriter does not deal with non-ascii | |
characters nicely. If you need to open a file with a non-ascii | |
encoding, use write_filename. | |
""" | |
if not fieldnames: | |
fieldnames = sorted(self.fieldnames) | |
elif sorted(fieldnames) != sorted(self.fieldnames): | |
raise ValueError("Fieldnames do not match internal") | |
writer = csv.DictWriter(file, fieldnames) | |
writer.writerow(dict(zip(fieldnames, fieldnames))) | |
for row in self.iter_rows(): | |
writer.writerow(row) | |
def to_string(self, fieldnames=None, dialect='excel', *args, **kwargs): | |
""" | |
Return a string representation of the csv file in its current state. | |
See write_file. | |
""" | |
s = StringIO() | |
self.write_file(s, fieldnames=fieldnames, dialect=dialect, *args, **kwargs) | |
@property | |
def col_length(self): | |
try: | |
return len(self._data[self.fieldnames[0]]) | |
except IndexError: | |
# if no fieldnames, no data, any length for new data | |
return -1 | |
def get_col(self, col_name): | |
"Return the named column" | |
return self._data[col_name] | |
def add_col(self, col_name, col_data, allow_collision=False): | |
""" | |
Add a column with the specified data. Note that the data must be the | |
same length as the previously existing columns. If the column name | |
already exists and allow_collision is not set to True, a ValueError | |
will be raised notifying you of the naming collision. | |
""" | |
self.__validate_col(col_data) | |
if not allow_collision and col_name in self._data: | |
raise ValueError("Fieldname already exists") | |
self._data[col_name] = list(col_data) | |
def replace_col(self, col_name, col_data): | |
""" | |
Like add_col, but allows collisions always | |
""" | |
self.add_col(col_name, col_data, True) | |
def rename_col(self, old_col, new_col): | |
"Rename a column from old_col to new_col" | |
self._data[new_col] = self._data[old_col] | |
del self._data[old_col] | |
def del_col(self, col_name): | |
"Delete column named col_name" | |
del self._data[col_name] | |
def iter_cols(self): | |
""" | |
Returns an iterator that returns each column as a dict with the key | |
being the column name and the value a list with the column data. | |
""" | |
for i in self.fieldnames: | |
yield {i:self.get_col(i)} | |
@property | |
def row_length(self): | |
return len(self.fieldnames) | |
def get_row(self, row_num): | |
""" | |
Return the row as a dict with the keys the column names and the values | |
from the given row index (starting with 0). | |
""" | |
return dict([(i, self._data[i][row_num]) for i in self.fieldnames]) | |
def del_row(self, row_num): | |
"Delete the row with the index specified by row_num" | |
for i in self.fieldnames: | |
del self._data[i][row_num] | |
def append_row(self, row_data): | |
""" | |
add a row of data. row_data must be a dict with keys matching | |
fieldnames | |
""" | |
self.__validate_row(row_data) | |
for i,j in row_data.items(): | |
self._data[i].append(j) | |
def replace_row(self, row_num, row_data): | |
""" | |
replace a row at row index (row_num) with the specified data, | |
row_data must be a dict with keys matching fieldnames | |
""" | |
self.__validate_row(row_data) | |
for i,j in row_data.items(): | |
self._data[i][row_num] = j | |
def insert_row(self, row_num, row_data): | |
""" | |
insert a row before row index (row_num) with the specified data, | |
row_data must be a dict with keys matching fieldnames | |
""" | |
self.__validate_row(row_data) | |
for i,j in row_data.items(): | |
self._data[i].insert(row_num, j) | |
def iter_rows(self): | |
""" | |
returns a generator that returns a row as a dict, with the keys as | |
fieldnames | |
""" | |
for i in xrange(self.col_length): | |
yield self.get_row(i) | |
def edit_cell(self, col_name, row_num, data): | |
""" | |
Change data in column col_name at row index row_num with the 'data' | |
""" | |
self._data[col_name][row_num] = data |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment