Last active
June 3, 2022 16:28
-
-
Save Ackuq/769c777988be0cba9a10318d228865be to your computer and use it in GitHub Desktop.
Experiment used to achieve benchmarks for executing point-in-time joins in Apache Spark using the Yelp dataset.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Hops | |
import hsfs | |
from hops import hdfs | |
# Spark | |
from pyspark.sql import DataFrame, SQLContext | |
from pyspark.sql.functions import col, lit | |
# Implementations | |
from hops.pit import PitContext | |
# SparkMeasure | |
from sparkmeasure import StageMetrics | |
connection = hsfs.connection() | |
fs = connection.get_feature_store() | |
# Storage connector to s3 | |
sc = fs.get_storage_connector(...) | |
sc.prepare_spark() | |
spark.sql("use {}".format(hdfs.project_name())) | |
sql_context = SQLContext(spark.sparkContext) | |
pit_context = PitContext(sql_context) | |
stage_metrics = StageMetrics(spark) | |
# | |
# Experiment definitions | |
# | |
def exploding_experiment(left: DataFrame, right: DataFrame): | |
stage_metrics.begin() | |
pit = pit_context.exploding( | |
left=left, | |
right=right, | |
left_ts_column=left["ts"], | |
right_ts_column=right["ts"], | |
partition_cols = [(left["business_id"], right["business_id"])], | |
) | |
pit.persist() | |
pit.count() | |
stage_metrics.end() | |
pit.unpersist() | |
def union_experiment(left: DataFrame, right: DataFrame): | |
stage_metrics.begin() | |
pit = pit_context.union( | |
left=left, | |
right=right, | |
right_prefix="right_", | |
left_ts_column = "ts", | |
right_ts_column = "ts", | |
partition_cols=["business_id"], | |
) | |
pit.persist() | |
pit.count() | |
stage_metrics.end() | |
pit.unpersist() | |
def warmup(left: DataFrame, right: DataFrame): | |
left.join( | |
right, | |
pit_context.pit_udf(left["ts"], right["ts"]) & (left["business_id"] == right["business_id"]) | |
).count() | |
pit_context.union( | |
left=left, | |
right=right, | |
right_prefix="right_", | |
left_ts_column = "ts", | |
right_ts_column = "ts", | |
partition_cols=["business_id"], | |
).count() | |
pit_context.exploding( | |
left=left, | |
right=right, | |
left_ts_column=left["ts"], | |
right_ts_column=right["ts"], | |
partition_cols = [(left["business_id"], right["business_id"])], | |
).count() | |
# | |
# Experiment configuraiton | |
# | |
FUNCTIONS = [exploding_experiment, union_experiment, early_stop_sort_merge] | |
NO_RUNS = 10 | |
S3_BASE_DIRECTORY = ... | |
OUTPUT_PATH = ... | |
def do_run(left, right, function, output_path): | |
combined_df = None | |
for run in range(NO_RUNS): | |
function(left, right) | |
_stage_metrics_df = stage_metrics.create_stagemetrics_DF() | |
aggregated_df = stage_metrics.aggregate_stagemetrics_DF().withColumn("runNumber", lit(run + 1)) | |
if combined_df is None: | |
combined_df = aggregated_df | |
else: | |
combined_df = combined_df.union(aggregated_df) | |
combined_df.repartition(1).write.mode("overwrite").option("header", True).csv(output_path) | |
def prepare(dataset): | |
dataset_path = S3_BASE_DIRECTORY + dataset | |
left = spark.read.format("parquet").load(dataset_path + "/tips.parquet").persist() | |
left.count() | |
right = spark.read.format("parquet").load(dataset_path + "/checkins.parquet").persist() | |
right.count() | |
return left, right | |
# | |
# Run experiments | |
# | |
dataset = "/yelp" | |
left, right = prepare(dataset) | |
warmup(left, right) | |
for function in FUNCTIONS: | |
output_path = OUTPUT_PATH + "/{}/{}.csv".format(dataset, function.__name__) | |
do_run(left, right, function, output_path) | |
left.unpersist() | |
right.unpersist() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment