Skip to content

Instantly share code, notes, and snippets.

@magdiel01
Last active August 29, 2015 14:06
Show Gist options
  • Save magdiel01/75006cab1c4416c62999 to your computer and use it in GitHub Desktop.
Save magdiel01/75006cab1c4416c62999 to your computer and use it in GitHub Desktop.
import csv, datetime
def _headers_count(csv_reader):
return len(list(csv_reader)[0])
def _get_headers(csv_reader):
return list(csv_reader)[0]
def _records_count(csv_reader):
return len(list(csv_reader))
def _header_is_repeated(csv_reader):
# print 'csv_reader: ', csv_reader
return len(list(csv_reader)[0]) != len(set(list(csv_reader)[0]))
def _identifier_is_repeated(csv_reader):
identifiers = [e[0] for e in list(csv_reader)[1:]]
return len(identifiers) != len( set(identifiers) )
def parse(csv_reader):
return list(csv_reader)
def compare_csv(file1, file2, generated_file_path):
csv_file1 = file1
csv_file2 = file2
generated_file_name = 'results-{timestamp}.csv'.format( timestamp=str(datetime.datetime.utcnow()) )
generated_file = open(generated_file_path + '/' +generated_file_name, 'wb')
writer = csv.writer(generated_file, delimiter=',',
quotechar='|', quoting=csv.QUOTE_MINIMAL)
reader1 = csv.reader(csv_file1, delimiter=',', quotechar='|')
reader2 = csv.reader(csv_file2, delimiter=',', quotechar='|')
csv_content1 = parse(reader1)
csv_content2 = parse(reader2)
#---------------
# Run Tests
#---------------
errors_count = 0
# Checks if each header name is unique
if _header_is_repeated(csv_content1) or _header_is_repeated(csv_content2):
errors_count+= 1
# Checks if a identifier is repeated at column1
if _identifier_is_repeated(csv_content1) or _identifier_is_repeated(csv_content2):
errors_count+= 1
# Checks if Count of Rows in CSV1 match count of rows in CSV2
if _records_count(csv_content1) != _records_count(csv_content2):
errors_count+= 1
if errors_count == 0:
result_list = []
# Set headers
result_list.append( [(_get_headers(csv_content1)[0])] )
for item in csv_content1[1:]:
result = []
item_id = item[0]
result = [e for e in csv_content2[1:] if e[0] == item_id]
if result:
result_list.append(result[0][0:1])
writer.writerows(result_list)
return {
'data': result_list,
'file': generated_file_name,
'errors': None
}
else:
print 'errors_count: ', errors_count
return False
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment