Skip to content

Instantly share code, notes, and snippets.

@vaibhavmule
Created November 9, 2017 16:01
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vaibhavmule/e650a83b125ca8e006f0e0e1f0bb1aec to your computer and use it in GitHub Desktop.
Save vaibhavmule/e650a83b125ca8e006f0e0e1f0bb1aec to your computer and use it in GitHub Desktop.
"""
Develop an ETL system that will Extract data from a CSV file,
apply some transformation and output the transformed
data to another CSV.
ETL stands for Extract Transform and Load - this system should be divided into these modules:
Extract module - should be able to retrieve data from a CSV file
Transform module - should accept input stream from Extract module and
for each column adds an additional column
indicating the data type of the column
Load module - should load this transformed data to another CSV file
Expected output CSV:
Name, Name datatype, Age, Age datatype, Join Date, Join Date datatype, Bank Balance,
Bank Balance datatype
Ramesh, str, 32, int, 2017-07-21, date, 26790.60, float
Ashok, str, 37, int, 2017-01-19, date, 132870.48, float
"""
import csv
import datetime
def extract(filename):
if '.csv' in filename:
with open(filename) as f:
reader = csv.reader(f)
for row in reader:
yield row
else:
raise Exception('Please use correct file!')
def transform(extracted_data):
first_row = []
for column in next(extracted_data):
first_row.append(column.strip())
first_row.append(
'{} datatype'.format(column.strip()))
yield first_row
row_to_yield = []
for row in extracted_data:
for column in row:
striped_column = column.strip()
row_to_yield.append(striped_column)
row_to_yield.append(get_type(striped_column))
yield row_to_yield
row_to_yield = []
def get_type(column):
if is_integer(column):
return 'int'
elif is_float(column):
return 'float'
elif is_date(column):
return 'date'
else:
return 'str'
def is_integer(s):
try:
return int(s)
except ValueError:
return False
def is_float(s):
try:
return float(s)
except ValueError:
return False
def is_date(s):
try:
splited = s.split('-')
return datetime.date(
year=int(splited[0]),
month=int(splited[1]),
day=int(splited[2]))
except Exception:
return False
def load_to_csv(transformed_data, out_filename):
with open(out_filename, 'w+') as f:
writer = csv.writer(f)
for data in transformed_data:
writer.writerow(data)
def main():
while True:
filename = input('Enter file name: ')
if '.csv' in filename:
break
else:
print('Please correct extension of the file')
while True:
output_filename = input('Enter output file name: ')
if '.csv' in output_filename:
break
else:
print('Please correct extension of the file')
extracted_data = extract(filename)
transformed_data = transform(extracted_data)
load_to_csv(transformed_data, output_filename)
print("Exracted and transformed data to", output_filename)
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment