Skip to content

Instantly share code, notes, and snippets.

@y2k-shubham
Created December 15, 2017 10:17
Show Gist options
  • Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
CSV to Parquet using Spark (Scala)
import com.mypackage.SparkSessionBuilder
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.joda.time.DateTime
import scala.util.control.Breaks._
object DataMover {
def main(args: Array[String]): Unit = {
implicit val spark = new SparkSessionBuilder("DataMoverDriver").build()
val sourceTableName: String = "sourceDB.sourceTable"
val destinationTableName: String = "destinationDB.destinationTable"
val repartitionCount: Int = 75
val today: DateTime = new DateTime()
var count: Int = 0
var i: Int = 0
val log = LogManager.getRootLogger
log.setLevel(Level.DEBUG)
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=\"nonstrict\"")
breakable {
do {
val dayOfWeek: DateTime.Property = today.minusWeeks(i).dayOfWeek()
val firstDayOfWeek: DateTime = dayOfWeek.withMinimumValue().minusDays(1)
val lastDayOfWeek: DateTime = dayOfWeek.withMaximumValue().minusDays(1)
val fromDate: String = firstDayOfWeek.toString("YYYYMMdd")
val toDate: String = lastDayOfWeek.toString("YYYYMMdd")
try {
// Read data
log.debug(s"iteration $i: reading data for dates between $fromDate and $toDate")
val df: DataFrame = spark.sql(s"SELECT * FROM $sourceTableName WHERE dt BETWEEN '$fromDate' AND '$toDate'").cache()
// Find count
count = df.count().toInt
log.debug(s"iteration $i: read data for dates between $fromDate and $toDate, count = $count")
/**
* Write data with following configurations:
* - Compression: SNAPPY
* - Format: Parquet
* - SaveMode: Append
**/
if (count > 0) {
log.debug(s"iteration $i: writing data for dates between $fromDate and $toDate")
df.
repartition(repartitionCount).
write.
option("compression", "snappy").
format("parquet").
mode(SaveMode.Append).
insertInto(destinationTableName)
log.debug(s"iteration $i: written data for dates between $fromDate and $toDate")
} else {
log.debug(s"iteration $i: no data for dates between $fromDate and $toDate")
break
}
} catch {
case ex: Exception => log.debug(s"iteration $i: data movement failed for dates $fromDate to $toDate: ${ex.toString}")
}
i += 1
} while (count > 0)
}
}
}
@y2k-shubham
Copy link
Author

y2k-shubham commented Dec 15, 2017

SparkSession object (here spark) can be obtained in variety of ways.
So modify this accordingly

import com.mypackage.SparkSessionBuilder
implicit val spark = new SparkSessionBuilder("DataMoverDriver").build()

Following part was only added to make configurations as per my requirements (Zeppelin, Spark, S3) and hasn't been tested (I added it here just to remind that this maybe required). Comment-out these lines if they cause problems

spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=\"nonstrict\"")

At least on Zeppelin notebook, you can run the following two paragraphs (don't put them together in single paragraph) to apply the above configurations (instead of putting them in your code like here)

%sql
set hive.exec.dynamic.partition=true
%sql
set hive.exec.dynamic.partition.mode="nonstrict"

@y2k-shubham
Copy link
Author

Here data has been read from a table (that had it's data stored as CSV) and written as Parquet to another (Hive) table

@y2k-shubham
Copy link
Author

Data has been moved in batches of one week (Sunday to Saturday, Indian locale)
For US (and other similar) locale, remove the .minusDays(1) part while computing the firstDayOfWeek and lastDayOfWeek

@y2k-shubham
Copy link
Author

the do..while loop here runs till there is data to be moved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment