Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Running Spark benchmarks to compare its string operations with Vaex and Pandas
"""
Benchark ran on my laptop:
spark-submit --master local[*] benchmarks/strings-spark.py
To run it:
* Download and install Spark 2.4.0 (https://spark.apache.org/downloads.html)
* Run the Vaex & Pandas benchmark (https://github.com/vaexio/vaex/blob/master/benchmarks/strings.py),
the test.parquet file will be created
* Set `args_n` constant in this script to the same value you used for `n` variable,
e.g. `python strings.py -n8`.
* Start the Spark job: `spark-submit --master local[*] benchmarks/strings-spark.py`,
assuming you have spark-submit in your PATH already.
The results will be saved into a file timings-spark.json and also merged into file
timings.json if it's present.
"""
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import timeit
import json
import os
timings = {}
args_n = 8
def test(name, expr):
# make an aggregation with a count of only one column: this seems to force
# spark to actually run the expression on all the rows. If you e.g. just
# do `sf.count('*')` then it just skips the expression and counts the rows in the
# parquet file, giving the result immediately.
spark_expr = f"df.select({expr}.alias('r')).agg(sf.count('r')).show()" if '.count()' not in expr else expr
N = 3
t = timeit.Timer(spark_expr, globals={'df': df, 'sf': sf})
print(name, expr)
t_spark = min(t.repeat(2, N))/N/(10**args_n)
# t_spark = t_spark * 1000000 # millis
print("\tspark", t_spark)
timings[name] = {'spark': t_spark}
if __name__ == '__main__':
spark = SparkSession.builder.appName("benchmark-strings").getOrCreate()
df = spark.read.parquet('test.parquet')
# test('capitalize', 'df.s.str.capitalize()') # not there
test('cat', 'sf.concat(df.s, df.s)')
test('contains', 'df.s.contains("9")')
test('contains(regex)', 'df.s.rlike(".*9.*")')
test('count', 'df.filter(df.s.rlike(".*9.*")).count()')
test('endswith', 'df.s.endswith("9")')
# test('find', 'df.s.str.find("4")') # not there
test('get', 'df.s.substr(1, 1)')
test('split+join', 'sf.array_join(sf.split(df.s, "\\."), "-")')
test('len', 'sf.length(df.s)')
test('ljust', 'sf.rpad(df.s, 10, " ")')
test('lower', 'sf.lower(df.s)')
test('lstrip', 'sf.regexp_replace(df.s, "^9*", "")')
test('match', 'df.s.rlike("1.*")')
test('pad', 'sf.lpad(df.s, 10, " ")')
test('repeat', 'sf.repeat(df.s, 2)')
test('replace(default)', 'sf.regexp_replace(df.s, "123", "321")')
# test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)') # not there
test('replace(regex)', 'sf.regexp_replace(df.s, "1?[45]4", "1004")')
# test('rfind', 'df.s.str.rfind("4")') # not there
test('rjust', 'sf.lpad(df.s, 10, " ")')
test('rstrip', 'sf.regexp_replace(df.s, "9*$", "")')
test('slice', 'df.s.substr(1, 2)')
test('split', 'sf.split(df.s, "\\.")')
test('startswith', 'df.s.startswith("9")')
test('strip', 'sf.regexp_replace(df.s, "0*$", "")')
# test('title', 'df.s.str.title()') # not there
test('upper', 'sf.upper(df.s)')
test('zfill', 'sf.lpad(df.s, 10, "0")')
fn = "timings-spark.json"
with open(fn, "w") as f:
json.dump(timings, f, indent=4)
print('write', fn)
print('spark timings:', timings)
# merge Spark timings into the overall ones
overall_timings = {}
fn = "timings.json"
if os.path.exists(fn):
with open(fn, "r") as f:
overall_timings = json.load(f)
for name in overall_timings:
if name not in timings:
continue
overall_timings[name].update(timings[name])
with open(fn, "w") as f:
json.dump(overall_timings, f, indent=4)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.