Skip to content

Instantly share code, notes, and snippets.

@lgray
Created April 4, 2023 20:19
Show Gist options
  • Save lgray/c5c1e027fb7e682bae287ca75d09dad2 to your computer and use it in GitHub Desktop.
Save lgray/c5c1e027fb7e682bae287ca75d09dad2 to your computer and use it in GitHub Desktop.
# import warnings
# warnings.filterwarnings("error")
import logging
import os
import time
from coffea import processor
from coffea.nanoevents import NanoAODSchema
import hist
import json
import matplotlib.pyplot as plt
import numpy as np
# import uproot
# import utils # contains code for bookkeeping and cosmetics, as well as some boilerplate
logging.getLogger("cabinetry").setLevel(logging.INFO)
### GLOBAL CONFIGURATION
# input files per process, set to e.g. 10 (smaller number = faster)
N_FILES_MAX_PER_SAMPLE = 1
# enable Dask
USE_DASK = True
# enable ServiceX
USE_SERVICEX = False
# ServiceX: ignore cache with repeated queries
SERVICEX_IGNORE_CACHE = False
# analysis facility: set to "coffea_casa" for coffea-casa environments, "EAF" for FNAL, "local" for local setups
AF = "local"
### BENCHMARKING-SPECIFIC SETTINGS
# chunk size to use
CHUNKSIZE = 500_000
# metadata to propagate through to metrics
AF_NAME = "coffea_casa" # "ssl-dev" allows for the switch to local data on /data
SYSTEMATICS = "all" # currently has no effect
CORES_PER_WORKER = 2 # does not do anything, only used for metric gathering (set to 2 for distributed coffea-casa)
# scaling for local setups with FuturesExecutor
NUM_CORES = 4
# only I/O, all other processing disabled
DISABLE_PROCESSING = False
# read additional branches (only with DISABLE_PROCESSING = True)
# acceptable values are 2.7, 4, 15, 25, 50 (corresponding to % of file read), 2.7% corresponds to the standard branches used in the notebook
IO_FILE_PERCENT = 2.7
import dask_awkward as dak
import hist.dask
class TtbarAnalysis(processor.ProcessorABC):
def __init__(self, disable_processing, io_file_percent):
num_bins = 25
bin_low = 50
bin_high = 550
name = "observable"
label = "observable [GeV]"
self.hist = (
hist.dask.Hist.new.Reg(num_bins, bin_low, bin_high, name=name, label=label)
.StrCat(["4j1b", "4j2b"], name="region", label="Region")
.StrCat([], name="process", label="Process", growth=True)
.StrCat([], name="variation", label="Systematic variation", growth=True)
.Weight()
)
self.disable_processing = disable_processing
self.io_file_percent = io_file_percent
def process(self, events):
if self.disable_processing:
# IO testing with no subsequent processing
return {}
#breakpoint()
histogram = self.hist
process = events.metadata["process"] # "ttbar" etc.
variation = events.metadata["variation"] # "nominal" etc.
# normalization for MC
x_sec = events.metadata["xsec"]
nevts_total = events.metadata["nevts"]
lumi = 3378 # /pb
if process != "data":
xsec_weight = x_sec * lumi / nevts_total
else:
xsec_weight = 1
#### systematics
# example of a simple flat weight variation, using the coffea nanoevents systematics feature
# if process == "wjets":
# events.add_systematic("scale_var", "UpDownSystematic", "weight", flat_variation)
# jet energy scale / resolution systematics
# need to adjust schema to instead use coffea add_systematic feature, especially for ServiceX
# cannot attach pT variations to events.jet, so attach to events directly
# and subsequently scale pT by these scale factors
events["pt_nominal"] = dak.ones_like(events.MET.pt)
# events["pt_scale_up"] = 1.03
# events["pt_res_up"] = jet_pt_resolution(events.Jet.pt)
pt_variations = ["pt_nominal"] #, "pt_scale_up", "pt_res_up"] if variation == "nominal" else ["pt_nominal"]
for pt_var in pt_variations:
### event selection
# very very loosely based on https://arxiv.org/abs/2006.13076
# pT > 25 GeV for leptons & jets
selected_electrons = events.Electron[(events.Electron.pt>25)]
selected_muons = events.Muon[(events.Muon.pt >25)]
jet_filter = (events.Jet.pt * events[pt_var]) > 25
selected_jets = events.Jet[jet_filter]
# single lepton requirement
event_filters = ((dak.count(selected_electrons.pt, axis=1) + dak.count(selected_muons.pt, axis=1)) == 1)
# at least four jets
pt_var_modifier = events[pt_var] if "res" not in pt_var else events[pt_var][jet_filter]
event_filters = event_filters & (dak.count(selected_jets.pt * pt_var_modifier, axis=1) >= 4)
# at least one b-tagged jet ("tag" means score above threshold)
B_TAG_THRESHOLD = 0.5
event_filters = event_filters & (dak.sum(selected_jets.btagCSVV2 >= B_TAG_THRESHOLD, axis=1) >= 1)
# apply event filters
selected_events = events[event_filters]
selected_electrons = selected_electrons[event_filters]
selected_muons = selected_muons[event_filters]
selected_jets = selected_jets[event_filters]
for region in ["4j1b", "4j2b"]:
# further filtering: 4j1b CR with single b-tag, 4j2b SR with two or more tags
if region == "4j1b":
region_filter = dak.sum(selected_jets.btagCSVV2 >= B_TAG_THRESHOLD, axis=1) == 1
selected_jets_region = selected_jets[region_filter]
# use HT (scalar sum of jet pT) as observable
pt_var_modifier = (
events[event_filters][region_filter][pt_var]
if "res" not in pt_var
else events[pt_var][jet_filter][event_filters][region_filter]
)
print(dak.necessary_columns(pt_var_modifier))
observable = dak.sum(selected_jets_region.pt * pt_var_modifier, axis=-1)
elif region == "4j2b":
region_filter = dak.sum(selected_jets.btagCSVV2 > B_TAG_THRESHOLD, axis=1) >= 2
selected_jets_region = selected_jets[region_filter]
# reconstruct hadronic top as bjj system with largest pT
# the jet energy scale / resolution effect is not propagated to this observable at the moment
trijet = dak.combinations(selected_jets_region, 3, fields=["j1", "j2", "j3"]) # trijet candidates
trijet["p4"] = trijet.j1 + trijet.j2 + trijet.j3 # calculate four-momentum of tri-jet system
trijet["max_btag"] = np.maximum(trijet.j1.btagCSVV2, np.maximum(trijet.j2.btagCSVV2, trijet.j3.btagCSVV2))
trijet = trijet[trijet.max_btag > B_TAG_THRESHOLD] # at least one-btag in trijet candidates
# pick trijet candidate with largest pT and calculate mass of system
trijet_mass = trijet["p4"][dak.argmax(trijet.p4.pt, axis=1, keepdims=True)].mass
observable = dak.flatten(trijet_mass)
histogram.fill(
observable=observable, region=region, process=process,
variation="nominal", weight=dak.full_like(observable, xsec_weight)
)
output = {"nevents": {events.metadata["process"]: len(events)}, "hist": histogram}
return output
def postprocess(self, accumulator):
return accumulator
# fileset = utils.construct_fileset(N_FILES_MAX_PER_SAMPLE, use_xcache=False, af_name=AF_NAME) # local files on /data for ssl-dev
# print(f"processes in fileset: {list(fileset.keys())}")
# print(f"\nexample of information in fileset:\n{{\n 'files': [{fileset['ttbar__nominal']['files'][0]}, ...],")
# print(f" 'metadata': {fileset['ttbar__nominal']['metadata']}\n}}")
# fileset = {"ttbar__nominal": fileset["ttbar__nominal"]}
if __name__ == "__main__":
NanoAODSchema.warn_missing_crossrefs = False # silences warnings about branches we will not use here
from coffea.nanoevents import NanoEventsFactory
from distributed import Client
client = Client(n_workers=5, threads_per_worker=2, memory_limit="2GB")
metadata = {'process': 'ttbar', 'variation': 'nominal', 'nevts': 1334428, 'xsec': 729.84}
events = NanoEventsFactory.from_root("/Users/lgray/coffea-dev/coffea/ttbar.root", schemaclass=NanoAODSchema, treepath="Events", metadata=metadata, permit_dask=True).events()
output = TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT).process(events)
print(output["hist"].dask)
# if USE_DASK:
# executor = processor.DaskExecutor(client=utils.get_client(AF))
# else:
# # executor = processor.FuturesExecutor(workers=NUM_CORES)
# executor = processor.IterativeExecutor(workers=NUM_CORES)
# run = processor.Runner(executor=executor, schema=NanoAODSchema, savemetrics=True, metadata_cache={}, chunksize=CHUNKSIZE)
# if USE_SERVICEX:
# treename = "servicex"
# else:
# treename = "Events"
# filemeta = run.preprocess(fileset, treename=treename) # pre-processing
# t0 = time.monotonic()
# all_histograms, metrics = run(fileset, treename, processor_instance=TtbarAnalysis(DISABLE_PROCESSING, IO_FILE_PERCENT)) # processing
# exec_time = time.monotonic() - t0
# all_histograms = all_histograms["hist"]
# print(f"\nexecution took {exec_time:.2f} seconds")
# dataset_source = "/data" if fileset["ttbar__nominal"]["files"][0].startswith("/data") else "https://xrootd-local.unl.edu:1094" # TODO: xcache support
# metrics.update({
# "walltime": exec_time,
# "num_workers": NUM_CORES,
# "af": AF_NAME,
# "dataset_source": dataset_source,
# "use_dask": USE_DASK,
# "use_servicex": USE_SERVICEX,
# "systematics": SYSTEMATICS,
# "n_files_max_per_sample": N_FILES_MAX_PER_SAMPLE,
# "cores_per_worker": CORES_PER_WORKER,
# "chunksize": CHUNKSIZE,
# "disable_processing": DISABLE_PROCESSING,
# "io_file_percent": IO_FILE_PERCENT
# })
# # save metrics to disk
# if not os.path.exists("metrics"):
# os.makedirs("metrics")
# timestamp = time.strftime('%Y%m%d-%H%M%S')
# metric_file_name = f"metrics/{AF_NAME}-{timestamp}.json"
# with open(metric_file_name, "w") as f:
# f.write(json.dumps(metrics))
# print(f"metrics saved as {metric_file_name}")
# #print(f"event rate per worker (full execution time divided by NUM_CORES={NUM_CORES}): {metrics['entries'] / NUM_CORES / exec_time / 1_000:.2f} kHz")
# print(f"event rate per worker (pure processtime): {metrics['entries'] / metrics['processtime'] / 1_000:.2f} kHz")
# print(f"amount of data read: {metrics['bytesread']/1000**2:.2f} MB") # likely buggy: https://github.com/CoffeaTeam/coffea/issues/717
# # utils.set_style()
# all_histograms[120j::hist.rebin(2), "4j1b", :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1, edgecolor="grey")
# plt.legend(frameon=False)
# plt.title(">= 4 jets, 1 b-tag")
# plt.xlabel("HT [GeV]");
# all_histograms[:, "4j2b", :, "nominal"].stack("process")[::-1].plot(stack=True, histtype="fill", linewidth=1,edgecolor="grey")
# plt.legend(frameon=False)
# plt.title(">= 4 jets, >= 2 b-tags")
# plt.xlabel("$m_{bjj}$ [Gev]");
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment