Last active
December 5, 2018 21:42
-
-
Save kittipatkampa/64d4829cbd9efff1b40e4199251cf7c3 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
spark = SparkSession.builder \ | |
.appName("Confidence Model") \ | |
.enableHiveSupport() \ | |
.getOrCreate() | |
# I told spark to use dir called `checkpoint` to | |
# store checkpoints. | |
sc = spark.sparkContext | |
sc.setCheckpointDir('checkpoint') | |
#... some other things | |
# Feature engineering section | |
# assuming lot more features to be computed and added here. | |
# Or there are so many JOINs. | |
window = Window.partitionBy('id') | |
feature_profile_df = feature_profile_df\ | |
.withColumn("crazy_logic_1", "crazy logics here")\ | |
.withColumn("crazy_logic_2", "crazy logics here")\ | |
.join(other_df, "id", "outer")\ | |
#...lots of them coming here | |
.withColumn("crazy_logic_1000", "crazy logics here")\ | |
# I'm sure the DAG size is pretty big now. | |
# add checkpoint here to consolidate the DAG size in the next step. | |
feature_profile_df.checkpoint() | |
# check the size of DAG | |
# Use extended=True to appreciate the full length of physical plan | |
feature_profile_df.explain(extended=True) | |
# Now you can use the data to train model. | |
# Without feature_profile_df.checkpoint() in line 22 | |
# this may results in StackOverflow error. | |
model = pipeline.fit(feature_profile_df) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment