Skip to content

Instantly share code, notes, and snippets.

@austospumanto
Created July 23, 2019 02:36
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save austospumanto/93eb920cf377e9770bce9c51094eaa2e to your computer and use it in GitHub Desktop.
Save austospumanto/93eb920cf377e9770bce9c51094eaa2e to your computer and use it in GitHub Desktop.
Use Numba to Quickly Calculate Pearson and Cramer's Correlation Coefficients
"""
pip install \
numpy \
pandas \
numba
"""
import time
from dataclasses import dataclass
from functools import wraps
from typing import List
import numpy as np
import pandas as pd
# noinspection PyUnresolvedReferences
from numba import njit, prange
def timeit():
def outer(func):
@wraps(func)
def inner(*args, **kwargs):
start_time = time.time()
res = func(*args, **kwargs)
interval = time.time() - start_time
print("Time for '%s': %0.3f seconds" % (func.__qualname__, interval))
return res
return inner
return outer
@njit(fastmath=True, parallel=True)
def mean1(a):
n = len(a)
b = np.empty(n)
for i in prange(n):
b[i] = a[i].mean()
return b
@njit(fastmath=True, parallel=True)
def std1(a):
n = len(a)
b = np.empty(n)
for i in prange(n):
b[i] = a[i].std()
return b
@njit(fastmath=False, parallel=True)
def numba_corrcoef(a):
""" Correlation """
n, k = a.shape
mu_a = mean1(a)
sig_a = std1(a)
for i in prange(n):
a[i] = (a[i] - mu_a[i]) / sig_a[i]
ak = a / k
out = np.empty((n, n))
for i in prange(n):
out[i, i] = 1.0
for j in prange(i + 1, n):
out[i, j] = ak[i] @ a[j]
out[j, i] = out[i, j]
return out
@timeit()
def pd_corr_numba(
feature_matrix: pd.DataFrame, dtype=np.float64, copy: bool = False
) -> pd.DataFrame:
return pd.DataFrame(
numba_corrcoef(feature_matrix.to_numpy(dtype=dtype, copy=copy).T),
index=feature_matrix.columns,
columns=feature_matrix.columns,
)
@timeit()
def drop_correlated_numeric_columns(
feature_matrix: pd.DataFrame, thresh: float
) -> pd.DataFrame:
if thresh is None or thresh == 0:
return feature_matrix
corr = pd_corr_numba(feature_matrix=feature_matrix, dtype=np.float32)
return drop_correlated_columns(corr, feature_matrix, thresh)
@timeit()
def drop_correlated_categorical_columns(
feature_matrix: pd.DataFrame, thresh: float
) -> pd.DataFrame:
if thresh is None or thresh == 0:
return feature_matrix
corr = pd_cramers(df=feature_matrix)
return drop_correlated_columns(corr, feature_matrix, thresh)
def drop_correlated_columns(corr, df: pd.DataFrame, thresh) -> pd.DataFrame:
prefix = f"In {drop_correlated_columns.__qualname__} --"
to_drop = get_correlated_columns_to_drop(corr=corr, thresh=thresh)
print(f"{prefix} Before: feats_n_labs.shape={df.shape}")
df = df.drop(columns=to_drop)
print(f"{prefix} After: df.shape={df.shape}")
return df
@dataclass
class CorrFilterResult:
df: pd.DataFrame
corr: pd.DataFrame
thresh: float
dropped: List[str]
def print_features_most_correlated_with_label(corr: pd.DataFrame) -> None:
prefix = f"In {print_features_most_correlated_with_label.__qualname__} --"
label_correlations: pd.Series = corr["label"].sort_values()
label_correlations: pd.Series = label_correlations[~pd.isna(label_correlations)]
print(f"{prefix} label_correlations.to_dict()={label_correlations.to_dict()}")
@timeit()
def get_correlated_columns_to_drop(corr: pd.DataFrame, thresh: float) -> List[str]:
corr = corr.abs()
# Subset to the upper triangle of correlation matrix
upper_tri_idxs = np.triu(np.ones(corr.shape), k=1).astype(np.bool)
corr_upper_tri = corr.where(upper_tri_idxs)
# Identify names of columns with correlation above threshold
return [
column
for column in corr_upper_tri.columns
if (corr_upper_tri[column] >= thresh).any() and column != "label"
]
def pd_cramers(df: pd.DataFrame) -> pd.DataFrame:
for col in df.columns:
if df[col].nunique(dropna=False) == 1:
print(col)
raise ValueError(col)
dtype_is_cat = df.dtypes == "category"
print(dtype_is_cat)
# noinspection PyUnresolvedReferences
assert len(dtype_is_cat.drop_duplicates()) == 1
# noinspection PyUnresolvedReferences
assert dtype_is_cat.drop_duplicates().iloc[0] == True
ncols = df.shape[1]
nsamples = df.shape[0]
df_np = np.empty((ncols, nsamples), dtype=np.int32)
ncats = np.empty(ncols, dtype=np.int32)
for i in range(ncols):
col = df[df.columns[i]]
codes = col.cat.codes.to_numpy(np.int32)
if -1 in set(codes):
codes += 1 # So they're all indices
assert set(codes) == set(range(np.max(codes) + 1)), set(codes) - set(
range(np.max(codes) + 1)
)
df_np[i] = codes
ncats[i] = np.unique(codes).size
out = cramer_many(ncats, df_np)
return pd.DataFrame(data=out, index=df.columns, columns=df.columns)
@njit(parallel=True)
def cramer_many(ncats, df_np):
ncols = ncats.size
out = np.empty((ncols, ncols))
for i in prange(ncols):
out[i, i] = 1.0
col_i = df_np[i]
ncats_i = ncats[i]
for j in prange(i + 1, ncols):
col_j = df_np[j]
ncats_j = ncats[j]
out_ij = cramers_corrected_stat(ncats_i, ncats_j, col_i, col_j)
out[i, j] = out_ij
out[j, i] = out[i, j]
return out
@njit(parallel=True)
def nb_crosstab(ncats1, ncats2, col1, col2):
nsamples = col1.size
out = np.zeros((ncats1, ncats2), dtype=np.float32)
for i in prange(nsamples):
col1_cat = col1[i]
col2_cat = col2[i]
out[col1_cat, col2_cat] += 1.0
return out
@njit(parallel=True)
def calculate_chi2(ncats1, ncats2, f_obs):
sums1 = np.sum(f_obs, axis=1) # sum over each row. Has size=ncats1
sums2 = np.sum(f_obs, axis=0) # sum over each column. Has size=ncats2
total = np.sum(f_obs)
out = 0
for i in prange(ncats1):
p_i = sums1[i] / total
for j in prange(ncats2):
f_expected = p_i * sums2[j]
surprise = f_obs[i, j] - f_expected
out += (surprise ** 2) / f_expected
return out
@njit(parallel=False)
def cramers_corrected_stat(ncats1, ncats2, col1, col2):
cf = nb_crosstab(ncats1, ncats2, col1, col2)
chi2 = calculate_chi2(ncats1, ncats2, cf)
return _cramers_corrected_stat(chi2, cf)
@njit(parallel=True)
def _cramers_corrected_stat(chi2, confusion_matrix):
""" calculate Cramers V statistic for categorial-categorial association.
uses correction from Bergsma and Wicher,
Journal of the Korean Statistical Society 42 (2013): 323-328
"""
n = confusion_matrix.sum()
phi2 = chi2 / n
r, k = confusion_matrix.shape
phi2corr = max(0, phi2 - ((k - 1) * (r - 1)) / (n - 1))
rcorr = r - ((r - 1) ** 2) / (n - 1)
kcorr = k - ((k - 1) ** 2) / (n - 1)
return np.sqrt(phi2corr / min((kcorr - 1), (rcorr - 1)))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment