This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mongodb.spark._ | |
val database = "sample_mflix" | |
val collection = "comments" | |
val movieCommetsDF = ( | |
spark.read | |
.format("com.mongodb.spark.sql.DefaultSource") | |
.option("database", database) | |
.option("collection", collection) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Define Read Options with partitionColumn, lowerBound, upperBound and numPartitions | |
val partitionColumn: String = "id" | |
val numPartitions: String = "4" // We can change as per the Data Volume and Source System | |
val readOptions: Map[String, String] = Map( | |
"driver" -> JDBC_DRIVER, | |
"url" -> JDBC_URL, | |
"user" -> JDBC_USERNAME, | |
"password" -> JDBC_PASSWORD, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Method to generate Boundary Query | |
def buildBoundaryQuery(column: String, tableName: String): String = { | |
s"(SELECT MIN(${column}) AS lowerBound, MAX(${column}) AS upperBound FROM ${tableName}) bq" | |
} | |
val bq = buildBoundaryQuery("id", tableName) | |
// Boundary Read Options | |
val boundaryReadOptions = Map( | |
"driver" -> JDBC_DRIVER, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// Read JDBC Configuration from ENV Variables | |
val JDBC_URL: String = sys.env("JDBC_URL") | |
val JDBC_USERNAME: String = sys.env("JDBC_USERNAME") | |
val JDBC_PASSWORD: String = sys.env("JDBC_PASSWORD") | |
val JDBC_DRIVER: String = "com.mysql.jdbc.Driver" | |
val tableName: String = "k9JPOiHGpv.world_city_population" | |
// Define JDBC Read Options | |
val readOptions: Map[String, String] = Map( | |
"driver" -> JDBC_DRIVER, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
// table imdb_titles created and available under default database | |
val tableName = "default.imdb_titles" | |
val imdbTitles = spark.table(tableName) | |
// Using Spark SQL | |
val df = spark.sql("SELECT titleType, primaryTitle, originalTitle, startYear, endYear FROM default.imdb_titles") |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* | |
* Assume below JSON String is stored in a File | |
{ | |
"type": "struct", | |
"fields": [ | |
{ | |
"name": "id", | |
"type": "integer", | |
"nullable": true, |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
/* Assume below is our File's Schema Definition | |
* id integer | |
* first_name string | |
* last_name string | |
* city string | |
* country string | |
* phone string | |
*/ | |
import org.apache.spark.sql.types._ |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.DataFrame | |
val filePath: String = "/user/hive/warehouse/product_json" | |
val jsonDF = spark.read.format("json").load(filePath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.DataFrame | |
val filePath: String = "/user/hive/warehouse/product_avro" | |
val avroDF: DataFrame = spark.read.format("avro").load(filePath) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.apache.spark.sql.DataFrame | |
val filePath: String = "/user/hive/warehouse/product_parquet" | |
val parquetDF: DataFrame = spark.read.parquet(filePath) |
NewerOlder