Created
February 27, 2019 22:51
-
-
Save jesusjavierdediego/190e2e47b1fe0fc44d11bf68395741fa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ModelComposer extends Logging{ | |
val operationType = "create" | |
@throws(classOf[ModelComposerException]) | |
def getAllDFs(persons: DataFrame, sourceData: DataFrame): Map[String, DataFrame] = { | |
import spark.implicits._ | |
import com.datastax.driver.core.utils.UUIDs | |
implicit val anyEncoder = org.apache.spark.sql.Encoders.kryo[Any] | |
var result: scala.collection.mutable.Map[String, DataFrame] = scala.collection.mutable.Map.empty[String, DataFrame] | |
var piis: ArrayBuffer[PII_eventdata] = ArrayBuffer.empty[PII_eventdata] | |
var addresses: ArrayBuffer[Address_eventdata] = ArrayBuffer.empty[Address_eventdata] | |
var contacts: ArrayBuffer[Contact_eventdata] = ArrayBuffer.empty[Contact_eventdata] | |
var datasharingagreements: ArrayBuffer[Datasharingagreement_eventdata] = ArrayBuffer.empty[Datasharingagreement_eventdata] | |
var documents: ArrayBuffer[Document] = ArrayBuffer.empty[Document] | |
var proofs: ArrayBuffer[Proof_eventdata] = ArrayBuffer.empty[Proof_eventdata] | |
persons.collect.par.foreach(personRow => { | |
val fkid = UUIDs.timeBased().toString | |
val old_id = personRow.getLong(personRow.fieldIndex("old_id")).toString | |
val person_id = personRow.getString(personRow.fieldIndex("person_id")) | |
val postcode = personRow.getString(personRow.fieldIndex("postcode")) | |
try{ | |
var cdRow: Option[Row] = Some(sourceData.filter($"customer_id" === old_id).head()) | |
cdRow match{ | |
case Some(row) => { | |
val piiID = UUIDs.timeBased().toString | |
val addressID = UUIDs.timeBased().toString | |
val contactID = UUIDs.timeBased().toString | |
val datasharingID = UUIDs.timeBased().toString | |
val proofIDs = ArrayBuffer.empty[String] | |
val proofOfID = UUIDs.timeBased().toString | |
var isThereAnyProof = false | |
val proof = row.getString(row.fieldIndex("proofid_category")) | |
proof match{ | |
case p:String => { | |
if(!p.equalsIgnoreCase("NULL")){ | |
isThereAnyProof = true | |
proofIDs += proofOfID | |
} | |
} | |
case _ => isThereAnyProof = false | |
} | |
var isThereAnyAddress = false | |
val addressesIDs = ArrayBuffer.empty[String] | |
val addressFromSource = row.getString(row.fieldIndex("address1")) | |
addressFromSource match{ | |
case p:String => { | |
if(!p.equalsIgnoreCase("NULL")){ | |
isThereAnyAddress = true | |
addressesIDs += addressID | |
} | |
} | |
case _ => isThereAnyAddress = false | |
} | |
piis += new PII_eventdata( | |
piiID, | |
datasharingID, | |
proofIDs.toSet[String], | |
addressesIDs.toSet[String], | |
person_id, | |
contactID, | |
"ACTIVE", | |
"", | |
Set.empty[String], | |
Set.empty[String], | |
"", | |
"", | |
"", | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
) | |
val address = getAddressForAPerson(old_id, postcode, addressID, row) | |
address match{ | |
case Some(a) => addresses += a | |
case None => logger.warn(s"Empty address record found. Historic ID: ${old_id}") | |
} | |
val contact = getContactForAPerson(old_id, contactID, row) | |
contact match{ | |
case Some(c) => contacts += c | |
case None => logger.warn("Empty contact record found. Historic ID: ${old_id}") | |
} | |
val datasharing = getDatasharingagreementsForAPerson(datasharingID, row) | |
datasharing match{ | |
case Some(d) => datasharingagreements += d | |
case None => logger.warn("Empty datasharing record found. Historic ID: ${old_id}") | |
} | |
if(isThereAnyProof){ | |
val(doc, docs_ids) = getProofDocumentForAPerson(row) | |
documents += doc | |
val proof = getProofForAPerson(proofOfID, row, docs_ids) | |
proof match{ | |
case Some(p) => proofs += p | |
case None => logger.warn("Empty proof record found. Historic ID: ${old_id}") | |
} | |
} | |
} | |
case None => logger.warn(s"No historic ID was found for person with person_id: ${personRow.getString(personRow.fieldIndex("person_id"))}") | |
} | |
}catch{ | |
case e: java.util.NoSuchElementException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.") | |
} | |
case e: java.lang.NullPointerException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${old_id}'. No record is available.") | |
} | |
} | |
}) | |
result += ("pii" -> piis.toDF()) | |
result += ("person" -> persons) | |
result += ("address" -> addresses.toDF()) | |
result += ("contact" -> contacts.toDF()) | |
result += ("datasharingagreement" -> datasharingagreements.toDF()) | |
result += ("document" -> documents.toDF()) | |
result += ("proof" -> proofs.toDF()) | |
result.toMap | |
} | |
def getAddressForAPerson(historic_id: String, postcode: String, address_id: String, cd: Row): Option[Address_eventdata] ={ | |
var result: Option[Address_eventdata] = None | |
try{ | |
result = Some(new Address_eventdata( | |
address_id, | |
cd.getString(cd.fieldIndex("address1")), | |
cd.getString(cd.fieldIndex("address2")), | |
"PRIMARY", | |
cd.getString(cd.fieldIndex("city")), | |
postcode, | |
cd.getString(cd.fieldIndex("countrycode")), | |
cd.getString(cd.fieldIndex("county")), | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
"", | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
}catch{ | |
case e: Exception => { | |
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. Reason: ${e}") | |
logger.error("No address will be retrieved for this customer.") | |
} | |
} | |
result | |
} | |
def getContactForAPerson(historic_id: String, contact_id: String, cd: Row): Option[Contact_eventdata] ={ | |
var result: Option[Contact_eventdata] = None | |
val mobiles = ArrayBuffer.empty[String] | |
val emails = ArrayBuffer.empty[String] | |
val socialmedia = ArrayBuffer.empty[String] | |
try{ | |
mobiles += cd.getString(cd.fieldIndex("mobileNumber")) | |
emails += cd.getString(cd.fieldIndex("email")) | |
}catch{ | |
case e: Exception => { | |
logger.warn(s"Contact section. Error getting value from row. Reason: ${e}") | |
} | |
} | |
try{ | |
result = Some(new Contact_eventdata( | |
contact_id, | |
cd.getString(cd.fieldIndex("telephoneNumber")), | |
mobiles.toSet[String], | |
emails.toSet[String], | |
Set.empty[String], | |
"", | |
Set.empty[String], | |
Set.empty[String], | |
Set.empty[String], | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
}catch{ | |
case e: java.util.NoSuchElementException => { | |
logger.warn(s"Problem searching for a customerdetails record (source) with a given customer id: '${historic_id}'. No record is available.") | |
} | |
case e: Exception => { | |
logger.error(s"Error searching for an address in source data (source) with a given customer id: '${historic_id}'. No address will be retrieved for this customer.") | |
} | |
} | |
result | |
} | |
def getDatasharingagreementsForAPerson(datasharingagreement_id: String, cd: Row): Option[Datasharingagreement_eventdata] ={ | |
var dataconsent: Boolean = false | |
try{ | |
val dataConsentId = cd.getLong(cd.fieldIndex("dataConsentId")) | |
if(dataConsentId > 0) dataconsent = true | |
}catch{ | |
case e: Exception => { | |
//do nothing | |
logger.debug(s"DATA SHARING section. Error getting value from row. Reason: ${e}") | |
} | |
} | |
def getBoolFromString(b: String): Boolean ={ | |
var result: Boolean = false | |
b match{ | |
case "1" => result = true | |
case _ => result = false | |
} | |
result | |
} | |
Some(new Datasharingagreement_eventdata( | |
datasharingagreement_id, | |
cd.getString(cd.fieldIndex("insertion_date")), | |
"1970-01-01T00:00:00.000", | |
if(dataconsent) "ACTIVE" else "INACTIVE", | |
dataconsent, | |
dataconsent, | |
getBoolFromString(cd.getString(cd.fieldIndex("thirdPartySharingDataConsent"))), | |
false, | |
"1970-01-01T00:00:00.000", | |
"1970-01-01T00:00:00.000", | |
"", | |
false, | |
false, | |
false, | |
false, | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
} | |
def getProofDocumentForAPerson(cd: Row): (Document, Array[String]) ={ | |
import com.fts.cp.etl.storage.ODBDAO | |
import spark.implicits._ | |
val doc_id: String = UUIDs.timeBased().toString | |
var docs_ids: ArrayBuffer[String] = ArrayBuffer.empty[String] | |
docs_ids += doc_id | |
val doctype = cd.getString(cd.fieldIndex("proofid_description")) | |
val dao = new ODBDAO(spark) | |
val docTypes = dao.getAllDocumentTypes() | |
val doctyperow = docTypes.toDF().filter($"name" === doctype).head | |
val docTypeId = Some(doctyperow.getString(doctyperow.fieldIndex("document_type_id"))) | |
var issueDate: Option[String] = None | |
var expiryDate: Option[String] = None | |
val issueDateInRow = cd.getString(cd.fieldIndex("cuid_issuedate")) | |
val expiryDateInRow = cd.getString(cd.fieldIndex("cuid_expirydate")) | |
issueDateInRow match{ | |
case "NULL" => issueDate = Some("1970-01-01 00:00:00.000") | |
case _ => issueDate = Some(issueDateInRow) | |
} | |
expiryDateInRow match{ | |
case "NULL" => expiryDate = Some("1970-01-01 00:00:00.000") | |
case _ => expiryDate = Some(expiryDateInRow) | |
} | |
val doc = new Document( | |
doc_id, | |
docTypeId.getOrElse(""), | |
cd.getString(cd.fieldIndex("cuid_reference")), | |
issueDate.get, | |
expiryDate.get, | |
true, | |
"1970-01-01T00:00:00.000" | |
) | |
(doc, docs_ids.toArray) | |
} | |
def getProofForAPerson(proof_id: String, cd: Row, docs_ids: Array[String]): Option[Proof_eventdata] ={ | |
Some(new Proof_eventdata( | |
proof_id, | |
cd.getString(cd.fieldIndex("proofid_category")), | |
"Bureau UK/IE", | |
docs_ids.toSet[String], | |
"", | |
"", | |
Set.empty[String], | |
operationType, | |
LocalDateTime.now().toString, | |
LocalDateTime.now().toString | |
)) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment