Skip to content

Instantly share code, notes, and snippets.

@niroj-office
Last active November 7, 2020 22:10
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save niroj-office/b9dd49966ee27a9772eedf58491f0063 to your computer and use it in GitHub Desktop.
Save niroj-office/b9dd49966ee27a9772eedf58491f0063 to your computer and use it in GitHub Desktop.
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) {
if (joinKeys.isEmpty) {
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!"))
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") )
} else {
import scala.io.Source._
import java.sql.DriverManager
import java.sql.Connection
import java.sql._
val totalCnt = loadReadyDb2DF.count
var batchPartition = (totalCnt / commitNbr).toInt
import org.apache.spark.sql.types._
object SequenceType extends DataType with java.io.Serializable {
override def typeName = "sequence"
override def asNullable: DataType = StringType
override def defaultSize: Int = 11
}
val (allCols, tblSchemaMap) = if (surrogateKey != null && generateSurrogateKey && sequenceName != null) {
(surrogateKey +: loadReadyDb2DF.columns, ((surrogateKey, SequenceType) +: (loadReadyDb2DF.columns,
loadReadyDb2DF.schema.map(x = (x.name, x.dataType)).toMap)
else
(loadReadyDb2DF.columns, loadReadyDb2DF.schema.map(x => (x.name, x.dataType)).toMap)
def findMinusKeys(seq: Seq[String]) = if (seq == null) Seq("") else seq.filter(_.trim.startsWith("-")).map(_.replace( "-", ""))
implicit class Minus[A](firstSeq: Seq[A]) { def -(secondSeq: Seq[A]) = firstSeq diff secondSeq }
joinKeys = if (!joinKeys.isEmpty && (joinKeys(0).trim equalsIgnoreCase "*")) allCols else joinKeys
updateCols = if (!updateCols.isEmpty && (updateCols(0).trim equalsIgnoreCase "*")) allCols else updateCols
joinKeys = (joinKeys - minusJoinKeys).distinct
updateCols = (updateCols - minusUpdateKeys).distinct
val offset = 0
var trQuery = scala.collection.mutable.ArrayBuffer.empty[String]
val tblTotCols = if (sequenceName != null && generateSurrogateKey) allCols.size - else allCols.size
val totalKeys = joinKeys.length
val bindingVars = ("?" * tb1TotCols).map(_.toString).reduce(_ + "," +_)
val joinConditions = joinKeys.map(key => s"trgtTable.Skey IS NOT DISTINCT FROM SRC.$key").reduce( + " AND " + _)
val updateQueryl = if (updateCols.isEmpty) "" else updateCols.filter(!_.equalsIgnoreCase(surrogateKey)).map(col reduce( + "," + )
val updateQuery = if (updateCols.isEmpty) ""
else s"""(|WHEN MATCHED $extraUpdateSQLConditions THEN
|UPDATE SET
| S{updateQuery1)""".stripMargin
val insertColStrl = if (generateSurrogateKey && sequenceName != null)
allCols.filterNot(_ equalsIgnoreCase surrogateKey).reduce(_ + "." + _)
else allCols.reduce(_ + "," + _)
val humanReadableColStr = if (generateSurrogateKey && sequenceName != null) allCols.filterNot(_ equalsIgnoreCase surrogateKey).: + )
.reduce( _+ "," + _)
else allCols.map("@" + ).reduce( _+ "," +_ )
val insertColStr = allCols.reduce( + "," + )
val allColsInsrtValuesStrl = allCols.map { x =>
if (generateSurrogateKey && sequenceName != null && (x equalsIgnoreCase surrogateKey))
s"NEXT VALUE FOR $sequenceName"
else "SRC." + x
}.reduce( + "," + )
val allColsInsrtValuesStr = if (dbOperation = DbOperations.UPSERT)
s"""|WHEN NOT MATCHED $extraInsertSQLConditions THEN
| INSERT ( $insertColStr )
| VALUES ( SallColsInsrtValuesStrl )
""".stripMargin
else
""
val sqlString = if(!overrideFullUpsertQuery.isEmpty)
overrideFullUpsertQuery
else s"""|MERGE INTO $queries AS trgtTable
|USING TABLE( VALUES($bindingVars) )
| SRC( $insertColStrl )
|ON $joinConditions
|$deleteSQLConditions
|$updateQuery
|$allColsInsrtValuesStr
""".stripMargin
if (!overrideFullUpseLQuery.isEmpty)
logger.warn(s"Overriding query manually can't currently print the human readable form properly !!")
val humanReadableQueryStr = s"""|MERGE INTO $queries AS trgtTable
|USING TABLE( VALUES($humanReadableColStr) )
| SRC( $insertColStrl )
|ON $joinConditions
|$deleteSQLConditiona
|$(updateQuery}
|$(allColslnsrtValuesStr}
""".stripMargin
logger.info(s"sqlString=$sqlString")
batchPartition = if(batchPartition > 10) 10 else if (batchPartition == 0) 1 else batchPartition
loadReadyDb2DF.repartition(batchPartition).foreachPartition { partition =>
logger.debug(s"partition=$partition")
var db2Properties = brConnect.value
var connection: Option[Connection] = None
var resultSet: Option[ResultSet] = None
var statement: Statement = null
var savepoint: Savepoint = null
var isDb2Error: Boolean = false
var numUpdates: ARR[Int] = null
try { // make the connection
Class.forName(driver) connection = Some(DriverManager.getConnection(url, user, password))
catch { case e: Exception = logger.info(s"connection=$connection")
logger.error(e.printStackTrace().toString())
}
connection.get.setAutoCommit(false)
var preparedStatement = connection.get.prepareStatement(sqlString)
try {
partition.grouped(commitNbr.toInt).foreach{ batch =>
logger.info(s"Db batchwise commit is enabled for each $commitNbr commit")
savepoint = connection.get.setSavepoint(randomSvPoint)
batch.foreach { record =>
trQuery = scala.collection.mutable.ArrayBuffer.empty[String]
if (dbPrintStatememntFlag) logger.info(s"record=$record")
var humanReadableQuery = humanReadableQueryStr
allCols.foreach{ col =>
var fieldIndex = -1
var (srcFieldType, endFieldType) = ("", "")
try {
if (!(col.equalsIgnoreCese(surrogateKey) && dbOperation = DbOperations.UPSERT && generateSurrogateKey && sequenceName != null)) {
fieldIndex = record.fieldIndex(col)
}
catch { case e: Exception = throw new Exception(e) }
try {
tblSchemaMap(col).typeName match {
case "sequence" = ""
case "long" =>
srcFieldType = "long"; endFieldType = "BIGINT"
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex)
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.BIGINT)
if (dbPrintStatememntFlag) {
val fieldValueStr = if (fieldValue = null) "null" else s"$fieldValue"
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr)
}
case "string" =>
srcFieldType = "string"; endFieldType = "VARCHAR"
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex)
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.VARCHAR)
if (dbPrintStatememntFlag) {
val fieldValueStr = if (fieldValue = null) "null" else s"'$fieldValue'"
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr)
}
...//So for other datatype
case "null" =>
srcFieldType = "string"; endFieldType = "NULL"
val fieldValue = if (record.get(fieldIndex) = null) null else record.getAs[Long)(fieldIndex)
if (!dbPrintOnlyFlag) preparedStatement.setObject(fieldIndex + offset + 1, fieldValue, java.sql.Types.NULL)
if (dbPrintStatememntFlag) {
val fieldValueStr = if (fieldValue = null) "null" else s"'$fieldValue'"
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr)
}
case _ => throw new IllegalArgumentException(s"No proper datatype found for column '$col' passed!!")
}
catch {
case se: SQLException =>
val sqlState = getSqlState(Se)
throw new SQLExeception(s"SQLSTATE: $sqlState, ERRORCODE = $errCd: Error while DF to table columns binding for field $col",se)
}
}
trQuery += humanReadableQuery
if(adPrintStatementFlag) logger.debug("Translated Query: "+ humanReadableQuery)
if(!dbPrintOnlyFlag) preparedStatement.addBatch()
}
try {
if (!dbPrintOnlyFlag) {
val numUpdates = preparedStatement.executeBatch()
var i = 0
numUpdates.foreach { r =>
i += 1
if( r == Statement.SUCESS_NO_INFO) logger.info(s"Execution $i : unknown number of rows updated !")
}
connection.get.commit
logger.info(s"Totak $i rows updated")
System.setProperty("TOTAL_SUCC_DB2_UPDT", i.toString)
connection.get.commit
preparedStatement.clearBatch()
}
catch {
case be: BatchUpdateException => //Write to reject file
be.printStatckStrace()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment