Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def createAllCombinationsDataset(personDFFromSource: Dataset[Person]): Dataset[(Person, Person, Vector)] ={
import spark.implicits._
import com.fts.cp.etl.model.Person
import org.apache.spark.ml.linalg.{Vector, Vectors}
val joinPostcodeCondition = when(
$"l.postcode".isNotNull && $"r.postcode".isNotNull,
$"l.postcode" === $"r.postcode"
)
val joinDOBCondition = when(
$"l.dob".isNotNull && $"r.dob".isNotNull,
$"l.dob" === $"r.dob"
)
val comparableDataset = personDFFromSource.as("l").joinWith(personDFFromSource.as("r"),
levenshtein($"l.forename", $"r.forename") < 3
&& soundex($"l.familyname") === soundex($"r.familyname")
&& levenshtein($"l.familyname", $"r.familyname") < 5
&& joinDOBCondition
&& joinPostcodeCondition
&& $"l.old_id" =!= $"r.old_id"
).map {
case (left, right) => (left, right, Vectors.dense(left.asInstanceOf[Person].distance(right.asInstanceOf[Person]).toArray))
}
.toDF("left", "right", "features")
.as[(Person, Person, Vector)]
//comparableDataset.show(200,false)
comparableDataset
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.