Skip to content

Instantly share code, notes, and snippets.

@y2k-shubham
Created December 15, 2017 10:17
Show Gist options
  • Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
CSV to Parquet using Spark (Scala)
import com.mypackage.SparkSessionBuilder
import org.apache.log4j.{Level, LogManager}
import org.apache.spark.sql.{DataFrame, SaveMode}
import org.joda.time.DateTime
import scala.util.control.Breaks._
object DataMover {
def main(args: Array[String]): Unit = {
implicit val spark = new SparkSessionBuilder("DataMoverDriver").build()
val sourceTableName: String = "sourceDB.sourceTable"
val destinationTableName: String = "destinationDB.destinationTable"
val repartitionCount: Int = 75
val today: DateTime = new DateTime()
var count: Int = 0
var i: Int = 0
val log = LogManager.getRootLogger
log.setLevel(Level.DEBUG)
spark.sql("set hive.exec.dynamic.partition=true")
spark.sql("set hive.exec.dynamic.partition.mode=\"nonstrict\"")
breakable {
do {
val dayOfWeek: DateTime.Property = today.minusWeeks(i).dayOfWeek()
val firstDayOfWeek: DateTime = dayOfWeek.withMinimumValue().minusDays(1)
val lastDayOfWeek: DateTime = dayOfWeek.withMaximumValue().minusDays(1)
val fromDate: String = firstDayOfWeek.toString("YYYYMMdd")
val toDate: String = lastDayOfWeek.toString("YYYYMMdd")
try {
// Read data
log.debug(s"iteration $i: reading data for dates between $fromDate and $toDate")
val df: DataFrame = spark.sql(s"SELECT * FROM $sourceTableName WHERE dt BETWEEN '$fromDate' AND '$toDate'").cache()
// Find count
count = df.count().toInt
log.debug(s"iteration $i: read data for dates between $fromDate and $toDate, count = $count")
/**
* Write data with following configurations:
* - Compression: SNAPPY
* - Format: Parquet
* - SaveMode: Append
**/
if (count > 0) {
log.debug(s"iteration $i: writing data for dates between $fromDate and $toDate")
df.
repartition(repartitionCount).
write.
option("compression", "snappy").
format("parquet").
mode(SaveMode.Append).
insertInto(destinationTableName)
log.debug(s"iteration $i: written data for dates between $fromDate and $toDate")
} else {
log.debug(s"iteration $i: no data for dates between $fromDate and $toDate")
break
}
} catch {
case ex: Exception => log.debug(s"iteration $i: data movement failed for dates $fromDate to $toDate: ${ex.toString}")
}
i += 1
} while (count > 0)
}
}
}
@y2k-shubham
Copy link
Author

Here data has been read from a table (that had it's data stored as CSV) and written as Parquet to another (Hive) table

@y2k-shubham
Copy link
Author

Data has been moved in batches of one week (Sunday to Saturday, Indian locale)
For US (and other similar) locale, remove the .minusDays(1) part while computing the firstDayOfWeek and lastDayOfWeek

@y2k-shubham
Copy link
Author

the do..while loop here runs till there is data to be moved

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment