Skip to content

Instantly share code, notes, and snippets.

#Get the set mode before reading secret variables
_TRACE_MODE=`echo $-|grep -q "x" && echo "ON"`
#Setting debug mode OFF
set +x
#Getting & setting secret credentials
export my_passwd="$(cat ~/system_db.passwd)"
export secret_email_id="xxx@yyy.com"
#Once done, reset back the original debug mode
__MAX_TRIAL=60
for _time in $(seq $__MAX_TRIAL)
do
__PS_ID=$(ps -eaf|grep -i $APP_NM | grep $PORT_NBR| awk '{print $2}')
__LAST_LOG=#Get Last Log Path
#check if service is down?
if [ x"$PS_ID" == "x" ];then
logger "INFO" "Service seems down! Kicking-off ..." $LINENO
(
function sendEmailAlert {
APP_PROPERTY_PATH=$CONF_DIR/${env}_connection.properties
FROM="$(grep '^ *FROM=' $APP_PROPERTY_PATH)"
TO="$(grep '^ *TO=' $APP_PROPERTY_PATH)"
FROM=`echo $FROM | sed "s:^[^=]*=::g" | tr "\r" " "`
TO=`echo $TO | sed "s:^[^=]*=::g" | tr "\r" " "`
messg="<html><body>
[[ $# -ne 2 ]] && logger "ERROR" "Usage: $0 <SRC_TYPE=[JSON|TXT]> <TARGET_TBL=[TBL1|TBL2]>" && exit 1
#####ENV SETUP#############################
export SRC_TYPE=$1
export PROJECT_NAME="etl_talend_migr"
export SCRIPT_NAME="$(basename $0 .sh)"
export SCRIPT_DIR="$(dirname $0)"
CURRENT_DATE="`date '+%Y%m%d'`"
export CURRENT_TIMESTAMP=${CURRENT_TIMESTAMP:-"$(date '+%Y%m%d%H%M%S')"}
APPL_USR=${APPL_USR:-`whoami`}
[[ ${APPL_USR} = 'xyz123' && `hostname` = 'machine01' ]] && ENV=dev
exec > >(tee "$LOG_FILE") 2>&1
function logger {
if [ $# -ne 3 ];then
logger "ERROR" "Usage logger <message_type> <command>" $LINENO
fi
severity=`echo "$1"|tr '[a-z]' '[A-Z]'`
msg="`printf \"$2\"`"
lineno=$3
object ExecuteDbQuery extends EnvProperties {
def as[T: scala.reflect.ClassTag] (configFilePath: String, query: String): ArrayBuffer[Array[T]] =
{
val res = execute(new ExecuteDb2Query(None, configFilePath, query)).asInstanceOf[Option[ResultSet]].get
res.resultSetToArray[T] ()
}
implicit class Result(resultSet: ResultSet) {
def resultSetToArray[T: scala.reflect.ClassTag](): scala.collection.mutable.ArrayBuffer[ARR[T]] = {
val buf = scala.collectio.mutable.ArrayBuffer.empty[Array[T]]
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) {
if (joinKeys.isEmpty) {
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!"))
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") )
} else {
import scala.io.Source._
import java.sql.DriverManager
import java.sql.Connection
import java.sql._
val totalCnt = loadReadyDb2DF.count
loadReadyDbDF.write.
partitionBy(joinKeys: _*).mode(mode).format(connection_type).
options(Map("url" -> url,
"driver" -> queries,
"user" -> user,
"password" -> password,
"batchsize" -> batchsize)).
save()
if (dbOperation = DbOperations.INSERT && surrogateKey != null && generateSurrogateKey && sequenceName = null) {
logger.info(s"Generating automatic sequence for $surrogateKey ...")
import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.{ lit, row_number }
val rs = excuteDb2Query(s"SELECT max(coalesce(SsurrogateKey,0)) from $tableName with ur").get
rs.next
val surrKeyMaxValue = rs.getLong(1)
val incrementbyOne = Window.partitionBy(lit("i")).orderBy(lit("i").asc)
loadReadyDb2DF = loadReadyDb2DF.withColumn(surrogateKey, lit(surrKeyMaxValue) + row_number().over(incrementbyGne))
private[this] def excuteDbQuery(spark:SparkSession, internalQuery: String, internalForceRead: Boolean = false) : DataFrame = {
import spark.implicits.
var dataFrame = spark.emptvDataFrame
queries = if (queries.trim.toUpperCase().startsWith("SELECT")) s"($queries)" else queries
queries = queries.replace(";","")
if (loadReadyDb2DF = null || interalForceRead) {
val readMapOptions = if (readByPartitionColumn) Map("url" -> url,