Skip to content

Instantly share code, notes, and snippets.

@rmitsch
Last active December 5, 2017 18:57
Show Gist options
  • Save rmitsch/ebadcde3a713639da4e26db5ca2dfb6d to your computer and use it in GitHub Desktop.
Save rmitsch/ebadcde3a713639da4e26db5ca2dfb6d to your computer and use it in GitHub Desktop.
from pyspark import SparkConf
from pyspark import SparkContext
import numpy.random as rnd
import numpy as np
import os
# os.environ["SPARK_HOME"] = "/usr/local/Cellar/apache-spark/1.5.1/"
os.environ["PYSPARK_PYTHON"] = "/home/raphael/Development/datamining/py3env/bin/python3"
def iterate(iterable):
"""
Auxiliary function used to print stuff.
:param iterable:
:return:
"""
r = []
if type(iterable).__name__ == "int":
r.append(iterable)
return tuple(r)
for v1_iterable in iterable:
if type(v1_iterable).__name__ == "int" or type(v1_iterable).__name__ == "numpy.int64":
r.append(v1_iterable)
else:
v1_tuples = []
for v2 in v1_iterable:
v1_tuples.append(v2)
r.append(v1_tuples)
return tuple(r)
# Tuple unpacking not supported in Python 3:
# https://stackoverflow.com/questions/21892989/what-is-the-good-python3-equivalent-for-auto-tuple-unpacking-in-lambda
def to_triple(x):
"""
Construction of rdd_A and rdd_B splits up row-wise. Hence: Fetch row-index and row, return list of triples -
[((row index, column index), value) for this row. Matrix representation is assembled from these triples at time of this
RDD's collection.
:param x: Matrix row with zipped row index, i. e.: (ndarray with values in row, index of row).
:return:
"""
# x[1] is row's index, x[0] are values in row. Return in form of (key, value).
return [(x[1], column_index, x[0][column_index]) for column_index in range(0, len(x[0]))]
def mult(rdd_A, rdd_B):
"""
Multiplies matrices A and B, stored in rdd_A and rdd_B respectively, with each other.
Does both mapping and reduction.
:param rdd_A:
:param rdd_B:
:return:
"""
# Basic idea for matrix multiplication: Multiply n-th column of A with n-th row of B.
# Approach as presented in
# https://www.thinkbiganalytics.com/2015/11/23/scalable-matrix-multiplication-using-spark-2/: Calculate product of
# A[:, n] x B[n, :] = C_n in mapping step. Sum up all C_n, where n in 1 to number of rows in A - and columns in B),
# in reduction step.
# 1. Group rdd_A by rows and rdd_B by columns so we can target map at A[:, n] and B[n, :].
rdd_A = rdd_A.groupBy(lambda x: x[0])
rdd_B = rdd_B.groupBy(lambda x: x[1])
# 2. Merge RDDs by key (which is row index for rdd_A and column_index for rdd_B), multiply values.
# merged_rdd = rdd_A.union(rdd_B).reduceByKey(lambda x, y: [x, y])
# For easier handling: Only keep values in rows/columns.
# Question: Is it inperformant to call list()?
row_times_column = rdd_A.cartesian(rdd_B).map(
lambda triples: (
(triples[0][0], triples[1][0]),
np.dot(
np.array([triple[2] for triple in triples[0][1]]),
np.array([triple[2] for triple in triples[1][1]])
)
)
)
# 3. Reduction: Add values for all triples.
return row_times_column #.reduceByKey(lambda x, y: np.sum(x[1] + y[1]))
conf = SparkConf().setAppName('master').setMaster('local')
sc = SparkContext(conf=conf)
n = 5
m = 5
A = rnd.randint(0, 2, size=(n, m))
B = rnd.randint(0, 2, size=(m, n))
print("A = \n", A)
print("B = \n", B)
# convert A, B to sparse triples
rdd_A = sc.parallelize(A).zipWithIndex().flatMap(to_triple)
rdd_B = sc.parallelize(B).zipWithIndex().flatMap(to_triple)
TMP = mult(rdd_A, rdd_B)
result = np.zeros((n, n), dtype=int)
for ((r,c),v) in TMP.collect():
result[r,c] = v
print("result = \n", result)
print("A * B = \n", np.dot(A, B))
if np.sum(result == np.dot(A, B)) == 25:
print("Results match.")
sc.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment