def mergePurgeAndMakeListWithoutDuplictes(listOfFoundDuplicates: DataFrame, personList: Dataset[Person]): DataFrame ={ | |
import spark.implicits._ | |
import scala.collection.mutable.ArrayBuffer | |
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any] | |
var unifiedListOfPersons = ArrayBuffer.empty[Person] | |
val multipleDuplicates = listOfFoundDuplicates.filter($"num" > 1) | |
val noDuplicates = listOfFoundDuplicates.filter($"num" === 1) | |
multipleDuplicates.map(row => { | |
try{ | |
unifiedListOfPersons += mergeDuplicatesForAGivenPerson(row.getAs[String]("id"), row.getAs[Set[String]]("duplicates"), personList) | |
}catch{ | |
case e: NoRowFoundException => { | |
logger.warn(s"Not found record for a person row in the dataset") | |
} | |
} | |
}) | |
unifiedListOfPersons.toDF().union(noDuplicates) | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment