Skip to content

Instantly share code, notes, and snippets.

num_cols = 156 # Adjust this based on your actual number of columns
# Create an empty list to hold the StructFields
fields = []
# Generate StructFields in a loop
for i in range(1, num_cols + 1):
field_name = f"col{i}"
field_type = StringType() if i % 10 != 0 else StringType() # Alternate between StringType and BooleanType
fields.append(StructField(field_name, field_type, nullable=True))
from pyspark.sql.functions import lit, col
tableName = "trips_table"
basePath = "file:///tmp/trips_table"
columns = ["ts","uuid","rider","driver","fare","city"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df330","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d721","rider-B","driver-L",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a00","rider-C","driver-M",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c","rider-C","driver-N",34.15,"sao_paulo")]
Testing cow_enable_metadata_nonpartitioned - 0.11.1 <> 0.15.0-rc1
Test Success - cow_enable_metadata_nonpartitioned_0_11_1_0_15_0-rc1
Testing mor_disable_metadata_nonpartitioned - 0.11.1 <> 0.15.0-rc1
Test Success - mor_disable_metadata_nonpartitioned_0_11_1_0_15_0-rc1
Testing mor_enable_metadata_partitioned - 0.11.1 <> 0.15.0-rc1
Test Success - mor_enable_metadata_partitioned_0_11_1_0_15_0-rc1
Testing mor_disable_metadata_partitioned - 0.11.1 <> 0.15.0-rc1
Test Success - mor_disable_metadata_partitioned_0_11_1_0_15_0-rc1
Testing cow_disable_metadata_partitioned - 0.11.1 <> 0.15.0-rc1
Test Success - cow_disable_metadata_partitioned_0_11_1_0_15_0-rc1
[ec2-user@ip-10-0-78-189 ~]$ spark-3.3.4-bin-hadoop3/bin/spark-submit --master local \
> --jars ${JAR_PATH}/hadoop-aws-3.2.0.jar,aws-java-sdk-bundle-1.11.375.jar,"${JAR_PATH}/hudi-spark${SPARK_VERSION}-bundle_2.12-${HUDI_VERSION}.jar,${JAR_PATH}/hudi-datahub-sync-bundle-${HUDI_VERSION}.jar" \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> ${JAR_PATH}/hudi-utilities-slim-bundle_2.12-$HUDI_VERSION.jar \
> --target-base-path ${DATA_PATH}/stocks/data/target/${NOW} \
> --target-table stocks${NOW} \
> --table-type COPY_ON_WRITE \
> --base-file-format PARQUET \
> --props ${DATA_PATH}/stocks/configs/hoodie.properties \
> --source-class org.apache.hudi.utilities.sources.JsonDFSSource \
DROP TABLE issue_11212;
set hoodie.spark.sql.insert.into.operation=bulk_insert;
CREATE TABLE issue_11212 (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
tableName = "record_index_deepdive"
basePath = "/tmp/" + tableName
columns = ["ts","uuid","rider","driver","fare","city"]
data =[(1695159649087,"334e26e9-8355-45cc-97c6-c31daf0df3301","rider-A","driver-K",19.10,"san_francisco"),
(1695091554788,"e96c4396-3fad-413a-a942-4cb36106d7212","rider-C","driver-M",27.70 ,"san_francisco"),
(1695046462179,"9909a8b1-2d15-4d3d-8ec9-efc48c536a003","rider-D","driver-L",33.90 ,"san_francisco"),
(1695516137016,"e3cf430c-889d-4015-bc98-59bdce1e530c4","rider-F","driver-P",34.15,"sao_paulo"),
(1695115999911,"c8abbe79-8d89-47ea-b4ce-4d224bae5bfa5","rider-J","driver-T",17.85,"chennai")]
inserts = spark.createDataFrame(data).toDF(*columns)
CREATE TABLE merge_source (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING PARQUET;
INSERT INTO merge_source
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import col
def initialize_spark_session():
"""
Initialize and return a Spark session configured for Hudi.
"""
spark = SparkSession.builder \
.appName("Hudi Incremental Read Example") \
.config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.DataFrame
// Function to set up Hudi options
def setupHudiOptions(tableName: String, basePath: String) = {
spark.sql(
s"""
| create table $tableName (
| id int,
| name string,
| price double,
| ts long,
| dt date
| ) using hudi
| tblproperties (