Skip to content

Instantly share code, notes, and snippets.

@Ackuq
Last active June 3, 2022 16:28
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save Ackuq/98eb8e42537634e007154aa60fed5bbf to your computer and use it in GitHub Desktop.
Save Ackuq/98eb8e42537634e007154aa60fed5bbf to your computer and use it in GitHub Desktop.
# Hops
import hsfs
from hops import hdfs
# Spark
from pyspark.sql import DataFrame, SQLContext
from pyspark.sql.functions import col, lit
# Implementations
from hops.pit import PitContext
# SparkMeasure
from sparkmeasure import StageMetrics
connection = hsfs.connection()
fs = connection.get_feature_store()
# Storage connector setup
sc = fs.get_storage_connector(...)
sc.prepare_spark()
sql_context = SQLContext(spark.sparkContext)
pit_context = PitContext(sql_context)
stage_metrics = StageMetrics(spark)
#
# Experiment function definitions
#
def exploding_experiment(left: DataFrame, right: DataFrame):
stage_metrics.begin()
pit = pit_context.exploding(
left=left,
right=right,
left_ts_column=left["ts"],
right_ts_column=right["ts"],
partition_cols = [(left["id"], right["id"])],
)
pit.persist()
pit.count()
stage_metrics.end()
pit.unpersist()
def union_experiment(left: DataFrame, right: DataFrame):
stage_metrics.begin()
pit = pit_context.union(
left=left,
right=right,
right_prefix="right_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
)
pit.persist()
pit.count()
stage_metrics.end()
pit.unpersist()
def early_stop_sort_merge(left: DataFrame, right: DataFrame):
stage_metrics.begin()
pit = left.join(
right,
pit_context.pit_udf(left["ts"], right["ts"]) & (left["id"] == right["id"])
)
pit.persist()
pit.count()
stage_metrics.end()
pit.unpersist()
def warmup(left: DataFrame, right: DataFrame):
left.join(
right,
pit_context.pit_udf(left["ts"], right["ts"]) & (left["id"] == right["id"])
).count()
pit_context.union(
left=left,
right=right,
right_prefix="right_",
left_ts_column = "ts",
right_ts_column = "ts",
partition_cols=["id"],
).count()
pit_context.exploding(
left=left,
right=right,
left_ts_column=left["ts"],
right_ts_column=right["ts"],
partition_cols = [(left["id"], right["id"])],
).count()
#
# Experiment configuration
#
FUNCTIONS = [exploding_experiment, union_experiment, early_stop_sort_merge]
DATASETS =[
"/{}-1_year".format(size)
for size in [
10_000,
100_000,
1_000_000,
10_000_000
]
]
NO_RUNS = 10
DATASET_STRUCTURE = [
"/sorted-asc",
"/sorted-desc",
"/sorted-rand"
]
S3_BASE_DIRECTORY = ...
OUTPUT_PATH = ...
#
# Experiment runner
#
def do_run(left, right, function, output_path):
combined_df = None
for run in range(NO_RUNS):
function(left, right)
_stage_metrics_df = stage_metrics.create_stagemetrics_DF()
aggregated_df = stage_metrics.aggregate_stagemetrics_DF().withColumn("runNumber", lit(run + 1))
if combined_df is None:
combined_df = aggregated_df
else:
combined_df = combined_df.union(aggregated_df)
combined_df.repartition(1).write.mode("overwrite").option("header", True).csv(output_path)
def prepare(structure, dataset):
dataset_path = S3_BASE_DIRECTORY + structure + dataset
left = spark.read.format("parquet").load(dataset_path + "/left.parquet").persist()
left.count()
right = spark.read.format("parquet").load(dataset_path + "/right.parquet").persist()
right.count()
return left, right
for structure in DATASET_STRUCTURE:
for dataset in DATASETS:
left, right = prepare(structure, dataset)
warmup(left, right)
for function in FUNCTIONS:
output_path = OUTPUT_PATH + "/{}/{}/{}.csv".format(structure, dataset, function.__name__)
do_run(left, right, function, output_path)
left.unpersist()
right.unpersist()
#
# Run bucketing experiments
#
def prepare_bucketed(structure, dataset, buckets):
dataset_path = S3_BASE_DIRECTORY + structure + dataset
left_raw = spark.read.format("parquet").load(dataset_path + "/left.parquet")
left_raw.write.bucketBy(buckets, "id").mode("overwrite").saveAsTable("left")
left = spark.table("left").persist()
left.count()
right_raw = spark.read.format("parquet").load(dataset_path + "/right.parquet")
right_raw.write.bucketBy(buckets, "id").mode("overwrite").saveAsTable("right")
right = spark.table("right").persist()
right.count()
return left, right
# Three different bucket sizes
BUCKETS = [20, 40, 80, 160]
# Only sorted ascending
structure = DATASET_STRUCTURE[0]
for buckets in BUCKETS:
for dataset in DATASETS:
left, right = prepare_bucketed(structure, dataset, buckets)
warmup(left, right)
for function in FUNCTIONS:
output_path = OUTPUT_PATH + "/{}_bucketed_{}/{}/{}.csv".format(structure, buckets, dataset, function.__name__)
do_run(left, right, function, output_path)
left.unpersist()
right.unpersist()
spark.sql("DROP TABLE IF EXISTS left")
spark.sql("DROP TABLE IF EXISTS right")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment