This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#Get the set mode before reading secret variables | |
_TRACE_MODE=`echo $-|grep -q "x" && echo "ON"` | |
#Setting debug mode OFF | |
set +x | |
#Getting & setting secret credentials | |
export my_passwd="$(cat ~/system_db.passwd)" | |
export secret_email_id="xxx@yyy.com" | |
#Once done, reset back the original debug mode |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
__MAX_TRIAL=60 | |
for _time in $(seq $__MAX_TRIAL) | |
do | |
__PS_ID=$(ps -eaf|grep -i $APP_NM | grep $PORT_NBR| awk '{print $2}') | |
__LAST_LOG=#Get Last Log Path | |
#check if service is down? | |
if [ x"$PS_ID" == "x" ];then | |
logger "INFO" "Service seems down! Kicking-off ..." $LINENO | |
( |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
function sendEmailAlert { | |
APP_PROPERTY_PATH=$CONF_DIR/${env}_connection.properties | |
FROM="$(grep '^ *FROM=' $APP_PROPERTY_PATH)" | |
TO="$(grep '^ *TO=' $APP_PROPERTY_PATH)" | |
FROM=`echo $FROM | sed "s:^[^=]*=::g" | tr "\r" " "` | |
TO=`echo $TO | sed "s:^[^=]*=::g" | tr "\r" " "` | |
messg="<html><body> |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
[[ $# -ne 2 ]] && logger "ERROR" "Usage: $0 <SRC_TYPE=[JSON|TXT]> <TARGET_TBL=[TBL1|TBL2]>" && exit 1 | |
#####ENV SETUP############################# | |
export SRC_TYPE=$1 | |
export PROJECT_NAME="etl_talend_migr" | |
export SCRIPT_NAME="$(basename $0 .sh)" | |
export SCRIPT_DIR="$(dirname $0)" | |
CURRENT_DATE="`date '+%Y%m%d'`" | |
export CURRENT_TIMESTAMP=${CURRENT_TIMESTAMP:-"$(date '+%Y%m%d%H%M%S')"} | |
APPL_USR=${APPL_USR:-`whoami`} | |
[[ ${APPL_USR} = 'xyz123' && `hostname` = 'machine01' ]] && ENV=dev |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
exec > >(tee "$LOG_FILE") 2>&1 | |
function logger { | |
if [ $# -ne 3 ];then | |
logger "ERROR" "Usage logger <message_type> <command>" $LINENO | |
fi | |
severity=`echo "$1"|tr '[a-z]' '[A-Z]'` | |
msg="`printf \"$2\"`" | |
lineno=$3 |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
object ExecuteDbQuery extends EnvProperties { | |
def as[T: scala.reflect.ClassTag] (configFilePath: String, query: String): ArrayBuffer[Array[T]] = | |
{ | |
val res = execute(new ExecuteDb2Query(None, configFilePath, query)).asInstanceOf[Option[ResultSet]].get | |
res.resultSetToArray[T] () | |
} | |
implicit class Result(resultSet: ResultSet) { | |
def resultSetToArray[T: scala.reflect.ClassTag](): scala.collection.mutable.ArrayBuffer[ARR[T]] = { | |
val buf = scala.collectio.mutable.ArrayBuffer.empty[Array[T]] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (dbOperation = DbOperations.UPSERT || dbOperation = DbOperations.UPDATE) { | |
if (joinKeys.isEmpty) { | |
logger.error("When dataframe write is in UPSERT/UPDATE mode, user must have to pass the composite keys!!")) | |
throw new NullPointerException("Composite keys for dataframe UPSERT/UPDATE operation is passed as Null") ) | |
} else { | |
import scala.io.Source._ | |
import java.sql.DriverManager | |
import java.sql.Connection | |
import java.sql._ | |
val totalCnt = loadReadyDb2DF.count |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
loadReadyDbDF.write. | |
partitionBy(joinKeys: _*).mode(mode).format(connection_type). | |
options(Map("url" -> url, | |
"driver" -> queries, | |
"user" -> user, | |
"password" -> password, | |
"batchsize" -> batchsize)). | |
save() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
if (dbOperation = DbOperations.INSERT && surrogateKey != null && generateSurrogateKey && sequenceName = null) { | |
logger.info(s"Generating automatic sequence for $surrogateKey ...") | |
import org.apache.spark.sql.expressions.Window | |
import org.apache.spark.sql.functions.{ lit, row_number } | |
val rs = excuteDb2Query(s"SELECT max(coalesce(SsurrogateKey,0)) from $tableName with ur").get | |
rs.next | |
val surrKeyMaxValue = rs.getLong(1) | |
val incrementbyOne = Window.partitionBy(lit("i")).orderBy(lit("i").asc) | |
loadReadyDb2DF = loadReadyDb2DF.withColumn(surrogateKey, lit(surrKeyMaxValue) + row_number().over(incrementbyGne)) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
private[this] def excuteDbQuery(spark:SparkSession, internalQuery: String, internalForceRead: Boolean = false) : DataFrame = { | |
import spark.implicits. | |
var dataFrame = spark.emptvDataFrame | |
queries = if (queries.trim.toUpperCase().startsWith("SELECT")) s"($queries)" else queries | |
queries = queries.replace(";","") | |
if (loadReadyDb2DF = null || interalForceRead) { | |
val readMapOptions = if (readByPartitionColumn) Map("url" -> url, |
NewerOlder