-
-
Save ijan10/e14b013819ad2536b30097054dfdcdaa to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark = SparkSession.builder.appName('ctr_model_v3_train').enableHiveSupport().getOrCreate() | |
spark.conf.set("spark.sql.execution.arrow.enabled", "true") | |
""" | |
Python UDF functions are broadcast to executers, files that are being import inside the UDF must be added also to executers: | |
""" | |
spark.sparkContext.addPyFile('./config.py') # Example of files that are part of the project and are used inside the udf. | |
spark.sparkContext.addPyFile('./data_preparation.py') | |
""" | |
Variables that declered inside main (master node) and are being used inside eexcuters, | |
need to being broadcast to all executers as bellow | |
""" | |
command_args = spark.sparkContext.broadcast(parser.parse_args()) # env and running folder | |
""" | |
Make spark transformation: group by/partition by selected feature and apply each partition to pandas_udf function | |
""" | |
results = df_all_data.groupBy(args.model_selected_feature_name).apply(pandas_udf_ctr_create_model) | |
results.write.mode("overwrite").parquet(os.path.join("s3://ramp-optimization-dev/ctr/temp_results.parquet") |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment