Skip to content

Instantly share code, notes, and snippets.

@jalbertbowden
Last active February 9, 2021 01:57
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jalbertbowden/882a50c92ca2ea7530195c28c380d7da to your computer and use it in GitHub Desktop.
Save jalbertbowden/882a50c92ca2ea7530195c28c380d7da to your computer and use it in GitHub Desktop.
COVID-19 Virginia LTCF Dataops - Python script for cleaning up Virginia LTCF COVID-19 data.
from csv import DictReader, DictWriter
import csv
file_name_base = 'vdh-covid-19-public-use-dataset-outbreaks-in-selected-exposure-settings-'
file_date = '2021-01-29'
file_row_report_date = '01/29/2021'
file_format = '.csv'
file_dataset = '-ltcf'
file_step_1 = '-fixed'
file_step_2 = '-updated'
file_csv_1 = file_name_base + file_date + file_format
file_csv_2 = file_name_base + file_date + file_dataset + file_step_1 + file_format
csv_file_step_3 = file_name_base + file_date + file_dataset + file_step_1 + file_step_2
file_csv_3 = file_name_base + file_date + file_dataset + file_step_1 + file_step_2 + file_format
new_csv = {}
def rewrite_asterisks():
with open(file_csv_1, 'r') as f:
csv_dict_reader = DictReader(f)
rows = list(csv_dict_reader)
for row in rows:
print(row['Cases'], row['Deaths'], row['Active'])
for key in ('Cases', 'Deaths', 'Active'):
if row[key] == '*':
row[key] = 1
with open(file_csv_2, 'w') as o:
csvwriter = DictWriter(o, fieldnames=rows[0].keys())
csvwriter.writeheader()
csvwriter.writerows(rows)
rewrite_asterisks()
# step 2
headers_fixed = ['Locality', 'Facility', 'Facility Type', 'Status', 'Date VDH Notified','FIPS', 'Report Date', 'Active', 'Cases', 'Deaths']
new_rows = []
csv_input = file_csv_2
facility_types = ['Assisted Living','Multicare','Nursing Home']
with open(csv_input) as fin:
csvin = csv.DictReader(fin)
outputs = {}
for row in csvin:
if row['Facility Type'] in facility_types:
print(row['Report Date'])
if row['Report Date'] == file_row_report_date:
print('matchmaker')
print(row)
date_intl_report = row['Report Date'].split('/')
date_intl_notified = row['Date VDH Notified'].split('/')
date_pretty_report = date_intl_report[2] + '-' + date_intl_report[0] + '-' + date_intl_report[1]
date_pretty_notified = date_intl_notified[2] + '-' + date_intl_notified[0] + '-' + date_intl_notified[1]
new_row = {
"Locality":row['Locality'],
"Facility":row['Facility'],
"Facility Type":row['Facility Type'],
"Status":row['Status'],
"Date VDH Notified":date_pretty_notified,
"FIPS":row['FIPS'],
"Report Date":date_pretty_report,
"Active":row['Active'],
"Cases":row['Cases'],
"Deaths":row['Deaths']
}
new_rows.append(new_row)
def write_csv_data(new_rows, headers_fixed, csv_input):
csv_file = csv_input.split('.')
csv_output = csv_file[0] + '-updated.csv'
with open(csv_output, 'w') as csvfile:
csv_writer = csv.DictWriter(csvfile, fieldnames=headers_fixed)
csv_writer.writeheader()
for row_dict in new_rows:
csv_writer.writerow(row_dict)
write_csv_data(new_rows, headers_fixed, csv_input)
# step 3: split facilities by type
def split_csv_by_column():
csv_file_step_3_name = csv_file_step_3 + '.csv'
with open(csv_file_step_3_name) as fin:
csvin = csv.DictReader(fin)
outputs = {}
for row in csvin:
cat = row['Facility Type']
cat_formatted = cat.replace(' ', '-')
cat_formatted_new = cat_formatted.replace('/', '')
cat_formatted_x = cat_formatted_new.replace('--', '-')
cat_formatted_xx = cat_formatted_x.replace('_', '-')
cat_formatted_newer = cat_formatted_xx.lower()
if cat not in outputs:
fout = open(csv_file_step_3 + '-{}.csv'.format(cat_formatted_newer), 'w')
dw = csv.DictWriter(fout, fieldnames=csvin.fieldnames)
dw.writeheader()
outputs[cat] = fout, dw
outputs[cat][1].writerow(row)
for fout, _ in outputs.values():
fout.close()
split_csv_by_column()
# step 4: add facility count column to all four csvs
headers_fixed_counted = ['Facility Number', 'Locality', 'Facility', 'Facility Type', 'Status', 'Date VDH Notified','FIPS', 'Report Date', 'Active', 'Cases', 'Deaths']
def csv_add_column():
csv_name_vars = ['-assisted-living.csv', '-multicare.csv', '-nursing-home.csv', '.csv']
for name_csv in csv_name_vars:
this_csv = csv_file_step_3 + name_csv
with open(this_csv) as fin:
csvin = csv.DictReader(fin)
new_csv_rows = []
row_count = 1
for row in csvin:
new_csv_row = {
"Facility Number":row_count,
"Locality":row['Locality'],
"Facility":row['Facility'],
"Facility Type":row['Facility Type'],
"Status":row['Status'],
"Date VDH Notified":row['Date VDH Notified'],
"FIPS":row['FIPS'],
"Report Date":row['Report Date'],
"Active":row['Active'],
"Cases":row['Cases'],
"Deaths":row['Deaths']
}
row_count = row_count + 1
new_csv_rows.append(new_csv_row)
write_csv_data(new_csv_rows, headers_fixed_counted, this_csv)
csv_add_column()
@jalbertbowden
Copy link
Author

To run, open up this script and edit these two vars: file_date, and file_row_report_date to the date(s) of the dataset that needs cleaning. file_date is the date the data was published, file_row_report_date is the date you want the script to output. in this case, its the same, because we want the latest published version, and the latest available date within it.

Sincere apologies for the difference in date formatting, the non-international date format in file_row_report_date is because its matching off the format from within the data.
That's not a legit excuse, but it works.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment