Skip to content

Instantly share code, notes, and snippets.

CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
PARTITIONED BY (city);
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, BooleanType, TimestampType
from pyspark.sql import Row
import os
from datetime import datetime
# Base path for Hudi table - use absolute path
base_path = os.path.abspath("simple_hudi_table")
# Common Hudi options for all commits
hudi_options = {
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import Row
import uuid
import datetime
def create_spark_session():
"""
Creates and configures a Spark session with Hudi support.
"""
from pyspark.sql import SparkSession
import datetime
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType
from pyspark.sql.functions import col, current_timestamp
def generate_sample_data(batch_id=1):
data = [
("ORD001", "PROD-1", "CUST-1", "LOC-1", "STORE-1", "DEPT-1", "EMP-1", 5, 500.0, "Electronics", "North", str(datetime.datetime.now()), "HIGH", "ONLINE", "CREDIT", "COMPLETED"),
("ORD002", "PROD-2", "CUST-2", "LOC-2", "STORE-2", "DEPT-2", "EMP-2", 2, 150.0, "Clothing", "South", str(datetime.datetime.now()), "MEDIUM", "IN-STORE", "CASH", "PENDING"),
("ORD003", "PROD-3", "CUST-1", "LOC-3", "STORE-3", "DEPT-3", "EMP-3", 3, 300.0, "Electronics", "North", str(datetime.datetime.now()), "LOW", "MOBILE", "DEBIT", "COMPLETED"),
spark-shell --deploy-mode client --executor-cores 3 --conf "spark.serializer=org.apache.spark.serializer.KryoSerializer" --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog" --conf "spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension" --conf "spark.sql.hive.convertMetastoreParquet=false" --conf "spark.speculation=true" --conf "spark.speculation.interval=100ms" --conf "spark.speculation.multiplier=1.5" --conf "spark.speculation.quantile=0.9" --packages org.apache.hudi:hudi-spark3.3-bundle_2.12:0.15.0 -i Spark_Hudi_Speculation.scala
import org.apache.spark.sql.{SparkSession, DataFrame}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.util.Random
import org.slf4j.LoggerFactory
import java.util.UUID
import scala.util.Random
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.common.table.HoodieTableConfig._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.keygen.constant.KeyGeneratorOptions._
import org.apache.hudi.common.model.HoodieRecord
CREATE TABLE hudi_table (
ts BIGINT,
uuid STRING,
rider STRING,
driver STRING,
fare DOUBLE,
city STRING
) USING HUDI
PARTITIONED BY (city)
OPTIONS (
create database MIT_partition_pruning5;
use MIT_partition_pruning5;
CREATE TABLE merge_source (
ts BIGINT,
uuid STRING,
fare DOUBLE,
city STRING
) USING PARQUET;
INSERT INTO merge_source
~/spark/spark-3.4.1-bin-hadoop3/bin/spark-shell --packages org.apache.hudi:hudi-spark$SPARK_VERSION-bundle_2.12:0.14.1 --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
import org.apache.hudi.QuickstartUtils._
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import scala.collection.JavaConversions._
import scala.io.Source
import org.apache.log4j.Logger
import org.apache.log4j.Level
import org.apache.hudi.client.common.HoodieSparkEngineContext
schema = StructType([
StructField('_sdc_batched_at', StringType(), True),
StructField('_sdc_received_at', StringType(), True),
StructField('_sdc_record_hash', StringType(), True),
StructField('_sdc_sequence', StringType(), True),
StructField('_sdc_table_version', StringType(), True),
StructField('amount_foreign_linked', StringType(), True),
StructField('amount_linked', StringType(), True),
StructField('applied_date_posted', StringType(), True),
StructField('applied_transaction_id', StringType(), True),