Skip to content

Instantly share code, notes, and snippets.

@niroj-office
niroj-office / FindLongestSequence.scala
Created May 26, 2020 02:08
Program to find longest sequence in Scala. Example: input- List(0,1,5,6,2), Output - 0,1,2
val longestSequence = (numList: List[Int]) => {
val list = numList
.map(x => {
var n = 0; var list = List[Int]()
while (numList.sorted.distinct.containsSlice(x to x + n)) {
list = (x to x + n).toList; n += 1;
}
list
})
.map(x => (x, x.length))
@niroj-office
niroj-office / CustomStringUtils.scala
Last active November 20, 2020 03:58
Implicit Scala & Spark UDF to covert a string to CamelCase
implicit class CustomStringUtils(s: String) {
def toCamelCase = if (s == null) "" else {
if (s.trim.isEmpty) s else s.split("[\\s+]").map{ word =>
val first = Character.toUpperCase(word(0))
val rest = word.replaceAll("^.(.*)", "$1").toLowerCase
s"$first$rest"
}.reduce(_+" "+_)
}
}
@niroj-office
niroj-office / DbOperations.scala
Created November 6, 2020 19:31
Db Operations Enum for custom spark JDBC write
object DbOperations extends Enumeration {
type DbOperation = Value
val INSERT = Value("INSERT")
val UPDATE = Value("UPDATE")
val UPSERT = Value("UPSERT")
}
class ExecuteDbQuery( spark: Option[SparkSession],
dbConfigFilePath: String, query: String, var commitNbr: Long = 1000,
var loadReadyDbDF: DataFrame = null, mode: SaveMode = SaveMode.Append,
var dbOperation: DbOperations.Value = DbOperation.INSERT,
var joinKeys: Seq[String] = Seq(),
var updateCols: Seq[String] = Seq(), surrogateKey: String = null,
generateSurrogateKey: Boolean = true, chkPrintFlg: Boolean = false,
sequenceName: String = nulll, readByPartitionColumn: Boolean = false,
rejectionHDFSPath: String = null)
extends EnvProperties with java.io.Serializable
try {
properties.load(this.getClass().getClassLoader().getResourceAsStream(configFilePath.resolveEnv)) )
catch {
case e: Exception = throw new FileNotFoundException(s"$ConfigFilePath configFilePath is absent !!")
}
var queries = query
val connection_type = properties.getProperty("connection type", "<invalid connections>").resolveEnv
val url = properties.getProperty("url", "<invalid_url>").resolveEnv
val driver = properties.getProperty("driver", "<invalid url>").resolveEnv
val user = properties.getProperty("user", "<invalid user>").resolveEnv
{
...
...
var connection: Option[Connection] = None
var resultSet: Option[ResultSet] = None
var statement: Statement = null
var savepoint: Savepoint = null
try {
Class.forName(driver)
try {connection = Some(DriverManager.getConnection(url, user, password))
private[this] def excuteDbQuery(spark:SparkSession, internalQuery: String, internalForceRead: Boolean = false) : DataFrame = {
import spark.implicits.
var dataFrame = spark.emptvDataFrame
queries = if (queries.trim.toUpperCase().startsWith("SELECT")) s"($queries)" else queries
queries = queries.replace(";","")
if (loadReadyDb2DF = null || interalForceRead) {
val readMapOptions = if (readByPartitionColumn) Map("url" -> url,
if (dbOperation = DbOperations.INSERT && surrogateKey != null && generateSurrogateKey && sequenceName = null) {
logger.info(s"Generating automatic sequence for $surrogateKey ...")
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{ lit, row_number }
val rs = excuteDb2Query(s"SELECT max(coalesce(SsurrogateKey,0)) from $tableName with ur").get
rs.next
val surrKeyMaxValue = rs.getLong(1)
val incrementbyOne = Window.partitionBy(lit("i")).orderBy(lit("i").asc)
loadReadyDb2DF = loadReadyDb2DF.withColumn(surrogateKey, lit(surrKeyMaxValue) + row_number().over(incrementbyGne))
loadReadyDbDF.write.
partitionBy(joinKeys: _*).mode(mode).format(connection_type).
options(Map("url" -> url,
"driver" -> queries,
"user" -> user,
"password" -> password,
"batchsize" -> batchsize)).
save()
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) {
if (joinKeys.isEmpty) {
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!"))
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") )
} else {
import scala.io.Source._
import java.sql.DriverManager
import java.sql.Connection
import java.sql._
val totalCnt = loadReadyDb2DF.count