Created
May 23, 2023 16:09
-
-
Save ad1happy2go/7d982bc6e137b56ce6e6f18bdb62fd03 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
======GLUE SCRIPT========= | |
import sys | |
import os | |
from pyspark.context import SparkContext | |
from pyspark.sql.session import SparkSession | |
from awsglue.context import GlueContext | |
from awsglue.job import Job | |
from awsglue.dynamicframe import DynamicFrame | |
from pyspark.sql.functions import col, to_timestamp, monotonically_increasing_id, to_date, when | |
from pyspark.sql.functions import * | |
from awsglue.utils import getResolvedOptions | |
from pyspark.sql.types import * | |
from datetime import datetime, date | |
import boto3 | |
from functools import reduce | |
from pyspark.sql import Row | |
import uuid | |
from faker import Faker | |
spark = (SparkSession.builder.getOrCreate()) | |
sc = spark.sparkContext | |
glueContext = GlueContext(sc) | |
job = Job(glueContext) | |
logger = glueContext.get_logger() | |
# =================================INSERTING DATA ===================================== | |
global faker | |
faker = Faker() | |
class DataGenerator(object): | |
@staticmethod | |
def get_data(): | |
return [ | |
( | |
uuid.uuid4().__str__(), | |
faker.name(), | |
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')), | |
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')), | |
str(faker.random_int(min=10000, max=150000)), | |
str(faker.random_int(min=18, max=60)), | |
str(faker.random_int(min=0, max=100000)), | |
str(faker.unix_time()), | |
faker.email(), | |
faker.credit_card_number(card_type='amex'), | |
) for x in range(100) | |
] | |
data = DataGenerator.get_data() | |
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card"] | |
spark_df = spark.createDataFrame(data=data, schema=columns) | |
# ============================== Settings ======================================= | |
db_name = "hudidb" | |
table_name = "issue_7191" | |
recordkey = 'emp_id' | |
precombine = "ts" | |
PARTITION_FIELD = 'state' | |
path = "s3://hudi-github-issues/output/issue_7191/" | |
method = 'upsert' | |
table_type = "MERGE_ON_READ" | |
# ==================================================================================== | |
hudi_part_write_config = { | |
'className': 'org.apache.hudi', | |
"hoodie.schema.on.read.enable":"true", | |
"hoodie.datasource.write.reconcile.schema":"true", | |
"hoodie.avro.schema.external.transformation":"true", | |
'hoodie.avro.schema.validate':"true", | |
"hoodie.datasource.write.schema.allow.auto.evolution.column.drop":"true", | |
"hoodie.datasource.write.partitionpath.field" : PARTITION_FIELD , | |
'hoodie.table.name': table_name, | |
'hoodie.datasource.write.table.type': table_type, | |
'hoodie.datasource.write.operation': method, | |
'hoodie.datasource.write.recordkey.field': recordkey, | |
'hoodie.datasource.write.precombine.field': precombine, | |
'hoodie.datasource.hive_sync.mode': 'hms', | |
'hoodie.datasource.hive_sync.enable': 'true', | |
'hoodie.datasource.hive_sync.use_jdbc': 'false', | |
'hoodie.datasource.hive_sync.support_timestamp': 'false', | |
'hoodie.datasource.hive_sync.database': db_name, | |
'hoodie.datasource.hive_sync.table': table_name, | |
} | |
spark_df.write.format("hudi").options(**hudi_part_write_config).mode("append").save(path) | |
# ================================================================ | |
# Adding NEW COLUMN | |
# ================================================================ | |
class DataGenerator(object): | |
@staticmethod | |
def get_data(): | |
return [ | |
( | |
uuid.uuid4().__str__(), | |
faker.name(), | |
faker.random_element(elements=('IT', 'HR', 'Sales', 'Marketing')), | |
faker.random_element(elements=('CA', 'NY', 'TX', 'FL', 'IL', 'RJ')), | |
str(faker.random_int(min=10000, max=150000)), | |
str(faker.random_int(min=18, max=60)), | |
str(faker.random_int(min=0, max=100000)), | |
str(faker.unix_time()), | |
faker.email(), | |
faker.credit_card_number(card_type='amex'), | |
faker.date().__str__() | |
) for x in range(100) | |
] | |
data = DataGenerator.get_data() | |
columns = ["emp_id", "employee_name", "department", "state", "salary", "age", "bonus", "ts", "email", "credit_card", | |
"new_date_col"] | |
spark_df = spark.createDataFrame(data=data, schema=columns) | |
spark_df.withColumn("state",expr("CAST(NULL AS STRING)")).write.format("hudi").options(**hudi_part_write_config).mode("append").save(path) | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment