Skip to content

Instantly share code, notes, and snippets.

@jesusjavierdediego
Created February 27, 2019 18:24
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jesusjavierdediego/b2bf498b64e3e84a7032084edeb282a3 to your computer and use it in GitHub Desktop.
Save jesusjavierdediego/b2bf498b64e3e84a7032084edeb282a3 to your computer and use it in GitHub Desktop.
def createAllCombinationsDataset(personDFFromSource: Dataset[Person]): Dataset[(Person, Person, Vector)] ={
import spark.implicits._
import com.fts.cp.etl.model.Person
import org.apache.spark.ml.linalg.{Vector, Vectors}
val joinPostcodeCondition = when(
$"l.postcode".isNotNull && $"r.postcode".isNotNull,
$"l.postcode" === $"r.postcode"
)
val joinDOBCondition = when(
$"l.dob".isNotNull && $"r.dob".isNotNull,
$"l.dob" === $"r.dob"
)
val comparableDataset = personDFFromSource.as("l").joinWith(personDFFromSource.as("r"),
levenshtein($"l.forename", $"r.forename") < 3
&& soundex($"l.familyname") === soundex($"r.familyname")
&& levenshtein($"l.familyname", $"r.familyname") < 5
&& joinDOBCondition
&& joinPostcodeCondition
&& $"l.old_id" =!= $"r.old_id"
).map {
case (left, right) => (left, right, Vectors.dense(left.asInstanceOf[Person].distance(right.asInstanceOf[Person]).toArray))
}
.toDF("left", "right", "features")
.as[(Person, Person, Vector)]
//comparableDataset.show(200,false)
comparableDataset
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment