Created
February 25, 2020 10:05
-
-
Save vanatteveldt/e33e8df416cec74cb5e04f59ffae204c to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# https://towardsdatascience.com/identifying-hate-speech-with-bert-and-cnn-b7aa2cddd60d | |
# should also try this one? https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/6%20-%20Transformers%20for%20Sentiment%20Analysis.ipynb | |
from transformers import BertTokenizer, BertModel | |
import logging | |
import time | |
#import matplotlib.pyplot as plt | |
import numpy as np | |
import pandas as pd | |
import sklearn | |
import torch | |
import torch.nn as nn | |
import torch.nn.functional as F | |
import transformers | |
from sklearn.metrics import roc_auc_score | |
from torch.autograd import Variable | |
MAX_SEQ = 100 | |
def tokenize_text(texts, max_seq=MAX_SEQ): | |
return [tokenizer.encode(text, add_special_tokens=True)[:max_seq] | |
for text in texts] | |
def pad_text(tokenized_text, max_seq=MAX_SEQ): | |
return np.array([el + [0] * (max_seq - len(el)) for el in tokenized_text]) | |
def tokenize_and_pad_text(texts, max_seq=MAX_SEQ): | |
tokenized_text = tokenize_text(texts, max_seq) | |
padded_text = pad_text(tokenized_text, max_seq) | |
return torch.tensor(padded_text) | |
def sentiment_to_tensor(sentiment, dim=1): | |
if dim == 1: | |
return torch.tensor([[x] for x in sentiment]).to('cuda') | |
elif dim == 2: | |
return torch.tensor([[(x + 1) / 2, int(x != 0)] for x in sentiment]).to('cuda') | |
def tensor_to_sentiment(output): | |
"""Convert NN output to {-1, 0, 1}""" | |
dim = output.size(1) | |
if dim == 1: | |
return [x[0] > .33 and 1 or (x[0] < -.33 and -1 or 0) for x in output] | |
if dim == 2: | |
return [0 if x[1] < .5 else (x[0] < 0.5 and -1 or 1) for x in output] | |
def chunks(my_list, n): | |
return [my_list[i * n:(i + 1) * n] for i in range((len(my_list) + n - 1) // n )] | |
def get_bert(texts): | |
result = None | |
tokens = tokenize_and_pad_text(texts).to('cuda') | |
with torch.no_grad(): | |
for i, chunk in enumerate(chunks(tokens, 100)): | |
logging.info(f"Chunk {i}: {chunk.size()}") | |
vectors = bert_model(chunk)[0] | |
if result is None: | |
result = vectors | |
else: | |
result = torch.cat((result, vectors), 0) | |
return result | |
class KimCNN(nn.Module): | |
def __init__(self, embed_num, embed_dim, class_num, kernel_num, kernel_sizes, dropout, static): | |
super().__init__() | |
V = embed_num | |
D = embed_dim | |
C = class_num | |
Co = kernel_num | |
Ks = kernel_sizes | |
K = 3 | |
self.static = static | |
self.embed = nn.Embedding(V, D) | |
#self.convs1 = nn.ModuleList([nn.Conv2d(1, Co, (K, D)) for K in Ks]) | |
self.conv = nn.Conv1d(1, 1, (K, D)) | |
self.dropout = nn.Dropout(dropout) | |
self.dense = nn.Linear(Co, 64) | |
self.fc1 = nn.Linear(64, C) | |
self.sigmoid = nn.Sigmoid() | |
def forward(self, x): | |
if self.static: | |
x = Variable(x) | |
x = x.unsqueeze(1) # (N, Ci, W, D) | |
x = F.relu(self.conv(x)).squeeze(3) # [(N, Co, W), ...]*len(Ks) | |
x = F.max_pool1d(x, x.size(2)).squeeze(2) # [(N, Co), ...]*len(Ks) | |
#x = torch.cat(x, 1) | |
x = self.dense(x) | |
x = self.dropout(x) # (N, len(Ks)*Co) | |
logit = self.fc1(x) # (N, C) | |
output = self.sigmoid(logit) | |
return output | |
def generate_batch_data(x, y, batch_size): | |
i, batch = 0, 0 | |
for batch, i in enumerate(range(0, len(x) - batch_size, batch_size), 1): | |
x_batch = x[i : i + batch_size] | |
y_batch = y[i : i + batch_size] | |
yield x_batch, y_batch, batch | |
if i + batch_size < len(x): | |
yield x[i + batch_size :], y[i + batch_size :], batch + 1 | |
if batch == 0: | |
yield x, y, 1 | |
logging.basicConfig(level=logging.INFO, format='[%(asctime)s %(name)-12s %(levelname)-5s] %(message)s') | |
logging.info("Loading data") | |
h = pd.read_csv("/home/wva/ecosent/data/intermediate/sentences_ml.csv") | |
np.random.seed(42) | |
h = h.sample(frac=1).reset_index(drop=True) | |
df_test = h[h.gold == True].reset_index(drop=True) | |
df_train = h[h.gold == False].reset_index(drop=True) | |
print(f"Test shape: {df_test.shape}") | |
print(f"Train shape: {df_train.shape}") | |
logging.info("Loading BERT models") | |
tokenizer = BertTokenizer.from_pretrained("bert-base-dutch-cased") | |
bert_model = BertModel.from_pretrained("bert-base-dutch-cased") | |
bert_model.eval() | |
bert_model = bert_model.to('cuda') | |
logging.info("Applying BERT model to texts") | |
x_train=get_bert(df_train.headline.values) | |
x_test=get_bert(df_test.headline.values) | |
y_train = sentiment_to_tensor(df_train.tone.values, dim=2) | |
y_test = sentiment_to_tensor(df_test.tone.values, dim=2) | |
model = KimCNN(embed_num=x_train.shape[1], | |
embed_dim=x_train.shape[2], | |
class_num=y_train.shape[1], | |
kernel_num = 3, | |
kernel_sizes = [2, 3, 4], | |
dropout = 0.2, | |
static=True).to('cuda') | |
n_epochs = 20 | |
batch_size = 10 | |
lr = 0.001 | |
optimizer = torch.optim.Adam(model.parameters(), lr=lr) | |
loss_fn = nn.BCELoss() | |
for epoch in range(n_epochs): | |
train_loss = 0 | |
n_correct = 0 | |
model.train(True) | |
for x_batch, y_batch, batch in generate_batch_data(x_train, y_train, batch_size): | |
y_pred = model(x_batch) | |
optimizer.zero_grad() | |
loss = loss_fn(y_pred, y_batch) | |
loss.backward() | |
optimizer.step() | |
train_loss += loss.item() | |
pred = tensor_to_sentiment(y_pred) | |
actual = tensor_to_sentiment(y_batch) | |
n_correct += sum([x == y for (x,y) in zip(pred, actual)]) | |
logging.info(f"[{epoch}/{n_epochs}] Loss: {train_loss/batch}, acc: {n_correct/y_train.size(0)}") | |
preds = [] | |
model.eval() # disable dropout for deterministic output | |
with torch.no_grad(): # deactivate autograd engine to reduce memory usage and speed up computations | |
batch = 0 | |
for x_batch, y_batch, batch in generate_batch_data(x_test, y_test, batch_size): | |
y_pred = model(x_batch) | |
pred = tensor_to_sentiment(y_pred) | |
preds += pred | |
actual = tensor_to_sentiment(y_test) | |
n_correct = sum([x == y for (x,y) in zip(preds, actual)]) | |
print(f"Accuracy on test set: {n_correct/len(actual)}") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment