Last active
June 3, 2022 16:28
-
-
Save Ackuq/98eb8e42537634e007154aa60fed5bbf to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# Hops | |
import hsfs | |
from hops import hdfs | |
# Spark | |
from pyspark.sql import DataFrame, SQLContext | |
from pyspark.sql.functions import col, lit | |
# Implementations | |
from hops.pit import PitContext | |
# SparkMeasure | |
from sparkmeasure import StageMetrics | |
connection = hsfs.connection() | |
fs = connection.get_feature_store() | |
# Storage connector setup | |
sc = fs.get_storage_connector(...) | |
sc.prepare_spark() | |
sql_context = SQLContext(spark.sparkContext) | |
pit_context = PitContext(sql_context) | |
stage_metrics = StageMetrics(spark) | |
# | |
# Experiment function definitions | |
# | |
def exploding_experiment(left: DataFrame, right: DataFrame): | |
stage_metrics.begin() | |
pit = pit_context.exploding( | |
left=left, | |
right=right, | |
left_ts_column=left["ts"], | |
right_ts_column=right["ts"], | |
partition_cols = [(left["id"], right["id"])], | |
) | |
pit.persist() | |
pit.count() | |
stage_metrics.end() | |
pit.unpersist() | |
def union_experiment(left: DataFrame, right: DataFrame): | |
stage_metrics.begin() | |
pit = pit_context.union( | |
left=left, | |
right=right, | |
right_prefix="right_", | |
left_ts_column = "ts", | |
right_ts_column = "ts", | |
partition_cols=["id"], | |
) | |
pit.persist() | |
pit.count() | |
stage_metrics.end() | |
pit.unpersist() | |
def early_stop_sort_merge(left: DataFrame, right: DataFrame): | |
stage_metrics.begin() | |
pit = left.join( | |
right, | |
pit_context.pit_udf(left["ts"], right["ts"]) & (left["id"] == right["id"]) | |
) | |
pit.persist() | |
pit.count() | |
stage_metrics.end() | |
pit.unpersist() | |
def warmup(left: DataFrame, right: DataFrame): | |
left.join( | |
right, | |
pit_context.pit_udf(left["ts"], right["ts"]) & (left["id"] == right["id"]) | |
).count() | |
pit_context.union( | |
left=left, | |
right=right, | |
right_prefix="right_", | |
left_ts_column = "ts", | |
right_ts_column = "ts", | |
partition_cols=["id"], | |
).count() | |
pit_context.exploding( | |
left=left, | |
right=right, | |
left_ts_column=left["ts"], | |
right_ts_column=right["ts"], | |
partition_cols = [(left["id"], right["id"])], | |
).count() | |
# | |
# Experiment configuration | |
# | |
FUNCTIONS = [exploding_experiment, union_experiment, early_stop_sort_merge] | |
DATASETS =[ | |
"/{}-1_year".format(size) | |
for size in [ | |
10_000, | |
100_000, | |
1_000_000, | |
10_000_000 | |
] | |
] | |
NO_RUNS = 10 | |
DATASET_STRUCTURE = [ | |
"/sorted-asc", | |
"/sorted-desc", | |
"/sorted-rand" | |
] | |
S3_BASE_DIRECTORY = ... | |
OUTPUT_PATH = ... | |
# | |
# Experiment runner | |
# | |
def do_run(left, right, function, output_path): | |
combined_df = None | |
for run in range(NO_RUNS): | |
function(left, right) | |
_stage_metrics_df = stage_metrics.create_stagemetrics_DF() | |
aggregated_df = stage_metrics.aggregate_stagemetrics_DF().withColumn("runNumber", lit(run + 1)) | |
if combined_df is None: | |
combined_df = aggregated_df | |
else: | |
combined_df = combined_df.union(aggregated_df) | |
combined_df.repartition(1).write.mode("overwrite").option("header", True).csv(output_path) | |
def prepare(structure, dataset): | |
dataset_path = S3_BASE_DIRECTORY + structure + dataset | |
left = spark.read.format("parquet").load(dataset_path + "/left.parquet").persist() | |
left.count() | |
right = spark.read.format("parquet").load(dataset_path + "/right.parquet").persist() | |
right.count() | |
return left, right | |
for structure in DATASET_STRUCTURE: | |
for dataset in DATASETS: | |
left, right = prepare(structure, dataset) | |
warmup(left, right) | |
for function in FUNCTIONS: | |
output_path = OUTPUT_PATH + "/{}/{}/{}.csv".format(structure, dataset, function.__name__) | |
do_run(left, right, function, output_path) | |
left.unpersist() | |
right.unpersist() | |
# | |
# Run bucketing experiments | |
# | |
def prepare_bucketed(structure, dataset, buckets): | |
dataset_path = S3_BASE_DIRECTORY + structure + dataset | |
left_raw = spark.read.format("parquet").load(dataset_path + "/left.parquet") | |
left_raw.write.bucketBy(buckets, "id").mode("overwrite").saveAsTable("left") | |
left = spark.table("left").persist() | |
left.count() | |
right_raw = spark.read.format("parquet").load(dataset_path + "/right.parquet") | |
right_raw.write.bucketBy(buckets, "id").mode("overwrite").saveAsTable("right") | |
right = spark.table("right").persist() | |
right.count() | |
return left, right | |
# Three different bucket sizes | |
BUCKETS = [20, 40, 80, 160] | |
# Only sorted ascending | |
structure = DATASET_STRUCTURE[0] | |
for buckets in BUCKETS: | |
for dataset in DATASETS: | |
left, right = prepare_bucketed(structure, dataset, buckets) | |
warmup(left, right) | |
for function in FUNCTIONS: | |
output_path = OUTPUT_PATH + "/{}_bucketed_{}/{}/{}.csv".format(structure, buckets, dataset, function.__name__) | |
do_run(left, right, function, output_path) | |
left.unpersist() | |
right.unpersist() | |
spark.sql("DROP TABLE IF EXISTS left") | |
spark.sql("DROP TABLE IF EXISTS right") | |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment