Skip to content

Instantly share code, notes, and snippets.

@0asa
Created January 27, 2015 11:12
Show Gist options
  • Star 17 You must be signed in to star a gist
  • Fork 5 You must be signed in to fork a gist
  • Save 0asa/5dcd725d464858e9082f to your computer and use it in GitHub Desktop.
Save 0asa/5dcd725d464858e9082f to your computer and use it in GitHub Desktop.
Run a Scikit-Learn algorithm on top of Spark with PySpark
from pyspark import SparkConf, SparkContext
from sklearn.datasets import make_classification
from sklearn.ensemble import ExtraTreesClassifier
import pandas as pd
import numpy as np
conf = (SparkConf()
.setMaster("local[*]")
.setAppName("My app")
.set("spark.executor.memory", "1g"))
sc = SparkContext(conf = conf)
# Build a classification task using 3 informative features
X, y = make_classification(n_samples=12000,
n_features=10,
n_informative=3,
n_redundant=0,
n_repeated=0,
n_classes=2,
random_state=0,
shuffle=False)
# or read from a file (for instance)
#df = pd.read_csv('data.csv', sep=' ', header=None)
#X = df[[1,2,3,4,5,6,7,8,9,10]].as_matrix()
#y = df[[0]][0].tolist()
# Partition data
def dataPart(X, y, start, stop): return dict(X=X[start:stop, :], y=y[start:stop])
def train(data):
X = data['X']
y = data['y']
return ExtraTreesClassifier(n_estimators=100,random_state=0).fit(X,y)
# Merge 2 Models
from sklearn.base import copy
def merge(left,right):
new = copy.deepcopy(left)
new.estimators_ += right.estimators_
new.n_estimators = len(new.estimators_)
return new
data = [dataPart(X, y, 0, 4000), dataPart(X,y,4000,8000), dataPart(X,y,8000,12000)]
forest = sc.parallelize(data).map(train).reduce(merge)
importances = forest.feature_importances_
std = np.std([tree.feature_importances_ for tree in forest.estimators_],
axis=0)
indices = np.argsort(importances)[::-1]
# Print the feature ranking
print("Feature ranking:")
for f in range(10):
print("%d. feature %d (%f)" % (f + 1, indices[f], importances[indices[f]]))
@auvipy
Copy link

auvipy commented Dec 8, 2015

any scikit-image integration example?

@tvmanikandan
Copy link

i would like to run SGD classifier using scikit + spark. Please post some examples.

@javadba
Copy link

javadba commented Jun 4, 2017

Works great! One small improvement is to permit the above code to be safely run either from spark-submit OR from inside ipython via %run -i sklearn-pyspark.py

if not 'sc' in locals():
    conf = (SparkConf()
             .setMaster("local[*]")
             .setAppName("My app")
             .set("spark.executor.memory", "1g"))
    sc = SparkContext(conf = conf)

@sagnik-rzt
Copy link

Great! This helped me parallelize an XGBoost model, thanks man.

@mshih2
Copy link

mshih2 commented Jul 5, 2018

sagnik-rzt, I dont think you can parallelize XGBoost model since the natural of the model can not be parallelized

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment