Skip to content

Instantly share code, notes, and snippets.

@hiropppe
Last active August 29, 2015 14:25
Show Gist options
  • Save hiropppe/d4a3edc30e3b8f18c2c4 to your computer and use it in GitHub Desktop.
Save hiropppe/d4a3edc30e3b8f18c2c4 to your computer and use it in GitHub Desktop.
# Example Pipeline
X = ["a b c d e spark", "b d", "spark f g h", "hadoop mapreduce"]
X_rdd = sc.parallelize(X, 2)
y = [1, 0, 1, 0]
y_rdd = sc.parallelize(y, 2)
Z = DictRDD((X_rdd, y_rdd), columns=('X', 'y'), dtype=[np.ndarray, np.ndarray])
dist_pipeline = SparkPipeline((
('vect', SparkCountVectorizer(analyzer=mecab_analyzer)),
('tfidf', SparkTfidfTransformer()),
('clf', SparkLinearSVC())
# ('clf', SparkMultinomialNB())
))
dist_pipeline.fit(Z, clf__classes=np.unique(y))
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
y_pred_dist = dist_pipeline.predict(ArrayRDD(sc.parallelize(['spark', 'hadoop', 'spark hadoop'])))
# Example GridSearch
X = ["a b c d e spark", "b d", "spark f g h", "hadoop mapreduce", "a b c d e spark", "b d", "spark f g h", "hadoop mapreduce", "spark f g h", "hadoop mapreduce""a b c d e spark", "b d", "spark f g h", "hadoop mapreduce", "a b c d e spark", "b d", "spark f g h"]
X_rdd = sc.parallelize(X, 8)
y = [1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0, 1, 0]
y_rdd = sc.parallelize(y, 8)
Z = DictRDD((X_rdd, y_rdd), columns=('X', 'y'), dtype=[np.ndarray, np.ndarray])
dist_pipeline = SparkPipeline((
('vect', SparkCountVectorizer(analyzer=mecab_analyzer)),
('tfidf', SparkTfidfTransformer()),
('clf', SparkMultinomialNB())
))
parameters = {'clf__alpha': [1., 1.5]}
fit_params = {'clf__classes': np.array([0, 1])}
grid = SparkGridSearchCV(cv=4, estimator=dist_pipeline,
param_grid=parameters,
fit_params=fit_params)
#debug grid.fit(Z)
grid.fit(Z)
grid.best_score_
grid.best_params_
grid.best_estimator_
# Fit&Predict from HDFS
pos_text = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/1")
neg_text = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/0")
xy = pos_text.map(lambda x: (x[1], 1)).union(neg_text.map(lambda x: (x[1], 0))).map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1])
xy = xy.map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1]).map(lambda x: x[0])
train_xy, test_xy = xy.randomSplit([7, 3], 17)
train_x = train_xy.map(lambda x: x[0][0])
train_y = train_xy.map(lambda x: x[0][1])
train_x = ArrayRDD(train_x)
train_y = ArrayRDD(train_y)
Z = DictRDD((train_x, train_y), columns=('X', 'y'), dtype=[np.ndarray, np.ndarray])
dist_pipeline = SparkPipeline((
('vect', SparkCountVectorizer(analyzer=mecab_analyzer)),
('tfidf', SparkTfidfTransformer()),
# ('clf', SparkLinearSVC())
('clf', SparkMultinomialNB())
))
dist_pipeline.fit(Z, clf__classes=np.array([0, 1]))
y_pred_dist = dist_pipeline.predict(Z[:, 'X'])
# GridSearch from HDFS
pos_text = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/1")
neg_text = sc.wholeTextFiles("hdfs://hdp1.containers.dev:9000/user/root/data/binary_clf/small/0")
xy = pos_text.map(lambda x: (x[1], 1)).union(neg_text.map(lambda x: (x[1], 0))).map(lambda x: (x, np.random.rand())).sortBy(lambda x: x[1])
train_xy, test_xy = xy.randomSplit([7, 3], 17)
train_x = train_xy.map(lambda x: x[0][0])
train_y = train_xy.map(lambda x: x[0][1])
train_x = ArrayRDD(train_x)
train_y = ArrayRDD(train_y)
Z = DictRDD((train_x, train_y), columns=('X', 'y'), dtype=[np.ndarray, np.ndarray])
dist_pipeline = SparkPipeline((
('vect', SparkCountVectorizer(analyzer=mecab_analyzer)),
('tfidf', SparkTfidfTransformer()),
('clf', SparkMultinomialNB())
))
parameters = {'clf__alpha': [1., 1.5]}
fit_params = {'clf__classes': np.array([0, 1])}
grid = SparkGridSearchCV(estimator=dist_pipeline,
param_grid=parameters,
fit_params=fit_params)
grid.fit(Z)
grid.best_score_
grid.best_params_
grid.best_estimator_
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment