Skip to content

Instantly share code, notes, and snippets.

@Ackuq
Last active June 3, 2022 16:28
Show Gist options
  • Save Ackuq/769c777988be0cba9a10318d228865be to your computer and use it in GitHub Desktop.
Save Ackuq/769c777988be0cba9a10318d228865be to your computer and use it in GitHub Desktop.
Experiment used to achieve benchmarks for executing point-in-time joins in Apache Spark using the Yelp dataset.
# Hops
import hsfs
from hops import hdfs
# Spark
from pyspark.sql import DataFrame, SQLContext
from pyspark.sql.functions import col, lit
# Implementations
from hops.pit import PitContext
# SparkMeasure
from sparkmeasure import StageMetrics
connection = hsfs.connection()
fs = connection.get_feature_store()
# Storage connector to s3
sc = fs.get_storage_connector(...)
sc.prepare_spark()
spark.sql("use {}".format(hdfs.project_name()))
sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
stage_metrics = StageMetrics(spark)
#
# Experiment definitions
#
def exploding_experiment(left: DataFrame, right: DataFrame):
stage_metrics.begin()
pit = pit_context.exploding(
left=left,
right=right,
left_ts_column=left["ts"],
right_ts_column=right["ts"],
partition_cols = [(left["business_id"], right["business_id"])],
)
pit.persist()
pit.count()
stage_metrics.end()
pit.unpersist()
def union_experiment(left: DataFrame, right: DataFrame):
stage_metrics.begin()
pit = pit_context.union(
left=left,
right=right,
right_prefix="right_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["business_id"],
)
pit.persist()
pit.count()
stage_metrics.end()
pit.unpersist()
def warmup(left: DataFrame, right: DataFrame):
left.join(
right,
pit_context.pit_udf(left["ts"], right["ts"]) & (left["business_id"] == right["business_id"])
).count()
pit_context.union(
left=left,
right=right,
right_prefix="right_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["business_id"],
).count()
pit_context.exploding(
left=left,
right=right,
left_ts_column=left["ts"],
right_ts_column=right["ts"],
partition_cols = [(left["business_id"], right["business_id"])],
).count()
#
# Experiment configuraiton
#
FUNCTIONS = [exploding_experiment, union_experiment, early_stop_sort_merge]
NO_RUNS = 10
S3_BASE_DIRECTORY = ...
OUTPUT_PATH = ...
def do_run(left, right, function, output_path):
combined_df = None
for run in range(NO_RUNS):
function(left, right)
_stage_metrics_df = stage_metrics.create_stagemetrics_DF()
aggregated_df = stage_metrics.aggregate_stagemetrics_DF().withColumn("runNumber", lit(run + 1))
if combined_df is None:
combined_df = aggregated_df
else:
combined_df = combined_df.union(aggregated_df)
combined_df.repartition(1).write.mode("overwrite").option("header", True).csv(output_path)
def prepare(dataset):
dataset_path = S3_BASE_DIRECTORY + dataset
left = spark.read.format("parquet").load(dataset_path + "/tips.parquet").persist()
left.count()
right = spark.read.format("parquet").load(dataset_path + "/checkins.parquet").persist()
right.count()
return left, right
#
# Run experiments
#
dataset = "/yelp"
left, right = prepare(dataset)
warmup(left, right)
for function in FUNCTIONS:
output_path = OUTPUT_PATH + "/{}/{}.csv".format(dataset, function.__name__)
do_run(left, right, function, output_path)
left.unpersist()
right.unpersist()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment