class ModelComposer extends Logging{ | |
val operationType = "create" | |
@throws(classOf[ModelComposerException]) | |
def getAllDFs(persons: DataFrame, sourceData: DataFrame): Map[String, DataFrame] = { | |
import spark.implicits._ | |
import com.datastax.driver.core.utils.UUIDs | |
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any] | |
var result: scala.collection.mutable.Map[String, DataFrame] = scala.collection.mutable.Map.empty[String, DataFrame] | |
var piis: ArrayBuffer[PII_eventdata] = ArrayBuffer.empty[PII_eventdata] | |
var addresses: ArrayBuffer[Address_eventdata] = ArrayBuffer.empty[Address_eventdata] | |
var contacts: ArrayBuffer[Contact_eventdata] = ArrayBuffer.empty[Contact_eventdata] | |
var datasharingagreements: ArrayBuffer[Datasharingagreement_eventdata] = ArrayBuffer.empty[Datasharingagreement_eventdata] | |
var documents: ArrayBuffer[Document] = ArrayBuffer.empty[Document] | |
var proofs: ArrayBuffer[Proof_eventdata] = ArrayBuffer.empty[Proof_eventdata] | |
persons.collect.par.foreach(personRow => { | |
val fkid = UUIDs.timeBased().toString | |
val old_id = personRow.getLong(personRow.fieldIndex("old_id")).toString | |
val person_id = personRow.getString(personRow.fieldIndex("person_id")) | |
val postcode = personRow.getString(personRow.fieldIndex("postcode")) | |
try{ | |
var cdRow: Option[Row] = Some(sourceData.filter($"customer_id" === old_id).head()) | |
cdRow match{ | |
case Some(row) => { | |
val piiID = UUIDs.timeBased().toString | |
val addressID = UUIDs.timeBased().toString | |
val contactID = UUIDs.timeBased().toString | |
val datasharingID = UUIDs.timeBased().toString | |
val proofIDs = ArrayBuffer.empty[String] | |
val proofOfID = UUIDs.timeBased().toString | |
var isThereAnyProof = false | |
val proof = row.getString(row.fieldIndex("proofid_category")) | |
proof match{ | |
case p:String => { | |
if(!p.equalsIgnoreCase("NULL")){ | |
isThereAnyProof = true | |
proofIDs += proofOfID | |
} | |
} | |
case _ => isThereAnyProof = false | |
} | |
var isThereAnyAddress = false | |
val addressesIDs = ArrayBuffer.empty[String] | |
val addressFromSource = row.getString(row.fieldIndex("address1")) | |
addressFromSource match{ | |
case p:String => { | |
if(!p.equalsIgnoreCase("NULL")){ | |
isThereAnyAddress = true | |
addressesIDs += addressID | |
} | |
} | |
case _ => isThereAnyAddress = false | |
} | |
piis += new PII_eventdata( | |
piiID, | |
datasharingID, | |
proofIDs.toSet[String], | |
addressesIDs.toSet[String], | |
person_id, | |
contactID, | |
"ACTIVE", | |
"", | |
Set.empty[String], | |
Set.empty[String], | |
"", | |
"", | |
"", | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
) | |
val address = getAddressForAPerson(old_id, postcode, addressID, row) | |
address match{ | |
case Some(a) => addresses += a | |
case None => logger.warn(s"Empty address record found. Historic ID: ${old_id}") | |
} | |
val contact = getContactForAPerson(old_id, contactID, row) | |
contact match{ | |
case Some(c) => contacts += c | |
case None => logger.warn("Empty contact record found. Historic ID: ${old_id}") | |
} | |
val datasharing = getDatasharingagreementsForAPerson(datasharingID, row) | |
datasharing match{ | |
case Some(d) => datasharingagreements += d | |
case None => logger.warn("Empty datasharing record found. Historic ID: ${old_id}") | |
} | |
if(isThereAnyProof){ | |
val(doc, docs_ids) = getProofDocumentForAPerson(row) | |
documents += doc | |
val proof = getProofForAPerson(proofOfID, row, docs_ids) | |
proof match{ | |
case Some(p) => proofs += p | |
case None => logger.warn("Empty proof record found. Historic ID: ${old_id}") | |
} | |
} | |
} | |
case None => logger.warn(s"No historic ID was found for person with person_id: ${personRow.getString(personRow.fieldIndex("person_id"))}") | |
} | |
}catch{ | |
case e: java.util.NoSuchElementException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.") | |
} | |
case e: java.lang.NullPointerException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.") | |
} | |
} | |
}) | |
result += ("pii" -> piis.toDF()) | |
result += ("person" -> persons) | |
result += ("address" -> addresses.toDF()) | |
result += ("contact" -> contacts.toDF()) | |
result += ("datasharingagreement" -> datasharingagreements.toDF()) | |
result += ("document" -> documents.toDF()) | |
result += ("proof" -> proofs.toDF()) | |
result.toMap | |
} | |
def getAddressForAPerson(historic_id: String, postcode: String, address_id: String, cd: Row): Option[Address_eventdata] ={ | |
var result: Option[Address_eventdata] = None | |
try{ | |
result = Some(new Address_eventdata( | |
address_id, | |
cd.getString(cd.fieldIndex("address1")), | |
cd.getString(cd.fieldIndex("address2")), | |
"PRIMARY", | |
cd.getString(cd.fieldIndex("city")), | |
postcode, | |
cd.getString(cd.fieldIndex("countrycode")), | |
cd.getString(cd.fieldIndex("county")), | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
}catch{ | |
case e: Exception => { | |
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. Reason: ${e}") | |
logger.error("No address will be retrieved for this customer.") | |
} | |
} | |
result | |
} | |
def getContactForAPerson(historic_id: String, contact_id: String, cd: Row): Option[Contact_eventdata] ={ | |
var result: Option[Contact_eventdata] = None | |
val mobiles = ArrayBuffer.empty[String] | |
val emails = ArrayBuffer.empty[String] | |
val socialmedia = ArrayBuffer.empty[String] | |
try{ | |
mobiles += cd.getString(cd.fieldIndex("mobileNumber")) | |
emails += cd.getString(cd.fieldIndex("email")) | |
}catch{ | |
case e: Exception => { | |
logger.warn(s"Contact section. Error getting value from row. Reason: ${e}") | |
} | |
} | |
try{ | |
result = Some(new Contact_eventdata( | |
contact_id, | |
cd.getString(cd.fieldIndex("telephoneNumber")), | |
mobiles.toSet[String], | |
emails.toSet[String], | |
Set.empty[String], | |
"", | |
Set.empty[String], | |
Set.empty[String], | |
Set.empty[String], | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
}catch{ | |
case e: java.util.NoSuchElementException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${historic_id}'. No record is available.") | |
} | |
case e: Exception => { | |
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. No address will be retrieved for this customer.") | |
} | |
} | |
result | |
} | |
def getDatasharingagreementsForAPerson(datasharingagreement_id: String, cd: Row): Option[Datasharingagreement_eventdata] ={ | |
var dataconsent: Boolean = false | |
try{ | |
val dataConsentId = cd.getLong(cd.fieldIndex("dataConsentId")) | |
if(dataConsentId > 0) dataconsent = true | |
}catch{ | |
case e: Exception => { | |
//do nothing | |
logger.debug(s"DATA SHARING section. Error getting value from row. Reason: ${e}") | |
} | |
} | |
def getBoolFromString(b: String): Boolean ={ | |
var result: Boolean = false | |
b match{ | |
case "1" => result = true | |
case _ => result = false | |
} | |
result | |
} | |
Some(new Datasharingagreement_eventdata( | |
datasharingagreement_id, | |
cd.getString(cd.fieldIndex("insertion_date")), | |
"1970-01-01T00:00:00.000", | |
if(dataconsent) "ACTIVE" else "INACTIVE", | |
dataconsent, | |
dataconsent, | |
getBoolFromString(cd.getString(cd.fieldIndex("thirdPartySharingDataConsent"))), | |
false, | |
"1970-01-01T00:00:00.000", | |
"1970-01-01T00:00:00.000", | |
"", | |
false, | |
false, | |
false, | |
false, | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
} | |
def getProofDocumentForAPerson(cd: Row): (Document, Array[String]) ={ | |
import com.fts.cp.etl.storage.ODBDAO | |
import spark.implicits._ | |
val doc_id: String = UUIDs.timeBased().toString | |
var docs_ids: ArrayBuffer[String] = ArrayBuffer.empty[String] | |
docs_ids += doc_id | |
val doctype = cd.getString(cd.fieldIndex("proofid_description")) | |
val dao = new ODBDAO(spark) | |
val docTypes = dao.getAllDocumentTypes() | |
val doctyperow = docTypes.toDF().filter($"name" === doctype).head | |
val docTypeId = Some(doctyperow.getString(doctyperow.fieldIndex("document_type_id"))) | |
var issueDate: Option[String] = None | |
var expiryDate: Option[String] = None | |
val issueDateInRow = cd.getString(cd.fieldIndex("cuid_issuedate")) | |
val expiryDateInRow = cd.getString(cd.fieldIndex("cuid_expirydate")) | |
issueDateInRow match{ | |
case "NULL" => issueDate = Some("1970-01-01 00:00:00.000") | |
case _ => issueDate = Some(issueDateInRow) | |
} | |
expiryDateInRow match{ | |
case "NULL" => expiryDate = Some("1970-01-01 00:00:00.000") | |
case _ => expiryDate = Some(expiryDateInRow) | |
} | |
val doc = new Document( | |
doc_id, | |
docTypeId.getOrElse(""), | |
cd.getString(cd.fieldIndex("cuid_reference")), | |
issueDate.get, | |
expiryDate.get, | |
true, | |
"1970-01-01T00:00:00.000" | |
) | |
(doc, docs_ids.toArray) | |
} | |
def getProofForAPerson(proof_id: String, cd: Row, docs_ids: Array[String]): Option[Proof_eventdata] ={ | |
Some(new Proof_eventdata( | |
proof_id, | |
cd.getString(cd.fieldIndex("proofid_category")), | |
"Bureau UK/IE", | |
docs_ids.toSet[String], | |
"", | |
"", | |
Set.empty[String], | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment