Created
December 25, 2014 23:48
-
-
Save aok1425/126cc9c6f96c56960633 to your computer and use it in GitHub Desktop.
Excel from Rima to Access for BPHC
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# filenames MUST start w COD or DOR, for MCMOnlyUnit() | |
import pandas as pd | |
import pypyodbc | |
import os | |
from datetime import datetime | |
from math import isnan | |
from shutil import copy as copy_file | |
from numpy import float64 | |
log_file = [] | |
filename1 = "../e2Boston_RsrPlus-empty.mdb" | |
time = datetime.now().strftime("%Y%m%d_%H%M") | |
filename2 = "../utilization_for_E2Boston" + time + ".mdb" | |
def append_text(text): | |
"""Only 'write' to log_file if it's not the same as the last element there.""" | |
if log_file == []: | |
print text | |
log_file.append(text) | |
else: | |
if text != log_file[-1]: | |
print text | |
log_file.append(text) | |
def write_log_file(): | |
with open('../log' + time + '.txt', 'a') as the_file: | |
for line in log_file: | |
the_file.write(line + '\n') | |
try: | |
copy_file(filename1, filename2) | |
except IOError: | |
append_text("Program can't find e2Boston_RsrPlus-empty.mdb. This file should be in the folder 'above' the program folder.") | |
write_log_file() | |
db = pypyodbc.win_connect_mdb(filename2) | |
c = db.cursor() | |
def transform_MRN(x): | |
"""For .map(). Adds 0s when inputted an MRN as a float converted to a str. | |
Assumes that all MRNs will start w 0s. | |
A hack bc .read_excel() in Pandas 0.15 doesn't have dtype argument like .read_csv().""" | |
if len(x) == 6: | |
return '0000' + x[:4] | |
elif len(x) == 7: | |
return '000' + x[:5] | |
elif len(x) == 8: | |
return '00' + x[:6] | |
elif len(x) == 9: | |
return '0' + x[:7] | |
else: | |
raise Exception('Something wrong w MRN') | |
def make_initials(row_num, df): | |
pre_join = [df.FirstInitial[row_num], df.FirstInitial3[row_num], df.LastInitial[row_num], df.LastInitial3[row_num]] | |
for i in range(len(pre_join)): | |
if type(pre_join[i]) == float: | |
if isnan(pre_join[i]): | |
pre_join[i] = ' ' | |
return ''.join(pre_join) | |
def convert_nan_to_none(a_tuple): | |
"""For ClientReportService""" | |
temp_list = list(a_tuple) | |
for i in range(len(temp_list)): | |
if type(temp_list[i]) == float64 or type(temp_list[i]) == float: | |
if isnan(temp_list[i]): | |
temp_list[i] = None | |
return tuple(temp_list) | |
def check_blank_SSN(df): | |
"""Input a DataFrame, and find the MRNs for pts w blank SSNs.""" | |
indexes_of_blank_SSNs = df.Last4SSN[df.Last4SSN.notnull() == False].index | |
if len(indexes_of_blank_SSNs) == 0: | |
text = 'Checking for SSN errors: No errors.' | |
append_text(text) | |
else: | |
for row_num in indexes_of_blank_SSNs: | |
initials = make_initials(row_num, df) | |
df.loc[row_num, 'Last4SSN'] = '9999' | |
text = "SSN for {}/{} is blank. Changed it to '9999' for Access. Please update SSN in CPS.".format(initials, df.MRN[row_num]) | |
append_text(text) | |
def check_MCMOnlyUnit(df, filename): | |
if filename == 'COD': | |
indexes = df.MCMOnlyUnit[df.MCMOnlyUnit != 1].index | |
column = 'MCMOnlyUnit' | |
elif filename == 'DOR': | |
indexes = df.MCMwithTransOnlyUnit[df.MCMwithTransOnlyUnit != 1].index | |
column = 'MCMwithTransOnlyUnit' | |
if len(indexes) == 0: | |
text = 'Checking for MCMOnlyUnit/MCMwithTransOnlyUnit errors: No errors.'.format(column) | |
append_text(text) | |
else: | |
for row_num in indexes: | |
text = "{} for {} is not 1. Changing it to 1.".format(column, df.MRN[row_num]) | |
df.loc[row_num, column] = 1 | |
append_text(text) | |
def write_to_ClientReportService( df, filename): | |
"""Input a DataFrame, and write to the empty Access template in the same folder.""" | |
if filename == 'DOR': | |
for i in df.index: | |
entry = df.MRN[i], df.ServiceDate[i], df.ServiceTime[i], df.Employee[i], df.ServiceID[i], df.SubserviceID[i], df.GrantID[i], df.DrugAssistanceDateReimburse[i], df.DrugAssistanceUnit[i], df.FoodUnit[i], df.NutritionalAssessment[i], df.NutritionalCounselingTime[i], df.MedicalNutritionalTherapyUnit[i], df.MedicalNutritionalTherapyAssessment[i], df.MedicalNutritionalTherapyCounselingTime[i], df.RentalAssistanceUnit[i], df.UtilityAssistanceUnit[i], df.HousingAdvocacyUnit[i], df.HousingAdvocacyPlacementUnit[i], df.MCMUnitTime[i], df.MCMOnlyUnit[i], df.MCMwithTransUnitTime[i], df.MCMwithTransOnlyUnit[i] | |
entry = convert_nan_to_none(entry) | |
c.execute('''INSERT INTO ClientReportService(ClientReportID, ServiceDate, ServiceTime, Employee, ServiceID, SubserviceID, GrantID, DrugAssistanceDateReimbursed, DrugAssistanceUnit, FoodUnit, NutritionalAssessment, NutritionalCounselingTime, MedicalNutritionalTherapyUnit, MedicalNutritionalTherapyAssessment, MedicalNutritionalTherapyCounselingTime, RentalAssistanceUnit, UtilityAssistanceUnit, HousingAdvocacyUnit, HousingAdvocacyPlacementUnit, MCMUnitTime, MCMOnlyUnit, MCMwithTransUnitTime, MCMwithTransOnlyUnit) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', entry) | |
elif filename == 'COD': | |
for i in df.index: | |
entry = df.MRN[i], df.ServiceDate[i], df.ServiceTime[i], df.Employee[i], df.ServiceID[i], df.SubserviceID[i], df.GrantID[i], df.DrugAssistanceDateReimburse[i], df.DrugAssistanceUnit[i], df.FoodUnit[i], df.NutritionalAssessment[i], df.NutritionalCounselingTime[i], df.MedicalNutritionalTherapyUnit[i], df.MedicalNutritionalTherapyAssessment[i], df.MedicalNutritionalTherapyCounselingTime[i], df.RentalAssistanceUnit[i], df.UtilityAssistanceUnit[i], df.HousingAdvocacyUnit[i], df.HousingAdvocacyPlacementUnit[i], df.MCMUnitTime[i], df.MCMOnlyUnit[i] | |
entry = convert_nan_to_none(entry) | |
c.execute('''INSERT INTO ClientReportService(ClientReportID, ServiceDate, ServiceTime, Employee, ServiceID, SubserviceID, GrantID, DrugAssistanceDateReimbursed, DrugAssistanceUnit, FoodUnit, NutritionalAssessment, NutritionalCounselingTime, MedicalNutritionalTherapyUnit, MedicalNutritionalTherapyAssessment, MedicalNutritionalTherapyCounselingTime, RentalAssistanceUnit, UtilityAssistanceUnit, HousingAdvocacyUnit, HousingAdvocacyPlacementUnit, MCMUnitTime, MCMOnlyUnit) VALUES(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)''', entry) | |
c.commit() | |
append_text('Writing to ClientReportService table: OK.') | |
def write_to_ClientReport(df): | |
"""Input a DataFrame, and write to the empty Access template in the same folder.""" | |
df = df.ix[:,'MRN':'GenderID'].drop_duplicates() | |
for i in df.index: | |
entry = df.MRN[i], df.FirstInitial[i], df.FirstInitial3[i], df.LastInitial[i], df.LastInitial3[i], df.BirthDate[i], df.Last4SSN[i], df.MothersFirstName[i], df.BirthGenderID[i], df.GenderID[i] | |
entry = convert_nan_to_none(entry) | |
c.execute('''INSERT INTO ClientReport(ID, FirstInitial, FirstInitial3, LastInitial, LastInitial3, BirthDate, Last4SSN, MothersFirstName, BirthGenderID, GenderID) VALUES(?,?,?,?,?,?,?,?,?,?)''', entry) | |
c.commit() | |
append_text('Writing to ClientReport table: OK.') | |
class ValidationCheckerExcelToAccess(object): | |
"""Takes in XLS, does various validation checks, and writes to new Access file.""" | |
def __init__(self, filename): | |
self.header_rows = 3 # num of rows on top of file until you get to the column names | |
self.df = pd.io.excel.read_excel(filename, header=self.header_rows) | |
self.filename = filename | |
def preprocess(self): | |
"""Takes out header and footer, and transforms type in MRN column.""" | |
self.df = self.df.dropna(how='all') # trying to delete the tail blank rows | |
if 'Page -1 of 1' in self.df.tail(1).values[0]: # deleting last row, which in all my tests contains 'Page -1 of 1' | |
self.df = self.df.ix[:self.df.shape[0] - 2] | |
self.df.MRN = self.df.MRN.astype(str).map(transform_MRN) | |
def check_moms_name(self, df): | |
"""Input the DataFrame, and find those rows for which the mom's name contains a number. For those rows, write to file.""" | |
names_of_moms = df.MothersFirstName.map(lambda x: ('0' in x) or ('1' in x) or ('2' in x) or ('3' in x) or ('4' in x) or ('5' in x) or ('6' in x) or ('7' in x) or ('8' in x) or ('9' in x)) | |
indexes_of_names_of_moms = names_of_moms[names_of_moms].index # filtering those which are True | |
if len(indexes_of_names_of_moms) == 0: | |
text = "Checking for errors in moms' names: No errors." | |
append_text(text) | |
else: | |
for row_num in indexes_of_names_of_moms: | |
initials = make_initials(row_num, df) | |
text = "Mom's name for {}/{} contains number: {}. Please update CPS and Excel file for {} row #{}.".format(initials, df.MRN[row_num], df.MothersFirstName[row_num], self.filename[3:6], row_num + self.header_rows + 2) | |
append_text(text) | |
write_log_file() | |
raise Exception(text) | |
def run(self): | |
self.preprocess() | |
# self.df = self.df.head(2) # for testing | |
check_MCMOnlyUnit(self.df, self.filename[3:6]) | |
check_blank_SSN(self.df) | |
self.check_moms_name(self.df) | |
write_to_ClientReport(self.df) | |
write_to_ClientReportService(self.df, self.filename[3:6]) | |
if __name__ == '__main__': | |
for file_ in os.listdir('..'): | |
if 'COD Service Utilization Report' in file_: | |
append_text('For Codman: \n') | |
ValidationCheckerExcelToAccess('../' + file_).run() | |
elif 'DOR Service Utilization Report' in file_: | |
append_text('\nFor Dorchester House: \n') | |
ValidationCheckerExcelToAccess('../' + file_).run() | |
db.close() | |
write_log_file() | |
raw_input("\nDone! Press any key to exit.") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment