Skip to content

Instantly share code, notes, and snippets.

@vanatteveldt
Created February 25, 2020 10:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save vanatteveldt/e33e8df416cec74cb5e04f59ffae204c to your computer and use it in GitHub Desktop.
Save vanatteveldt/e33e8df416cec74cb5e04f59ffae204c to your computer and use it in GitHub Desktop.
# https://towardsdatascience.com/identifying-hate-speech-with-bert-and-cnn-b7aa2cddd60d
# should also try this one? https://github.com/bentrevett/pytorch-sentiment-analysis/blob/master/6%20-%20Transformers%20for%20Sentiment%20Analysis.ipynb
from transformers import BertTokenizer, BertModel
import logging
import time
#import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import sklearn
import torch
import torch.nn as nn
import torch.nn.functional as F
import transformers
from sklearn.metrics import roc_auc_score
from torch.autograd import Variable
MAX_SEQ = 100
def tokenize_text(texts, max_seq=MAX_SEQ):
return [tokenizer.encode(text, add_special_tokens=True)[:max_seq]
for text in texts]
def pad_text(tokenized_text, max_seq=MAX_SEQ):
return np.array([el + [0] * (max_seq - len(el)) for el in tokenized_text])
def tokenize_and_pad_text(texts, max_seq=MAX_SEQ):
tokenized_text = tokenize_text(texts, max_seq)
padded_text = pad_text(tokenized_text, max_seq)
return torch.tensor(padded_text)
def sentiment_to_tensor(sentiment, dim=1):
if dim == 1:
return torch.tensor([[x] for x in sentiment]).to('cuda')
elif dim == 2:
return torch.tensor([[(x + 1) / 2, int(x != 0)] for x in sentiment]).to('cuda')
def tensor_to_sentiment(output):
"""Convert NN output to {-1, 0, 1}"""
dim = output.size(1)
if dim == 1:
return [x[0] > .33 and 1 or (x[0] < -.33 and -1 or 0) for x in output]
if dim == 2:
return [0 if x[1] < .5 else (x[0] < 0.5 and -1 or 1) for x in output]
def chunks(my_list, n):
return [my_list[i * n:(i + 1) * n] for i in range((len(my_list) + n - 1) // n )]
def get_bert(texts):
result = None
tokens = tokenize_and_pad_text(texts).to('cuda')
with torch.no_grad():
for i, chunk in enumerate(chunks(tokens, 100)):
logging.info(f"Chunk {i}: {chunk.size()}")
vectors = bert_model(chunk)[0]
if result is None:
result = vectors
else:
result = torch.cat((result, vectors), 0)
return result
class KimCNN(nn.Module):
def __init__(self, embed_num, embed_dim, class_num, kernel_num, kernel_sizes, dropout, static):
super().__init__()
V = embed_num
D = embed_dim
C = class_num
Co = kernel_num
Ks = kernel_sizes
K = 3
self.static = static
self.embed = nn.Embedding(V, D)
#self.convs1 = nn.ModuleList([nn.Conv2d(1, Co, (K, D)) for K in Ks])
self.conv = nn.Conv1d(1, 1, (K, D))
self.dropout = nn.Dropout(dropout)
self.dense = nn.Linear(Co, 64)
self.fc1 = nn.Linear(64, C)
self.sigmoid = nn.Sigmoid()
def forward(self, x):
if self.static:
x = Variable(x)
x = x.unsqueeze(1) # (N, Ci, W, D)
x = F.relu(self.conv(x)).squeeze(3) # [(N, Co, W), ...]*len(Ks)
x = F.max_pool1d(x, x.size(2)).squeeze(2) # [(N, Co), ...]*len(Ks)
#x = torch.cat(x, 1)
x = self.dense(x)
x = self.dropout(x) # (N, len(Ks)*Co)
logit = self.fc1(x) # (N, C)
output = self.sigmoid(logit)
return output
def generate_batch_data(x, y, batch_size):
i, batch = 0, 0
for batch, i in enumerate(range(0, len(x) - batch_size, batch_size), 1):
x_batch = x[i : i + batch_size]
y_batch = y[i : i + batch_size]
yield x_batch, y_batch, batch
if i + batch_size < len(x):
yield x[i + batch_size :], y[i + batch_size :], batch + 1
if batch == 0:
yield x, y, 1
logging.basicConfig(level=logging.INFO, format='[%(asctime)s %(name)-12s %(levelname)-5s] %(message)s')
logging.info("Loading data")
h = pd.read_csv("/home/wva/ecosent/data/intermediate/sentences_ml.csv")
np.random.seed(42)
h = h.sample(frac=1).reset_index(drop=True)
df_test = h[h.gold == True].reset_index(drop=True)
df_train = h[h.gold == False].reset_index(drop=True)
print(f"Test shape: {df_test.shape}")
print(f"Train shape: {df_train.shape}")
logging.info("Loading BERT models")
tokenizer = BertTokenizer.from_pretrained("bert-base-dutch-cased")
bert_model = BertModel.from_pretrained("bert-base-dutch-cased")
bert_model.eval()
bert_model = bert_model.to('cuda')
logging.info("Applying BERT model to texts")
x_train=get_bert(df_train.headline.values)
x_test=get_bert(df_test.headline.values)
y_train = sentiment_to_tensor(df_train.tone.values, dim=2)
y_test = sentiment_to_tensor(df_test.tone.values, dim=2)
model = KimCNN(embed_num=x_train.shape[1],
embed_dim=x_train.shape[2],
class_num=y_train.shape[1],
kernel_num = 3,
kernel_sizes = [2, 3, 4],
dropout = 0.2,
static=True).to('cuda')
n_epochs = 20
batch_size = 10
lr = 0.001
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
loss_fn = nn.BCELoss()
for epoch in range(n_epochs):
train_loss = 0
n_correct = 0
model.train(True)
for x_batch, y_batch, batch in generate_batch_data(x_train, y_train, batch_size):
y_pred = model(x_batch)
optimizer.zero_grad()
loss = loss_fn(y_pred, y_batch)
loss.backward()
optimizer.step()
train_loss += loss.item()
pred = tensor_to_sentiment(y_pred)
actual = tensor_to_sentiment(y_batch)
n_correct += sum([x == y for (x,y) in zip(pred, actual)])
logging.info(f"[{epoch}/{n_epochs}] Loss: {train_loss/batch}, acc: {n_correct/y_train.size(0)}")
preds = []
model.eval() # disable dropout for deterministic output
with torch.no_grad(): # deactivate autograd engine to reduce memory usage and speed up computations
batch = 0
for x_batch, y_batch, batch in generate_batch_data(x_test, y_test, batch_size):
y_pred = model(x_batch)
pred = tensor_to_sentiment(y_pred)
preds += pred
actual = tensor_to_sentiment(y_test)
n_correct = sum([x == y for (x,y) in zip(preds, actual)])
print(f"Accuracy on test set: {n_correct/len(actual)}")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment