Skip to content

Instantly share code, notes, and snippets.

@hotienvu
Created October 2, 2020 08:21
Show Gist options
  • Save hotienvu/3a859c26c8b997b313e5a0a2b794391c to your computer and use it in GitHub Desktop.
Save hotienvu/3a859c26c8b997b313e5a0a2b794391c to your computer and use it in GitHub Desktop.
package com.vho.hudisparkstreaming;
import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.config.HoodieWriteConfig;
import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.CustomKeyGenerator;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.streaming.*;
import static org.apache.hudi.common.model.HoodieTableType.COPY_ON_WRITE;
import static org.apache.spark.sql.functions.expr;
import static org.apache.spark.sql.functions.lit;
public class HudiNestedParitionSyncFailure {
public static void main(String[] args) {
SparkSession spark = SparkSession.builder()
.appName("hoodie spark streaming")
.master("local[*]")
.enableHiveSupport()
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
.getOrCreate();
Dataset<Row> inputStream = spark.readStream().format("rate").option("rowsPerSecond", 10).load()
.withColumn("year", lit(2020))
.withColumn("month", expr("cast(rand(5)*2 as int) + 1"))
.withColumn("day", expr("cast(rand(5)*3 as int) + 1"))
.withColumn("ts", expr("current_timestamp()"))
.withColumn("dt", expr("cast(concat(year, '-', month, '-', day) as date)"))
.withColumn("id", expr("cast(rand(5) * 5 as int) + 1"));
final String partitionPath = "year:SIMPLE,month:SIMPLE,day:SIMPLE";
final String cowTablePath = "/tmp/hoodie/hoodie_streaming_cow";
final String cowTableName = "hoodie_streaming_cow";
DataStreamWriter<Row> writer = inputStream.writeStream()
.format("hudi")
.option("hoodie.insert.shuffle.parallelism", "2")
.option("hoodie.upsert.shuffle.parallelism", "2")
.option(DataSourceWriteOptions.TABLE_TYPE_OPT_KEY(), COPY_ON_WRITE.name())
.option(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY(), "id")
.option(DataSourceWriteOptions.PARTITIONPATH_FIELD_OPT_KEY(), partitionPath)
.option(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY(), CustomKeyGenerator.class.getCanonicalName())
.option(DataSourceWriteOptions.PRECOMBINE_FIELD_OPT_KEY(), "ts")
.option(DataSourceWriteOptions.HIVE_SYNC_ENABLED_OPT_KEY(), "true")
.option(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(), cowTableName)
.option(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), partitionPath)
.option(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(), MultiPartKeysValueExtractor.class.getCanonicalName())
.option(DataSourceWriteOptions.HIVE_STYLE_PARTITIONING_OPT_KEY(), "true")
.option(HoodieWriteConfig.TABLE_NAME, cowTableName)
.option("checkpointLocation", cowTablePath + "/.checkpoint")
.outputMode(OutputMode.Append())
.trigger(Trigger.ProcessingTime("60 seconds"));
StreamingQuery stream = writer.start(cowTablePath);
try {
stream.awaitTermination();
} catch (StreamingQueryException e) {
e.printStackTrace();
stream.stop();
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment