Last active
April 19, 2024 20:47
-
-
Save MaxNevermind/0feaaf380520ca34c2637027ef349a7d to your computer and use it in GitHub Desktop.
A Spark Parquet utility, enables much faster modification or addition of a field to an extremely large dataset.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import org.slf4j.{Logger, LoggerFactory} | |
import org.apache.hadoop.fs.{FileSystem, Path} | |
import org.apache.parquet.hadoop.rewrite.{RewriteOptions, ParquetRewriter} | |
import org.apache.spark.sql.{DataFrame, SaveMode, SparkSession} | |
import org.apache.spark.sql.functions.{col, element_at, expr, monotonically_increasing_id} | |
import java.net.URI | |
import scala.collection.JavaConverters._ | |
/** | |
* A Spark Parquet stitching utility, enables much faster modification or addition of a field to an extremely large dataset. | |
* This class is supposed to be used in combination with this improvement to parquet-mr's ParquetRewriter: [[https://github.com/apache/parquet-mr/pull/1273]] | |
* | |
* ==Use-case== | |
* This utility is suitable for a following use case. | |
* You have a very large Parquet based dataset(dozens or hundreds of fields/Terabytes of data daily/Petabytes of historical partitions). | |
* You face one of the following tasks: {{{ | |
* 1) Backfill. The task is to modify all historical data of a dataset but you need to modify/add only one or two fields | |
* 2) Projection enrichment join. You frequently project a massive input and that part stays stable but then it need to be joined with some enrichments. }}} | |
* It is trivial using Spark, but taking into consideration the share scale of a dataset it will take a lot of resources to do that. | |
* The point is Spark by default works pretty inefficiently in such cases and wastes a lot of resources, for large datasets simply writing the dataframe is often the main resource consumer. | |
* Even if all you need to do is to modify 1 field out of 100 Spark deserialize all 100 fields and then serialize them back without any modifications. | |
* | |
* ==Implementation details== | |
* We take a very large dataset as the input. We read it using just one executor for each file and add a two system columns: | |
* 1) input file name | |
* 2) id to each row in each file | |
* We pass that data frame to user provided transformer and then write it back preserving original file partitioning and ordering using our system columns. | |
* Now we have original input and transformed input, each file on the left has a corresponding file on the right with preserved ordering. | |
* Now we feed each file pair to parquet-mr library's ParquetRewriter class that quickly stitches each file pair in to a single one. | |
* | |
* ==ParquetRewriter implementation details== | |
* Parquet-mr library provides implementation of parquet data format and is used by many BigData tools including Spark. | |
* Spark workers use parquet-mr locally on each executor to read and write Parquet files. | |
* ParquetRewriter is a class from parquet-mr, it allows to stitch files with the same schema into a single file, | |
* it also can be used for effectively stitch/joining multiple parquet files with different schemas. | |
* You can provide the main input file group and multiple right side ones. {{{That is possible when: | |
* 1) the number of rows in the main and extra input groups are the same, | |
* 2) the ordering of rows in the main and extra input groups is the same. }}} | |
* ParquetRewriter will copy the main input as binary data and write extra input files with row groups adjusted to the main input. | |
* If the left is much larger than the right side then a lot of resources will be saved by working with the main input as binary. | |
* Parquet is a column oriented data format. Each file is split into chunks(row groups) and each chunk stores each column data separately, | |
* each chunk and each column from that chunk can be searched and accessed individually as a binary stream without touching the rest of the file. | |
* {{{ | |
* |--------------|--------------|----------|--------------| | |
* | Chunk1_Col_1 | Chunk1_Col_2 | ... | Chunk1_Col_N | | |
* |--------------|--------------|----------|--------------| | |
* | Chunk2_Col_1 | Chunk2_Col_2 | ... | Chunk2_Col_N | | |
* |--------------|--------------|----------|--------------| | |
* | ... | ... | ... | ... | | |
* |--------------|--------------|----------|--------------| | |
* | ChunkN_Col_1 | ChunkN_Col_2 | ... | ChunkN_Col_N | | |
* |--------------|--------------|----------|--------------| | |
* }}} | |
* The way ParquetRewriter works while stitching files - it copies the left large input column chunks as binary data | |
* while rebuilding column chunks for the file on the right side. | |
* | |
* ==Recommendations== | |
* This class is supposed to be a tool for very large datasets, It is likely pointless to use it if you have just few gigabytes of data. | |
* | |
* ==Limitations== | |
* Currently doesn't support Spark's speculations and it need to be disabled `"spark.speculation": "false"`. | |
* Otherwise speculative tasks can interfere with each other. | |
*/ | |
object FastParquetTransformer { | |
@transient | |
val logger: Logger = LoggerFactory.getLogger(getClass.getName) | |
val InputFileName = "input_file_name" | |
val MonotonicallyIncreasingId = "monotonically_increasing_id" | |
/** | |
* A function quickly stitches provided main input data frame with a transformed one. | |
* Can be used as optimization for addition/changing of few columns in extremely heavy dataset. | |
* See scaladoc of [[FastParquetTransformer]] for more details | |
* | |
* Input requirements:{{{ | |
* - must contain parquet files}}} | |
* | |
* Transformer function requirements: {{{ | |
* - must not delete or add new rows, each input rows should be transformed 1 to 1 | |
* - must preserve system columns unmodified: [[InputFileName]] and [[MonotonicallyIncreasingId]] | |
* - must output only system columns and columns that were added/modified, all unmodified column must be dropped}}} | |
* | |
* Output schema:{{{ | |
* - will contain a set of all columns from the main input and transformed input but without system columns | |
* - if columns in transformer dataframe present in the main input, the former overwrite the later | |
* - is not sorted, the main input column order is preserved, if new columns were added in transformer then they will be added at the end of the output schema}}} | |
* | |
* @param spark Spark Session | |
* @param inDir directory where read the main input from | |
* @param outDir directory where write the output to | |
* @param transformer DataFrame the main input transformation function | |
*/ | |
def transformParquet( | |
spark: SparkSession, | |
inDir: String, | |
outDir: String, | |
tmpDir: String, | |
transformer: DataFrame => DataFrame | |
): Unit = { | |
assert(inDir != null && inDir.nonEmpty, s"Provided input directory [$inDir] can't be NULL or empty!") | |
assert(outDir != null && outDir.nonEmpty, s"Provided input directory [$outDir] can't be NULL or empty!") | |
assert(tmpDir != null && tmpDir.nonEmpty, s"Provided tmp directory [$tmpDir] can't be NULL or empty!") | |
assert(Set(inDir, outDir, tmpDir).size == 3, s"Provided input & output & tmp directory must all be different!") | |
// Let's create a new session and increase maxPartitionBytes & openCostInBytes to 1 TB so each file will be a part of a single partition. | |
// This is needed so we monotonically_increasing_id later is consistent for all records in the same ASP files | |
// and we can later sort by monotonically_increasing_id to get records in a the output file in the exactly the same order as they were present in the input. | |
// So monotonically_increasing_id is preserved from ASP input files and later reused for joining back with mapped input. | |
val newSession = spark.newSession() | |
val TB = (1024L * 1024L * 1024L * 1024L).toString | |
newSession.sessionState.conf.setConfString("spark.sql.files.maxPartitionBytes", TB) | |
newSession.sessionState.conf.setConfString("spark.sql.files.openCostInBytes", TB) | |
newSession.read | |
.parquet(inDir) | |
.withColumn(InputFileName, element_at(expr("""split(input_file_name(), "\/")"""), -1)) | |
.withColumn(MonotonicallyIncreasingId, monotonically_increasing_id) | |
.transform(transformer) | |
.repartition(col(InputFileName)) | |
.sortWithinPartitions(InputFileName, MonotonicallyIncreasingId) | |
.drop(MonotonicallyIncreasingId) | |
.write | |
.mode(SaveMode.Overwrite) | |
.partitionBy(InputFileName) | |
.parquet(tmpDir) | |
@transient val hConf = spark.sparkContext.hadoopConfiguration | |
@transient val fsL = FileSystem.get(new URI(inDir), hConf) | |
@transient val fsR = FileSystem.get(new URI(tmpDir), hConf) // files can be located on a different HDFS cluster, let's create a separate fs for such a case to avoid IllegalArgumentException | |
val filesL = fsL | |
.listStatus(new Path(inDir)) | |
.map(_.getPath.getName) | |
.filter(!_.startsWith("_")) | |
.filter(_.endsWith(".parquet")) | |
val filesR = filesL.map( | |
fileL => | |
fsR | |
.listStatus(new Path(s"$tmpDir/input_file_name=$fileL")) | |
.map(_.getPath.getName) | |
.filter(!_.startsWith("_")) | |
.filter(_.endsWith(".parquet")) | |
.head // we expect just one file | |
) | |
assert(filesL.nonEmpty, s"Provided input directory [$inDir] doesn't contain parquet files!") | |
val files = filesL.zip(filesR) | |
logger.info(s"List of file pairs for a fast parquet join: [${files.mkString(",")}]!") | |
@transient val hConfProps = hConf.iterator.asScala.map(entry => (entry.getKey, entry.getValue)).toMap | |
val hConfPropsBroadcast = spark.sparkContext.broadcast(hConfProps) | |
newSession.sparkContext | |
.parallelize(files, files.length) | |
.foreachPartition( | |
iter => { | |
val hConf = new org.apache.hadoop.conf.Configuration() | |
hConfPropsBroadcast.value.foreach { | |
case (key, value) => hConf.set(key, value) | |
} | |
val fs = FileSystem.get(new URI(outDir), hConf) | |
iter.foreach { | |
case (fileNameL, fileNameR) => | |
val inputPathsL = Seq(new Path(s"$inDir/$fileNameL")).asJava | |
val inputPathsR = Seq(Seq(new Path(s"$tmpDir/input_file_name=$fileNameL/$fileNameR")).asJava).asJava | |
val outPath = new Path(s"$outDir/$fileNameL") | |
if (fs.exists(outPath)) { | |
logger.warn(s"Removing existing output file [$outPath]!") | |
fs.delete(outPath, false) // TODO this might break with Spark speculation turned on? | |
} | |
logger.info( | |
s"Processing inputPathsL: [${inputPathsL.toString}], inputPathsR: [${inputPathsR.toString}], outPath: [${outPath.toString}]" | |
) | |
val joiner = new ParquetRewriter( | |
new RewriteOptions.Builder(hConf, inputPathsL, inputPathsR, outPath).build() | |
) | |
joiner.processBlocks() | |
joiner.close() | |
logger.info( | |
s"Processed inputPathsL: [${inputPathsL.toString}], inputPathsR: [${inputPathsR.toString}], outPath: [${outPath.toString}]" | |
) | |
} | |
} | |
) | |
FileSystem.get(new URI(outDir), hConf).createNewFile(new Path(s"$outDir/_SUCCESS")) | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment