Skip to content

Instantly share code, notes, and snippets.

@alexeykudinkin
Created June 9, 2022 20:34
Show Gist options
  • Save alexeykudinkin/233ce2d365ae4a9833f557de7ed5d1b9 to your computer and use it in GitHub Desktop.
Save alexeykudinkin/233ce2d365ae4a9833f557de7ed5d1b9 to your computer and use it in GitHub Desktop.
Ingesting Amazon Reviews dataset into a Hudi table on S3
import org.apache.hadoop.fs.{FileStatus, Path}
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.{DataSourceReadOptions, DataSourceWriteOptions}
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.config.HoodieMetadataConfig
import org.apache.hudi.common.fs.FSUtils
import org.apache.hudi.common.table.HoodieTableMetaClient
import org.apache.hudi.common.util.ClusteringUtils
import org.apache.hudi.config.{HoodieClusteringConfig, HoodieStorageConfig, HoodieWriteConfig}
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.metadata.{HoodieMetadataPayload, MetadataPartitionType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions.col
val layoutStrategy = "extra_small_file"; // "standard", "small_file", "extra_small_file"
val datasetName = "amazon_reviews"
val tableName = s"${datasetName}_${layoutStrategy}"
val inputPath = "s3a://amazon-reviews-pds/parquet"
val outputPath = s"s3a://alexey-testlake/$datasetName"
def safeTableName(s: String) = s.replace('-', '_')
val commonOpts =
Map(
"hoodie.compact.inline" -> "false",
"hoodie.bulk_insert.shuffle.parallelism" -> "10"
)
val layoutOpts =
layoutStrategy match {
case "extra_small_file" =>
Map(
"hoodie.parquet.small.file.limit" -> String.valueOf(2 * 1024 * 1024), // 2Mb
"hoodie.parquet.max.file.size" -> String.valueOf(2 * 1024 * 1024) // 2Mb
)
// ...
case _ =>
throw new UnsupportedOperationException("Not supported");
}
////////////////////////////////////////////////////////////////
// Writing to Hudi
////////////////////////////////////////////////////////////////
val fs = FSUtils.getFs(outputPath, spark.sparkContext.hadoopConfiguration)
if (!fs.exists(new Path(outputPath))) {
val df = spark.read.parquet(inputPath)
df.write.format("hudi")
.option(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME.key(), "zstd")
.option(HoodieMetadataConfig.ENABLE_METADATA_INDEX_COLUMN_STATS.key, "true")
.option(DataSourceWriteOptions.TABLE_TYPE.key(), COW_TABLE_TYPE_OPT_VAL)
.option("hoodie.table.name", tableName)
.option(PRECOMBINE_FIELD.key(), "review_id")
.option(RECORDKEY_FIELD.key(), "review_id")
//.option(DataSourceWriteOptions.PARTITIONPATH_FIELD.key(), "product_category")
.option(HoodieWriteConfig.KEYGENERATOR_CLASS_NAME.key(), "org.apache.hudi.keygen.NonpartitionedKeyGenerator")
.option(DataSourceWriteOptions.OPERATION.key(), DataSourceWriteOptions.BULK_INSERT_OPERATION_OPT_VAL)
.option(BULK_INSERT_SORT_MODE.key(), "NONE")
.options(layoutOpts)
.options(commonOpts)
.mode(ErrorIfExists)
.save(outputPath)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment