Skip to content

Instantly share code, notes, and snippets.

@sierra073
Created February 14, 2019 23:45
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save sierra073/6981fb1a692f5385d2fcd78c62aef580 to your computer and use it in GitHub Desktop.
Save sierra073/6981fb1a692f5385d2fcd78c62aef580 to your computer and use it in GitHub Desktop.
import os
import pandas as pd
import numpy as np
from tqdm import tqdm
import concurrent.futures as cf
from itertools import zip_longest
GITHUB = os.environ.get("GITHUB")
facilities = pd.read_csv('../data/raw/facilities.csv')
accounts = pd.read_csv('../data/raw/accounts.csv')
users = pd.read_csv('../data/raw/users.csv')
# rename ParentId in facitilies to FacilityId
facilities = facilities.rename(columns={'ParentId': 'FacilityId'})
# rename Id in users to CreatedById (to make merging more clean)
users = users.rename(columns={'Id': 'CreatedById'})
# merge the user data and create a Boolean column to represent if the field edit was a mass update or not
# (mass updates are generated by Anthony Zirilli, Steven Han, or Developer, so I identified their user Ids)
def merge_user_data(df):
df = df.merge(users, on='CreatedById', how='left')
df['MassUpdated'] = np.where(df['CreatedById'].isin(['005E0000007XSBHIA4', '005E0000007OrqlIAC', '005E0000007QaBkIAK']), True, False)
return df
facilities = merge_user_data(facilities)
accounts = merge_user_data(accounts)
# Convert date to a timestamp
facilities['CreatedDate'] = pd.to_datetime(facilities['CreatedDate'])
accounts['CreatedDate'] = pd.to_datetime(accounts['CreatedDate'])
# create a Boolean column to represent if the field edit was a "true change":
# if OldValue at first timestamp = NewValue at last timestamp then False, else True
# partition by FacilityId, Field order by CreatedDate
facilities['EditOrder'] = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].rank(method='first')
accounts['EditOrder'] = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].rank(method='first')
facilities_uniqs = facilities.groupby(['FacilityId', 'Field'])['CreatedDate'].count().reset_index()
facilities_uniqs = facilities_uniqs[facilities_uniqs['CreatedDate'] > 1]
accounts_uniqs = accounts.groupby(['AccountId', 'Field'])['CreatedDate'].count().reset_index()
accounts_uniqs = accounts_uniqs[accounts_uniqs['CreatedDate'] > 1]
def get_truechange(Id, Field, table):
# returns single boolean value (True or False) representing whether or not the Field for the given Id (facility or account id) has truly changed from the original value
# input strings for the Id, Field, and table ('facilities' or 'accounts')
if table == 'facilities':
idCol = 'FacilityId'
df = facilities
else:
idCol = 'AccountId'
df = accounts
df_sub = df[(df[idCol] == Id) & (df['Field'] == Field)]
orig_val = df_sub[df_sub['EditOrder'] == 1]['OldValue'].values[0]
latest_val = df_sub[df_sub['EditOrder'] == df_sub['EditOrder'].max()]['NewValue'].values[0]
return (orig_val != latest_val)
get_true_change = np.vectorize(get_truechange)
#facilities['TrueChange'] = get_true_change(facilities_uniqs['FacilityId'].values, facilities_uniqs['Field'].values, 'facilities')
def tqdm_parallel_map(executor, fn, *iterables, **kwargs):
"""
Equivalent to executor.map(fn, *iterables),
but displays a tqdm-based progress bar.
Does not support timeout or chunksize as executor.submit is used internally
**kwargs is passed to tqdm.
"""
futures_list = []
args = iterables[0]
final_args = [dict(zip(args.keys(), values)) for values in zip_longest(*args.values())]
for arg in final_args:
futures_list += [executor.submit(fn, arg['Id'], arg['Field'], arg['table'])]
for f in tqdm(cf.as_completed(futures_list), total=len(futures_list), **kwargs):
pass
return futures_list
print("Running facilities...")
facilities_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': facilities_uniqs['FacilityId'].values, 'Field': facilities_uniqs['Field'].values, 'table': ['facilities'] * len(facilities_uniqs['Field'].values)})
facilities_final = facilities.merge(facilities_uniqs, on=['FacilityId','Field'], how='left')
print(facilities_final.head())
facilities_final.to_csv('../data/interim/facilities.csv',index=False)
print("Running accounts...")
accounts_uniqs['TrueChange'] = tqdm_parallel_map(cf.ProcessPoolExecutor(), get_truechange, {'Id': accounts_uniqs['AccountId'].values, 'Field': accounts_uniqs['Field'].values, 'table': ['accounts'] * len(accounts_uniqs['Field'].values)})
accounts_final = accounts.merge(accounts_uniqs, on=['AccountId','Field'], how='left')
accounts_final.to_csv('../data/interim/accounts.csv',index=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment