Skip to content

Instantly share code, notes, and snippets.

@MLnick
Last active December 30, 2015 20:29
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save MLnick/7880766 to your computer and use it in GitHub Desktop.
Save MLnick/7880766 to your computer and use it in GitHub Desktop.
PySpark: svmlight Hadoop text files -> RDD[sparse vectors]
13/12/09 23:01:02 INFO spark.SparkContext: Job finished: runJob at PythonRDD.scala:288, took 0.175237 s
Count raw: 683; Count svml: 683
Raw sample: [u'2.000000 1:1000025.000000 2:5.000000 3:1.000000 4:1.000000 5:1.000000 6:2.000000 7:1.000000 8:3.000000 9:1.000000 10:1.000000', u'2.000000 1:1002945.000000 2:5.000000 3:4.000000 4:4.000000 5:5.000000 6:7.000000 7:10.000000 8:3.000000 9:2.000000 10:1.000000', u'2.000000 1:1015425.000000 2:3.000000 3:1.000000 4:1.000000 5:1.000000 6:2.000000 7:2.000000 8:3.000000 9:1.000000 10:1.000000', u'2.000000 1:1016277.000000 2:6.000000 3:8.000000 4:8.000000 5:1.000000 6:3.000000 7:4.000000 8:3.000000 9:7.000000 10:1.000000', u'2.000000 1:1017023.000000 2:4.000000 3:1.000000 4:1.000000 5:3.000000 6:2.000000 7:1.000000 8:3.000000 9:1.000000 10:1.000000', u'4.000000 1:1017122.000000 2:8.000000 3:10.000000 4:10.000000 5:8.000000 6:7.000000 7:10.000000 8:9.000000 9:7.000000 10:1.000000', u'2.000000 1:1018099.000000 2:1.000000 3:1.000000 4:1.000000 5:1.000000 6:2.000000 7:10.000000 8:3.000000 9:1.000000 10:1.000000', u'2.000000 1:1018561.000000 2:2.000000 3:1.000000 4:2.000000 5:1.000000 6:2.000000 7:1.000000 8:3.000000 9:1.000000 10:1.000000', u'2.000000 1:1033078.000000 2:2.000000 3:1.000000 4:1.000000 5:1.000000 6:2.000000 7:1.000000 8:1.000000 9:1.000000 10:5.000000', u'2.000000 1:1033078.000000 2:4.000000 3:2.000000 4:1.000000 5:1.000000 6:2.000000 7:1.000000 8:2.000000 9:1.000000 10:1.000000']
SVML sample: [(<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 4.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.])), (<1x10 sparse matrix of type '<type 'numpy.float64'>'
with 10 stored elements in Compressed Sparse Row format>, array([ 2.]))]
from sklearn.datasets._svmlight_format import _load_svmlight_file
import numpy as np
import scipy.sparse as sp
from pyspark.context import SparkContext
'''
Run from spark-0.8.0-incubating-bin-hadoop1/python directory:
$ SPARK_HOME=../ python spark-svmlight.py
Better place to change code would be in datasets.svmlight_format.py, but the below worked
without needing to re-build sklearn.
Something like this should work (in datasets.svmlight_format.py):
def _open_and_load(f, dtype, multilabel, zero_based, query_id):
# need to import collections
if hasattr(f, "read") or isinstance(f, collections.Iterable)::
return _load_svmlight_file(f, dtype, multilabel, zero_based, query_id)
# XXX remove closing when Python 2.7+/3.1+ required
with closing(_gen_open(f)) as f:
return _load_svmlight_file(f, dtype, multilabel, zero_based, query_id)
'''
def parse_svmllight_string(s, n_features=None, dtype=np.float64,
multilabel=False, zero_based="auto", query_id=False):
r = [_load_svmlight_file([s], dtype, multilabel, bool(zero_based), bool(query_id))]
if (zero_based is False
or zero_based == "auto" and all(np.min(tmp[1]) > 0 for tmp in r)):
for ind in r:
indices = ind[1]
indices -= 1
n_f = max(ind[1].max() for ind in r) + 1
if n_features is None:
n_features = n_f
elif n_features < n_f:
raise ValueError("n_features was set to {},"
" but input file contains {} features"
.format(n_features, n_f))
result = []
for data, indices, indptr, y, query_values in r:
shape = (indptr.shape[0] - 1, n_features)
X = sp.csr_matrix((data, indices, indptr), shape)
X.sort_indices()
result += X, y
if query_id:
result.append(query_values)
return (result[0], result[1])
'''
This maps line-by-line and returns an RDD[(csc_matrix, csc_matrix)] (effectively an RDD[(sparse feature vector, target)]
It may be more efficient to mapPartitions and parse the entire partition, and have a resulting 1 csc_matrix for X and Y for
each partition
'''
sc = SparkContext("local[2]", "test")
rdd_raw = sc.textFile("breast-cancer.svml")
raw_count = rdd_raw.count()
raw_values = rdd_raw.take(10)
# str(x) required because Spark converts to UTF which breaks _load_svmlight_file
rdd_svml = rdd_raw.map(lambda x: parse_svmllight_string(str(x)))
svml_count = rdd_svml.count()
svml_values = rdd_svml.take(10)
print "Count raw: %i; Count svml: %i" % (raw_count, svml_count)
print "Raw sample: %s" % str(raw_values)
print "SVML sample: %s" % str(svml_values)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment