Git clone and build systemml.
mvn clean package -P distribution
From your systemml source directory, install the systemml 0.10.0 jar to your local m2 repository.
mvn install:install-file -Dfile=target/systemml-0.10.0-incubating-SNAPSHOT-standalone.jar -DgroupId=org.apache.systemml -DartifactId=systemml -Dversion=0.10.0-incubating -Dpackaging=jar
Git clone the spark_dml branch from the zeppelin project fork at nakul02/incubator-systemml
git clone -b spark_dml https://github.com/nakul02/incubator-zeppelin.git
Build zeppelin. In the incubating-zeppelin root directory,
mvn package -DskipTests
Run zeppelin. In the incubating-zeppelin root directory,
bin/zeppelin-daemon.sh restart
In a browser, navigate to localhost:8080 In a new cell, type
%dml
X = rand(rows=200, cols=1000, min=0, max=4, pdf="uniform", sparsity=1.0)
Y = rand(rows=1000, cols=100, min=0, max=4, pdf="uniform", sparsity=1.0)
A = rand(rows=100, cols=1000, min=0, max=4, pdf="uniform", sparsity=1.0)
B = rand(rows=1000, cols=200, min=0, max=4, pdf="uniform", sparsity=1.0)
Q <- (X %*% Y) %*% (A %*% B)
q <- castAsScalar(Q[1,1])
print ("Q[1,1] is " + q)
and press the "play" button on the top right corner of the cell.
Your output should look like
Q[1,1] is 1.5272079204925272E9
Download the data set from SNAP@Stanford and unzip/untar it. You should get a file called amazon0601.txt. To try out the Spark DML, in the first cell, paste this Spark code and "run" the cell:
%spark
val baseDir = "/path/to/file"
val dataPath = baseDir + "/amazon0601.txt"
val X_train = sc.textFile(dataPath)
.filter(!_.startsWith("#"))
.map(_.split("\t") match { case Array(prod1, prod2) => (prod1.toInt, prod2.toInt, 1.0)})
.toDF("prod_i", "prod_j", "x_ij")
.filter("prod_i < 500 AND prod_j < 500") // Change to smaller number
.cache()
val max_prod_i = X_train.select(max("prod_i")).first.getInt(0)
val max_prod_j = X_train.select(max("prod_i")).first.getInt(0)
val numProducts = math.max(max_prod_i, max_prod_j) + 1 // 0-based indexing
z.put("X", X_train)
Change the value of "baseDir" appropriately.
In the next cell on the same notebook, type the following DML code and hit run:
%spark.dml
# data & args
X = read($zX)
X = X+1 # change product IDs to be 1-based, rather than 0-based
V = table(X[,1], X[,2])
size = 100
if(size > -1) {
V = V[1:size,1:size]
}
max_iteration = 100
rank = 10
# run PNMF
n = nrow(V)
m = ncol(V)
range = 0.01
W = Rand(rows=n, cols=rank, min=0, max=range, pdf="uniform")
H = Rand(rows=rank, cols=m, min=0, max=range, pdf="uniform")
i=0
while(i < max_iteration) {
H = (H * (t(W) %*% (V/(W%*%H))))/t(colSums(W))
W = (W * ((V/(W%*%H)) %*% t(H)))/t(rowSums(H))
i = i + 1;
}
# compute negative log-likelihood
negloglik = -1 * (sum(V*log(W%*%H)) - as.scalar(colSums(W)%*%rowSums(H)))
# write outputs
negloglik = matrix(negloglik, rows=1, cols=1)
write(negloglik, $znegloglikout)
write(W, $zWout)
write(H, $zHout)
In this code, read and write statements are interpreted differently if they refer to a command line expression. X = read($zX) reads in a DataFrame object called "X" which was saved in the spark program. "$z" is a special prefix. write(negloglik, $znegloglikout) saves a DataFrame object called "negloglikout". Again, "$z" is a special prefix. This and two other DataFrame objects are available in subsequent Spark cells.
To print the value of "negloglik", paste this code and hit run:
%spark
val n = z.get( "negloglikout")
n.asInstanceOf[org.apache.spark.sql.DataFrame].show
Of course, negloglikout could have also been printed from within DML, but this cell was to show the flow of DataFrames into and out of "%spark.dml" cells.