Skip to content

Instantly share code, notes, and snippets.

@0asa
Forked from wpm/spark_parallel_boost.py
Last active August 29, 2015 14:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save 0asa/e839176002095aa7172e to your computer and use it in GitHub Desktop.
Save 0asa/e839176002095aa7172e to your computer and use it in GitHub Desktop.
from pyspark import SparkContext
import numpy as np
from sklearn.cross_validation import train_test_split, Bootstrap
from sklearn.datasets import make_classification
from sklearn.metrics import accuracy_score
from sklearn.tree import DecisionTreeClassifier
def run(sc):
def zero_matrix(n, m):
return np.zeros(n*m, dtype = int).reshape(n, m)
def vote_increment(y_est):
increment = zero_matrix(y_est.size, n_ys)
increment[np.arange(y_est.size), y_est] = 1
return increment # test point x class matrix with 1s marking the estimator prediction
X, y = make_classification()
X_train, X_test, y_train, y_test = train_test_split(X, y)
n_test = X_test.shape[0]
n_ys = np.unique(y_train).size
model = DecisionTreeClassifier()
# Partition the training data into random sub-samples with replacement.
samples = sc.parallelize(Bootstrap(y.size))
# Train a model for each sub-sample and apply it to the test data.
vote_tally = samples.map(lambda (index, _):
model.fit(X[index], y[index]).predict(X_test)
).map(vote_increment).fold(zero_matrix(n_test, n_ys), np.add) # Take the learner majority vote.
y_estimate_vote = np.argmax(vote_tally, axis = 1)
return accuracy_score(y_test, y_estimate_vote)
if __name__ == '__main__':
print run(SparkContext("local", "Boost"))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment