Skip to content

Instantly share code, notes, and snippets.

@kittipatkampa
Last active December 5, 2018 21:42
Show Gist options
  • Save kittipatkampa/64d4829cbd9efff1b40e4199251cf7c3 to your computer and use it in GitHub Desktop.
Save kittipatkampa/64d4829cbd9efff1b40e4199251cf7c3 to your computer and use it in GitHub Desktop.
spark = SparkSession.builder \
.appName("Confidence Model") \
.enableHiveSupport() \
.getOrCreate()
# I told spark to use dir called `checkpoint` to
# store checkpoints.
sc = spark.sparkContext
sc.setCheckpointDir('checkpoint')
#... some other things
# Feature engineering section
# assuming lot more features to be computed and added here.
# Or there are so many JOINs.
window = Window.partitionBy('id')
feature_profile_df = feature_profile_df\
.withColumn("crazy_logic_1", "crazy logics here")\
.withColumn("crazy_logic_2", "crazy logics here")\
.join(other_df, "id", "outer")\
#...lots of them coming here
.withColumn("crazy_logic_1000", "crazy logics here")\
# I'm sure the DAG size is pretty big now.
# add checkpoint here to consolidate the DAG size in the next step.
feature_profile_df.checkpoint()
# check the size of DAG
# Use extended=True to appreciate the full length of physical plan
feature_profile_df.explain(extended=True)
# Now you can use the data to train model.
# Without feature_profile_df.checkpoint() in line 22
# this may results in StackOverflow error.
model = pipeline.fit(feature_profile_df)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment