Created
December 15, 2017 10:17
-
-
Save y2k-shubham/f6227e1528f3eb1c849b3f3f275bdc80 to your computer and use it in GitHub Desktop.
CSV to Parquet using Spark (Scala)
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import com.mypackage.SparkSessionBuilder | |
import org.apache.log4j.{Level, LogManager} | |
import org.apache.spark.sql.{DataFrame, SaveMode} | |
import org.joda.time.DateTime | |
import scala.util.control.Breaks._ | |
object DataMover { | |
def main(args: Array[String]): Unit = { | |
implicit val spark = new SparkSessionBuilder("DataMoverDriver").build() | |
val sourceTableName: String = "sourceDB.sourceTable" | |
val destinationTableName: String = "destinationDB.destinationTable" | |
val repartitionCount: Int = 75 | |
val today: DateTime = new DateTime() | |
var count: Int = 0 | |
var i: Int = 0 | |
val log = LogManager.getRootLogger | |
log.setLevel(Level.DEBUG) | |
spark.sql("set hive.exec.dynamic.partition=true") | |
spark.sql("set hive.exec.dynamic.partition.mode=\"nonstrict\"") | |
breakable { | |
do { | |
val dayOfWeek: DateTime.Property = today.minusWeeks(i).dayOfWeek() | |
val firstDayOfWeek: DateTime = dayOfWeek.withMinimumValue().minusDays(1) | |
val lastDayOfWeek: DateTime = dayOfWeek.withMaximumValue().minusDays(1) | |
val fromDate: String = firstDayOfWeek.toString("YYYYMMdd") | |
val toDate: String = lastDayOfWeek.toString("YYYYMMdd") | |
try { | |
// Read data | |
log.debug(s"iteration $i: reading data for dates between $fromDate and $toDate") | |
val df: DataFrame = spark.sql(s"SELECT * FROM $sourceTableName WHERE dt BETWEEN '$fromDate' AND '$toDate'").cache() | |
// Find count | |
count = df.count().toInt | |
log.debug(s"iteration $i: read data for dates between $fromDate and $toDate, count = $count") | |
/** | |
* Write data with following configurations: | |
* - Compression: SNAPPY | |
* - Format: Parquet | |
* - SaveMode: Append | |
**/ | |
if (count > 0) { | |
log.debug(s"iteration $i: writing data for dates between $fromDate and $toDate") | |
df. | |
repartition(repartitionCount). | |
write. | |
option("compression", "snappy"). | |
format("parquet"). | |
mode(SaveMode.Append). | |
insertInto(destinationTableName) | |
log.debug(s"iteration $i: written data for dates between $fromDate and $toDate") | |
} else { | |
log.debug(s"iteration $i: no data for dates between $fromDate and $toDate") | |
break | |
} | |
} catch { | |
case ex: Exception => log.debug(s"iteration $i: data movement failed for dates $fromDate to $toDate: ${ex.toString}") | |
} | |
i += 1 | |
} while (count > 0) | |
} | |
} | |
} |
the do..while loop here runs till there is data to be moved
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Data has been moved in batches of one week (Sunday to Saturday, Indian locale)
For US (and other similar) locale, remove the
.minusDays(1)
part while computing thefirstDayOfWeek
andlastDayOfWeek