Skip to content

Instantly share code, notes, and snippets.

@hiropppe
Last active September 2, 2015 05:55
Show Gist options
  • Save hiropppe/747d97269e6d6f2ca605 to your computer and use it in GitHub Desktop.
Save hiropppe/747d97269e6d6f2ca605 to your computer and use it in GitHub Desktop.
import numpy as np
from pyspark.sql import Row, SQLContext
from pyspark.mllib.feature import HashingTF
from pyspark.mllib.feature import IDF
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.classification import SVMWithSGD, SVMModel
from pyspark.mllib.classification import NaiveBayes, NaiveBayesModel
import MeCab
default_stop_pos = ['接続詞', '助詞', '助動詞', '記号']
def mecab_analyzer(text, stop_pos=default_stop_pos):
mecab = MeCab.Tagger('-Ochasen')
encoded_text = text.encode('utf-8')
node = mecab.parseToNode(encoded_text)
node = node.next
word = []
while node:
surface = node.surface
feature_array = node.feature.split(',')
if feature_array[0] == 'BOS/EOS' or feature_array[0] in stop_pos:
node = node.next
continue
if feature_array[6] == '*':
w = surface
else:
w = feature_array[6]
word.append(w.decode('utf-8'))
node = node.next
return word
# Classification(SparkML mllib)
pos_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/all/1")
neg_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/all/0")
xy = pos_files.map(lambda x: (mecab_analyzer(x[1]), 1.0)).union(neg_files.map(lambda x: (mecab_analyzer(x[1]), 0.0)))
xy = xy.map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1]).map(lambda x: x[0])
fold = 3
hashingTF = HashingTF()
# split rdd
data = []
as_list = xy.collect()
size = len(as_list)/fold
for i in range(fold-1):
data.append(as_list[size*i:size*(i+1)])
data.append(as_list[size*(i+1):])
# cv
cv_accuracy = []
cv_precision = []
cv_recall = []
for i in range(fold):
print 'iteration', i
train = []
for k in range(fold):
data_idx = k+i
if(fold <= data_idx):
data_idx -= fold
if(k < fold - 1):
print data_idx, 'train data',
train.extend(data[data_idx])
else:
print data_idx, 'test data'
test = data[data_idx]
## HashingTF model
train_xy = sc.parallelize(train).map(lambda x: LabeledPoint(x[1], hashingTF.transform(x[0])))
test_xy = sc.parallelize(test).map(lambda x: LabeledPoint(x[1], hashingTF.transform(x[0])))
## HashingTF + IDF model
#train_xy = sc.parallelize(train).map(lambda x: (x[1], hashingTF.transform(x[0])))
#test_xy = sc.parallelize(test).map(lambda x: (x[1], hashingTF.transform(x[0])))
#x_tf = train_xy.map(lambda x: x[1]).union(test_xy.map(lambda x: x[1]))
#idf_model = IDF().fit(x_tf)
#tmp = []
#for x in train_xy.collect():
# tmp.append((x[0], idf_model.transform(x[1])))
#train_xy = sc.parallelize(tmp).map(lambda x: LabeledPoint(x[0], x[1]))
#del tmp[:]
#for x in test_xy.collect():
# tmp.append((x[0], idf_model.transform(x[1])))
#test_xy = sc.parallelize(tmp).map(lambda x: LabeledPoint(x[0], x[1]))
### Error applying IDF
### It appears that you are attempting to reference SparkContext from a broadcast
### variable, action, or transforamtion. SparkContext can only be used on the driver,
### not in code that it run on workers. For more information, see SPARK-5063.
##train_xy = train_xy.map(lambda x: LabeledPoint(x[0], idf_model.transform(x[1])))
##test_xy = test_xy.map(lambda x: LabeledPoint(x[0], idf_model.transform(x[1])))
# train
model = SVMWithSGD.train(train_xy, iterations=100)
#model= NaiveBayes.train(train_xy, 0.05)
# test data
labelsAndPreds = test_xy.map(lambda p: (p.label, model.predict(p.features)))
data_size = test_xy.count()
pos_size = float(test_xy.filter(lambda p: p.label == 1.0).count())
neg_size = data_size - pos_size
pos_pred_size = labelsAndPreds.filter(lambda (v, p): p == 1.0).count()
neg_pred_size = data_size - pos_pred_size
pos_acc_size = labelsAndPreds.filter(lambda (v, p): v == 1.0 and v == p).count()
neg_acc_size = labelsAndPreds.filter(lambda (v, p): v == 0.0 and v == p).count()
acc_size = pos_acc_size + neg_acc_size
# metrics
accuracy = acc_size / float(data_size)
precision = pos_acc_size / float(pos_pred_size)
recall = pos_acc_size / float(pos_size)
f1 = 2 * (precision * recall) / (precision + recall)
score = {'accuracy': accuracy, 'recall': recall, 'precision': precision, 'f1': f1}
print score
cv_accuracy.append(score['accuracy'])
cv_precision.append(score['precision'])
cv_recall.append(score['recall'])
accuracy = sum(cv_accuracy)/len(cv_accuracy)
precision = sum(cv_precision)/len(cv_precision)
recall = sum(cv_recall)/len(cv_recall)
cv_score = {'accuracy': accuracy, 'precision': precision, 'recall': recall, 'f1': 2 * (precision * recall) / (precision + recall)}
print cv_score
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment