Created
September 4, 2020 05:44
-
-
Save casparjespersen/99506c648f872ed7b1b97bcfcdcb41f9 to your computer and use it in GitHub Desktop.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
from time import time | |
from functools import reduce | |
from random import seed, randint | |
from pprint import pprint | |
import numpy as np | |
from pyspark.sql import DataFrame, SparkSession, Row | |
from pyspark.sql.functions import get_json_object, from_json, sum | |
from pyspark.sql.types import StructType, IntegerType | |
# Config | |
iter_params = "abcdefgh" | |
num_experiments = 500 | |
num_rows = 100 | |
row_size = 100000 | |
# Generate dataset | |
def _generate_data(): | |
for i in range(num_rows): | |
yield Row(data=json.dumps({ | |
"my": { | |
"param": {k: randint(0, 10) for k in iter_params}, | |
"other": "0" * row_size | |
} | |
})) | |
def _method_a(): | |
""" Retrieve all parameters through get_json_object and casting. | |
""" | |
params = [get_json_object("data", f"$.my.param.{k}").cast("int") for k in iter_params] | |
my_sum = reduce(lambda x, y: x + y, params) | |
return my_sum | |
def _method_b(): | |
""" Retrieve all parameters through schema enforcing. | |
""" | |
schema = StructType() | |
for k in iter_params: | |
schema = schema.add(k, IntegerType()) | |
params = from_json(get_json_object("data", "$.my.param"), schema) | |
fields = [params[k] for k in iter_params] | |
my_sum = reduce(lambda x, y: x + y, fields) | |
return my_sum | |
spark = SparkSession.builder.getOrCreate() | |
def _experiment(column): | |
sdf = spark.createDataFrame(_generate_data()) | |
tic = time() | |
rows = sdf.select(column.alias("result")).agg(sum("result")).collect() | |
toc = time() | |
duration = toc - tic | |
return duration | |
def _experiment_outer(column): | |
times = [_experiment(column) for _ in range(num_experiments)] | |
times = np.array(times) | |
return np.mean(times[times.size//2:]) | |
results = {method: _experiment_outer(method()) for method in (_method_a, _method_b)} | |
pprint(results) | |
#stdout: | |
#{<function _method_b at 0x7fb73d30e200>: 0.16155768394470216, | |
# <function _method_a at 0x7fb73d30e320>: 0.22132105827331544} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Should be: