Skip to content

Instantly share code, notes, and snippets.

@dapangmao
Last active April 5, 2016 15:57
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save dapangmao/0c6b79a6541623221d4e to your computer and use it in GitHub Desktop.
Save dapangmao/0c6b79a6541623221d4e to your computer and use it in GitHub Desktop.
Spark example

###Transform RDD to DataFrame in Spark

from pyspark.sql import Row
import os

rdd = sc.textFile('C:/Users/chao.huang.ctr/spark-playground//class.txt')
def transform(x):
    args = x.split()
    funcs = [str, str, int, float, float]
    return [z(y) for z, y in zip(funcs, args)]

varnames = Row("name", "sex", "age", "height", "weight")
df = rdd.map(transform).map(lambda x: varnames(*x)).toDF()

for x in df.collect():
    print x

link

"""
A simple text classification pipeline that recognizes "spark" from
input text. This is to show how to create and configure a Spark ML
pipeline in Python. Run with:

  bin/spark-submit examples/src/main/python/ml/simple_text_classification_pipeline.py
"""
from pyspark import SparkContext
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row, SQLContext

sc = SparkContext(appName="SimpleTextClassificationPipeline")
sqlCtx = SQLContext(sc)

# Prepare training documents, which are labeled.
LabeledDocument = Row("id", "text", "label")
training = sc.parallelize([(0L, "a b c d e spark", 1.0),
                           (1L, "b d", 0.0),
                           (2L, "spark f g h", 1.0),
                           (3L, "hadoop mapreduce", 0.0)]) \
    .map(lambda x: LabeledDocument(*x)).toDF()

# Configure an ML pipeline, which consists of tree stages: tokenizer, hashingTF, and lr.
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

# Fit the pipeline to training documents.
model = pipeline.fit(training)

# Prepare test documents, which are unlabeled.
Document = Row("id", "text")
test = sc.parallelize([(4L, "spark i j k"),
                       (5L, "l m n"),
                       (6L, "mapreduce spark"),
                       (7L, "apache hadoop")]) \
    .map(lambda x: Document(*x)).toDF()

# Make predictions on test documents and print columns of interest.
prediction = model.transform(test)
selected = prediction.select("id", "text", "prediction")
for row in selected.collect():
    print row

sc.stop()
@krishnaiitd
Copy link

Does the Logistic regression in PySpark MLLIB model working for multiclasses classification ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment