Skip to content

Instantly share code, notes, and snippets.

@jordanparker6
Last active October 25, 2021 03:51
Show Gist options
  • Save jordanparker6/e1b3a238ec41b764ee5158645b11f99c to your computer and use it in GitHub Desktop.
Save jordanparker6/e1b3a238ec41b764ee5158645b11f99c to your computer and use it in GitHub Desktop.
Transformers | BERT Quantile Regression Example
import torch
from transformers import AutoTokenizer, AutoConfig, AutoModel, DistilBertModel, TrainingArguments, Trainer
from transformers.modeling_outputs import SequenceClassifierOutput
from sklearn.model_selection import train_test_split
import wandb
import os
PROJECT = "transformer_quantile_gist"
RUN_NAME = "transformer_quantile_regression"
MODEL_TYPE = "distilbert-base-uncased"
OUTPUT_DIR = './'
wandb.login()
wandb.init(project=PROJECT)
# //////////////////////////////////////////////
# // DEFINE DATASET /////////////.//////////////
# //////////////////////////////////////////////
class FreeTextRegressionDataset(torch.utils.data.Dataset):
def __init__(self, text, features, labels=None, tokenizer=None):
self.encodings = tokenizer(text, truncation=True, padding=True)
self.features = features
self.labels = labels
def __getitem__(self, idx):
item = {key: torch.tensor(val[idx]) for key, val in self.encodings.items()}
item["features"] = torch.tensor(self.features[idx]).float()
if self.labels is not None:
item['labels'] = torch.tensor(self.labels[idx]).float()
return item
def __len__(self):
return self.features.shape[0]
tokenizer = AutoTokenizer.from_pretrained(MODEL_TYPE, use_fast=True)
# //////////////////////////////////////////////
# // DEFINE FEATURE ENGINEERING ////////////////
# //////////////////////////////////////////////
TAREGT = [""]
TEXT_OR_CAT_COLUMNS = []
def process_raw_features(x):
"""insert logic to process raw features"""
return x
def create_text_feature(df):
"""Create a text feature using free text columns and categorical data"""
df = df.loc[:, TEXT_OR_CAT_COLUMNS]
if len(TEXT_OR_CAT_COLUMNS) > 1:
df = df.apply(lambda x: ' [CLS] '.join(map(str, x)), axis=1)
return df.tolist()
labels = df[TARGET]
features = process_raw_features(df.drop(columns=TARGET + TEXT_OR_CAT_COLUMNS))
text = create_text_feature(df)
# TEST TEXT TOKENIZATION
test = tokenizer(text[0])
print(test)
print(tokenizer.decode(test["input_ids"]))
# TRAIN TEST SPLIT
train_texts, val_texts, train_features, val_features, train_labels, val_labels = train_test_split(text, features.values, labels.values, test_size=.2)
# FEATURE & LABEL SCALING
feature_scaler, label_scaler = MinMaxScaler().fit(train_features), MinMaxScaler().fit(train_labels)
train_features, val_features = feature_scaler.transform(train_features), feature_scaler.transform(val_features)
train_labels, val_labels = label_scaler.transform(train_labels), label_scaler.transform(val_labels)
# INITIATE DATASETS
train_dataset = FreeTextRegressionDataset(train_texts, train_features, train_labels, tokenizer)
val_dataset = FreeTextRegressionDataset(val_texts, val_features, val_labels, tokenizer)
# //////////////////////////////////////////////
# // DEFINE MODEL //////////////////////////////
# //////////////////////////////////////////////
config = AutoConfig.from_pretrained(MODEL_TYPE, num_labels=1, name_or_path=MODEL_TYPE)
config.num_features = features.shape[1]
config.quantiles = [0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9]
class QuantileLoss(torch.nn.Module):
def __init__(self, quantiles):
super().__init__()
self.quantiles = quantiles
def forward(self, preds, target):
assert not target.requires_grad
assert preds.size(0) == target.size(0)
losses = []
for i, q in enumerate(self.quantiles):
errors = target - preds[:, i]
losses.append(torch.max((q-1) * errors, q * errors).unsqueeze(1))
loss = torch.mean(torch.sum(torch.cat(losses, dim=1), dim=1))
return loss
class TransformerQuantileRegression(DistilBertModel):
"""A customisation of Transformers to apply regression on additional features"""
def __init__(self, config):
super().__init__(config)
self.bert = AutoModel.from_pretrained(config.name_or_path)
self.pre_classifier = torch.nn.Linear(config.dim, config.dim)
self.classifier = torch.nn.Linear(config.dim + config.num_features, config.num_labels * len(config.quantiles))
self.dropout = torch.nn.Dropout(config.seq_classif_dropout)
def forward(
self,
input_ids=None,
attention_mask=None,
head_mask=None,
inputs_embeds=None,
output_attentions=None,
output_hidden_states=None,
return_dict=None,
labels=None,
features=None,
):
# get the hidden state of the last layer
outputs = self.bert(
input_ids=input_ids,
attention_mask=attention_mask,
head_mask=head_mask,
inputs_embeds=inputs_embeds,
output_attentions=output_attentions,
output_hidden_states=output_hidden_states,
return_dict=return_dict,
)
#last_hidden = outputs.last_hidden_state[:, 0, :] # grab the first token (CLS) - shape: (batch, token, embedding)
hidden_state = outputs[0] # (bs, seq_len, dim)
pooled_output = hidden_state[:, 0] # (bs, dim)
pooled_output = self.pre_classifier(torch.nn.ReLU()(pooled_output)) # (bs, dim)
cat = torch.cat([pooled_output, features], dim=-1)
logits = self.classifier(self.dropout(cat)) # (bs, num_labels)
loss = None
if labels is not None:
loss_fct = QuantileLoss(config.quantiles)
if self.config.num_labels == 1:
loss = loss_fct(logits.squeeze(), labels.squeeze())
else:
loss = loss_fct(logits, labels)
return SequenceClassifierOutput(
loss=loss,
logits=logits,
hidden_states=outputs.hidden_states,
attentions=outputs.attentions,
)
model = TransformerQuantileRegression(config=config)
# //////////////////////////////////////////////
# // DEFINE TRAINING OBJECT & METRICS //////////
# //////////////////////////////////////////////
def quantile_r2_loss(output, target, quantiles):
"""Pseudo R2 for quantiel regression taken from statsmodels
- np.where switched for np.max
"""
outputs = []
for i, q in enumerate(quantiles):
error = target - output[:, i]
ss_res = np.sum(np.abs(np.max((1 - q) * error, q * error)))
ered = target - stats.scoreatpercentile(target, q * 100)
ss_tot = np.sum(np.abs(np.max((1 - q) * ered, q * ered)))
r2 = 1 - ss_res / ss_tot
outputs.append(r2)
return np.array(outputs).mean()
def compute_metrics(eval_pred):
return {
"r2": quantile_r2_loss(eval_pred.predictions, eval_pred.label_ids, config.quantiles)
}
training_args = TrainingArguments(
output_dir=OUTPUT_DIR, # output directory
num_train_epochs=3, # total number of training epochs
per_device_train_batch_size=16, # batch size per device during training
per_device_eval_batch_size=64, # batch size for evaluation
warmup_steps=500, # number of warmup steps for learning rate scheduler
weight_decay=0.01, # strength of weight decay
logging_dir=f"{OUTPUT_DIR}/logs", # directory for storing logs
logging_steps=10,
evaluation_strategy="steps",
load_best_model_at_end=True,
run_name=RUN_NAME
)
trainer = Trainer(
model=model, # the instantiated 🤗 Transformers model to be trained
args=training_args, # training arguments, defined above
train_dataset=train_dataset, # training dataset
eval_dataset=val_dataset, # evaluation dataset
compute_metrics=compute_metrics
)
# ///////////////////////////////////////
# /// RUN TRAINING & EVALUATION LOOP ////
# ///////////////////////////////////////
trainer.train()
trainer.evaluate()
trainer.save_model(OUTPUT_DIR)
# ///////////////////////////////////////
# /// RUN PREDICTIONS ///////////////////
# ///////////////////////////////////////
def load_test_data():
"""A function to load test / prediction data"""
df = None
return df
model = TransformerQuantileRegression.from_pretrained(OUTPUT_DIR)
trainer = Trainer(model=model)
df = load_test_data()
test_text = create_text_feature(df)
test_features = process_raw_features(df.drop(columns=[TARGET] + TEXT_OR_CAT_COLUMNS))
test_dataset = FreeTextRegressionDataset(test_text, test_features, labels=None, tokenizer=tokenizer)
output = trainer.predict(test_dataset)
prediction = label_scaler.inverse_transform(output.predictions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment