This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
val longestSequence = (numList: List[Int]) => { | |
val list = numList | |
.map(x => { | |
var n = 0; var list = List[Int]() | |
while (numList.sorted.distinct.containsSlice(x to x + n)) { | |
list = (x to x + n).toList; n += 1; | |
} | |
list | |
}) | |
.map(x => (x, x.length)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
implicit class CustomStringUtils(s: String) { | |
def toCamelCase = if (s == null) "" else { | |
if (s.trim.isEmpty) s else s.split("[\\s+]").map{ word => | |
val first = Character.toUpperCase(word(0)) | |
val rest = word.replaceAll("^.(.*)", "$1").toLowerCase | |
s"$first$rest" | |
}.reduce(_+" "+_) | |
} | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object DbOperations extends Enumeration { | |
type DbOperation = Value | |
val INSERT = Value("INSERT") | |
val UPDATE = Value("UPDATE") | |
val UPSERT = Value("UPSERT") | |
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
class ExecuteDbQuery( spark: Option[SparkSession], | |
dbConfigFilePath: String, query: String, var commitNbr: Long = 1000, | |
var loadReadyDbDF: DataFrame = null, mode: SaveMode = SaveMode.Append, | |
var dbOperation: DbOperations.Value = DbOperation.INSERT, | |
var joinKeys: Seq[String] = Seq(), | |
var updateCols: Seq[String] = Seq(), surrogateKey: String = null, | |
generateSurrogateKey: Boolean = true, chkPrintFlg: Boolean = false, | |
sequenceName: String = nulll, readByPartitionColumn: Boolean = false, | |
rejectionHDFSPath: String = null) | |
extends EnvProperties with java.io.Serializable |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
try { | |
properties.load(this.getClass().getClassLoader().getResourceAsStream(configFilePath.resolveEnv)) ) | |
catch { | |
case e: Exception = throw new FileNotFoundException(s"$ConfigFilePath configFilePath is absent !!") | |
} | |
var queries = query | |
val connection_type = properties.getProperty("connection type", "<invalid connections>").resolveEnv | |
val url = properties.getProperty("url", "<invalid_url>").resolveEnv | |
val driver = properties.getProperty("driver", "<invalid url>").resolveEnv | |
val user = properties.getProperty("user", "<invalid user>").resolveEnv |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
{ | |
... | |
... | |
var connection: Option[Connection] = None | |
var resultSet: Option[ResultSet] = None | |
var statement: Statement = null | |
var savepoint: Savepoint = null | |
try { | |
Class.forName(driver) | |
try {connection = Some(DriverManager.getConnection(url, user, password)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private[this] def excuteDbQuery(spark:SparkSession, internalQuery: String, internalForceRead: Boolean = false) : DataFrame = { | |
import spark.implicits. | |
var dataFrame = spark.emptvDataFrame | |
queries = if (queries.trim.toUpperCase().startsWith("SELECT")) s"($queries)" else queries | |
queries = queries.replace(";","") | |
if (loadReadyDb2DF = null || interalForceRead) { | |
val readMapOptions = if (readByPartitionColumn) Map("url" -> url, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (dbOperation = DbOperations.INSERT && surrogateKey != null && generateSurrogateKey && sequenceName = null) { | |
logger.info(s"Generating automatic sequence for $surrogateKey ...") | |
import org.apache.spark.sql.expressions.Window | |
import org.apache.spark.sql.functions.{ lit, row_number } | |
val rs = excuteDb2Query(s"SELECT max(coalesce(SsurrogateKey,0)) from $tableName with ur").get | |
rs.next | |
val surrKeyMaxValue = rs.getLong(1) | |
val incrementbyOne = Window.partitionBy(lit("i")).orderBy(lit("i").asc) | |
loadReadyDb2DF = loadReadyDb2DF.withColumn(surrogateKey, lit(surrKeyMaxValue) + row_number().over(incrementbyGne)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
loadReadyDbDF.write. | |
partitionBy(joinKeys: _*).mode(mode).format(connection_type). | |
options(Map("url" -> url, | |
"driver" -> queries, | |
"user" -> user, | |
"password" -> password, | |
"batchsize" -> batchsize)). | |
save() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) { | |
if (joinKeys.isEmpty) { | |
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!")) | |
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") ) | |
} else { | |
import scala.io.Source._ | |
import java.sql.DriverManager | |
import java.sql.Connection | |
import java.sql._ | |
val totalCnt = loadReadyDb2DF.count |
OlderNewer