Last active
November 7, 2020 22:10
-
-
Save niroj-office/b9dd49966ee27a9772eedf58491f0063 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) { | |
if (joinKeys.isEmpty) { | |
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!")) | |
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") ) | |
} else { | |
import scala.io.Source._ | |
import java.sql.DriverManager | |
import java.sql.Connection | |
import java.sql._ | |
val totalCnt = loadReadyDb2DF.count | |
var batchPartition = (totalCnt / commitNbr).toInt | |
import org.apache.spark.sql.types._ | |
object SequenceType extends DataType with java.io.Serializable { | |
override def typeName = "sequence" | |
override def asNullable: DataType = StringType | |
override def defaultSize: Int = 11 | |
} | |
val (allCols, tblSchemaMap) = if (surrogateKey != null && generateSurrogateKey && sequenceName != null) { | |
(surrogateKey +: loadReadyDb2DF.columns, ((surrogateKey, SequenceType) +: (loadReadyDb2DF.columns, | |
loadReadyDb2DF.schema.map(x = (x.name, x.dataType)).toMap) | |
else | |
(loadReadyDb2DF.columns, loadReadyDb2DF.schema.map(x => (x.name, x.dataType)).toMap) | |
def findMinusKeys(seq: Seq[String]) = if (seq == null) Seq("") else seq.filter(_.trim.startsWith("-")).map(_.replace( "-", "")) | |
implicit class Minus[A](firstSeq: Seq[A]) { def -(secondSeq: Seq[A]) = firstSeq diff secondSeq } | |
joinKeys = if (!joinKeys.isEmpty && (joinKeys(0).trim equalsIgnoreCase "*")) allCols else joinKeys | |
updateCols = if (!updateCols.isEmpty && (updateCols(0).trim equalsIgnoreCase "*")) allCols else updateCols | |
joinKeys = (joinKeys - minusJoinKeys).distinct | |
updateCols = (updateCols - minusUpdateKeys).distinct | |
val offset = 0 | |
var trQuery = scala.collection.mutable.ArrayBuffer.empty[String] | |
val tblTotCols = if (sequenceName != null && generateSurrogateKey) allCols.size - else allCols.size | |
val totalKeys = joinKeys.length | |
val bindingVars = ("?" * tb1TotCols).map(_.toString).reduce(_ + "," +_) | |
val joinConditions = joinKeys.map(key => s"trgtTable.Skey IS NOT DISTINCT FROM SRC.$key").reduce( + " AND " + _) | |
val updateQueryl = if (updateCols.isEmpty) "" else updateCols.filter(!_.equalsIgnoreCase(surrogateKey)).map(col reduce( + "," + ) | |
val updateQuery = if (updateCols.isEmpty) "" | |
else s"""(|WHEN MATCHED $extraUpdateSQLConditions THEN | |
|UPDATE SET | |
| S{updateQuery1)""".stripMargin | |
val insertColStrl = if (generateSurrogateKey && sequenceName != null) | |
allCols.filterNot(_ equalsIgnoreCase surrogateKey).reduce(_ + "." + _) | |
else allCols.reduce(_ + "," + _) | |
val humanReadableColStr = if (generateSurrogateKey && sequenceName != null) allCols.filterNot(_ equalsIgnoreCase surrogateKey).: + ) | |
.reduce( _+ "," + _) | |
else allCols.map("@" + ).reduce( _+ "," +_ ) | |
val insertColStr = allCols.reduce( + "," + ) | |
val allColsInsrtValuesStrl = allCols.map { x => | |
if (generateSurrogateKey && sequenceName != null && (x equalsIgnoreCase surrogateKey)) | |
s"NEXT VALUE FOR $sequenceName" | |
else "SRC." + x | |
}.reduce( + "," + ) | |
val allColsInsrtValuesStr = if (dbOperation = DbOperations.UPSERT) | |
s"""|WHEN NOT MATCHED $extraInsertSQLConditions THEN | |
| INSERT ( $insertColStr ) | |
| VALUES ( SallColsInsrtValuesStrl ) | |
""".stripMargin | |
else | |
"" | |
val sqlString = if(!overrideFullUpsertQuery.isEmpty) | |
overrideFullUpsertQuery | |
else s"""|MERGE INTO $queries AS trgtTable | |
|USING TABLE( VALUES($bindingVars) ) | |
| SRC( $insertColStrl ) | |
|ON $joinConditions | |
|$deleteSQLConditions | |
|$updateQuery | |
|$allColsInsrtValuesStr | |
""".stripMargin | |
if (!overrideFullUpseLQuery.isEmpty) | |
logger.warn(s"Overriding query manually can't currently print the human readable form properly !!") | |
val humanReadableQueryStr = s"""|MERGE INTO $queries AS trgtTable | |
|USING TABLE( VALUES($humanReadableColStr) ) | |
| SRC( $insertColStrl ) | |
|ON $joinConditions | |
|$deleteSQLConditiona | |
|$(updateQuery} | |
|$(allColslnsrtValuesStr} | |
""".stripMargin | |
logger.info(s"sqlString=$sqlString") | |
batchPartition = if(batchPartition > 10) 10 else if (batchPartition == 0) 1 else batchPartition | |
loadReadyDb2DF.repartition(batchPartition).foreachPartition { partition => | |
logger.debug(s"partition=$partition") | |
var db2Properties = brConnect.value | |
var connection: Option[Connection] = None | |
var resultSet: Option[ResultSet] = None | |
var statement: Statement = null | |
var savepoint: Savepoint = null | |
var isDb2Error: Boolean = false | |
var numUpdates: ARR[Int] = null | |
try { // make the connection | |
Class.forName(driver) connection = Some(DriverManager.getConnection(url, user, password)) | |
catch { case e: Exception = logger.info(s"connection=$connection") | |
logger.error(e.printStackTrace().toString()) | |
} | |
connection.get.setAutoCommit(false) | |
var preparedStatement = connection.get.prepareStatement(sqlString) | |
try { | |
partition.grouped(commitNbr.toInt).foreach{ batch => | |
logger.info(s"Db batchwise commit is enabled for each $commitNbr commit") | |
savepoint = connection.get.setSavepoint(randomSvPoint) | |
batch.foreach { record => | |
trQuery = scala.collection.mutable.ArrayBuffer.empty[String] | |
if (dbPrintStatememntFlag) logger.info(s"record=$record") | |
var humanReadableQuery = humanReadableQueryStr | |
allCols.foreach{ col => | |
var fieldIndex = -1 | |
var (srcFieldType, endFieldType) = ("", "") | |
try { | |
if (!(col.equalsIgnoreCese(surrogateKey) && dbOperation = DbOperations.UPSERT && generateSurrogateKey && sequenceName != null)) { | |
fieldIndex = record.fieldIndex(col) | |
} | |
catch { case e: Exception = throw new Exception(e) } | |
try { | |
tblSchemaMap(col).typeName match { | |
case "sequence" = "" | |
case "long" => | |
srcFieldType = "long"; endFieldType = "BIGINT" | |
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex) | |
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.BIGINT) | |
if (dbPrintStatememntFlag) { | |
val fieldValueStr = if (fieldValue = null) "null" else s"$fieldValue" | |
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr) | |
} | |
case "string" => | |
srcFieldType = "string"; endFieldType = "VARCHAR" | |
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex) | |
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.VARCHAR) | |
if (dbPrintStatememntFlag) { | |
val fieldValueStr = if (fieldValue = null) "null" else s"'$fieldValue'" | |
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr) | |
} | |
...//So for other datatype | |
case "null" => | |
srcFieldType = "string"; endFieldType = "NULL" | |
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex) | |
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.NULL) | |
if (dbPrintStatememntFlag) { | |
val fieldValueStr = if (fieldValue = null) "null" else s"'$fieldValue'" | |
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr) | |
} | |
case _ => throw new IllegalArgumentException(s"No proper datatype found for column '$col' passed!!") | |
} | |
catch { | |
case se: SQLException => | |
val sqlState = getSqlState(Se) | |
throw new SQLExeception(s"SQLSTATE: $sqlState, ERRORCODE = $errCd: Error while DF to table columns binding for field $col",se) | |
} | |
} | |
trQuery += humanReadableQuery | |
if(adPrintStatementFlag) logger.debug("Translated Query: "+ humanReadableQuery) | |
if(!dbPrintOnlyFlag) preparedStatement.addBatch() | |
} | |
try { | |
if (!dbPrintOnlyFlag) { | |
val numUpdates = preparedStatement.executeBatch() | |
var i = 0 | |
numUpdates.foreach { r => | |
i += 1 | |
if( r == Statement.SUCESS_NO_INFO) logger.info(s"Execution $i : unknown number of rows updated !") | |
} | |
connection.get.commit | |
logger.info(s"Totak $i rows updated") | |
System.setProperty("TOTAL_SUCC_DB2_UPDT", i.toString) | |
connection.get.commit | |
preparedStatement.clearBatch() | |
} | |
catch { | |
case be: BatchUpdateException => //Write to reject file | |
be.printStatckStrace() | |
} | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment