Created
May 11, 2020 05:24
-
-
Save nbertagnolli/004862c45808fb4054e0fc3ca73acc7a to your computer and use it in GitHub Desktop.
A basic active learning example with marginal, uncertainty, and random sampling.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
from typing import Dict, List | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.pipeline import Pipeline | |
sns.set() | |
def uncertainty_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
"""samples points for which we are least confident""" | |
df["preds"] = np.max(preds, axis=1) | |
return df.sort_values("preds").head(n).drop("preds", axis=1) | |
def random_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
return df.sample(n) | |
def margin_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
"""Samples points with greatest difference between most and second most probably classes""" | |
# Sort the predictions in increasing order | |
sorted_preds = np.sort(preds, axis=1) | |
# Subtract the second highest prediction from the highest prediction | |
# We need to check if the classifier has more than one class here. | |
if sorted_preds.shape[1] == 1: | |
return df.sample(n) | |
else: | |
df["margin"] = sorted_preds[:, -1] - sorted_preds[:, -2] | |
return df.sort_values("margin").head(n).drop("margin", axis=1) | |
def combined_sampling(df: pd.DataFrame, preds: np.array, n: int, weights: List[float]=None) -> pd.DataFrame: | |
"""weighted sample with random, margin, and uncertainty""" | |
if weights is None: | |
weights = [.4, .4, .2] | |
margin_points = margin_sampling(df, preds, round(n * weights[0])) | |
uncertainty_points = uncertainty_sampling(df, preds, round(n * weights[1])) | |
# Resample the dataframe and preds to remove the sampled points | |
remaining_df = df.iloc[~(df.index.isin(margin_points.index) | df.index.isin(uncertainty_points.index))] | |
random_points = random_sampling(remaining_df, preds, round(n * weights[0])) | |
final_df = pd.concat([random_points, uncertainty_points, margin_points]).drop_duplicates().head(n) | |
print(final_df.shape) | |
return final_df | |
def evaluate_model_improvement(model, | |
train_df: pd.DataFrame, | |
test_df: pd.DataFrame, | |
sample_func, | |
n: int, | |
label_col: str, | |
data_col: str, | |
random_state: int=1234, | |
num_iterations: int=30 | |
) -> List[Dict[str, Dict[str, float]]]: | |
train_data = train_df.sample(n, random_state=random_state) | |
scores = [] | |
for i in range(1, num_iterations, 1): | |
# Clone the model to make sure we don't reuse model state | |
model = sklearn.base.clone(model) | |
# fit the model on our data | |
model.fit(train_data[data_col], train_data[label_col]) | |
# Get predictions for the current data level | |
preds = model.predict(test_df[data_col]) | |
scores.append(classification_report(test_df[label_col], preds, output_dict=True)) | |
# Get all points in training set that haven't been used | |
remaining_df = train_df.iloc[~train_df.index.isin(train_data.index)] | |
# Resample the data | |
new_samples = sample_func(remaining_df, model.predict_proba(remaining_df[data_col]), n) | |
train_data = pd.concat([train_data, new_samples]) | |
return scores | |
# Load in the data | |
df = pd.read_csv("IMDB_Dataset.csv") | |
df["split"] = np.random.choice(["train", "test"], df.shape[0], [.7, .3]) | |
x_train = df[df["split"] == "train"] | |
y_train = x_train["sentiment"] | |
x_test = df[df["split"] == "test"] | |
y_test = x_test["sentiment"] | |
# Sample each point | |
uncertainty_scores = evaluate_model_improvement(model, x_train, x_val, uncertainty_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
random_scores = evaluate_model_improvement(model, x_train, x_val, random_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
margin_scores = evaluate_model_improvement(model, x_train, x_val, margin_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
combined_scores = evaluate_model_improvement(model, x_train, x_val, combined_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
fig, ax = plt.subplots(1, 1, figsize=(10, 10)) | |
key = "positive" | |
x_points = np.cumsum([5] * 99) | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in uncertainty_scores]), label="uncertainty") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in random_scores]), label="random") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in margin_scores]), label="margin") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in combined_scores]), label="combined") | |
handles, labels = ax.get_legend_handles_labels() | |
ax.legend(handles, labels) | |
ax.set_title("Performance with Increasing Data", fontsize=25) | |
ax.set_xlabel("Number of Data Points", fontsize=15) | |
ax.set_ylabel("F1", fontsize=15) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
This code not work.