Skip to content

Instantly share code, notes, and snippets.

@MaxNevermind
Last active April 19, 2024 20:47
Show Gist options
  • Save MaxNevermind/0feaaf380520ca34c2637027ef349a7d to your computer and use it in GitHub Desktop.
Save MaxNevermind/0feaaf380520ca34c2637027ef349a7d to your computer and use it in GitHub Desktop.
A Spark Parquet utility, enables much faster modification or addition of a field to an extremely large dataset.
import org.slf4j.{Logger, LoggerFactory}
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.parquet.hadoop.rewrite.{RewriteOptions, ParquetRewriter}
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession}
import org.apache.spark.sql.functions.{col, element_at, expr, monotonically_increasing_id}
import java.net.URI
import scala.collection.JavaConverters._
/**
* A Spark Parquet stitching utility, enables much faster modification or addition of a field to an extremely large dataset.
* This class is supposed to be used in combination with this improvement to parquet-mr's ParquetRewriter: [[https://github.com/apache/parquet-mr/pull/1273]]
*
* ==Use-case==
* This utility is suitable for a following use case.
* You have a very large Parquet based dataset(dozens or hundreds of fields/Terabytes of data daily/Petabytes of historical partitions).
* You face one of the following tasks: {{{
* 1) Backfill. The task is to modify all historical data of a dataset but you need to modify/add only one or two fields
* 2) Projection enrichment join. You frequently project a massive input and that part stays stable but then it need to be joined with some enrichments. }}}
* It is trivial using Spark, but taking into consideration the share scale of a dataset it will take a lot of resources to do that.
* The point is Spark by default works pretty inefficiently in such cases and wastes a lot of resources, for large datasets simply writing the dataframe is often the main resource consumer.
* Even if all you need to do is to modify 1 field out of 100 Spark deserialize all 100 fields and then serialize them back without any modifications.
*
* ==Implementation details==
* We take a very large dataset as the input. We read it using just one executor for each file and add a two system columns:
* 1) input file name
* 2) id to each row in each file
* We pass that data frame to user provided transformer and then write it back preserving original file partitioning and ordering using our system columns.
* Now we have original input and transformed input, each file on the left has a corresponding file on the right with preserved ordering.
* Now we feed each file pair to parquet-mr library's ParquetRewriter class that quickly stitches each file pair in to a single one.
*
* ==ParquetRewriter implementation details==
* Parquet-mr library provides implementation of parquet data format and is used by many BigData tools including Spark.
* Spark workers use parquet-mr locally on each executor to read and write Parquet files.
* ParquetRewriter is a class from parquet-mr, it allows to stitch files with the same schema into a single file,
* it also can be used for effectively stitch/joining multiple parquet files with different schemas.
* You can provide the main input file group and multiple right side ones. {{{That is possible when:
* 1) the number of rows in the main and extra input groups are the same,
* 2) the ordering of rows in the main and extra input groups is the same. }}}
* ParquetRewriter will copy the main input as binary data and write extra input files with row groups adjusted to the main input.
* If the left is much larger than the right side then a lot of resources will be saved by working with the main input as binary.
* Parquet is a column oriented data format. Each file is split into chunks(row groups) and each chunk stores each column data separately,
* each chunk and each column from that chunk can be searched and accessed individually as a binary stream without touching the rest of the file.
* {{{
* |--------------|--------------|----------|--------------|
* | Chunk1_Col_1 | Chunk1_Col_2 | ... | Chunk1_Col_N |
* |--------------|--------------|----------|--------------|
* | Chunk2_Col_1 | Chunk2_Col_2 | ... | Chunk2_Col_N |
* |--------------|--------------|----------|--------------|
* | ... | ... | ... | ... |
* |--------------|--------------|----------|--------------|
* | ChunkN_Col_1 | ChunkN_Col_2 | ... | ChunkN_Col_N |
* |--------------|--------------|----------|--------------|
* }}}
* The way ParquetRewriter works while stitching files - it copies the left large input column chunks as binary data
* while rebuilding column chunks for the file on the right side.
*
* ==Recommendations==
* This class is supposed to be a tool for very large datasets, It is likely pointless to use it if you have just few gigabytes of data.
*
* ==Limitations==
* Currently doesn't support Spark's speculations and it need to be disabled `"spark.speculation": "false"`.
* Otherwise speculative tasks can interfere with each other.
*/
object FastParquetTransformer {
@transient
val logger: Logger = LoggerFactory.getLogger(getClass.getName)
val InputFileName = "input_file_name"
val MonotonicallyIncreasingId = "monotonically_increasing_id"
/**
* A function quickly stitches provided main input data frame with a transformed one.
* Can be used as optimization for addition/changing of few columns in extremely heavy dataset.
* See scaladoc of [[FastParquetTransformer]] for more details
*
* Input requirements:{{{
* - must contain parquet files}}}
*
* Transformer function requirements: {{{
* - must not delete or add new rows, each input rows should be transformed 1 to 1
* - must preserve system columns unmodified: [[InputFileName]] and [[MonotonicallyIncreasingId]]
* - must output only system columns and columns that were added/modified, all unmodified column must be dropped}}}
*
* Output schema:{{{
* - will contain a set of all columns from the main input and transformed input but without system columns
* - if columns in transformer dataframe present in the main input, the former overwrite the later
* - is not sorted, the main input column order is preserved, if new columns were added in transformer then they will be added at the end of the output schema}}}
*
* @param spark Spark Session
* @param inDir directory where read the main input from
* @param outDir directory where write the output to
* @param transformer DataFrame the main input transformation function
*/
def transformParquet(
spark: SparkSession,
inDir: String,
outDir: String,
tmpDir: String,
transformer: DataFrame => DataFrame
): Unit = {
assert(inDir != null && inDir.nonEmpty, s"Provided input directory [$inDir] can't be NULL or empty!")
assert(outDir != null && outDir.nonEmpty, s"Provided input directory [$outDir] can't be NULL or empty!")
assert(tmpDir != null && tmpDir.nonEmpty, s"Provided tmp directory [$tmpDir] can't be NULL or empty!")
assert(Set(inDir, outDir, tmpDir).size == 3, s"Provided input & output & tmp directory must all be different!")
// Let's create a new session and increase maxPartitionBytes & openCostInBytes to 1 TB so each file will be a part of a single partition.
// This is needed so we monotonically_increasing_id later is consistent for all records in the same ASP files
// and we can later sort by monotonically_increasing_id to get records in a the output file in the exactly the same order as they were present in the input.
// So monotonically_increasing_id is preserved from ASP input files and later reused for joining back with mapped input.
val newSession = spark.newSession()
val TB = (1024L * 1024L * 1024L * 1024L).toString
newSession.sessionState.conf.setConfString("spark.sql.files.maxPartitionBytes", TB)
newSession.sessionState.conf.setConfString("spark.sql.files.openCostInBytes", TB)
newSession.read
.parquet(inDir)
.withColumn(InputFileName, element_at(expr("""split(input_file_name(), "\/")"""), -1))
.withColumn(MonotonicallyIncreasingId, monotonically_increasing_id)
.transform(transformer)
.repartition(col(InputFileName))
.sortWithinPartitions(InputFileName, MonotonicallyIncreasingId)
.drop(MonotonicallyIncreasingId)
.write
.mode(SaveMode.Overwrite)
.partitionBy(InputFileName)
.parquet(tmpDir)
@transient val hConf = spark.sparkContext.hadoopConfiguration
@transient val fsL = FileSystem.get(new URI(inDir), hConf)
@transient val fsR = FileSystem.get(new URI(tmpDir), hConf) // files can be located on a different HDFS cluster, let's create a separate fs for such a case to avoid IllegalArgumentException
val filesL = fsL
.listStatus(new Path(inDir))
.map(_.getPath.getName)
.filter(!_.startsWith("_"))
.filter(_.endsWith(".parquet"))
val filesR = filesL.map(
fileL =>
fsR
.listStatus(new Path(s"$tmpDir/input_file_name=$fileL"))
.map(_.getPath.getName)
.filter(!_.startsWith("_"))
.filter(_.endsWith(".parquet"))
.head // we expect just one file
)
assert(filesL.nonEmpty, s"Provided input directory [$inDir] doesn't contain parquet files!")
val files = filesL.zip(filesR)
logger.info(s"List of file pairs for a fast parquet join: [${files.mkString(",")}]!")
@transient val hConfProps = hConf.iterator.asScala.map(entry => (entry.getKey, entry.getValue)).toMap
val hConfPropsBroadcast = spark.sparkContext.broadcast(hConfProps)
newSession.sparkContext
.parallelize(files, files.length)
.foreachPartition(
iter => {
val hConf = new org.apache.hadoop.conf.Configuration()
hConfPropsBroadcast.value.foreach {
case (key, value) => hConf.set(key, value)
}
val fs = FileSystem.get(new URI(outDir), hConf)
iter.foreach {
case (fileNameL, fileNameR) =>
val inputPathsL = Seq(new Path(s"$inDir/$fileNameL")).asJava
val inputPathsR = Seq(Seq(new Path(s"$tmpDir/input_file_name=$fileNameL/$fileNameR")).asJava).asJava
val outPath = new Path(s"$outDir/$fileNameL")
if (fs.exists(outPath)) {
logger.warn(s"Removing existing output file [$outPath]!")
fs.delete(outPath, false) // TODO this might break with Spark speculation turned on?
}
logger.info(
s"Processing inputPathsL: [${inputPathsL.toString}], inputPathsR: [${inputPathsR.toString}], outPath: [${outPath.toString}]"
)
val joiner = new ParquetRewriter(
new RewriteOptions.Builder(hConf, inputPathsL, inputPathsR, outPath).build()
)
joiner.processBlocks()
joiner.close()
logger.info(
s"Processed inputPathsL: [${inputPathsL.toString}], inputPathsR: [${inputPathsR.toString}], outPath: [${outPath.toString}]"
)
}
}
)
FileSystem.get(new URI(outDir), hConf).createNewFile(new Path(s"$outDir/_SUCCESS"))
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment