Skip to content

Instantly share code, notes, and snippets.

@lgray
Created January 8, 2024 20:53
Show Gist options
  • Save lgray/68775c3735918f8e464a97b74b4d8ccb to your computer and use it in GitHub Desktop.
Save lgray/68775c3735918f8e464a97b74b4d8ccb to your computer and use it in GitHub Desktop.
client.compute() example for coffea
with Client() as client: # distributed Client scheduler
# Run preprocess
print("\nRunning preprocess...")
dataset_runnable, dataset_updated = preprocess(
fileset,
maybe_step_size=50_000,
align_clusters=False,
files_per_batch=1,
#skip_bad_files=True,
#calculate_form=True,
)
# Run apply_to_fileset
print("\nRunning apply_to_fileset...")
histos_to_compute, reports = apply_to_fileset(
processor_instance,
dataset_runnable,
uproot_options={"allow_read_errors_with_report": True}
)
# Check columns to be read
print("\nRunning necessary_columns...")
columns_read = dak.necessary_columns(histos_to_compute[list(histos_to_compute.keys())[0]])
print(columns_read)
# Compute
print("\nRunning compute...")
output_futures, report_futures = {}, {}
for key in histos_to_compute:
output_futures[key], report_futures[key] = client.compute((histos_to_compute[key], reports[key],)) # , scheduler=taskvine
coutputs, creports = client.gather((output_futures, report_futures,))
dt = time.time() - tstart
# Save the output
if not os.path.isdir(outpath): os.system("mkdir -p %s"%outpath)
out_pkl_file = os.path.join(outpath,outname+".pkl.gz")
print(f"\nSaving output in {out_pkl_file}...")
with gzip.open(out_pkl_file, "wb") as fout:
cloudpickle.dump(coutputs, fout)
print("Done!")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment