Skip to content

Instantly share code, notes, and snippets.

@dimart
Last active January 1, 2016 00:38
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save dimart/8073fbea2ad9fea1c6f4 to your computer and use it in GitHub Desktop.
Save dimart/8073fbea2ad9fea1c6f4 to your computer and use it in GitHub Desktop.
#!/usr/local/bin/python
# -*- coding: utf-8 -*-
import codecs
import re
import string
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.neighbors import KNeighborsClassifier
def clean_data(data, lang):
data = re.sub("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", " ", data)
data = re.sub("[" + string.digits +
string.punctuation + "]", ' ', data)
data = re.sub(u"[‘—»₤«€₦→↓↗↑↔⇄·№■₴′=‑›…‰∫√×⊕≥█⊢⊂●♦⟨⟩⋀⋁⊥∞≠≡″≈•−一”‚‒†‐:’]", ' ', data)
if lang not in ["uz", "en", "tt"]:
if lang in ['be', 'uk', "kk"]:
data = re.sub("["+string.ascii_letters.replace('i', '').replace('I', '')+"]", ' ', data)
else:
data = re.sub("["+string.ascii_letters+"]", ' ', data)
data = data.replace('\t', ' ')
data = data.replace('\n', ' ')
# remove double spaces
data = re.sub(' +', ' ', data)
data = data.lower()
return data
def clean_content(text):
data = re.sub("http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+", " ", text)
data = re.sub("[" + string.digits +
string.punctuation + "]", ' ', data)
data = re.sub(u"[‘—»₤«€₦→↓↗↑↔⇄·№■₴′=‑›…‰∫√×⊕≥█⊢⊂●♦⟨⟩⋀⋁⊥∞≠≡″≈•−一”‚‒†‐:’]", ' ', data)
data = re.sub(u'([\w\-\.]+@(\w[\w\-]+\.)+[\w\-]+)', ' ', data) # email
data = data.replace('\t', ' ')
data = data.replace('\n', ' ')
data = re.sub(' +', ' ', data)
return data.lower()
def get_corpus(mode, langs):
corpus = []
y = []
for lang in langs:
print "Getting " + mode + " corpus for " + lang
file = codecs.open("./data/"+mode+"/" + mode + "_" + lang, 'r', encoding='utf-8')
data = file.read()
data = clean_data(data, lang)
data = data.split(' ')
tmp = ""
tmp_corpus = []
for word in data:
if len(word) > 0:
if len(tmp) < 120:
tmp += word if len(tmp) < 1 else " " + word
else:
tmp_corpus.append(tmp)
tmp = ""
corpus.extend(tmp_corpus)
y.extend([lang] * len(tmp_corpus))
return corpus, y
def tail(langs):
import os
for lang in langs:
os.system("tail -n 1000 ./data/train/" + lang + "/articles_in_plain_text.txt > ./data/test/test_" + lang)
def head(langs):
import os
for lang in langs:
os.system("head -n 2000 ./data/train/" + lang + "/articles_in_plain_text.txt > ./data/train/train_" + lang)
def filter_ok_ds():
'''
:return: Odnoklasskini dataset which contains only post in russian
'''
languages = ['be', 'en', 'tg', 'kk', 'uz', 'ru', 'uk']
head(languages)
train_corpus, train_y = get_corpus("train", languages)
tv = TfidfVectorizer(analyzer='char', ngram_range=(3, 3), norm='l1')
print "fit_trans tv"
train_freq = tv.fit_transform(train_corpus).toarray()
output = codecs.open("./result.txt", 'w', encoding='utf-8')
output_bad = codecs.open("./result_bad.txt", 'w', encoding='utf-8')
with codecs.open('./data/test/test_content.csv', 'r', encoding='utf-8') as f:
knn = KNeighborsClassifier(n_neighbors=2, algorithm='brute')
print "fit knn"
knn.fit(train_freq, train_y)
from itertools import islice
step = 10000
lines_gen = islice(f, step)
lines = list(lines_gen)
while len(lines) > 0:
print "Processing", step
lines_clean = []
lines_ = []
for line in lines:
try:
orig_text = line.split('\t')[3]
text = clean_content(orig_text)
if len(text) > 3:
lines_clean.append(text)
lines_.append(line)
except:
pass
test_freq = tv.transform(lines_clean).toarray()
pred = knn.predict_proba(test_freq)
for i, pr in enumerate(pred[:, 3]):
if pr < 0.98:
output_bad.write(lines_[i])
else:
output.write(lines_[i])
lines = list(islice(f, step))
print "-" * 10
def main():
languages = ['be', 'en', 'tg', 'kk', 'uz', 'ru', 'uk']
head(languages)
tail(languages)
train_corpus, train_y = get_corpus("train", languages)
tv = TfidfVectorizer(analyzer='char', ngram_range=(3, 3), norm='l1')
train_freq = tv.fit_transform(train_corpus).toarray()
print "Train dataset size = ", len(train_freq)
test_corpus, test_y = get_corpus("test", languages)
print "Transforming"
test_freq = tv.transform(test_corpus).toarray()
print "Test dataset size = ", len(test_freq)
for n in [1, 2, 5, 10]:
knn = KNeighborsClassifier(n_neighbors=n)
print "Fitting kNN"
knn.fit(train_freq, train_y)
print "Predicting"
pred = knn.predict(test_freq)
total = len(pred)
c = 0
for i, p in enumerate(pred):
real = test_y[i]
if real == p:
c += 1
else:
print test_corpus[i], p, real
print "Acc ", n, c / float(total), "right =", c, "wrong =", total - c
if __name__ == "__main__":
main()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment