Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Save Muhammad4hmed/61fdba99d7c63d28512074beadfa72f5 to your computer and use it in GitHub Desktop.
Save Muhammad4hmed/61fdba99d7c63d28512074beadfa72f5 to your computer and use it in GitHub Desktop.
# config.py
# we define all the configuration here
MAX_LEN = 128
TRAIN_BATCH_SIZE = 16
VALID_BATCH_SIZE = 8
EPOCHS = 10
# EMB CNN LSTM
import torch
import torch.nn.functional as F
class LSTMMalware_Model(torch.nn.Module):
def __init__(self, input_dim=0, embedding_dim=700, hidden_dim=100, output_dim=1,
batch_size=8, num_layers=2, bidirectional=False, dropout=0):
super().__init__()
self.input_dim = input_dim
self.embedding_dim = embedding_dim
self.hidden_dim = hidden_dim
self.output_dim = output_dim
self.batch_size = batch_size
self.num_layers = num_layers
self.bidirectional = bidirectional
self.dropout = dropout
self.fc_hidden_dim = self.hidden_dim
if self.bidirectional:
self.fc_hidden_dim = self.hidden_dim * 2
self.embedding = nn.Embedding(self.input_dim, self.embedding_dim)
self.conv1D1 = nn.Conv1d(embedding_dim, 32, 5)
self.dropout1 = nn.Dropout(0.3)
self.maxp1 = nn.MaxPool1d(5)
self.dropout2 = nn.Dropout(0.4)
self.conv1D2 = nn.Conv1d(32, 64, 5)
self.maxp2 = nn.MaxPool1d(5)
self.dropout3 = nn.Dropout(0.4)
# LSTM
self.lstm = nn.LSTM(64, 32)
self.dropout4 = nn.Dropout(0.3)
# linear
self.dense = nn.Linear(864, 1)
def forward(self, opcode):
embedded = self.embedding(opcode)
# CNN
x = nn.Dropout(0.3)(embedded)
# cnn_x = x.unsqueeze(1)
cnn_x = self.conv1D1(x)
cnn_x = torch.tanh(cnn_x)
cnn_x = self.dropout1(cnn_x)
cnn_x = self.maxp1(cnn_x)
cnn_x = self.dropout2(cnn_x)
cnn_x = self.conv1D2(cnn_x)
cnn_x = torch.tanh(cnn_x)
cnn_x = self.maxp2(cnn_x)
cnn_x = torch.transpose(cnn_x, 1, 2)
# LSTM
lstm_out, _ = self.lstm(cnn_x)
lstm_out = self.dropout4(torch.transpose(lstm_out, 1, 2).squeeze(2))
# linear
lstm_out = lstm_out.reshape((lstm_out.shape[0], -1, 1)).squeeze()
cnn_lstm_out = F.softmax(self.dense(lstm_out), dim = 1)
return cnn_lstm_out
#dataset.py
import torch
class IMDBDataset:
def __init__(self, reviews, targets):
"""
:param reviews: this is a numpy array
:param targets: a vector, numpy array
"""
self.reviews = reviews
self.target = targets
def __len__(self):
# returns length of the dataset
return len(self.reviews)
def __getitem__(self, item):
# for any given item, which is an int,
# return review and targets as torch tensor
# item is the index of the item in concern
review = self.reviews[item, :]
target = self.target[item]
return {
"review": torch.tensor(review, dtype=torch.long),
"target": torch.tensor(target, dtype=torch.float)
}
# lstm.py
import torch
import torch.nn as nn
class LSTM(nn.Module):
def __init__(self, embedding_matrix):
"""
:param embedding_matrix: numpy array with vectors for all words
"""
super(LSTM, self).__init__()
# number of words = number of rows in embedding matrix
num_words = embedding_matrix.shape[0]
# dimension of embedding is num of columns in the matrix
embed_dim = embedding_matrix.shape[1]
# we define an input embedding layer
self.embedding = nn.Embedding(
num_embeddings=num_words,
embedding_dim=embed_dim
)
# embedding matrix is used as weights of
# the embedding layer
self.embedding.weight = nn.Parameter(
torch.tensor(
embedding_matrix,
dtype=torch.float32
)
)
# we dont want to train the pretrained embeddings
self.embedding.weight.requires_grad = False
# a simple bidirectional LSTM with
# hidden size of 128
self.lstm = nn.LSTM(
embed_dim,
128,
bidirectional=True,
batch_first=True,
)
# output layer which is a linear layer
# we have only one output
# input (512) = 128 + 128 for mean and same for max pooling
self.out = nn.Linear(512, 1)
def forward(self, x):
# pass data through embedding layer
# the input is just the tokens
x = self.embedding(x)
# move embedding output to lstm
x, _ = self.lstm(x)
# apply mean and max pooling on lstm output
avg_pool = torch.mean(x, 1)
max_pool, _ = torch.max(x, 1)
# concatenate mean and max pooling
# this is why size is 512
# 128 for each direction = 256
# avg_pool = 256 and max_pool = 256
out = torch.cat((avg_pool, max_pool), 1)
# pass through the output layer and return the output
out = self.out(out)
# return linear output
return out
# engine.py
import torch
import torch.nn as nn
from torch.optim.lr_scheduler import CosineAnnealingWarmRestarts, CosineAnnealingLR, ReduceLROnPlateau
def get_scheduler(optimizer, scheduler):
if scheduler=='ReduceLROnPlateau':
scheduler = ReduceLROnPlateau(optimizer, mode='min', factor=0.2, patience=4, verbose=True, eps=1e-6)
elif scheduler=='CosineAnnealingLR':
scheduler = CosineAnnealingLR(optimizer, T_max=10, eta_min=1e-6, last_epoch=-1)
elif scheduler=='CosineAnnealingWarmRestarts':
scheduler = CosineAnnealingWarmRestarts(optimizer, T_0=10, T_mult=1, eta_min=1e-6, last_epoch=-1)
return scheduler
def train(data_loader, model, optimizer, device, scheduler):
"""
This is the main training function that trains model
for one epoch
:param data_loader: this is the torch dataloader
:param model: model (lstm model)
:param optimizer: torch optimizer, e.g. adam, sgd, etc.
:param device: this can be "cuda" or "cpu"
"""
# set model to training mode
model.train()
# go through batches of data in data loader
for data in data_loader:
# fetch review and target from the dict
reviews = data["review"]
targets = data["target"]
# move the data to device that we want to use
reviews = reviews.to(device, dtype=torch.long)
targets = targets.to(device, dtype=torch.float)
# clear the gradients
optimizer.zero_grad()
# make predictions from the model
predictions = model(reviews)
# calculate the loss
loss = nn.BCEWithLogitsLoss()(
predictions,
targets.view(-1, 1)
)
# compute gradient of loss w.r.t.
# all parameters of the model that are trainable
loss.backward()
# single optimization step
optimizer.step()
scheduler.step()
def evaluate(data_loader, model, device):
# initialize empty lists to store predictions
# and targets
final_predictions = []
final_targets = []
# put the model in eval mode
model.eval()
# disable gradient calculation
with torch.no_grad():
for data in data_loader:
reviews = data["review"]
targets = data["target"]
reviews = reviews.to(device, dtype=torch.long)
targets = targets.to(device, dtype=torch.float)
# make predictions
predictions = model(reviews)
# move predictions and targets to list
# we need to move predictions and targets to cpu too
predictions = predictions.cpu().numpy().tolist()
targets = data["target"].cpu().numpy().tolist()
final_predictions.extend(predictions)
final_targets.extend(targets)
# return final predictions and targets
return final_predictions, final_targets
# train.py
import io
import torch
import numpy as np
import pandas as pd
# yes, we use tensorflow
# but not for training the model!
import tensorflow as tf
from sklearn import metrics
import config
import dataset
import engine
import lstm
def load_vectors(fname):
# taken from: https://fasttext.cc/docs/en/english-vectors.html
fin = io.open(
fname,
'r',
encoding='utf-8',
newline='\n',
errors='ignore'
)
n, d = map(int, fin.readline().split())
data = {}
for line in fin:
tokens = line.rstrip().split(' ')
data[tokens[0]] = list(map(float, tokens[1:]))
return data
def create_embedding_matrix(word_index, embedding_dict):
"""
This function creates the embedding matrix.
:param word_index: a dictionary with word:index_value
:param embedding_dict: a dictionary with word:embedding_vector
:return: a numpy array with embedding vectors for all known words
"""
# initialize matrix with zeros
embedding_matrix = np.zeros((len(word_index) + 1, 300))
# loop over all the words
for word, i in word_index.items():
# if word is found in pre-trained embeddings,
# update the matrix. if the word is not found,
# the vector is zeros!
if word in embedding_dict:
embedding_matrix[i] = embedding_dict[word]
# return embedding matrix
return embedding_matrix
def load_embeddings(word_index, embedding_file, vector_length=300):
"""
A general function to create embedding matrix
:param word_index: word:index dictionary
:param embedding_file: path to embeddings file
:param vector_length: length of vector
"""
max_features = len(word_index) + 1
words_to_find = list(word_index.keys())
more_words_to_find = []
for wtf in words_to_find:
more_words_to_find.append(wtf)
more_words_to_find.append(str(wtf).capitalize())
more_words_to_find = set(more_words_to_find)
def get_coefs(word, *arr):
return word, np.asarray(arr, dtype='float32')
embeddings_index = dict(
get_coefs(*o.strip().split(" "))
for o in open(embedding_file)
if o.split(" ")[0]
in more_words_to_find
and len(o) > 100
)
embedding_matrix = np.zeros((max_features, vector_length))
for word, i in word_index.items():
if i >= max_features:
continue
embedding_vector = embeddings_index.get(word)
if embedding_vector is None:
embedding_vector = embeddings_index.get(
str(word).capitalize()
)
if embedding_vector is None:
embedding_vector = embeddings_index.get(
str(word).upper()
)
if (embedding_vector is not None
and len(embedding_vector) == vector_length):
embedding_matrix[i] = embedding_vector
return embedding_matrix
def run(df, fold):
"""
Run training and validation for a given fold
and dataset
:param df: pandas dataframe with kfold column
:param fold: current fold, int
"""
# fetch training dataframe
train_df = df[df.kfold != fold].reset_index(drop=True)
# fetch validation dataframe
valid_df = df[df.kfold == fold].reset_index(drop=True)
print("Fitting tokenizer")
# we use tf.keras for tokenization
# you can use your own tokenizer and then you can
# get rid of tensorflow
tokenizer = tf.keras.preprocessing.text.Tokenizer()
tokenizer.fit_on_texts(df.review.values.tolist())
# convert training data to sequences
# for example : "bad movie" gets converted to
# [24, 27] where 24 is the index for bad and 27 is the
# index for movie
xtrain = tokenizer.texts_to_sequences(train_df.review.values)
# similarly convert validation data to
# sequences
xtest = tokenizer.texts_to_sequences(valid_df.review.values)
# zero pad the training sequences given the maximum length
# this padding is done on left hand side
# if sequence is > MAX_LEN, it is truncated on left hand side too
xtrain = tf.keras.preprocessing.sequence.pad_sequences(
xtrain, maxlen=config.MAX_LEN
)
# zero pad the validation sequences
xtest = tf.keras.preprocessing.sequence.pad_sequences(
xtest, maxlen=config.MAX_LEN
)
# initialize dataset class for training
train_dataset = IMDBDataset(
reviews=xtrain,
targets=train_df.sentiment.values
)
# create torch dataloader for training
# torch dataloader loads the data using dataset
# class in batches specified by batch size
train_data_loader = torch.utils.data.DataLoader(
train_dataset,
batch_size=config.TRAIN_BATCH_SIZE,
num_workers=2
)
# initialize dataset class for validation
valid_dataset = IMDBDataset(
reviews=xtest,
targets=valid_df.sentiment.values
)
# create torch dataloader for validation
valid_data_loader = torch.utils.data.DataLoader(
valid_dataset,
batch_size=config.VALID_BATCH_SIZE,
num_workers=1
)
print("Loading embeddings")
# load embeddings as shown previously
embedding_dict = load_vectors("../input/crawl-300d-2M.vec")
embedding_matrix = create_embedding_matrix(
tokenizer.word_index, embedding_dict
)
# create torch device, since we use gpu, we are using cuda
device = torch.device("cuda")
# fetch our LSTM model
model = LSTM(embedding_matrix)
# send model to device
model.to(device)
# initialize Adam optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
scheduler = get_scheduler(optimizer, 'CosineAnnealingWarmRestarts')
print("Training Model")
# set best accuracy to zero
best_accuracy = 0
# set early stopping counter to zero
early_stopping_counter = 0
# train and validate for all epochs
for epoch in range(config.EPOCHS):
# train one epoch
train(train_data_loader, model, optimizer, device, scheduler)
# validate
outputs, targets = evaluate(
valid_data_loader, model, device
)
# use threshold of 0.5
# please note we are using linear layer and no sigmoid
# you should do this 0.5 threshold after sigmoid
outputs = np.array(outputs) >= 0.5
# calculate accuracy
accuracy = metrics.accuracy_score(targets, outputs)
print(
f"FOLD:{fold}, Epoch: {epoch}, Accuracy Score = {accuracy}"
)
# simple early stopping
if accuracy > best_accuracy:
best_accuracy = accuracy
else:
early_stopping_counter += 1
if early_stopping_counter > 2:
break
import random
import os
# metrics.py
from sklearn import metrics
def multi_class_roc_auc(true, pred_probs_arr, labels):
auc_all = []
for label_number in labels:
true_labels = true.loc[:,label_number].copy()
pred_probs = pred_probs_arr.loc[:, label_number].copy()
#AUROC and AP (sliding across multiple decision thresholds)
fpr, tpr, thresholds = metrics.roc_curve(y_true = true_labels,
y_score = pred_probs,
pos_label = 1)
auc = metrics.auc(fpr, tpr)
auc_all.append(auc)
print(f'AUC of each class: {auc_all}')
return np.mean(auc_all)
if __name__ == "__main__":
def seed_torch(seed=42):
random.seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed)
np.random.seed(seed)
torch.manual_seed(seed)
torch.cuda.manual_seed(seed)
torch.backends.cudnn.deterministic = True
seed_torch(seed=CFG.seed)
# load data
df = pd.read_csv("../input/imdb_folds.csv")
# train for all folds
run(df, fold=0)
run(df, fold=1)
run(df, fold=2)
run(df, fold=3)
run(df, fold=4)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment