Skip to content

Instantly share code, notes, and snippets.

@byaminov
Created April 11, 2019 08:45
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save byaminov/7a4b356771e389bd9c873dd63ca52b6f to your computer and use it in GitHub Desktop.
Save byaminov/7a4b356771e389bd9c873dd63ca52b6f to your computer and use it in GitHub Desktop.
Running Spark benchmarks to compare its string operations with Vaex and Pandas
"""
Benchark ran on my laptop:
spark-submit --master local[*] benchmarks/strings-spark.py
To run it:
* Download and install Spark 2.4.0 (https://spark.apache.org/downloads.html)
* Run the Vaex & Pandas benchmark (https://github.com/vaexio/vaex/blob/master/benchmarks/strings.py),
the test.parquet file will be created
* Set `args_n` constant in this script to the same value you used for `n` variable,
e.g. `python strings.py -n8`.
* Start the Spark job: `spark-submit --master local[*] benchmarks/strings-spark.py`,
assuming you have spark-submit in your PATH already.
The results will be saved into a file timings-spark.json and also merged into file
timings.json if it's present.
"""
from pyspark.sql import SparkSession
import pyspark.sql.functions as sf
import timeit
import json
import os
timings = {}
args_n = 8
def test(name, expr):
# make an aggregation with a count of only one column: this seems to force
# spark to actually run the expression on all the rows. If you e.g. just
# do `sf.count('*')` then it just skips the expression and counts the rows in the
# parquet file, giving the result immediately.
spark_expr = f"df.select({expr}.alias('r')).agg(sf.count('r')).show()" if '.count()' not in expr else expr
N = 3
t = timeit.Timer(spark_expr, globals={'df': df, 'sf': sf})
print(name, expr)
t_spark = min(t.repeat(2, N))/N/(10**args_n)
# t_spark = t_spark * 1000000 # millis
print("\tspark", t_spark)
timings[name] = {'spark': t_spark}
if __name__ == '__main__':
spark = SparkSession.builder.appName("benchmark-strings").getOrCreate()
df = spark.read.parquet('test.parquet')
# test('capitalize', 'df.s.str.capitalize()') # not there
test('cat', 'sf.concat(df.s, df.s)')
test('contains', 'df.s.contains("9")')
test('contains(regex)', 'df.s.rlike(".*9.*")')
test('count', 'df.filter(df.s.rlike(".*9.*")).count()')
test('endswith', 'df.s.endswith("9")')
# test('find', 'df.s.str.find("4")') # not there
test('get', 'df.s.substr(1, 1)')
test('split+join', 'sf.array_join(sf.split(df.s, "\\."), "-")')
test('len', 'sf.length(df.s)')
test('ljust', 'sf.rpad(df.s, 10, " ")')
test('lower', 'sf.lower(df.s)')
test('lstrip', 'sf.regexp_replace(df.s, "^9*", "")')
test('match', 'df.s.rlike("1.*")')
test('pad', 'sf.lpad(df.s, 10, " ")')
test('repeat', 'sf.repeat(df.s, 2)')
test('replace(default)', 'sf.regexp_replace(df.s, "123", "321")')
# test('replace(no regex)', 'df.s.str.replace("123", "321", regex=False)') # not there
test('replace(regex)', 'sf.regexp_replace(df.s, "1?[45]4", "1004")')
# test('rfind', 'df.s.str.rfind("4")') # not there
test('rjust', 'sf.lpad(df.s, 10, " ")')
test('rstrip', 'sf.regexp_replace(df.s, "9*$", "")')
test('slice', 'df.s.substr(1, 2)')
test('split', 'sf.split(df.s, "\\.")')
test('startswith', 'df.s.startswith("9")')
test('strip', 'sf.regexp_replace(df.s, "0*$", "")')
# test('title', 'df.s.str.title()') # not there
test('upper', 'sf.upper(df.s)')
test('zfill', 'sf.lpad(df.s, 10, "0")')
fn = "timings-spark.json"
with open(fn, "w") as f:
json.dump(timings, f, indent=4)
print('write', fn)
print('spark timings:', timings)
# merge Spark timings into the overall ones
overall_timings = {}
fn = "timings.json"
if os.path.exists(fn):
with open(fn, "r") as f:
overall_timings = json.load(f)
for name in overall_timings:
if name not in timings:
continue
overall_timings[name].update(timings[name])
with open(fn, "w") as f:
json.dump(overall_timings, f, indent=4)
spark.stop()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment