Skip to content

Instantly share code, notes, and snippets.

@chetkhatri
Created May 11, 2020 23:25
Show Gist options
  • Save chetkhatri/6947918ea56e783e5c732f6f45cd0e34 to your computer and use it in GitHub Desktop.
Save chetkhatri/6947918ea56e783e5c732f6f45cd0e34 to your computer and use it in GitHub Desktop.
import com.example.core.common.ConfigLoader
import com.example.core.common.DeltaWriter._
import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.{Await, Future}
import scala.util.{Failure, Success}
import scala.concurrent.duration.Duration
object Demo extends App {
if (args.length < 2) {
logger.error("No Environment type or Network ID are provided as a argument while running spark job")
System.exit(1)
}
val envType = args(0)
val NETWORK_ID = Some(args(1))
val CLIENT_SCHEMA = Some(args(2))
val APP_NAME: String = s"Demo-${NETWORK_ID.get}"
val spark = getSparkSession(APP_NAME)
val configUtility = new ConfigLoader(envType, NETWORK_ID.get.toInt, CLIENT_SCHEMA.get)
val tableList = List(
"EXAMPLE_EXCHANGE_RATE_DAILY",
"EXAMPLE_SETTING",
"EXAMPLE_HISTORY",
"EXAMPLE"
)
def doFilterLoad(tableName: String): Future[String] = Future {
deltaTableWriteToSQLServer(
spark, envType, NETWORK_ID.get.toInt, configUtility.CONSUMABLE_DATA,
CLIENT_SCHEMA.get, tableName
)
tableName
}
val filterLoadFutures = scala.collection.mutable.ListBuffer.empty[Future[String]]
tableList.foldLeft(filterLoadFutures)((map, value) => map += doFilterLoad(value))
val futureFailureCount = spark.sparkContext.longAccumulator
filterLoadFutures foreach { filterLoadFuture =>
Await.ready(filterLoadFuture, Duration.Inf).onComplete {
case Success(table) => logger.info(s"$table load succeeded")
case Failure(e) => {
futureFailureCount.add(1)
throw new Exception("Exception in Future Async block", e)
}
}
}
if(futureFailureCount.value.intValue() > 0) throw new Exception("Stopping Spark Job - Exception in Future " +
"Async block")
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment