Skip to content

Instantly share code, notes, and snippets.

@anjijava16
Last active December 22, 2020 23:30
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save anjijava16/c489b8952201752086ab5a4f842b2ee7 to your computer and use it in GitHub Desktop.
Save anjijava16/c489b8952201752086ab5a4f842b2ee7 to your computer and use it in GitHub Desktop.
def getSparkSessionMongoDbConfig(parms: Map[String, String]): SparkSession = {
val spark = SparkSession
.builder
.appName(parms("JOB_NAME"))
.master("local[*]")
.config("spark.mongodb.input.uri", "mongodb://127.0.0.1/retaildb.orders?authSource=admin")
.config("spark.mongodb.output.uri", "mongodb://127.0.0.1/retaildb.orders?authSource=admin")
.getOrCreate()
val isS3Enable = parms("S3_OPERATION_ENABLE").toBoolean;
if (isS3Enable) {
spark.sparkContext.hadoopConfiguration.set("fs.s3a.access.key", parms("S3_ACCESS_KEY"))
spark.sparkContext.hadoopConfiguration.set("fs.s3a.secret.key", parms("S3_SECRET_KEY"))
spark.sparkContext.hadoopConfiguration.set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark.sparkContext.hadoopConfiguration.set("spark.speculation", "false")
}
return spark;
}
val fileName = "config/app_mongodb.properties";
val params: Map[String, String] = com.mts.analytics.utils.Utils.getConfig(fileName);
logger.info("File Name :" + params)
val spark = com.mts.analytics.utils.SparkUtils.getSparkSessionMongoDbConfig(params);
println("spark :" + spark)
// Read From File Parquet
val readFromParquetDF = spark.read.format("parquet").load("C:/data/mts_data/");
val writeMongoDF = spark.read.format("jdbc")
.option("url", params("JDBC_URL"))
.option("dbtable", "products")
.option("user", params("JDBC_USERNAME"))
.option("password", params("JDBC_PASSWORD")).load()
writeMongoDF.printSchema()
writeMongoDF.show(10, false)
//Write into MongoDB retaildb.products
writeMongoDF.write.format("com.mongodb.spark.sql.DefaultSource").mode("append").option("database", "retaildb").option("collection", "products").save();
print("Write Into MongoDB DataFrame completed ")
val readMongoDF = spark.read.format("com.mongodb.spark.sql.DefaultSource").option("uri", "mongodb://127.0.0.1/retaildb.products?authSource=admin").load()
readMongoDF.printSchema()
readMongoDF.show(10, false)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment