Skip to content

Instantly share code, notes, and snippets.

@hiropppe
Last active August 29, 2015 14:25
Show Gist options
  • Save hiropppe/eb53c3d4d67cad7ad058 to your computer and use it in GitHub Desktop.
Save hiropppe/eb53c3d4d67cad7ad058 to your computer and use it in GitHub Desktop.
# Pipeline(SparkML ml)
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import CrossValidator
pos_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/1")
neg_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/0")
xy = pos_files.map(lambda x: (" ".join(mecab_analyzer(x[1])), 1.0)).union(neg_files.map(lambda x: (" ".join(mecab_analyzer(x[1])), 0.0)))
xy = xy.map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1]).map(lambda x: x[0])
train_xy, test_xy = xy.randomSplit([7, 3], 17)
LabeledDocument = Row("text", "label")
train_df = train_xy.map(lambda x: LabeledDocument(*x)).toDF()
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
model = pipeline.fit(train_df)
Document = Row("text")
test_df = test_xy.map(lambda x: Document(x[0])).toDF()
prediction = model.transform(test_df)
selected = prediction.select("text", "prediction")
for row in selected.collect():
print row.prediction, row.text
# CrossValidation(SparkML mllib)
pos_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/1")
neg_files = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/0")
xy = pos_files.map(lambda x: (" ".join(mecab_analyzer(x[1])), 1.0)).union(neg_files.map(lambda x: (" ".join(mecab_analyzer(x[1])), 0.0)))
xy = xy.map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1]).map(lambda x: x[0])
train_xy, test_xy = xy.randomSplit([7, 3], 17)
LabeledDocument = Row("text", "label")
train_df = train_xy.map(lambda x: LabeledDocument(*x)).toDF()
test_df = test_xy.map(lambda x: LabeledDocument(*x)).toDF()
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression()
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])
grid = ParamGridBuilder().addGrid(hashingTF.numFeatures, [10, 100]).addGrid(lr.maxIter, [0, 1]).addGrid(lr.regParam, [0.1, 0.01]).build()
evaluator = BinaryClassificationEvaluator()
cv = CrossValidator(estimator=pipeline, estimatorParamMaps=grid, evaluator=evaluator)
cvModel = cv.fit(train_df)
evaluator.evaluate(cvModel.transform(train_df))
for r in cvModel.transform(test_df).select("text", "probability", "prediction").collect():
print r.prediction, r.text, r.probability
evaluator.evaluate(cvModel.transform(test_df), {evaluator.metricName: "areaUnderPR"})
evaluator.evaluate(cvModel.transform(test_df), {evaluator.metricName: "areaUnderROC"})
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment