Skip to content

Instantly share code, notes, and snippets.

View TomLous's full-sized avatar
:shipit:
D̴̰͈͍̟͚́̓̉̄͂̈́ê̵͆̈͐͠v̶ẽ̴̱͇̰̜l̴̹̫̃̇́̏͘ǫ̸͍̻̜̯̲̙͇̣̕p̵̐̈́̐͊͂ȉ̴̳̺n̷̻͈̯͇͌͋̊̄̚g̸̛̭͂̍

Tom Lous TomLous

:shipit:
D̴̰͈͍̟͚́̓̉̄͂̈́ê̵͆̈͐͠v̶ẽ̴̱͇̰̜l̴̹̫̃̇́̏͘ǫ̸͍̻̜̯̲̙͇̣̕p̵̐̈́̐͊͂ȉ̴̳̺n̷̻͈̯͇͌͋̊̄̚g̸̛̭͂̍
View GitHub Profile
val conf = new SparkConf().setAppName("SparkTest").setMaster("local[1]")
val sparkContext = new SparkContext(conf)
val sqlContext = new SQLContext(sparkContext)
import sqlContext.implicits.{getClass => _, _}
val fileNameGoogle = "/google-places-data.json" // 1 json places per line
val locationsGoogle = sqlContext.read.json(getClass.getResource(fileNameGoogle).getPath).
toDF().
import spark.implicits._
val kvKDataset = spark.read.option("header", true).option("sep", ";").option("ignoreLeadingWhiteSpace", true).option("ignoreTrailingWhiteSpace", true).option("quote", """"""").option("nullValue", "").option("mode", "FAILFAST").csv(path)
kvKDataset.select(
'DOSSIERNR.as("dossierNummer"),
'VG_NR.as("vgNummer"),
'HANDELSN30.as("naamShort"),
'HANDN1_30.as("naamShortP1"),
'HANDN2_30.as("naamShortP2"),
val comparableDataset = kvKDataset.as("l")
.joinWith(
kvKDataset.as("r"),
$"l.adresV" === $"r.adresV" && $"l.postcodePlaatsV" === $"r.postcodePlaatsV" && $"l.dossierNummer" =!= $"r.dossierNummer"
).map {
case (left, right) => (left, right, Vectors.dense(left.distance(right).toArray))
}
.toDF("left", "right", "features")
.as[(KvKRecord, KvKRecord, Vector)]
import java.lang.Math._
import info.debatty.java.stringsimilarity.JaroWinkler
case class KvKRecord(dossierNummer: String, vgNummer: String, naamShort: String, naamShortP1: String, naamShortP2: String, naamLong: String, adresV: String, postcodePlaatsV: String, adresC: String, postcodePlaatsC: String, wptf: Int, sbi: Int) {
def vectorValues: List[Any] = List(dossierNummer, vgNummer, naamShort, naamShortP1, naamShortP2, naamLong, adresV, postcodePlaatsV, adresC, postcodePlaatsC, wptf, sbi)
def distance(other: KvKRecord): List[Double] = {
val jw = new JaroWinkler
// Tabulator: checkout http://stackoverflow.com/questions/7539831/scala-draw-table-to-console
def propertyList(kvKRecord: KvKRecord): List[Any] = KvKRecord.unapply(kvKRecord).map(_.productIterator.toList).getOrElse(Nil)
val labeledList: ArrayBuffer[LabeledVector] = ArrayBuffer()
breakable {
comparableDataset
.sample(withReplacement = false, Config.sampleFactor)
.collect()
// load the labeled data
val labeledSet = spark.read.parquet(path).as[LabeledVector]
// split train/test (80/20)
val Array(trainingData, testData) = labeledSet.randomSplit(Array(Config.trainSplit, 1 - Config.trainSplit))
// Basic model
val lr = new LogisticRegression().setMaxIter(200).setRegParam(0.01).setElasticNetParam(0.8)
// Train
// edges are given the probabilty of the correctness of the edge
val edges: RDD[Edge[Double]] = predictedLinksDataset.rdd.map(
l => Edge(l.dossierNummer_left.toLong, l.dossierNummer_right.toLong, l.probability))
// vertices are just tuples of the dossiernummer.toLong (VertexId is typed as Long)
val vertices: RDD[(VertexId, KvKRecord)] = predictedLinksDataset.rdd.map(
k => (k.dossierNummer.toLong, k))
// the graph
val linkedGraph = Graph(vertices, edges)
import spark.sqlContext.implicits._
case class GroupedKvKRecord(groupId: Long, kvkRecords: Seq[KvKRecord])
// need to broadcast for rdd in rdd mapping
val bcVertices = spark.sparkContext.broadcast(vertices.collectAsMap)
// connectedComponents is the magic here
val groupedKvKRecords = linkedGraph
.connectedComponents
val stringify = udf((vs: Seq[String]) => vs.mkString(";"))
groupedKvKRecords
.map(
r => r.kvkRecords
.foldRight((0, 0, List.empty[String]))(
(kvkrecord, comb) => (
comb._1 + 1,
kvkrecord.wptf + comb._2,
kvkrecord.dossierNummer :: comb._3