Skip to content

Instantly share code, notes, and snippets.

@ad1happy2go
Created July 9, 2024 04:22
Show Gist options
  • Save ad1happy2go/e9a9740c86d139876005e59a2b897706 to your computer and use it in GitHub Desktop.
Save ad1happy2go/e9a9740c86d139876005e59a2b897706 to your computer and use it in GitHub Desktop.
num_cols = 156 # Adjust this based on your actual number of columns
# Create an empty list to hold the StructFields
fields = []
# Generate StructFields in a loop
for i in range(1, num_cols + 1):
field_name = f"col{i}"
field_type = StringType() if i % 10 != 0 else StringType() # Alternate between StringType and BooleanType
fields.append(StructField(field_name, field_type, nullable=True))
# Create the StructType
struct_type = StructType(fields)
# Define the schema
schema = StructType([
StructField("keycol", LongType(), nullable=True),
StructField("dateCol", TimestampType(), nullable=True),
StructField("colA", StringType(), nullable=True),
StructField("colB", StringType(), nullable=True),
StructField("colC", StringType(), nullable=True),
StructField("structCol", struct_type, nullable=False),
StructField("partitionKey", LongType(), nullable=True)
])
from datetime import datetime
# Generate sample data
data = []
for i in range(1, 3): # Generating 2 rows for example
row_data = (
i,
datetime.strptime("2023-01-01 00:00:00", "%Y-%m-%d %H:%M:%S"),
f"ValueA_{i}",
f"ValueB_{i}",
f"ValueC_{i}",
tuple(f"Struct{j}_{i}" for j in range(1, num_cols + 1)), # Generating structCol dynamically
1000 + i
)
data.append(row_data)
rows = []
for item in data:
row = Row(id=item[0], timestamp=item[1], field1=item[2], field2=item[3], field3=item[4],
structCol=Row(*item[5]), value=item[6])
rows.append(row)
# Create DataFrame
df = spark.createDataFrame(rows, schema)
for i in range(1, num_cols + 1):
field_name = f"col{i}"
df = df.withColumn(field_name, col("structCol." + field_name))
hoodi_configs = {
"hoodie.datasource.write.recordkey.field": "keycol",
"hoodie.datasource.write.precombine.field": "keycol",
"hoodie.datasource.write.partitionpath.field": "partitionKey",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.table.name": "huditransaction",
}
spark.sparkContext.setLogLevel("WARN")
df.write.format("hudi").options(**hoodi_configs).mode("overwrite").save(PATH)
spark.read.options(**hoodi_configs).format("hudi").load(PATH).select("_hoodie_record_key").show(10,False)
df.write.format("hudi").options(**hoodi_configs).mode("append").save(PATH)
spark.read.options(**hoodi_configs).format("hudi").load(PATH).select("_hoodie_record_key").show(10,False)
# Show the DataFrame
df.show(truncate=False)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment