Created
December 15, 2017 10:17
-
-
Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
CSV to Parquet using Spark (Scala)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mypackage.SparkSessionBuilder | |
import org.apache.log4j.{Level, LogManager} | |
import org.apache.spark.sql.{DataFrame, SaveMode} | |
import org.joda.time.DateTime | |
import scala.util.control.Breaks._ | |
object DataMover { | |
def main(args: Array[String]): Unit = { | |
implicit val spark = new SparkSessionBuilder("DataMoverDriver").build() | |
val sourceTableName: String = "sourceDB.sourceTable" | |
val destinationTableName: String = "destinationDB.destinationTable" | |
val repartitionCount: Int = 75 | |
val today: DateTime = new DateTime() | |
var count: Int = 0 | |
var i: Int = 0 | |
val log = LogManager.getRootLogger | |
log.setLevel(Level.DEBUG) | |
spark.sql("set hive.exec.dynamic.partition=true") | |
spark.sql("set hive.exec.dynamic.partition.mode=\"nonstrict\"") | |
breakable { | |
do { | |
val dayOfWeek: DateTime.Property = today.minusWeeks(i).dayOfWeek() | |
val firstDayOfWeek: DateTime = dayOfWeek.withMinimumValue().minusDays(1) | |
val lastDayOfWeek: DateTime = dayOfWeek.withMaximumValue().minusDays(1) | |
val fromDate: String = firstDayOfWeek.toString("YYYYMMdd") | |
val toDate: String = lastDayOfWeek.toString("YYYYMMdd") | |
try { | |
// Read data | |
log.debug(s"iteration $i: reading data for dates between $fromDate and $toDate") | |
val df: DataFrame = spark.sql(s"SELECT * FROM $sourceTableName WHERE dt BETWEEN '$fromDate' AND '$toDate'").cache() | |
// Find count | |
count = df.count().toInt | |
log.debug(s"iteration $i: read data for dates between $fromDate and $toDate, count = $count") | |
/** | |
* Write data with following configurations: | |
* - Compression: SNAPPY | |
* - Format: Parquet | |
* - SaveMode: Append | |
**/ | |
if (count > 0) { | |
log.debug(s"iteration $i: writing data for dates between $fromDate and $toDate") | |
df. | |
repartition(repartitionCount). | |
write. | |
option("compression", "snappy"). | |
format("parquet"). | |
mode(SaveMode.Append). | |
insertInto(destinationTableName) | |
log.debug(s"iteration $i: written data for dates between $fromDate and $toDate") | |
} else { | |
log.debug(s"iteration $i: no data for dates between $fromDate and $toDate") | |
break | |
} | |
} catch { | |
case ex: Exception => log.debug(s"iteration $i: data movement failed for dates $fromDate to $toDate: ${ex.toString}") | |
} | |
i += 1 | |
} while (count > 0) | |
} | |
} | |
} |
Here data has been read from a table (that had it's data stored as CSV) and written as Parquet to another (Hive) table
Data has been moved in batches of one week (Sunday to Saturday, Indian locale)
For US (and other similar) locale, remove the .minusDays(1)
part while computing the firstDayOfWeek
and lastDayOfWeek
the do..while loop here runs till there is data to be moved
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
SparkSession object (here spark) can be obtained in variety of ways.
So modify this accordingly
Following part was only added to make configurations as per my requirements (Zeppelin, Spark, S3) and hasn't been tested (I added it here just to remind that this maybe required). Comment-out these lines if they cause problems
At least on Zeppelin notebook, you can run the following two paragraphs (don't put them together in single paragraph) to apply the above configurations (instead of putting them in your code like here)