Skip to content

Instantly share code, notes, and snippets.

@ijan10
Last active November 23, 2019 18:05
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save ijan10/e14b013819ad2536b30097054dfdcdaa to your computer and use it in GitHub Desktop.
Save ijan10/e14b013819ad2536b30097054dfdcdaa to your computer and use it in GitHub Desktop.
spark = SparkSession.builder.appName('ctr_model_v3_train').enableHiveSupport().getOrCreate()
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
"""
Python UDF functions are broadcast to executers, files that are being import inside the UDF must be added also to executers:
"""
spark.sparkContext.addPyFile('./config.py') # Example of files that are part of the project and are used inside the udf.
spark.sparkContext.addPyFile('./data_preparation.py')
"""
Variables that declered inside main (master node) and are being used inside eexcuters,
need to being broadcast to all executers as bellow
"""
command_args = spark.sparkContext.broadcast(parser.parse_args()) # env and running folder
"""
Make spark transformation: group by/partition by selected feature and apply each partition to pandas_udf function
"""
results = df_all_data.groupBy(args.model_selected_feature_name).apply(pandas_udf_ctr_create_model)
results.write.mode("overwrite").parquet(os.path.join("s3://ramp-optimization-dev/ctr/temp_results.parquet")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment