Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Created March 21, 2022 18:15
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save rjzamora/b6b572e2672086bdbbe37f76ce388ad1 to your computer and use it in GitHub Desktop.
Save rjzamora/b6b572e2672086bdbbe37f76ce388ad1 to your computer and use it in GitHub Desktop.
Simple benchmark to measure the performance of fsspec.parquet.read_parquet_file for a single-column read.
import time
import argparse
try:
import cudf
except ImportError:
cudf = None
import pandas as pd
import numpy as np
def run_all_benchmarks(
path,
columns,
engines=("cudf", "fastparquet", "pyarrow"),
cases=("open_parquet_file", "default-cache", "none-cache"),
trials=5,
storage_options=None,
):
if cudf is None and "cudf" in engines:
raise ValueError("Cannot benchmark cudf - It is not installed!")
storage_options = storage_options or {}
results = {}
for case in cases:
results[case] = {}
for engine in engines:
if case.startswith("pyarrow") and not (
engine.startswith("pyarrow") or engine.startswith("cudf")
):
# Skip pyarrow-specific cases for other engines
continue
results[case][engine] = run_benchmark(
path,
columns,
case,
engine,
trials,
storage_options,
)
return results
def refresh_fs(path=None, storage_options=None, return_fs=True):
import fsspec
importlib.reload(fsspec)
fsspec.spec.AbstractFileSystem.clear_instance_cache()
if return_fs:
fs = fsspec.get_fs_token_paths(
path,
storage_options=(storage_options or {}),
)[0]
return fsspec, fs
return
def open_file(path, columns, engine, case, storage_options):
fsspec, fs = refresh_fs(path=path, storage_options=storage_options)
if case == "open_parquet_file":
import fsspec.parquet as fsspec_parquet
importlib.reload(fsspec_parquet)
use_engine = engine.split("-")[0] if not engine.startswith("cudf") else "pyarrow"
return fsspec_parquet.open_parquet_file(
path, fs=fs, columns=columns, engine=use_engine
)
elif case == "none-cache":
return fs.open(path, mode="rb", cache_type="none")
elif case == "default-cache":
return fs.open(path, mode="rb")
elif case == "pyarrow-s3":
from pyarrow import fs as pafs
return pafs.S3FileSystem().open_input_file(path.strip("s3://"))
else:
raise ValueError(f"Case {case} not recognized!")
def read_parquet(f, columns, engine, case):
if engine == "cudf":
return cudf.read_parquet(f, columns=columns, use_python_file_object=True)
else:
return pd.read_parquet(f, columns=columns, engine=engine)
def run_benchmark(path, columns, case, engine, trials, storage_options):
split_engine = engine.split("-")
io_engine = split_engine[0]
results = []
for trial in range(trials):
t0 = time.time()
with open_file(path, columns, engine, case, storage_options) as f:
df = read_parquet(f, columns, io_engine, case)
results.append(time.time() - t0)
print(f"Case {case} results for {engine}: {results}")
return results
def write_file():
"""Original commands used to write 'large_file.parquet'
This locally-written file was moved to S3 and GCS using `fs.put`.
"""
from dask.datasets import timeseries
dtypes = {f"str{i}": str for i in range(15)}
dtypes.update({f"int{i}": int for i in range(15)})
df = timeseries(
start='2000-01-01',
end='2000-12-01',
freq='1s',
partition_freq='1d',
dtypes=dtypes,
seed=42,
).reset_index(drop=True).compute()
row_group_size=(len(df)//10) # Want 10 row groups
df.to_parquet("large_file.parquet", engine="pyarrow", row_group_size=row_group_size)
# Command-line args
parser = argparse.ArgumentParser(description="Run read_parquet benchmark for remote storage")
parser.add_argument("-t", "--trials", default=5, type=int, help="Number of trials for each engine/case")
parser.add_argument("-p", "--protocol", default="s3", help="Remote-storage protocol ('s3' or 'gs')")
parser.add_argument(
"-e",
"--engines",
default="cudf,fastparquet,pyarrow",
help="Comma-seperated list of IO engines ('cudf', 'fastparquet', and 'pyarrow')",
)
parser.add_argument(
"-c",
"--cases",
default="open_parquet_file,none-cache,default-cache,pyarrow-s3",
help="Comma-seperated list of FS cases ('open_parquet_file', 'none-cache', 'default-cache', 'pyarrow-s3')",
)
args = parser.parse_args()
if __name__ == '__main__':
# Get IO engines
engines = args.engines.split(",")
if not engines:
raise ValueError("No IO engines specified.")
# Get trials and FS protocol
trials = args.trials
protocol = args.protocol
if protocol == "gs":
storage_options = ## DEFINE GCS STORAGE OPTIONS HERE
my_bucket = None ## DEFINE GCS BUCKET PATH HERE
elif protocol == "s3":
storage_options = {} ## DEFINE S3 STORAGE OPTIONS HERE
my_bucket = None ## DEFINE S3 BUCKET PATH HERE
else:
raise ValueError(f"{protocol} not a supported protocol")
# Get FS/caching cases
cases = args.cases.split(",")
if protocol != "s3" and "pyarrow-s3" in cases:
# Cannot use pyarrow-s3 for GCS
cases = tuple(case for case in cases if case != "pyarrow-s3")
if not cases:
raise ValueError("No IO cases specified.")
# Define the path and column selection
path = f"{my_bucket}/large_file.parquet"
columns = ["int10"]
# Run the desired benchmarks
results = run_all_benchmarks(
protocol + "://" + path,
columns,
engines=engines,
cases=cases,
trials=trials,
storage_options=storage_options,
)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment