def createAllCombinationsDataset(personDFFromSource: Dataset[Person]): Dataset[(Person, Person, Vector)] ={ | |
import spark.implicits._ | |
import com.fts.cp.etl.model.Person | |
import org.apache.spark.ml.linalg.{Vector, Vectors} | |
val joinPostcodeCondition = when( | |
$"l.postcode".isNotNull && $"r.postcode".isNotNull, | |
$"l.postcode" === $"r.postcode" | |
) | |
val joinDOBCondition = when( | |
$"l.dob".isNotNull && $"r.dob".isNotNull, | |
$"l.dob" === $"r.dob" | |
) | |
val comparableDataset = personDFFromSource.as("l").joinWith(personDFFromSource.as("r"), | |
levenshtein($"l.forename", $"r.forename") < 3 | |
&& soundex($"l.familyname") === soundex($"r.familyname") | |
&& levenshtein($"l.familyname", $"r.familyname") < 5 | |
&& joinDOBCondition | |
&& joinPostcodeCondition | |
&& $"l.old_id" =!= $"r.old_id" | |
).map { | |
case (left, right) => (left, right, Vectors.dense(left.asInstanceOf[Person].distance(right.asInstanceOf[Person]).toArray)) | |
} | |
.toDF("left", "right", "features") | |
.as[(Person, Person, Vector)] | |
//comparableDataset.show(200,false) | |
comparableDataset | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment