Last active
November 12, 2017 13:17
-
-
Save gmyrianthous/d17b8b527a80cdcedbcf117a31f6c0b4 to your computer and use it in GitHub Desktop.
Classification of movie reviews
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Text Classification with Perceptron | |
# Dataset: Review polarity - Cornell University | |
# Author: Giorgos Myrianthous | |
# February, 2017 | |
import sys, os, random | |
import numpy as np | |
# np.set_printoptions(threshold=np.inf) | |
from collections import Counter | |
from nltk.corpus import stopwords | |
import matplotlib.pyplot as plt | |
import collections | |
# features' vectorisation. | |
def vectorise_features(data): | |
# Tokenisation - split on whitespace. | |
tokens = data.split(" ") | |
counter = Counter() | |
counter += features_all(tokens) | |
# Bias term | |
bias = Counter() | |
bias["bias"] = 1 | |
counter += bias | |
return counter | |
def features_all(tokens): | |
counter = Counter() | |
# stopwords list. | |
cached_stopwords = stopwords.words("english") | |
for token in tokens: | |
# Stop-word removal | |
if token not in cached_stopwords: | |
counter[token] = 1 | |
return counter | |
def read_data(directory): | |
total_counts = Counter() | |
documents = collections.OrderedDict() # (filename:counts) | |
labels = collections.OrderedDict() # (filename:label) | |
feature_names = [] | |
hashed = collections.OrderedDict() | |
for label in ["pos", "neg"]: | |
subdirectories = os.path.join(directory, label) | |
for filename in os.listdir(subdirectories): | |
full_path = os.path.join(subdirectories, filename) | |
data = open(full_path).read().lower() | |
counter = vectorise_features(data) | |
total_counts += counter | |
documents[filename] = counter | |
labels[filename] = label | |
feature_count = 0 | |
for (word, count) in total_counts.most_common(1000000): | |
hashed[word] = feature_count | |
feature_names.append(word) | |
feature_count += 1 | |
docs_tmp = collections.OrderedDict() | |
for filename in documents: | |
tmp = collections.OrderedDict() | |
for count in documents[filename]: | |
if count in hashed: | |
tmp[hashed[count]] = 1 | |
docs_tmp[filename] = tmp | |
# Split the dataset (1600 training - 400 testing - fully balanced) | |
no_training_pos = 0 | |
total_training = 0 | |
no_training_neg = 0 | |
training = collections.OrderedDict() | |
testing = collections.OrderedDict() | |
for filename in docs_tmp: | |
if (labels[filename] == "pos"): | |
if (no_training_pos < 800 and total_training < 1600): | |
training[filename] = docs_tmp[filename] | |
no_training_pos += 1 | |
total_training += 1 | |
else: | |
testing[filename] = docs_tmp[filename] | |
else: | |
if (no_training_neg < 800 and total_training < 1600): | |
training[filename] = docs_tmp[filename] | |
no_training_pos += 1 | |
total_training += 1 | |
else: | |
testing[filename] = docs_tmp[filename] | |
return training, testing, feature_names, labels | |
def dot_product(a, b, filename): | |
dot = 0 | |
for i in b[filename]: | |
dot += a[i] * b[filename][i] | |
return dot | |
def shuffle(data): | |
keys = list(data.keys()) | |
# Fix the random seed so that results are reproducible. | |
random.seed(26) | |
random.shuffle(keys) | |
shuffled_dictionary = collections.OrderedDict() | |
for key in keys: | |
shuffled_dictionary[key] = data[key] | |
return shuffled_dictionary | |
# Plot: uncomment plt.show(), comment out plt.savefig() | |
# Save: uncomment plt.savefig(), comment out plt.show() | |
def plot_learning_progress(training_accuracies): | |
plt.plot(training_accuracies) | |
plt.title("Learning progress during multiple passes") | |
plt.ylabel('Training accuracy') | |
plt.xlabel("Iterations") | |
#plt.show() | |
plt.savefig('Learning_progress.png') | |
def train(data, features, labels): | |
# Initialise weights | |
w_0 = np.zeros(len(features)) | |
w_1 = np.zeros(len(features)) | |
classes = collections.OrderedDict() | |
classes["pos"] = 1 | |
classes["neg"] = -1 | |
training_accuracy_list = [] | |
c = 1 | |
weights_averaged_0 = [] | |
weights_averaged_1 = [] | |
for i in range(100): | |
correctly_classified = 0 | |
data = shuffle(data) | |
for filename in data: | |
dot_0 = dot_product(w_0, data, filename) | |
dot_1 = dot_product(w_1, data, filename) | |
# Predict label y_hat | |
y_hat = 1 | |
if (dot_0 > dot_1): | |
y_hat = -1 | |
y = classes[labels[filename]] | |
# Update weights | |
if y_hat != y: | |
# Subtract f(x) from the weight of the incorrect label | |
# Add f(x) to the weight of the correct label | |
if (y_hat == -1): | |
for i in data[filename]: | |
w_0[i] -= data[filename][i] | |
for i in data[filename]: | |
w_1[i] += data[filename][i] | |
else: | |
for i in data[filename]: | |
w_0[i] += data[filename][i] | |
for i in data[filename]: | |
w_1[i] -= data[filename][i] | |
else: | |
correctly_classified += 1 | |
# Compute the training accuracy | |
training_accuracy = correctly_classified / len(data) * 100 | |
training_accuracy_list.append(training_accuracy) | |
# Store the weight vectors | |
weights_averaged_1.append(w_1) | |
weights_averaged_0.append(w_0) | |
# Increment variable c | |
c += 1 | |
# Repeat until convergence. | |
if (training_accuracy == 100): | |
# Plot learning progress | |
plot_learning_progress(training_accuracy_list) | |
# Average the weight vectors | |
w_0_averaged = np.sum(weights_averaged_0 ,axis=0) / c | |
w_1_averaged = np.sum(weights_averaged_1, axis=0) / c | |
return w_0_averaged, w_1_averaged, classes | |
# Average the weight vectors | |
w_0_averaged = np.sum(weights_averaged_0 ,axis=0) / c | |
w_1_averaged = np.sum(weights_averaged_1, axis=0) / c | |
# Plot learning progress | |
plot_learning_progress(training_accuracy_list) | |
return w_0_averaged, w_1_averaged, classes | |
def test(data, w_0, w_1, values, labels): | |
correctly_classified = 0 | |
TP = 0 | |
FN = 0 | |
FP = 0 | |
data = shuffle(data) | |
for filename in data: | |
dot_0 = dot_product(w_0, data, filename) | |
dot_1 = dot_product(w_1, data, filename) | |
# Predict the label y_hat (argmax) | |
y_hat = 1 | |
if (dot_0 > dot_1): | |
y_hat = -1 | |
y = values[labels[filename]] | |
# Compare the prediction with the true label. | |
if (y == y_hat): | |
correctly_classified += 1 | |
# Update TP, FN and FP | |
if (y == y_hat and y == 1): | |
TP += 1 | |
if (y == 1 and y_hat == 0): | |
FN += 1 | |
if (y == -1 and y_hat == 1): | |
FP += 1 | |
# testing accuracy. | |
print("===================================================================") | |
print("Testing accuracy: ", correctly_classified / len(data) * 100) | |
precision = TP / (TP + FP) | |
print("Precision: ", precision) | |
recall = TP / (TP + FN) | |
print("Recall: ", recall) | |
print("F-measure: ", 2 * precision * recall / (precision + recall)) | |
print("===================================================================") | |
def report_most_positively_weighted_terms(w_0, w_1, features): | |
# Positive class | |
zipped = list(zip(w_1, features)) | |
zipped.sort(key = lambda t: t[0], reverse=True) | |
print("\n===================================================================") | |
print ("Most positively weighted terms for Class = 1 (pos):") | |
print("===================================================================") | |
for (weight, word) in zipped[:10]: | |
print ("Word: {}, Weight: {}".format(weight, word)) | |
# Negative class | |
zipped = list(zip(w_0, features)) | |
zipped.sort(key = lambda t: t[0], reverse=True) | |
print("\n===================================================================") | |
print ("Most positively weighted terms for Class = -1 (neg):") | |
print("===================================================================") | |
for (weight, word) in zipped[:10]: | |
print ("Word: {}, Weight: {}".format(weight, word)) | |
if __name__ == '__main__': | |
# Prepare the data (assume same directory with /pos and /neg subdirectories) | |
training_dataset, testing_dataset, features, labels = read_data("./") | |
weights_0, weights_1, vals = train(training_dataset, features, labels) | |
test(testing_dataset, weights_0, weights_1, vals, labels) | |
report_most_positively_weighted_terms(weights_0, weights_1, features) | |
print("\n===================================================================") | |
print("Learning progress graph has been generated as Learning_progress.png") | |
print("===================================================================") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment