Skip to content

Instantly share code, notes, and snippets.

@najikadri
Created January 27, 2024 22:52
Show Gist options
  • Save najikadri/1c821de392dbeb77fe97926b91ccc7bb to your computer and use it in GitHub Desktop.
Save najikadri/1c821de392dbeb77fe97926b91ccc7bb to your computer and use it in GitHub Desktop.
Basic Example of Apache Spark application using Dataframes
package le.pioneer
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.log4j.Logger
import org.apache.spark.SparkConf
import java.util.Properties
import scala.io.Source
// This uses DataFrame API
object HelloDataFrame extends Serializable {
@transient private lazy val logger: Logger = Logger.getLogger(getClass.getName)
def main(args: Array[String]): Unit = {
val sparkAppConfig = getSparkAppConf
logger.info(s"Starting ${sparkAppConfig.get("spark.app.name")} Session")
val spark: SparkSession = SparkSession.builder()
.config(sparkAppConfig)
.getOrCreate()
if(args.length == 0){
logger.error("Usage: Test filename.csv")
System.exit(1)
}
val employeeDF = loadEmployeeCSV(spark, args(0))
employeeDF.show()
val dataAnalyticsEmployees = employeeDF.where("department == \"investigations\"")
dataAnalyticsEmployees.show()
logger.info("spark.conf= "+ spark.conf.getAll.toString())
//process data here
logger.info("Naji Spark application finished...")
spark.stop()
}
def loadEmployeeCSV(spark: SparkSession, file_path: String): DataFrame = {
spark.read
.option("header", true)
.option("inferSchema", true)
.csv(file_path)
}
private def getSparkAppConf: SparkConf = {
val sparkAppConfig = new SparkConf
val props = new Properties()
props.load(Source.fromFile("spark.conf").bufferedReader())
props.forEach((k,v) => sparkAppConfig.set(k.toString, v.toString))
sparkAppConfig
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment