Last active
October 25, 2021 03:51
-
-
Save jordanparker6/e1b3a238ec41b764ee5158645b11f99c to your computer and use it in GitHub Desktop.
Transformers | BERT Quantile Regression Example
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import torch | |
from transformers import AutoTokenizer, AutoConfig, AutoModel, DistilBertModel, TrainingArguments, Trainer | |
from transformers.modeling_outputs import SequenceClassifierOutput | |
from sklearn.model_selection import train_test_split | |
import wandb | |
import os | |
PROJECT = "transformer_quantile_gist" | |
RUN_NAME = "transformer_quantile_regression" | |
MODEL_TYPE = "distilbert-base-uncased" | |
OUTPUT_DIR = './' | |
wandb.login() | |
wandb.init(project=PROJECT) | |
# ////////////////////////////////////////////// | |
# // DEFINE DATASET /////////////.////////////// | |
# ////////////////////////////////////////////// | |
class FreeTextRegressionDataset(torch.utils.data.Dataset): | |
def __init__(self, text, features, labels=None, tokenizer=None): | |
self.encodings = tokenizer(text, truncation=True, padding=True) | |
self.features = features | |
self.labels = labels | |
def __getitem__(self, idx): | |
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()} | |
item["features"] = torch.tensor(self.features[idx]).float() | |
if self.labels is not None: | |
item['labels'] = torch.tensor(self.labels[idx]).float() | |
return item | |
def __len__(self): | |
return self.features.shape[0] | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_TYPE, use_fast=True) | |
# ////////////////////////////////////////////// | |
# // DEFINE FEATURE ENGINEERING //////////////// | |
# ////////////////////////////////////////////// | |
TAREGT = [""] | |
TEXT_OR_CAT_COLUMNS = [] | |
def process_raw_features(x): | |
"""insert logic to process raw features""" | |
return x | |
def create_text_feature(df): | |
"""Create a text feature using free text columns and categorical data""" | |
df = df.loc[:, TEXT_OR_CAT_COLUMNS] | |
if len(TEXT_OR_CAT_COLUMNS) > 1: | |
df = df.apply(lambda x: ' [CLS] '.join(map(str, x)), axis=1) | |
return df.tolist() | |
labels = df[TARGET] | |
features = process_raw_features(df.drop(columns=TARGET + TEXT_OR_CAT_COLUMNS)) | |
text = create_text_feature(df) | |
# TEST TEXT TOKENIZATION | |
test = tokenizer(text[0]) | |
print(test) | |
print(tokenizer.decode(test["input_ids"])) | |
# TRAIN TEST SPLIT | |
train_texts, val_texts, train_features, val_features, train_labels, val_labels = train_test_split(text, features.values, labels.values, test_size=.2) | |
# FEATURE & LABEL SCALING | |
feature_scaler, label_scaler = MinMaxScaler().fit(train_features), MinMaxScaler().fit(train_labels) | |
train_features, val_features = feature_scaler.transform(train_features), feature_scaler.transform(val_features) | |
train_labels, val_labels = label_scaler.transform(train_labels), label_scaler.transform(val_labels) | |
# INITIATE DATASETS | |
train_dataset = FreeTextRegressionDataset(train_texts, train_features, train_labels, tokenizer) | |
val_dataset = FreeTextRegressionDataset(val_texts, val_features, val_labels, tokenizer) | |
# ////////////////////////////////////////////// | |
# // DEFINE MODEL ////////////////////////////// | |
# ////////////////////////////////////////////// | |
config = AutoConfig.from_pretrained(MODEL_TYPE, num_labels=1, name_or_path=MODEL_TYPE) | |
config.num_features = features.shape[1] | |
config.quantiles = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9] | |
class QuantileLoss(torch.nn.Module): | |
def __init__(self, quantiles): | |
super().__init__() | |
self.quantiles = quantiles | |
def forward(self, preds, target): | |
assert not target.requires_grad | |
assert preds.size(0) == target.size(0) | |
losses = [] | |
for i, q in enumerate(self.quantiles): | |
errors = target - preds[:, i] | |
losses.append(torch.max((q-1) * errors, q * errors).unsqueeze(1)) | |
loss = torch.mean(torch.sum(torch.cat(losses, dim=1), dim=1)) | |
return loss | |
class TransformerQuantileRegression(DistilBertModel): | |
"""A customisation of Transformers to apply regression on additional features""" | |
def __init__(self, config): | |
super().__init__(config) | |
self.bert = AutoModel.from_pretrained(config.name_or_path) | |
self.pre_classifier = torch.nn.Linear(config.dim, config.dim) | |
self.classifier = torch.nn.Linear(config.dim + config.num_features, config.num_labels * len(config.quantiles)) | |
self.dropout = torch.nn.Dropout(config.seq_classif_dropout) | |
def forward( | |
self, | |
input_ids=None, | |
attention_mask=None, | |
head_mask=None, | |
inputs_embeds=None, | |
output_attentions=None, | |
output_hidden_states=None, | |
return_dict=None, | |
labels=None, | |
features=None, | |
): | |
# get the hidden state of the last layer | |
outputs = self.bert( | |
input_ids=input_ids, | |
attention_mask=attention_mask, | |
head_mask=head_mask, | |
inputs_embeds=inputs_embeds, | |
output_attentions=output_attentions, | |
output_hidden_states=output_hidden_states, | |
return_dict=return_dict, | |
) | |
#last_hidden = outputs.last_hidden_state[:, 0, :] # grab the first token (CLS) - shape: (batch, token, embedding) | |
hidden_state = outputs[0] # (bs, seq_len, dim) | |
pooled_output = hidden_state[:, 0] # (bs, dim) | |
pooled_output = self.pre_classifier(torch.nn.ReLU()(pooled_output)) # (bs, dim) | |
cat = torch.cat([pooled_output, features], dim=-1) | |
logits = self.classifier(self.dropout(cat)) # (bs, num_labels) | |
loss = None | |
if labels is not None: | |
loss_fct = QuantileLoss(config.quantiles) | |
if self.config.num_labels == 1: | |
loss = loss_fct(logits.squeeze(), labels.squeeze()) | |
else: | |
loss = loss_fct(logits, labels) | |
return SequenceClassifierOutput( | |
loss=loss, | |
logits=logits, | |
hidden_states=outputs.hidden_states, | |
attentions=outputs.attentions, | |
) | |
model = TransformerQuantileRegression(config=config) | |
# ////////////////////////////////////////////// | |
# // DEFINE TRAINING OBJECT & METRICS ////////// | |
# ////////////////////////////////////////////// | |
def quantile_r2_loss(output, target, quantiles): | |
"""Pseudo R2 for quantiel regression taken from statsmodels | |
- np.where switched for np.max | |
""" | |
outputs = [] | |
for i, q in enumerate(quantiles): | |
error = target - output[:, i] | |
ss_res = np.sum(np.abs(np.max((1 - q) * error, q * error))) | |
ered = target - stats.scoreatpercentile(target, q * 100) | |
ss_tot = np.sum(np.abs(np.max((1 - q) * ered, q * ered))) | |
r2 = 1 - ss_res / ss_tot | |
outputs.append(r2) | |
return np.array(outputs).mean() | |
def compute_metrics(eval_pred): | |
return { | |
"r2": quantile_r2_loss(eval_pred.predictions, eval_pred.label_ids, config.quantiles) | |
} | |
training_args = TrainingArguments( | |
output_dir=OUTPUT_DIR, # output directory | |
num_train_epochs=3, # total number of training epochs | |
per_device_train_batch_size=16, # batch size per device during training | |
per_device_eval_batch_size=64, # batch size for evaluation | |
warmup_steps=500, # number of warmup steps for learning rate scheduler | |
weight_decay=0.01, # strength of weight decay | |
logging_dir=f"{OUTPUT_DIR}/logs", # directory for storing logs | |
logging_steps=10, | |
evaluation_strategy="steps", | |
load_best_model_at_end=True, | |
run_name=RUN_NAME | |
) | |
trainer = Trainer( | |
model=model, # the instantiated 🤗 Transformers model to be trained | |
args=training_args, # training arguments, defined above | |
train_dataset=train_dataset, # training dataset | |
eval_dataset=val_dataset, # evaluation dataset | |
compute_metrics=compute_metrics | |
) | |
# /////////////////////////////////////// | |
# /// RUN TRAINING & EVALUATION LOOP //// | |
# /////////////////////////////////////// | |
trainer.train() | |
trainer.evaluate() | |
trainer.save_model(OUTPUT_DIR) | |
# /////////////////////////////////////// | |
# /// RUN PREDICTIONS /////////////////// | |
# /////////////////////////////////////// | |
def load_test_data(): | |
"""A function to load test / prediction data""" | |
df = None | |
return df | |
model = TransformerQuantileRegression.from_pretrained(OUTPUT_DIR) | |
trainer = Trainer(model=model) | |
df = load_test_data() | |
test_text = create_text_feature(df) | |
test_features = process_raw_features(df.drop(columns=[TARGET] + TEXT_OR_CAT_COLUMNS)) | |
test_dataset = FreeTextRegressionDataset(test_text, test_features, labels=None, tokenizer=tokenizer) | |
output = trainer.predict(test_dataset) | |
prediction = label_scaler.inverse_transform(output.predictions) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment