Skip to content

Instantly share code, notes, and snippets.

@wangzk
Last active February 5, 2018 09:52
Show Gist options
  • Save wangzk/818e084cf0018592a1f1efe76dc8c47f to your computer and use it in GitHub Desktop.
Save wangzk/818e084cf0018592a1f1efe76dc8c47f to your computer and use it in GitHub Desktop.
Dima Demo with SQL
package demo
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SQLContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.{SparkConf, SparkContext}
case class Record(id:Int, value: String)
object EasyDemo {
def parseAdjFile(line:String):demo.Record = {
val spacePos = line.indexOf(' ', 0)
val adjPart = line.substring(spacePos + 1)
val id = line.substring(0, spacePos).toInt
new demo.Record(id, adjPart)
}
def main(args: Array[String]): Unit = {
Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
Logger.getLogger("org.eclipse.jetty.server").setLevel(Level.OFF)
val graphFile = args(0)
var threshold:Double = args(1).toDouble
val partitionNum:Int = args(2).toInt
val sparkConf = new SparkConf().setAppName(s"Dima-${graphFile}")
sparkConf.set("spark.sql.joins.numSimialrityPartitions", partitionNum.toString)
val sc = new SparkContext(sparkConf)
val sqlContext = new SQLContext(sc)
import sqlContext.implicits._
val t0 = System.currentTimeMillis()
val s1 = sc.textFile(graphFile, partitionNum).map(parseAdjFile(_)).toDF()
s1.registerTempTable("s1")
s1.persist(StorageLevel.MEMORY_AND_DISK_SER)
val numRecords = s1.count()
println("Threshold = " + threshold)
println("Dataframe Partition Number = " + partitionNum)
val resultDF = sqlContext.sql(s"SELECT * FROM s1 AS l SIMILARITY JOIN s1 AS r ON JACCARDSIMILARITY(l.value, r.value) >= ${threshold}")
//resultDF.collect().foreach(println)
val resultCount = resultDF.count()
// Since Dima just supports R&S join, we need to correct the result number here to fit the self-join.
println(s"Raw join result count = ${resultCount}")
println(s"Num records = ${numRecords}")
println(s"Similar pair count under self-join = ${(resultCount - numRecords)/2}")
val t1 = System.currentTimeMillis()
println(s"Elaspsed time (ms) = ${t1 - t0}")
}
}
@wangzk
Copy link
Author

wangzk commented Feb 5, 2018

Run this script with three arguments: PathToDataFile SimilarityThreshold PartitionNum.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment