Skip to content

Instantly share code, notes, and snippets.

@rjzamora
Last active December 1, 2020 20:15
Show Gist options
  • Save rjzamora/dbc65c9e37345fd47d0fbaec29c4ddff to your computer and use it in GitHub Desktop.
Save rjzamora/dbc65c9e37345fd47d0fbaec29c4ddff to your computer and use it in GitHub Desktop.
from dask.distributed import Client, LocalCluster, wait
import dask.dataframe as dd
from dask.datasets import timeseries
import glob
import time
import argparse
import numpy as np
def run(args):
paths = {
"many" : "test.bench.many.parquet",
"few" : "test.bench.few.parquet",
"partitioned" : "test.bench.partitioned.parquet",
"criteo" : "/raid/dask_space/criteo/crit_pq_int",
}
if args.write:
ddf = timeseries(
start='2000-01-01', end='2000-03-01', partition_freq='1h'
).to_parquet(paths["many"], engine="pyarrow", row_group_size=1_000)
ddf = timeseries(
start='2000-01-01', end='2004-12-31', partition_freq='7d'
).to_parquet(paths["few"], engine="pyarrow", row_group_size=1_000_000)
ddf = timeseries(
start='2000-01-01', end='2004-12-31', partition_freq='7d'
).to_parquet(
paths["partitioned"],
partition_on=["name"],
engine="pyarrow",
row_group_size=1_000_000,
)
print("Datasets Written.")
else:
path = paths[args.type]
if args.no_meta:
path = glob.glob(path + "/*.parquet")
if args.type == "criteo":
path = path[:1]
start_t = time.time()
ddf = dd.read_parquet(
path,
engine="pyarrow-legacy" if args.legacy else "pyarrow-dataset",
split_row_groups=2 if args.agg_row_groups else args.split_row_groups,
gather_statistics=args.gather_statistics,
filters=[('id', '>', 1000)] if args.filter else None,
read_from_path=False,
)
metatime = time.time() - start_t
if not args.skip_read:
if args.client:
wait(ddf.persist())
else:
ddf.compute(scheduler="synchronous")
runtime = time.time() - start_t
npartitions = ddf.npartitions
return runtime, metatime, npartitions
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("--write", action="store_true")
parser.add_argument("--split_row_groups", action="store_true")
parser.add_argument("--gather_statistics", action="store_true")
parser.add_argument("--agg_row_groups", action="store_true")
parser.add_argument("--no_meta", action="store_true")
parser.add_argument("--skip_read", action="store_true")
parser.add_argument("--legacy", action="store_true")
parser.add_argument("--filter", action="store_true")
parser.add_argument("--type", default="many")
parser.add_argument("--trials", type=int, default=10)
parser.add_argument("--workers", type=int, default=0)
args = parser.parse_args()
if args.workers:
cluster = LocalCluster(
n_workers=args.workers,
local_directory="/raid/dask-space/rzamora",
)
args.client = Client(cluster)
else:
args.client = None
if args.write:
run(args)
print("Done Writing.")
else:
runtimes = []
metatimes = []
readtimes = []
for trial in range(args.trials):
rt, mt, npartitions = run(args)
runtimes.append(rt)
metatimes.append(mt)
readtimes.append(rt - mt)
print("Runtime [s]:", np.mean(runtimes), "+-", np.std(runtimes))
print("Reading Metadata [s]:", np.mean(metatimes))
print("Reading Data [s]:", np.mean(readtimes))
print("npartitions:", npartitions)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment