Skip to content

Instantly share code, notes, and snippets.

View saisgit's full-sized avatar

SaiKumar saisgit

View GitHub Profile
import com.mongodb.spark._
val database = "sample_mflix"
val collection = "comments"
val movieCommetsDF = (
spark.read
.format("com.mongodb.spark.sql.DefaultSource")
.option("database", database)
.option("collection", collection)
// Define Read Options with partitionColumn, lowerBound, upperBound and numPartitions
val partitionColumn: String = "id"
val numPartitions: String = "4" // We can change as per the Data Volume and Source System
val readOptions: Map[String, String] = Map(
"driver" -> JDBC_DRIVER,
"url" -> JDBC_URL,
"user" -> JDBC_USERNAME,
"password" -> JDBC_PASSWORD,
// Method to generate Boundary Query
def buildBoundaryQuery(column: String, tableName: String): String = {
s"(SELECT MIN(${column}) AS lowerBound, MAX(${column}) AS upperBound FROM ${tableName}) bq"
}
val bq = buildBoundaryQuery("id", tableName)
// Boundary Read Options
val boundaryReadOptions = Map(
"driver" -> JDBC_DRIVER,
// Read JDBC Configuration from ENV Variables
val JDBC_URL: String = sys.env("JDBC_URL")
val JDBC_USERNAME: String = sys.env("JDBC_USERNAME")
val JDBC_PASSWORD: String = sys.env("JDBC_PASSWORD")
val JDBC_DRIVER: String = "com.mysql.jdbc.Driver"
val tableName: String = "k9JPOiHGpv.world_city_population"
// Define JDBC Read Options
val readOptions: Map[String, String] = Map(
"driver" -> JDBC_DRIVER,
// table imdb_titles created and available under default database
val tableName = "default.imdb_titles"
val imdbTitles = spark.table(tableName)
// Using Spark SQL
val df = spark.sql("SELECT titleType, primaryTitle, originalTitle, startYear, endYear FROM default.imdb_titles")
/*
* Assume below JSON String is stored in a File
{
"type": "struct",
"fields": [
{
"name": "id",
"type": "integer",
"nullable": true,
/* Assume below is our File's Schema Definition
* id integer
* first_name string
* last_name string
* city string
* country string
* phone string
*/
import org.apache.spark.sql.types._
import org.apache.spark.sql.DataFrame
val filePath: String = "/user/hive/warehouse/product_json"
val jsonDF = spark.read.format("json").load(filePath)
import org.apache.spark.sql.DataFrame
val filePath: String = "/user/hive/warehouse/product_avro"
val avroDF: DataFrame = spark.read.format("avro").load(filePath)
import org.apache.spark.sql.DataFrame
val filePath: String = "/user/hive/warehouse/product_parquet"
val parquetDF: DataFrame = spark.read.parquet(filePath)