Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
class ModelComposer extends Logging{
val operationType = "create"
@throws(classOf[ModelComposerException])
def getAllDFs(persons: DataFrame, sourceData: DataFrame): Map[String, DataFrame] = {
import spark.implicits._
import com.datastax.driver.core.utils.UUIDs
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any]
var result: scala.collection.mutable.Map[String, DataFrame] = scala.collection.mutable.Map.empty[String, DataFrame]
var piis: ArrayBuffer[PII_eventdata] = ArrayBuffer.empty[PII_eventdata]
var addresses: ArrayBuffer[Address_eventdata] = ArrayBuffer.empty[Address_eventdata]
var contacts: ArrayBuffer[Contact_eventdata] = ArrayBuffer.empty[Contact_eventdata]
var datasharingagreements: ArrayBuffer[Datasharingagreement_eventdata] = ArrayBuffer.empty[Datasharingagreement_eventdata]
var documents: ArrayBuffer[Document] = ArrayBuffer.empty[Document]
var proofs: ArrayBuffer[Proof_eventdata] = ArrayBuffer.empty[Proof_eventdata]
persons.collect.par.foreach(personRow => {
val fkid = UUIDs.timeBased().toString
val old_id = personRow.getLong(personRow.fieldIndex("old_id")).toString
val person_id = personRow.getString(personRow.fieldIndex("person_id"))
val postcode = personRow.getString(personRow.fieldIndex("postcode"))
try{
var cdRow: Option[Row] = Some(sourceData.filter($"customer_id" === old_id).head())
cdRow match{
case Some(row) => {
val piiID = UUIDs.timeBased().toString
val addressID = UUIDs.timeBased().toString
val contactID = UUIDs.timeBased().toString
val datasharingID = UUIDs.timeBased().toString
val proofIDs = ArrayBuffer.empty[String]
val proofOfID = UUIDs.timeBased().toString
var isThereAnyProof = false
val proof = row.getString(row.fieldIndex("proofid_category"))
proof match{
case p:String => {
if(!p.equalsIgnoreCase("NULL")){
isThereAnyProof = true
proofIDs += proofOfID
}
}
case _ => isThereAnyProof = false
}
var isThereAnyAddress = false
val addressesIDs = ArrayBuffer.empty[String]
val addressFromSource = row.getString(row.fieldIndex("address1"))
addressFromSource match{
case p:String => {
if(!p.equalsIgnoreCase("NULL")){
isThereAnyAddress = true
addressesIDs += addressID
}
}
case _ => isThereAnyAddress = false
}
piis += new PII_eventdata(
piiID,
datasharingID,
proofIDs.toSet[String],
addressesIDs.toSet[String],
person_id,
contactID,
"ACTIVE",
"",
Set.empty[String],
Set.empty[String],
"",
"",
"",
operationType,
LocalDateTime.now().toString,
LocalDateTime.now().toString
)
val address = getAddressForAPerson(old_id, postcode, addressID, row)
address match{
case Some(a) => addresses += a
case None => logger.warn(s"Empty address record found. Historic ID: ${old_id}")
}
val contact = getContactForAPerson(old_id, contactID, row)
contact match{
case Some(c) => contacts += c
case None => logger.warn("Empty contact record found. Historic ID: ${old_id}")
}
val datasharing = getDatasharingagreementsForAPerson(datasharingID, row)
datasharing match{
case Some(d) => datasharingagreements += d
case None => logger.warn("Empty datasharing record found. Historic ID: ${old_id}")
}
if(isThereAnyProof){
val(doc, docs_ids) = getProofDocumentForAPerson(row)
documents += doc
val proof = getProofForAPerson(proofOfID, row, docs_ids)
proof match{
case Some(p) => proofs += p
case None => logger.warn("Empty proof record found. Historic ID: ${old_id}")
}
}
}
case None => logger.warn(s"No historic ID was found for person with person_id: ${personRow.getString(personRow.fieldIndex("person_id"))}")
}
}catch{
case e: java.util.NoSuchElementException => {
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.")
}
case e: java.lang.NullPointerException => {
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.")
}
}
})
result += ("pii" -> piis.toDF())
result += ("person" -> persons)
result += ("address" -> addresses.toDF())
result += ("contact" -> contacts.toDF())
result += ("datasharingagreement" -> datasharingagreements.toDF())
result += ("document" -> documents.toDF())
result += ("proof" -> proofs.toDF())
result.toMap
}
def getAddressForAPerson(historic_id: String, postcode: String, address_id: String, cd: Row): Option[Address_eventdata] ={
var result: Option[Address_eventdata] = None
try{
result = Some(new Address_eventdata(
address_id,
cd.getString(cd.fieldIndex("address1")),
cd.getString(cd.fieldIndex("address2")),
"PRIMARY",
cd.getString(cd.fieldIndex("city")),
postcode,
cd.getString(cd.fieldIndex("countrycode")),
cd.getString(cd.fieldIndex("county")),
"",
"",
"",
"",
"",
"",
"",
operationType,
LocalDateTime.now().toString,
LocalDateTime.now().toString
))
}catch{
case e: Exception => {
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. Reason: ${e}")
logger.error("No address will be retrieved for this customer.")
}
}
result
}
def getContactForAPerson(historic_id: String, contact_id: String, cd: Row): Option[Contact_eventdata] ={
var result: Option[Contact_eventdata] = None
val mobiles = ArrayBuffer.empty[String]
val emails = ArrayBuffer.empty[String]
val socialmedia = ArrayBuffer.empty[String]
try{
mobiles += cd.getString(cd.fieldIndex("mobileNumber"))
emails += cd.getString(cd.fieldIndex("email"))
}catch{
case e: Exception => {
logger.warn(s"Contact section. Error getting value from row. Reason: ${e}")
}
}
try{
result = Some(new Contact_eventdata(
contact_id,
cd.getString(cd.fieldIndex("telephoneNumber")),
mobiles.toSet[String],
emails.toSet[String],
Set.empty[String],
"",
Set.empty[String],
Set.empty[String],
Set.empty[String],
operationType,
LocalDateTime.now().toString,
LocalDateTime.now().toString
))
}catch{
case e: java.util.NoSuchElementException => {
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${historic_id}'. No record is available.")
}
case e: Exception => {
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. No address will be retrieved for this customer.")
}
}
result
}
def getDatasharingagreementsForAPerson(datasharingagreement_id: String, cd: Row): Option[Datasharingagreement_eventdata] ={
var dataconsent: Boolean = false
try{
val dataConsentId = cd.getLong(cd.fieldIndex("dataConsentId"))
if(dataConsentId > 0) dataconsent = true
}catch{
case e: Exception => {
//do nothing
logger.debug(s"DATA SHARING section. Error getting value from row. Reason: ${e}")
}
}
def getBoolFromString(b: String): Boolean ={
var result: Boolean = false
b match{
case "1" => result = true
case _ => result = false
}
result
}
Some(new Datasharingagreement_eventdata(
datasharingagreement_id,
cd.getString(cd.fieldIndex("insertion_date")),
"1970-01-01T00:00:00.000",
if(dataconsent) "ACTIVE" else "INACTIVE",
dataconsent,
dataconsent,
getBoolFromString(cd.getString(cd.fieldIndex("thirdPartySharingDataConsent"))),
false,
"1970-01-01T00:00:00.000",
"1970-01-01T00:00:00.000",
"",
false,
false,
false,
false,
operationType,
LocalDateTime.now().toString,
LocalDateTime.now().toString
))
}
def getProofDocumentForAPerson(cd: Row): (Document, Array[String]) ={
import com.fts.cp.etl.storage.ODBDAO
import spark.implicits._
val doc_id: String = UUIDs.timeBased().toString
var docs_ids: ArrayBuffer[String] = ArrayBuffer.empty[String]
docs_ids += doc_id
val doctype = cd.getString(cd.fieldIndex("proofid_description"))
val dao = new ODBDAO(spark)
val docTypes = dao.getAllDocumentTypes()
val doctyperow = docTypes.toDF().filter($"name" === doctype).head
val docTypeId = Some(doctyperow.getString(doctyperow.fieldIndex("document_type_id")))
var issueDate: Option[String] = None
var expiryDate: Option[String] = None
val issueDateInRow = cd.getString(cd.fieldIndex("cuid_issuedate"))
val expiryDateInRow = cd.getString(cd.fieldIndex("cuid_expirydate"))
issueDateInRow match{
case "NULL" => issueDate = Some("1970-01-01 00:00:00.000")
case _ => issueDate = Some(issueDateInRow)
}
expiryDateInRow match{
case "NULL" => expiryDate = Some("1970-01-01 00:00:00.000")
case _ => expiryDate = Some(expiryDateInRow)
}
val doc = new Document(
doc_id,
docTypeId.getOrElse(""),
cd.getString(cd.fieldIndex("cuid_reference")),
issueDate.get,
expiryDate.get,
true,
"1970-01-01T00:00:00.000"
)
(doc, docs_ids.toArray)
}
def getProofForAPerson(proof_id: String, cd: Row, docs_ids: Array[String]): Option[Proof_eventdata] ={
Some(new Proof_eventdata(
proof_id,
cd.getString(cd.fieldIndex("proofid_category")),
"Bureau UK/IE",
docs_ids.toSet[String],
"",
"",
Set.empty[String],
operationType,
LocalDateTime.now().toString,
LocalDateTime.now().toString
))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.