Skip to content

Instantly share code, notes, and snippets.

@aok1425
Created December 25, 2014 23:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save aok1425/126cc9c6f96c56960633 to your computer and use it in GitHub Desktop.
Save aok1425/126cc9c6f96c56960633 to your computer and use it in GitHub Desktop.
Excel from Rima to Access for BPHC
# filenames MUST start w COD or DOR, for MCMOnlyUnit()
import pandas as pd
import pypyodbc
import os
from datetime import datetime
from math import isnan
from shutil import copy as copy_file
from numpy import float64
log_file = []
filename1 = "../e2Boston_RsrPlus-empty.mdb"
time = datetime.now().strftime("%Y%m%d_%H%M")
filename2 = "../utilization_for_E2Boston" + time + ".mdb"
def append_text(text):
"""Only 'write' to log_file if it's not the same as the last element there."""
if log_file == []:
print text
log_file.append(text)
else:
if text != log_file[-1]:
print text
log_file.append(text)
def write_log_file():
with open('../log' + time + '.txt', 'a') as the_file:
for line in log_file:
the_file.write(line + '\n')
try:
copy_file(filename1, filename2)
except IOError:
append_text("Program can't find e2Boston_RsrPlus-empty.mdb. This file should be in the folder 'above' the program folder.")
write_log_file()
db = pypyodbc.win_connect_mdb(filename2)
c = db.cursor()
def transform_MRN(x):
"""For .map(). Adds 0s when inputted an MRN as a float converted to a str.
Assumes that all MRNs will start w 0s.
A hack bc .read_excel() in Pandas 0.15 doesn't have dtype argument like .read_csv()."""
if len(x) == 6:
return '0000' + x[:4]
elif len(x) == 7:
return '000' + x[:5]
elif len(x) == 8:
return '00' + x[:6]
elif len(x) == 9:
return '0' + x[:7]
else:
raise Exception('Something wrong w MRN')
def make_initials(row_num, df):
pre_join = [df.FirstInitial[row_num], df.FirstInitial3[row_num], df.LastInitial[row_num], df.LastInitial3[row_num]]
for i in range(len(pre_join)):
if type(pre_join[i]) == float:
if isnan(pre_join[i]):
pre_join[i] = ' '
return ''.join(pre_join)
def convert_nan_to_none(a_tuple):
"""For ClientReportService"""
temp_list = list(a_tuple)
for i in range(len(temp_list)):
if type(temp_list[i]) == float64 or type(temp_list[i]) == float:
if isnan(temp_list[i]):
temp_list[i] = None
return tuple(temp_list)
def check_blank_SSN(df):
"""Input a DataFrame, and find the MRNs for pts w blank SSNs."""
indexes_of_blank_SSNs = df.Last4SSN[df.Last4SSN.notnull() == False].index
if len(indexes_of_blank_SSNs) == 0:
text = 'Checking for SSN errors: No errors.'
append_text(text)
else:
for row_num in indexes_of_blank_SSNs:
initials = make_initials(row_num, df)
df.loc[row_num, 'Last4SSN'] = '9999'
text = "SSN for {}/{} is blank. Changed it to '9999' for Access. Please update SSN in CPS.".format(initials, df.MRN[row_num])
append_text(text)
def check_MCMOnlyUnit(df, filename):
if filename == 'COD':
indexes = df.MCMOnlyUnit[df.MCMOnlyUnit != 1].index
column = 'MCMOnlyUnit'
elif filename == 'DOR':
indexes = df.MCMwithTransOnlyUnit[df.MCMwithTransOnlyUnit != 1].index
column = 'MCMwithTransOnlyUnit'
if len(indexes) == 0:
text = 'Checking for MCMOnlyUnit/MCMwithTransOnlyUnit errors: No errors.'.format(column)
append_text(text)
else:
for row_num in indexes:
text = "{} for {} is not 1. Changing it to 1.".format(column, df.MRN[row_num])
df.loc[row_num, column] = 1
append_text(text)
def write_to_ClientReportService( df, filename):
"""Input a DataFrame, and write to the empty Access template in the same folder."""
if filename == 'DOR':
for i in df.index:
entry = df.MRN[i], df.ServiceDate[i], df.ServiceTime[i], df.Employee[i], df.ServiceID[i], df.SubserviceID[i], df.GrantID[i], df.DrugAssistanceDateReimburse[i], df.DrugAssistanceUnit[i], df.FoodUnit[i], df.NutritionalAssessment[i], df.NutritionalCounselingTime[i], df.MedicalNutritionalTherapyUnit[i], df.MedicalNutritionalTherapyAssessment[i], df.MedicalNutritionalTherapyCounselingTime[i], df.RentalAssistanceUnit[i], df.UtilityAssistanceUnit[i], df.HousingAdvocacyUnit[i], df.HousingAdvocacyPlacementUnit[i], df.MCMUnitTime[i], df.MCMOnlyUnit[i], df.MCMwithTransUnitTime[i], df.MCMwithTransOnlyUnit[i]
entry = convert_nan_to_none(entry)
c.execute('''INSERT INTO ClientReportService(ClientReportID, ServiceDate, ServiceTime, Employee, ServiceID, SubserviceID, GrantID, DrugAssistanceDateReimbursed, DrugAssistanceUnit, FoodUnit, NutritionalAssessment, NutritionalCounselingTime, MedicalNutritionalTherapyUnit, MedicalNutritionalTherapyAssessment, MedicalNutritionalTherapyCounselingTime, RentalAssistanceUnit, UtilityAssistanceUnit, HousingAdvocacyUnit, HousingAdvocacyPlacementUnit, MCMUnitTime, MCMOnlyUnit, MCMwithTransUnitTime, MCMwithTransOnlyUnit) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', entry)
elif filename == 'COD':
for i in df.index:
entry = df.MRN[i], df.ServiceDate[i], df.ServiceTime[i], df.Employee[i], df.ServiceID[i], df.SubserviceID[i], df.GrantID[i], df.DrugAssistanceDateReimburse[i], df.DrugAssistanceUnit[i], df.FoodUnit[i], df.NutritionalAssessment[i], df.NutritionalCounselingTime[i], df.MedicalNutritionalTherapyUnit[i], df.MedicalNutritionalTherapyAssessment[i], df.MedicalNutritionalTherapyCounselingTime[i], df.RentalAssistanceUnit[i], df.UtilityAssistanceUnit[i], df.HousingAdvocacyUnit[i], df.HousingAdvocacyPlacementUnit[i], df.MCMUnitTime[i], df.MCMOnlyUnit[i]
entry = convert_nan_to_none(entry)
c.execute('''INSERT INTO ClientReportService(ClientReportID, ServiceDate, ServiceTime, Employee, ServiceID, SubserviceID, GrantID, DrugAssistanceDateReimbursed, DrugAssistanceUnit, FoodUnit, NutritionalAssessment, NutritionalCounselingTime, MedicalNutritionalTherapyUnit, MedicalNutritionalTherapyAssessment, MedicalNutritionalTherapyCounselingTime, RentalAssistanceUnit, UtilityAssistanceUnit, HousingAdvocacyUnit, HousingAdvocacyPlacementUnit, MCMUnitTime, MCMOnlyUnit) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', entry)
c.commit()
append_text('Writing to ClientReportService table: OK.')
def write_to_ClientReport(df):
"""Input a DataFrame, and write to the empty Access template in the same folder."""
df = df.ix[:,'MRN':'GenderID'].drop_duplicates()
for i in df.index:
entry = df.MRN[i], df.FirstInitial[i], df.FirstInitial3[i], df.LastInitial[i], df.LastInitial3[i], df.BirthDate[i], df.Last4SSN[i], df.MothersFirstName[i], df.BirthGenderID[i], df.GenderID[i]
entry = convert_nan_to_none(entry)
c.execute('''INSERT INTO ClientReport(ID, FirstInitial, FirstInitial3, LastInitial, LastInitial3, BirthDate, Last4SSN, MothersFirstName, BirthGenderID, GenderID) VALUES(?,?,?,?,?,?,?,?,?,?)''', entry)
c.commit()
append_text('Writing to ClientReport table: OK.')
class ValidationCheckerExcelToAccess(object):
"""Takes in XLS, does various validation checks, and writes to new Access file."""
def __init__(self, filename):
self.header_rows = 3 # num of rows on top of file until you get to the column names
self.df = pd.io.excel.read_excel(filename, header=self.header_rows)
self.filename = filename
def preprocess(self):
"""Takes out header and footer, and transforms type in MRN column."""
self.df = self.df.dropna(how='all') # trying to delete the tail blank rows
if 'Page -1 of 1' in self.df.tail(1).values[0]: # deleting last row, which in all my tests contains 'Page -1 of 1'
self.df = self.df.ix[:self.df.shape[0] - 2]
self.df.MRN = self.df.MRN.astype(str).map(transform_MRN)
def check_moms_name(self, df):
"""Input the DataFrame, and find those rows for which the mom's name contains a number. For those rows, write to file."""
names_of_moms = df.MothersFirstName.map(lambda x: ('0' in x) or ('1' in x) or ('2' in x) or ('3' in x) or ('4' in x) or ('5' in x) or ('6' in x) or ('7' in x) or ('8' in x) or ('9' in x))
indexes_of_names_of_moms = names_of_moms[names_of_moms].index # filtering those which are True
if len(indexes_of_names_of_moms) == 0:
text = "Checking for errors in moms' names: No errors."
append_text(text)
else:
for row_num in indexes_of_names_of_moms:
initials = make_initials(row_num, df)
text = "Mom's name for {}/{} contains number: {}. Please update CPS and Excel file for {} row #{}.".format(initials, df.MRN[row_num], df.MothersFirstName[row_num], self.filename[3:6], row_num + self.header_rows + 2)
append_text(text)
write_log_file()
raise Exception(text)
def run(self):
self.preprocess()
# self.df = self.df.head(2) # for testing
check_MCMOnlyUnit(self.df, self.filename[3:6])
check_blank_SSN(self.df)
self.check_moms_name(self.df)
write_to_ClientReport(self.df)
write_to_ClientReportService(self.df, self.filename[3:6])
if __name__ == '__main__':
for file_ in os.listdir('..'):
if 'COD Service Utilization Report' in file_:
append_text('For Codman: \n')
ValidationCheckerExcelToAccess('../' + file_).run()
elif 'DOR Service Utilization Report' in file_:
append_text('\nFor Dorchester House: \n')
ValidationCheckerExcelToAccess('../' + file_).run()
db.close()
write_log_file()
raw_input("\nDone! Press any key to exit.")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment