Created
April 11, 2019 08:45
-
-
Save byaminov/7a4b356771e389bd9c873dd63ca52b6f to your computer and use it in GitHub Desktop.
Running Spark benchmarks to compare its string operations with Vaex and Pandas
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
""" | |
Benchark ran on my laptop: | |
spark-submit --master local[*] benchmarks/strings-spark.py | |
To run it: | |
* Download and install Spark 2.4.0 (https://spark.apache.org/downloads.html) | |
* Run the Vaex & Pandas benchmark (https://github.com/vaexio/vaex/blob/master/benchmarks/strings.py), | |
the test.parquet file will be created | |
* Set `args_n` constant in this script to the same value you used for `n` variable, | |
e.g. `python strings.py -n8`. | |
* Start the Spark job: `spark-submit --master local[*] benchmarks/strings-spark.py`, | |
assuming you have spark-submit in your PATH already. | |
The results will be saved into a file timings-spark.json and also merged into file | |
timings.json if it's present. | |
""" | |
from pyspark.sql import SparkSession | |
import pyspark.sql.functions as sf | |
import timeit | |
import json | |
import os | |
timings = {} | |
args_n = 8 | |
def test(name, expr): | |
# make an aggregation with a count of only one column: this seems to force | |
# spark to actually run the expression on all the rows. If you e.g. just | |
# do `sf.count('*')` then it just skips the expression and counts the rows in the | |
# parquet file, giving the result immediately. | |
spark_expr = f"df.select({expr}.alias('r')).agg(sf.count('r')).show()" if '.count()' not in expr else expr | |
N = 3 | |
t = timeit.Timer(spark_expr, globals={'df': df, 'sf': sf}) | |
print(name, expr) | |
t_spark = min(t.repeat(2, N))/N/(10**args_n) | |
# t_spark = t_spark * 1000000 # millis | |
print("\tspark", t_spark) | |
timings[name] = {'spark': t_spark} | |
if __name__ == '__main__': | |
spark = SparkSession.builder.appName("benchmark-strings").getOrCreate() | |
df = spark.read.parquet('test.parquet') | |
# test('capitalize', 'df.s.str.capitalize()') # not there | |
test('cat', 'sf.concat(df.s, df.s)') | |
test('contains', 'df.s.contains("9")') | |
test('contains(regex)', 'df.s.rlike(".*9.*")') | |
test('count', 'df.filter(df.s.rlike(".*9.*")).count()') | |
test('endswith', 'df.s.endswith("9")') | |
# test('find', 'df.s.str.find("4")') # not there | |
test('get', 'df.s.substr(1, 1)') | |
test('split+join', 'sf.array_join(sf.split(df.s, "\\."), "-")') | |
test('len', 'sf.length(df.s)') | |
test('ljust', 'sf.rpad(df.s, 10, " ")') | |
test('lower', 'sf.lower(df.s)') | |
test('lstrip', 'sf.regexp_replace(df.s, "^9*", "")') | |
test('match', 'df.s.rlike("1.*")') | |
test('pad', 'sf.lpad(df.s, 10, " ")') | |
test('repeat', 'sf.repeat(df.s, 2)') | |
test('replace(default)', 'sf.regexp_replace(df.s, "123", "321")') | |
# test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)') # not there | |
test('replace(regex)', 'sf.regexp_replace(df.s, "1?[45]4", "1004")') | |
# test('rfind', 'df.s.str.rfind("4")') # not there | |
test('rjust', 'sf.lpad(df.s, 10, " ")') | |
test('rstrip', 'sf.regexp_replace(df.s, "9*$", "")') | |
test('slice', 'df.s.substr(1, 2)') | |
test('split', 'sf.split(df.s, "\\.")') | |
test('startswith', 'df.s.startswith("9")') | |
test('strip', 'sf.regexp_replace(df.s, "0*$", "")') | |
# test('title', 'df.s.str.title()') # not there | |
test('upper', 'sf.upper(df.s)') | |
test('zfill', 'sf.lpad(df.s, 10, "0")') | |
fn = "timings-spark.json" | |
with open(fn, "w") as f: | |
json.dump(timings, f, indent=4) | |
print('write', fn) | |
print('spark timings:', timings) | |
# merge Spark timings into the overall ones | |
overall_timings = {} | |
fn = "timings.json" | |
if os.path.exists(fn): | |
with open(fn, "r") as f: | |
overall_timings = json.load(f) | |
for name in overall_timings: | |
if name not in timings: | |
continue | |
overall_timings[name].update(timings[name]) | |
with open(fn, "w") as f: | |
json.dump(overall_timings, f, indent=4) | |
spark.stop() |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment