A basic active learning example with marginal, uncertainty, and random sampling.
from typing import Dict, List | |
import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import seaborn as sns | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.pipeline import Pipeline | |
sns.set() | |
def uncertainty_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
"""samples points for which we are least confident""" | |
df["preds"] = np.max(preds, axis=1) | |
return df.sort_values("preds").head(n).drop("preds", axis=1) | |
def random_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
return df.sample(n) | |
def margin_sampling(df: pd.DataFrame, preds: np.array, n: int) -> pd.DataFrame: | |
"""Samples points with greatest difference between most and second most probably classes""" | |
# Sort the predictions in increasing order | |
sorted_preds = np.sort(preds, axis=1) | |
# Subtract the second highest prediction from the highest prediction | |
# We need to check if the classifier has more than one class here. | |
if sorted_preds.shape[1] == 1: | |
return df.sample(n) | |
else: | |
df["margin"] = sorted_preds[:, -1] - sorted_preds[:, -2] | |
return df.sort_values("margin").head(n).drop("margin", axis=1) | |
def combined_sampling(df: pd.DataFrame, preds: np.array, n: int, weights: List[float]=None) -> pd.DataFrame: | |
"""weighted sample with random, margin, and uncertainty""" | |
if weights is None: | |
weights = [.4, .4, .2] | |
margin_points = margin_sampling(df, preds, round(n * weights[0])) | |
uncertainty_points = uncertainty_sampling(df, preds, round(n * weights[1])) | |
# Resample the dataframe and preds to remove the sampled points | |
remaining_df = df.iloc[~(df.index.isin(margin_points.index) | df.index.isin(uncertainty_points.index))] | |
random_points = random_sampling(remaining_df, preds, round(n * weights[0])) | |
final_df = pd.concat([random_points, uncertainty_points, margin_points]).drop_duplicates().head(n) | |
print(final_df.shape) | |
return final_df | |
def evaluate_model_improvement(model, | |
train_df: pd.DataFrame, | |
test_df: pd.DataFrame, | |
sample_func, | |
n: int, | |
label_col: str, | |
data_col: str, | |
random_state: int=1234, | |
num_iterations: int=30 | |
) -> List[Dict[str, Dict[str, float]]]: | |
train_data = train_df.sample(n, random_state=random_state) | |
scores = [] | |
for i in range(1, num_iterations, 1): | |
# Clone the model to make sure we don't reuse model state | |
model = sklearn.base.clone(model) | |
# fit the model on our data | |
model.fit(train_data[data_col], train_data[label_col]) | |
# Get predictions for the current data level | |
preds = model.predict(test_df[data_col]) | |
scores.append(classification_report(test_df[label_col], preds, output_dict=True)) | |
# Get all points in training set that haven't been used | |
remaining_df = train_df.iloc[~train_df.index.isin(train_data.index)] | |
# Resample the data | |
new_samples = sample_func(remaining_df, model.predict_proba(remaining_df[data_col]), n) | |
train_data = pd.concat([train_data, new_samples]) | |
return scores | |
# Load in the data | |
df = pd.read_csv("IMDB_Dataset.csv") | |
df["split"] = np.random.choice(["train", "test"], df.shape[0], [.7, .3]) | |
x_train = df[df["split"] == "train"] | |
y_train = x_train["sentiment"] | |
x_test = df[df["split"] == "test"] | |
y_test = x_test["sentiment"] | |
# Sample each point | |
uncertainty_scores = evaluate_model_improvement(model, x_train, x_val, uncertainty_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
random_scores = evaluate_model_improvement(model, x_train, x_val, random_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
margin_scores = evaluate_model_improvement(model, x_train, x_val, margin_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
combined_scores = evaluate_model_improvement(model, x_train, x_val, combined_sampling, 5, | |
"sentiment", "review", rand_state, num_iterations=100) | |
fig, ax = plt.subplots(1, 1, figsize=(10, 10)) | |
key = "positive" | |
x_points = np.cumsum([5] * 99) | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in uncertainty_scores]), label="uncertainty") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in random_scores]), label="random") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in margin_scores]), label="margin") | |
plt.plot(x_points, np.array([x[key]["f1-score"] for x in combined_scores]), label="combined") | |
handles, labels = ax.get_legend_handles_labels() | |
ax.legend(handles, labels) | |
ax.set_title("Performance with Increasing Data", fontsize=25) | |
ax.set_xlabel("Number of Data Points", fontsize=15) | |
ax.set_ylabel("F1", fontsize=15) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment