Skip to content

Instantly share code, notes, and snippets.

@djs-basil-sys
Last active September 12, 2022 19:50
Show Gist options
  • Save djs-basil-sys/648d6089b5a73629e21a84234a879e98 to your computer and use it in GitHub Desktop.
Save djs-basil-sys/648d6089b5a73629e21a84234a879e98 to your computer and use it in GitHub Desktop.
import pandas as pd
from datetime import datetime
from itertools import combinations
from math import comb
from sys import argv
from time import monotonic
from typing import List
MAX_FAMILY_TOKENS = 6
GROUP_SIZES = 2
GROUPS = comb(MAX_FAMILY_TOKENS, GROUP_SIZES)
class Constants:
TIMESTAMP: str = datetime.utcnow().strftime("%Y%m%dT%H%M%S")
MINIMUM_TOKEN_LENGTH: int = 2
DEFAULT_SEPR: str = "|"
THRESHOLD: float = 0.75
@staticmethod
def return_ts() -> str:
return datetime.utcnow().strftime("%Y-%m-%dT%H.%M.%S")
def get_permutations(string: str) -> List[str]:
"""
Get the possible permutations of a given string with size `GROUP_SIZES`.
"""
tokens = str(string).split(" ")
combos = combinations(tokens, GROUP_SIZES)
combos = [" ".join(sorted(combo)) for combo in combos]
if not combos:
combos = [string]
combos = combos + [None] * (GROUPS - len(combos))
return [string] + combos
def make_permutations_matrix(family_names: List[str]) -> pd.DataFrame:
"""
Apply the `get_permutations` method to a `list` of family names, in order to
make a DataFrame.
"""
data = [get_permutations(family_name) for family_name in family_names]
cols = [f"permutation_{i}" for i in range(1, GROUPS + 1)]
df_cols = ["__family_name__"] + cols
df = pd.DataFrame(data, columns=df_cols)
for i in range(len(cols)):
df[cols[i]] = df[cols[i]].fillna(df[cols[i - 1]])
return df
def get_counts(permutation_df: pd.DataFrame, cols: List[str]) -> pd.DataFrame:
"""
Create a column of `counts` for each token of size `GROUP_SIZES`.
"""
tokens = permutation_df[cols].stack().reset_index()
tokens = tokens.iloc[:, -1]
counts = tokens.value_counts().reset_index()
counts.columns = ["tokens", "counts"]
return counts
def apply_counts(df: pd.DataFrame, col_name: str) -> pd.DataFrame:
"""
Count the most popular tokens across all columns, but then apply the counts
to each column as well (so that we can later "pick" the most popular column).
"""
families = df[col_name].unique().tolist()
permutation_df = make_permutations_matrix(families)
p_cols = [col for col in permutation_df.columns if col.startswith("permutation")]
counts = get_counts(permutation_df, p_cols)
for col in p_cols:
permutation_df = permutation_df.merge(
counts, left_on=[col], right_on=["tokens"], suffixes=["", "_count"]
)
permutation_df = permutation_df.drop(columns=[col])
c_cols = [f"{col}_count" for col in p_cols]
permutation_col = [None] * (len(p_cols) + len(c_cols))
permutation_col[::2] = p_cols
permutation_col[1::2] = c_cols
permutation_col = [col_name] + permutation_col
permutation_df.columns = permutation_col
return permutation_df
def get_most_frequent_value(df: pd.DataFrame) -> pd.DataFrame:
"""
Pick the column that has the highest "count"; in other words, pick the
most popular "token" for a given `name`.
"""
c_cols = [
col
for col in df.columns
if col.startswith("permutation") and col.endswith("count")
]
df["max_col"] = df[c_cols].idxmax(axis=1)
df["max_col"] = df["max_col"].str.replace("_count", "")
idx_max_series = df["max_col"]
df["max_val"] = df.lookup(idx_max_series.index, idx_max_series.values)
return df
def main(filepath: str, data_type: str, col_name: str):
"""
id|name|brand_name|device_name
1|CARDINAL HEALTH SUTURE|CARDINAL|SUTURE
2|DREAMSTATION|DREAMSTATION||
3|ADHESIVE BANDAGES||ADHESIVE BANDAGES
"""
start = monotonic()
df = pd.read_csv(filepath, sep="|", dtype=str)
end = monotonic() - start
print(f"Loading data took: {end}")
start = monotonic()
counts = apply_counts(df, col_name)
counts = get_most_frequent_value(counts)
print(f"Counts: {counts.shape}")
end = monotonic() - start
print(f"Getting most frequent values took: {end}")
temp = counts[[col_name, "max_val"]]
temp = temp.merge(
df, left_on=col_name, right_on=col_name, suffixes=["", "_"], how="left"
)
temp = temp[[col_name, "max_val"]]
temp.columns = ["new", "max_val"]
temp = temp.drop_duplicates()
temp["spaces"] = temp["new"].str.count(" ")
temp["length"] = temp["new"].str.len()
temp = temp.sort_values(by=["spaces", "length", "new"])
common = temp.groupby("max_val").first().reset_index()
start = monotonic()
print(f"Common: {common.shape}")
counts = counts.merge(
common,
left_on=["max_val"],
right_on=["max_val"],
suffixes=["", "_"],
how="left",
)
end = monotonic() - start
print(f"Making patterns took: {end}")
start = monotonic()
end = monotonic() - start
print(f"Matching data took: {end}")
df = df.merge(
counts[[col_name, "max_val", "new"]],
left_on=[col_name],
right_on=[col_name],
how="left",
)
df["new"] = df["new"].fillna(df[col_name]).str.upper()
df.to_csv(f"{filepath}.{Constants.return_ts()}.tokens", sep="|", index=False)
if __name__ == "__main__":
main(argv[1], argv[2], argv[3])
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment