Navigation Menu

Skip to content

Instantly share code, notes, and snippets.

@mattkatz
Created June 3, 2016 16:54
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save mattkatz/a10b79768a3e090caf2da42628910668 to your computer and use it in GitHub Desktop.
Save mattkatz/a10b79768a3e090caf2da42628910668 to your computer and use it in GitHub Desktop.
PETL example
from pathlib import WindowsPath
import petl as etl
class DataMasseuse:
data_files = WindowsPath("s:\files\QTRLY");
new_data_files = WindowsPath("s:\files\QTRLY\cleaned")
final_file = new_data_files / "final_file.csv"
def main(self):
#let's start fresh. Delete the previous result
if self.final_file.exists(): self.final_file.unlink()
self.clean_data_files(self.data_files)
self.process_clean_data_files(self.new_data_files)
#let's go through every file in the data_files directory
def clean_data_files(self,data_path):
for file_path in data_path.iterdir():
if file_path.is_file(): #ignore directories and such
self.clean_data_file(file_path, self.new_data_files)
# first, we'll go through each file and strip the top line
def clean_data_file(self,file_path, new_files_path):
#open the file and remove the first line, then write out to new path
with file_path.open() as dirty_file, (new_files_path / file_path.name).open("w") as clean_file:
#skip the first line
dirty_file.readline()
for line in dirty_file:
clean_file.write(line)
def process_clean_data_files(self,files_path):
for file_path in files_path.iterdir():
self.process_clean_data_file(file_path)
# for a file we need to use a defined set of headers, then add columns for quarter and year.
def process_clean_data_file(self,file_path):
quarter = self.get_quarter(file_path.name)
year = self.get_year(file_path.name)
table = (
etl
.fromcsv(file_path) #let's treat this file as a table
#.cutout('') #remove any blank column name - this happens when someone saves a csv after clicking outside the original fields
.addfield("Quarter", quarter)
.addfield("Year", year)
)
if(self.final_file.exists()):
table.appendcsv(str(self.final_file))
else:
table.tocsv(str(self.final_file))
def get_quarter(self,file_name):
return file_name.split("_")[0];
def get_year(self,file_name):
return file_name.split("_")[1];
if __name__ == '__main__':
#if we call the file directly, just process
dm = DataMasseuse()
dm.main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment