Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
def mergePurgeAndMakeListWithoutDuplictes(listOfFoundDuplicates: DataFrame, personList: Dataset[Person]): DataFrame ={
import spark.implicits._
import scala.collection.mutable.ArrayBuffer
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any]
var unifiedListOfPersons = ArrayBuffer.empty[Person]
val multipleDuplicates = listOfFoundDuplicates.filter($"num" > 1)
val noDuplicates = listOfFoundDuplicates.filter($"num" === 1)
multipleDuplicates.map(row => {
try{
unifiedListOfPersons += mergeDuplicatesForAGivenPerson(row.getAs[String]("id"), row.getAs[Set[String]]("duplicates"), personList)
}catch{
case e: NoRowFoundException => {
logger.warn(s"Not found record for a person row in the dataset")
}
}
})
unifiedListOfPersons.toDF().union(noDuplicates)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.