Created
February 14, 2019 23:45
-
-
Save sierra073/6981fb1a692f5385d2fcd78c62aef580 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import os | |
import pandas as pd | |
import numpy as np | |
from tqdm import tqdm | |
import concurrent.futures as cf | |
from itertools import zip_longest | |
GITHUB = os.environ.get("GITHUB") | |
facilities = pd.read_csv('../data/raw/facilities.csv') | |
accounts = pd.read_csv('../data/raw/accounts.csv') | |
users = pd.read_csv('../data/raw/users.csv') | |
# rename ParentId in facitilies to FacilityId | |
facilities = facilities.rename(columns={'ParentId': 'FacilityId'}) | |
# rename Id in users to CreatedById (to make merging more clean) | |
users = users.rename(columns={'Id': 'CreatedById'}) | |
# merge the user data and create a Boolean column to represent if the field edit was a mass update or not | |
# (mass updates are generated by Anthony Zirilli, Steven Han, or Developer, so I identified their user Ids) | |
def merge_user_data(df): | |
df = df.merge(users, on='CreatedById', how='left') | |
df['MassUpdated'] = np.where(df['CreatedById'].isin(['005E0000007XSBHIA4', '005E0000007OrqlIAC', '005E0000007QaBkIAK']), True, False) | |
return df | |
facilities = merge_user_data(facilities) | |
accounts = merge_user_data(accounts) | |
# Convert date to a timestamp | |
facilities['CreatedDate'] = pd.to_datetime(facilities['CreatedDate']) | |
accounts['CreatedDate'] = pd.to_datetime(accounts['CreatedDate']) | |
# create a Boolean column to represent if the field edit was a "true change": | |
# if OldValue at first timestamp = NewValue at last timestamp then False, else True | |
# partition by FacilityId, Field order by CreatedDate | |
facilities['EditOrder'] = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].rank(method='first') | |
accounts['EditOrder'] = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].rank(method='first') | |
facilities_uniqs = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].count().reset_index() | |
facilities_uniqs = facilities_uniqs[facilities_uniqs['CreatedDate'] > 1] | |
accounts_uniqs = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].count().reset_index() | |
accounts_uniqs = accounts_uniqs[accounts_uniqs['CreatedDate'] > 1] | |
def get_truechange(Id, Field, table): | |
# returns single boolean value (True or False) representing whether or not the Field for the given Id (facility or account id) has truly changed from the original value | |
# input strings for the Id, Field, and table ('facilities' or 'accounts') | |
if table == 'facilities': | |
idCol = 'FacilityId' | |
df = facilities | |
else: | |
idCol = 'AccountId' | |
df = accounts | |
df_sub = df[(df[idCol] == Id) & (df['Field'] == Field)] | |
orig_val = df_sub[df_sub['EditOrder'] == 1]['OldValue'].values[0] | |
latest_val = df_sub[df_sub['EditOrder'] == df_sub['EditOrder'].max()]['NewValue'].values[0] | |
return (orig_val != latest_val) | |
get_true_change = np.vectorize(get_truechange) | |
#facilities['TrueChange'] = get_true_change(facilities_uniqs['FacilityId'].values, facilities_uniqs['Field'].values, 'facilities') | |
def tqdm_parallel_map(executor, fn, *iterables, **kwargs): | |
""" | |
Equivalent to executor.map(fn, *iterables), | |
but displays a tqdm-based progress bar. | |
Does not support timeout or chunksize as executor.submit is used internally | |
**kwargs is passed to tqdm. | |
""" | |
futures_list = [] | |
args = iterables[0] | |
final_args = [dict(zip(args.keys(), values)) for values in zip_longest(*args.values())] | |
for arg in final_args: | |
futures_list += [executor.submit(fn, arg['Id'], arg['Field'], arg['table'])] | |
for f in tqdm(cf.as_completed(futures_list), total=len(futures_list), **kwargs): | |
pass | |
return futures_list | |
print("Running facilities...") | |
facilities_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': facilities_uniqs['FacilityId'].values, 'Field': facilities_uniqs['Field'].values, 'table': ['facilities'] * len(facilities_uniqs['Field'].values)}) | |
facilities_final = facilities.merge(facilities_uniqs, on=['FacilityId','Field'], how='left') | |
print(facilities_final.head()) | |
facilities_final.to_csv('../data/interim/facilities.csv',index=False) | |
print("Running accounts...") | |
accounts_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': accounts_uniqs['AccountId'].values, 'Field': accounts_uniqs['Field'].values, 'table': ['accounts'] * len(accounts_uniqs['Field'].values)}) | |
accounts_final = accounts.merge(accounts_uniqs, on=['AccountId','Field'], how='left') | |
accounts_final.to_csv('../data/interim/accounts.csv',index=False) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment