Created
April 4, 2023 20:19
-
-
Save lgray/c5c1e027fb7e682bae287ca75d09dad2 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
# import warnings | |
# warnings.filterwarnings("error") | |
import logging | |
import os | |
import time | |
from coffea import processor | |
from coffea.nanoevents import NanoAODSchema | |
import hist | |
import json | |
import matplotlib.pyplot as plt | |
import numpy as np | |
# import uproot | |
# import utils # contains code for bookkeeping and cosmetics, as well as some boilerplate | |
logging.getLogger("cabinetry").setLevel(logging.INFO) | |
### GLOBAL CONFIGURATION | |
# input files per process, set to e.g. 10 (smaller number = faster) | |
N_FILES_MAX_PER_SAMPLE = 1 | |
# enable Dask | |
USE_DASK = True | |
# enable ServiceX | |
USE_SERVICEX = False | |
# ServiceX: ignore cache with repeated queries | |
SERVICEX_IGNORE_CACHE = False | |
# analysis facility: set to "coffea_casa" for coffea-casa environments, "EAF" for FNAL, "local" for local setups | |
AF = "local" | |
### BENCHMARKING-SPECIFIC SETTINGS | |
# chunk size to use | |
CHUNKSIZE = 500_000 | |
# metadata to propagate through to metrics | |
AF_NAME = "coffea_casa" # "ssl-dev" allows for the switch to local data on /data | |
SYSTEMATICS = "all" # currently has no effect | |
CORES_PER_WORKER = 2 # does not do anything, only used for metric gathering (set to 2 for distributed coffea-casa) | |
# scaling for local setups with FuturesExecutor | |
NUM_CORES = 4 | |
# only I/O, all other processing disabled | |
DISABLE_PROCESSING = False | |
# read additional branches (only with DISABLE_PROCESSING = True) | |
# acceptable values are 2.7, 4, 15, 25, 50 (corresponding to % of file read), 2.7% corresponds to the standard branches used in the notebook | |
IO_FILE_PERCENT = 2.7 | |
import dask_awkward as dak | |
import hist.dask | |
class TtbarAnalysis(processor.ProcessorABC): | |
def __init__(self, disable_processing, io_file_percent): | |
num_bins = 25 | |
bin_low = 50 | |
bin_high = 550 | |
name = "observable" | |
label = "observable [GeV]" | |
self.hist = ( | |
hist.dask.Hist.new.Reg(num_bins, bin_low, bin_high, name=name, label=label) | |
.StrCat(["4j1b", "4j2b"], name="region", label="Region") | |
.StrCat([], name="process", label="Process", growth=True) | |
.StrCat([], name="variation", label="Systematic variation", growth=True) | |
.Weight() | |
) | |
self.disable_processing = disable_processing | |
self.io_file_percent = io_file_percent | |
def process(self, events): | |
if self.disable_processing: | |
# IO testing with no subsequent processing | |
return {} | |
#breakpoint() | |
histogram = self.hist | |
process = events.metadata["process"] # "ttbar" etc. | |
variation = events.metadata["variation"] # "nominal" etc. | |
# normalization for MC | |
x_sec = events.metadata["xsec"] | |
nevts_total = events.metadata["nevts"] | |
lumi = 3378 # /pb | |
if process != "data": | |
xsec_weight = x_sec * lumi / nevts_total | |
else: | |
xsec_weight = 1 | |
#### systematics | |
# example of a simple flat weight variation, using the coffea nanoevents systematics feature | |
# if process == "wjets": | |
# events.add_systematic("scale_var", "UpDownSystematic", "weight", flat_variation) | |
# jet energy scale / resolution systematics | |
# need to adjust schema to instead use coffea add_systematic feature, especially for ServiceX | |
# cannot attach pT variations to events.jet, so attach to events directly | |
# and subsequently scale pT by these scale factors | |
events["pt_nominal"] = dak.ones_like(events.MET.pt) | |
# events["pt_scale_up"] = 1.03 | |
# events["pt_res_up"] = jet_pt_resolution(events.Jet.pt) | |
pt_variations = ["pt_nominal"] #, "pt_scale_up", "pt_res_up"] if variation == "nominal" else ["pt_nominal"] | |
for pt_var in pt_variations: | |
### event selection | |
# very very loosely based on https://arxiv.org/abs/2006.13076 | |
# pT > 25 GeV for leptons & jets | |
selected_electrons = events.Electron[(events.Electron.pt>25)] | |
selected_muons = events.Muon[(events.Muon.pt >25)] | |
jet_filter = (events.Jet.pt * events[pt_var]) > 25 | |
selected_jets = events.Jet[jet_filter] | |
# single lepton requirement | |
event_filters = ((dak.count(selected_electrons.pt, axis=1) + dak.count(selected_muons.pt, axis=1)) == 1) | |
# at least four jets | |
pt_var_modifier = events[pt_var] if "res" not in pt_var else events[pt_var][jet_filter] | |
event_filters = event_filters & (dak.count(selected_jets.pt * pt_var_modifier, axis=1) >= 4) | |
# at least one b-tagged jet ("tag" means score above threshold) | |
B_TAG_THRESHOLD = 0.5 | |
event_filters = event_filters & (dak.sum(selected_jets.btagCSVV2 >= B_TAG_THRESHOLD, axis=1) >= 1) | |
# apply event filters | |
selected_events = events[event_filters] | |
selected_electrons = selected_electrons[event_filters] | |
selected_muons = selected_muons[event_filters] | |
selected_jets = selected_jets[event_filters] | |
for region in ["4j1b", "4j2b"]: | |
# further filtering: 4j1b CR with single b-tag, 4j2b SR with two or more tags | |
if region == "4j1b": | |
region_filter = dak.sum(selected_jets.btagCSVV2 >= B_TAG_THRESHOLD, axis=1) == 1 | |
selected_jets_region = selected_jets[region_filter] | |
# use HT (scalar sum of jet pT) as observable | |
pt_var_modifier = ( | |
events[event_filters][region_filter][pt_var] | |
if "res" not in pt_var | |
else events[pt_var][jet_filter][event_filters][region_filter] | |
) | |
print(dak.necessary_columns(pt_var_modifier)) | |
observable = dak.sum(selected_jets_region.pt * pt_var_modifier, axis=-1) | |
elif region == "4j2b": | |
region_filter = dak.sum(selected_jets.btagCSVV2 > B_TAG_THRESHOLD, axis=1) >= 2 | |
selected_jets_region = selected_jets[region_filter] | |
# reconstruct hadronic top as bjj system with largest pT | |
# the jet energy scale / resolution effect is not propagated to this observable at the moment | |
trijet = dak.combinations(selected_jets_region, 3, fields=["j1", "j2", "j3"]) # trijet candidates | |
trijet["p4"] = trijet.j1 + trijet.j2 + trijet.j3 # calculate four-momentum of tri-jet system | |
trijet["max_btag"] = np.maximum(trijet.j1.btagCSVV2, np.maximum(trijet.j2.btagCSVV2, trijet.j3.btagCSVV2)) | |
trijet = trijet[trijet.max_btag > B_TAG_THRESHOLD] # at least one-btag in trijet candidates | |
# pick trijet candidate with largest pT and calculate mass of system | |
trijet_mass = trijet["p4"][dak.argmax(trijet.p4.pt, axis=1, keepdims=True)].mass | |
observable = dak.flatten(trijet_mass) | |
histogram.fill( | |
observable=observable, region=region, process=process, | |
variation="nominal", weight=dak.full_like(observable, xsec_weight) | |
) | |
output = {"nevents": {events.metadata["process"]: len(events)}, "hist": histogram} | |
return output | |
def postprocess(self, accumulator): | |
return accumulator | |
# fileset = utils.construct_fileset(N_FILES_MAX_PER_SAMPLE, use_xcache=False, af_name=AF_NAME) # local files on /data for ssl-dev | |
# print(f"processes in fileset: {list(fileset.keys())}") | |
# print(f"\nexample of information in fileset:\n{{\n 'files': [{fileset['ttbar__nominal']['files'][0]}, ...],") | |
# print(f" 'metadata': {fileset['ttbar__nominal']['metadata']}\n}}") | |
# fileset = {"ttbar__nominal": fileset["ttbar__nominal"]} | |
if __name__ == "__main__": | |
NanoAODSchema.warn_missing_crossrefs = False # silences warnings about branches we will not use here | |
from coffea.nanoevents import NanoEventsFactory | |
from distributed import Client | |
client = Client(n_workers=5, threads_per_worker=2, memory_limit="2GB") | |
metadata = {'process': 'ttbar', 'variation': 'nominal', 'nevts': 1334428, 'xsec': 729.84} | |
events = NanoEventsFactory.from_root("/Users/lgray/coffea-dev/coffea/ttbar.root", schemaclass=NanoAODSchema, treepath="Events", metadata=metadata, permit_dask=True).events() | |
output = TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT).process(events) | |
print(output["hist"].dask) | |
# if USE_DASK: | |
# executor = processor.DaskExecutor(client=utils.get_client(AF)) | |
# else: | |
# # executor = processor.FuturesExecutor(workers=NUM_CORES) | |
# executor = processor.IterativeExecutor(workers=NUM_CORES) | |
# run = processor.Runner(executor=executor, schema=NanoAODSchema, savemetrics=True, metadata_cache={}, chunksize=CHUNKSIZE) | |
# if USE_SERVICEX: | |
# treename = "servicex" | |
# else: | |
# treename = "Events" | |
# filemeta = run.preprocess(fileset, treename=treename) # pre-processing | |
# t0 = time.monotonic() | |
# all_histograms, metrics = run(fileset, treename, processor_instance=TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT)) # processing | |
# exec_time = time.monotonic() - t0 | |
# all_histograms = all_histograms["hist"] | |
# print(f"\nexecution took {exec_time:.2f} seconds") | |
# dataset_source = "/data" if fileset["ttbar__nominal"]["files"][0].startswith("/data") else "https://xrootd-local.unl.edu:1094" # TODO: xcache support | |
# metrics.update({ | |
# "walltime": exec_time, | |
# "num_workers": NUM_CORES, | |
# "af": AF_NAME, | |
# "dataset_source": dataset_source, | |
# "use_dask": USE_DASK, | |
# "use_servicex": USE_SERVICEX, | |
# "systematics": SYSTEMATICS, | |
# "n_files_max_per_sample": N_FILES_MAX_PER_SAMPLE, | |
# "cores_per_worker": CORES_PER_WORKER, | |
# "chunksize": CHUNKSIZE, | |
# "disable_processing": DISABLE_PROCESSING, | |
# "io_file_percent": IO_FILE_PERCENT | |
# }) | |
# # save metrics to disk | |
# if not os.path.exists("metrics"): | |
# os.makedirs("metrics") | |
# timestamp = time.strftime('%Y%m%d-%H%M%S') | |
# metric_file_name = f"metrics/{AF_NAME}-{timestamp}.json" | |
# with open(metric_file_name, "w") as f: | |
# f.write(json.dumps(metrics)) | |
# print(f"metrics saved as {metric_file_name}") | |
# #print(f"event rate per worker (full execution time divided by NUM_CORES={NUM_CORES}): {metrics['entries'] / NUM_CORES / exec_time / 1_000:.2f} kHz") | |
# print(f"event rate per worker (pure processtime): {metrics['entries'] / metrics['processtime'] / 1_000:.2f} kHz") | |
# print(f"amount of data read: {metrics['bytesread']/1000**2:.2f} MB") # likely buggy: https://github.com/CoffeaTeam/coffea/issues/717 | |
# # utils.set_style() | |
# all_histograms[120j::hist.rebin(2), "4j1b", :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1, edgecolor="grey") | |
# plt.legend(frameon=False) | |
# plt.title(">= 4 jets, 1 b-tag") | |
# plt.xlabel("HT [GeV]"); | |
# all_histograms[:, "4j2b", :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1,edgecolor="grey") | |
# plt.legend(frameon=False) | |
# plt.title(">= 4 jets, >= 2 b-tags") | |
# plt.xlabel("$m_{bjj}$ [Gev]"); |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment