Skip to content

Instantly share code, notes, and snippets.

@myui
Forked from MLnick/sklearn-lr-spark.py
Last active December 20, 2015 15:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save myui/6154126 to your computer and use it in GitHub Desktop.
Save myui/6154126 to your computer and use it in GitHub Desktop.
Forked to deal with large dense dataset on HDFS.
import sys
from pyspark.context import SparkContext
from numpy import array, random as np_random
from sklearn import linear_model as lm
from sklearn.base import copy
ITERATIONS = 5
np_random.seed(seed=42)
def train(iterator, sgd):
for x in iterator:
label = x[0]
features = x[1]
sgd.partial_fit(features, label, classes=array([0,1]))
yield sgd
def merge(left, right):
new = copy.deepcopy(left)
new.coef_ += right.coef_
new.intercept_ += right.intercept_
new.count += right.count
return new
def avg_model(sgd):
slices = sgd.count
sgd.coef_ /= slices
sgd.intercept_ /= slices
return sgd
def parse_line(line):
items = line.split('\t')
label = array(int(items[0]))
features = array([int(f) for f in items[1].split(',')])
return [label, features]
if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, \
"Usage: PythonLR_HDFS <master> <input_dir> [<iterations>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR_HDFS")
input_path = sys.argv[2]
ITERATIONS = int(sys.argv[3]) if len(sys.argv) >= 4 else ITERATIONS
data = sc.textFile(input_path).map(parse_line)
# init stochastic gradient descent
sgd = lm.SGDClassifier(loss='log')
# training
for ii in range(ITERATIONS):
sgd.count = 1
sgd = data.mapPartitions(lambda x: train(x, sgd)) \
.reduce(lambda x, y: merge(x, y))
sgd = avg_model(sgd) # averaging weight vector => iterative parameter mixtures
print "Iteration %d:" % (ii + 1)
print "Model: "
print sgd.coef_
print sgd.intercept_
print ""
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment