Skip to content

Instantly share code, notes, and snippets.

Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save jesusjavierdediego/d8da5d2d7df22a9c49b00623de547d59 to your computer and use it in GitHub Desktop.
Save jesusjavierdediego/d8da5d2d7df22a9c49b00623de547d59 to your computer and use it in GitHub Desktop.
def mergePurgeAndMakeListWithoutDuplictes(listOfFoundDuplicates: DataFrame, personList: Dataset[Person]): DataFrame ={
import spark.implicits._
import scala.collection.mutable.ArrayBuffer
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any]
var unifiedListOfPersons = ArrayBuffer.empty[Person]
val multipleDuplicates = listOfFoundDuplicates.filter($"num" > 1)
val noDuplicates = listOfFoundDuplicates.filter($"num" === 1)
multipleDuplicates.map(row => {
try{
unifiedListOfPersons += mergeDuplicatesForAGivenPerson(row.getAs[String]("id"), row.getAs[Set[String]]("duplicates"), personList)
}catch{
case e: NoRowFoundException => {
logger.warn(s"Not found record for a person row in the dataset")
}
}
})
unifiedListOfPersons.toDF().union(noDuplicates)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment