Created
March 21, 2022 18:15
-
-
Save rjzamora/b6b572e2672086bdbbe37f76ce388ad1 to your computer and use it in GitHub Desktop.
Simple benchmark to measure the performance of fsspec.parquet.read_parquet_file for a single-column read.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import time | |
import argparse | |
try: | |
import cudf | |
except ImportError: | |
cudf = None | |
import pandas as pd | |
import numpy as np | |
def run_all_benchmarks( | |
path, | |
columns, | |
engines=("cudf", "fastparquet", "pyarrow"), | |
cases=("open_parquet_file", "default-cache", "none-cache"), | |
trials=5, | |
storage_options=None, | |
): | |
if cudf is None and "cudf" in engines: | |
raise ValueError("Cannot benchmark cudf - It is not installed!") | |
storage_options = storage_options or {} | |
results = {} | |
for case in cases: | |
results[case] = {} | |
for engine in engines: | |
if case.startswith("pyarrow") and not ( | |
engine.startswith("pyarrow") or engine.startswith("cudf") | |
): | |
# Skip pyarrow-specific cases for other engines | |
continue | |
results[case][engine] = run_benchmark( | |
path, | |
columns, | |
case, | |
engine, | |
trials, | |
storage_options, | |
) | |
return results | |
def refresh_fs(path=None, storage_options=None, return_fs=True): | |
import fsspec | |
importlib.reload(fsspec) | |
fsspec.spec.AbstractFileSystem.clear_instance_cache() | |
if return_fs: | |
fs = fsspec.get_fs_token_paths( | |
path, | |
storage_options=(storage_options or {}), | |
)[0] | |
return fsspec, fs | |
return | |
def open_file(path, columns, engine, case, storage_options): | |
fsspec, fs = refresh_fs(path=path, storage_options=storage_options) | |
if case == "open_parquet_file": | |
import fsspec.parquet as fsspec_parquet | |
importlib.reload(fsspec_parquet) | |
use_engine = engine.split("-")[0] if not engine.startswith("cudf") else "pyarrow" | |
return fsspec_parquet.open_parquet_file( | |
path, fs=fs, columns=columns, engine=use_engine | |
) | |
elif case == "none-cache": | |
return fs.open(path, mode="rb", cache_type="none") | |
elif case == "default-cache": | |
return fs.open(path, mode="rb") | |
elif case == "pyarrow-s3": | |
from pyarrow import fs as pafs | |
return pafs.S3FileSystem().open_input_file(path.strip("s3://")) | |
else: | |
raise ValueError(f"Case {case} not recognized!") | |
def read_parquet(f, columns, engine, case): | |
if engine == "cudf": | |
return cudf.read_parquet(f, columns=columns, use_python_file_object=True) | |
else: | |
return pd.read_parquet(f, columns=columns, engine=engine) | |
def run_benchmark(path, columns, case, engine, trials, storage_options): | |
split_engine = engine.split("-") | |
io_engine = split_engine[0] | |
results = [] | |
for trial in range(trials): | |
t0 = time.time() | |
with open_file(path, columns, engine, case, storage_options) as f: | |
df = read_parquet(f, columns, io_engine, case) | |
results.append(time.time() - t0) | |
print(f"Case {case} results for {engine}: {results}") | |
return results | |
def write_file(): | |
"""Original commands used to write 'large_file.parquet' | |
This locally-written file was moved to S3 and GCS using `fs.put`. | |
""" | |
from dask.datasets import timeseries | |
dtypes = {f"str{i}": str for i in range(15)} | |
dtypes.update({f"int{i}": int for i in range(15)}) | |
df = timeseries( | |
start='2000-01-01', | |
end='2000-12-01', | |
freq='1s', | |
partition_freq='1d', | |
dtypes=dtypes, | |
seed=42, | |
).reset_index(drop=True).compute() | |
row_group_size=(len(df)//10) # Want 10 row groups | |
df.to_parquet("large_file.parquet", engine="pyarrow", row_group_size=row_group_size) | |
# Command-line args | |
parser = argparse.ArgumentParser(description="Run read_parquet benchmark for remote storage") | |
parser.add_argument("-t", "--trials", default=5, type=int, help="Number of trials for each engine/case") | |
parser.add_argument("-p", "--protocol", default="s3", help="Remote-storage protocol ('s3' or 'gs')") | |
parser.add_argument( | |
"-e", | |
"--engines", | |
default="cudf,fastparquet,pyarrow", | |
help="Comma-seperated list of IO engines ('cudf', 'fastparquet', and 'pyarrow')", | |
) | |
parser.add_argument( | |
"-c", | |
"--cases", | |
default="open_parquet_file,none-cache,default-cache,pyarrow-s3", | |
help="Comma-seperated list of FS cases ('open_parquet_file', 'none-cache', 'default-cache', 'pyarrow-s3')", | |
) | |
args = parser.parse_args() | |
if __name__ == '__main__': | |
# Get IO engines | |
engines = args.engines.split(",") | |
if not engines: | |
raise ValueError("No IO engines specified.") | |
# Get trials and FS protocol | |
trials = args.trials | |
protocol = args.protocol | |
if protocol == "gs": | |
storage_options = ## DEFINE GCS STORAGE OPTIONS HERE | |
my_bucket = None ## DEFINE GCS BUCKET PATH HERE | |
elif protocol == "s3": | |
storage_options = {} ## DEFINE S3 STORAGE OPTIONS HERE | |
my_bucket = None ## DEFINE S3 BUCKET PATH HERE | |
else: | |
raise ValueError(f"{protocol} not a supported protocol") | |
# Get FS/caching cases | |
cases = args.cases.split(",") | |
if protocol != "s3" and "pyarrow-s3" in cases: | |
# Cannot use pyarrow-s3 for GCS | |
cases = tuple(case for case in cases if case != "pyarrow-s3") | |
if not cases: | |
raise ValueError("No IO cases specified.") | |
# Define the path and column selection | |
path = f"{my_bucket}/large_file.parquet" | |
columns = ["int10"] | |
# Run the desired benchmarks | |
results = run_all_benchmarks( | |
protocol + "://" + path, | |
columns, | |
engines=engines, | |
cases=cases, | |
trials=trials, | |
storage_options=storage_options, | |
) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment