Skip to content

Instantly share code, notes, and snippets.

@myui
Forked from MLnick/sklearn-lr-spark.py
Last active December 20, 2015 15:19
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 1 You must be signed in to fork a gist
  • Save myui/6153721 to your computer and use it in GitHub Desktop.
Save myui/6153721 to your computer and use it in GitHub Desktop.
Forked to deal with sparse and large dataset on HDFS.
import sys
from pyspark.context import SparkContext
from numpy import array, random as np_random
from sklearn import linear_model as lm
from sklearn.base import copy
from scipy import sparse as sp
#MAX_FEATURES=1000
MAX_FEATURES=16777216
ITERATIONS = 5
np_random.seed(seed=42)
def train(iterator, sgd):
for x in iterator:
label = x[0]
features = x[1]
sgd.partial_fit(features, label, classes=array([0,1]))
yield sgd
def merge(left, right):
new = copy.deepcopy(left)
new.coef_ += right.coef_
new.intercept_ += right.intercept_
new.count += right.count
return new
def avg_model(sgd, slices):
slices = sgd.count
sgd.coef_ /= slices
sgd.intercept_ /= slices
return sgd
def parse_line(line):
items = line.split('\t')
label = array(int(items[0]))
features = sp.lil_matrix((1,MAX_FEATURES),)
for f in items[1].split(','):
features[0,int(f)] = 1.0
return [label, features]
if __name__ == "__main__":
if len(sys.argv) < 3:
print >> sys.stderr, \
"Usage: PythonLR_HDFS <master> <input_dir> [<iterations>]"
exit(-1)
sc = SparkContext(sys.argv[1], "PythonLR_HDFS")
input_path = sys.argv[2]
ITERATIONS = int(sys.argv[3]) if len(sys.argv) >= 4 else ITERATIONS
data = sc.textFile(input_path).map(parse_line)
# init stochastic gradient descent
sgd = lm.SGDClassifier(loss='log')
# training
for ii in range(ITERATIONS):
sgd.count = 1
sgd = data.mapPartitions(lambda x: train(x, sgd)) \
.reduce(lambda x, y: merge(x, y))
sgd = avg_model(sgd) # averaging weight vector => iterative parameter mixtures
print "Iteration %d:" % (ii + 1)
print "Model: "
print sgd.coef_
print sgd.intercept_
print ""
@myui
Copy link
Author

myui commented Aug 5, 2013

Usage: ./pyspark work/sklearn-sparselr-spark-hdfs.py spark://masternode:7077 hdfs://namenode:8020/project/spark/tmp/test01.data 5

Data format:

target\tfeatures
1       1,2,3,9,10
0       1,5,4
1       1,2,7
0       1,2,8,4

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment