Skip to content

Instantly share code, notes, and snippets.

@giorgiopizz
Last active June 27, 2024 11:37
Show Gist options
  • Save giorgiopizz/dad93ea6287f3e8e38f2fdd6323c3910 to your computer and use it in GitHub Desktop.
Save giorgiopizz/dad93ea6287f3e8e38f2fdd6323c3910 to your computer and use it in GitHub Desktop.
my_processor snippets
import json
import random
import sys
import dask
import uproot
from coffea.dataset_tools import rucio_utils
from dbs.apis.dbsClient import DbsApi
def get_files():
Samples = {}
active_samples = [
# # Data
# # DoubleMuon
# # "DoubleMuon_Run2018A-UL2018-v1",
# "DoubleMuon_Run2018B-UL2018-v2",
# # "DoubleMuon_Run2018C-UL2018-v1",
# # "DoubleMuon_Run2018D-UL2018-v2",
# # EGamma
# # "EGamma_Run2018A-UL2018-v1",
# "EGamma_Run2018B-UL2018-v1",
# # "EGamma_Run2018C-UL2018-v1",
# # "EGamma_Run2018D-UL2018-v1",
# # SingleMuon
# # "SingleMuon_Run2018A-UL2018-v2",
# "SingleMuon_Run2018B-UL2018-v2",
# # "SingleMuon_Run2018C-UL2018-v2",
# # "SingleMuon_Run2018D-UL2018-v1",
# # DY NLO
# "DYJetsToLL_M-50",
# # DY Jet binned
# "DYJetsToLL_0J",
# "DYJetsToLL_1J",
# "DYJetsToLL_2J",
# # DY LO
# "DYJetsToLL_M-50-LO",
# # "DYJetsToLL_M-50-LO_ext1",
# # Signal
"EWK_LLJJ_MLL-50_MJJ-120",
# # # Interference
# "EWK_LLJJ_MLL-50_MJJ-120_QCD",
# # Top
# "ST_tW_top",
# "ST_tW_antitop",
# "ST_t-channel_antitop",
# "ST_t-channel_top",
# "ST_s-channel",
# "TTTo2L2Nu",
]
with open("data/samples/samples.json") as file:
Samples = json.load(file)
Samples = {k: v for k, v in Samples["samples"].items() if k in active_samples}
files = {}
for sampleName in Samples:
# if "DoubleMuon" not in sampleName:
# continue
files[sampleName] = {"query": Samples[sampleName]["nanoAOD"], "files": {}}
return files
# @dask.delayed
# def get_filename_nevents(files, **kwargs):
# # https://cmsweb.cern.ch:8443/dbs/prod/global/DBSReader/files?dataset=/EWK_LLJJ_MLL-50_MJJ-120_TuneCP5_13TeV-madgraph-pythia8_dipole/RunIISummer20UL18NanoAODv9-106X_upgrade2018_realistic_v16_L1v1-v1/NANOAODSIM&detail=True
# # file is actually a list of all replicas
# result = {
# "path": files + [],
# "nevents": 200_000_000,
# }
# # max_events = 20_000_000
# for file in files:
# # skip = False
# # for bad_site in kwargs["bad_sites"]:
# # if bad_site in _file:
# # skip = True
# # break
# # if skip:
# # continue
# try:
# f = uproot.open(file, handler=uproot.source.xrootd.XRootDSource)
# num_entries = f["Events"].num_entries
# f.close()
# nevents = min(num_entries, result["nevents"])
# if nevents < result["nevents"]:
# # the file is good, just move it on the top of the list
# result["path"].pop(files.index(file))
# result["path"].insert(0, file)
# result["nevents"] = nevents
# return result
# except Exception:
# continue
# # final_file = file[0]
# # for bad_site in kwargs["bad_sites"]:
# # if bad_site in _file:
# # print("had to use a bad site!", file=sys.stderr)
# # break
# result["bad_file"] = True
# return result
# def get_cluster(local=True):
# if not local:
# import socket
# from dask_jobqueue import HTCondorCluster
# machines = [
# # "empire.hcms.it",
# # "pccms01.hcms.it",
# "pccms02.hcms.it",
# "pccms04.hcms.it",
# # "pccms08.hcms.it",
# "pccms11.hcms.it",
# "pccms12.hcms.it",
# # "pccms13.hcms.it",
# # "pccms14.hcms.it",
# ]
# cluster = HTCondorCluster(
# cores=1,
# memory="2 GB", # hardcoded
# disk="100 MB",
# death_timeout="60",
# nanny=False,
# scheduler_options={
# # 'port': n_port,
# "dashboard_address": 8887,
# "host": socket.gethostname(),
# },
# job_extra_directives={
# "log": "dask_out/dask_job_output.log",
# "output": "dask_out/dask_job_output.out",
# "error": "dask_out/dask_job_output.err",
# "should_transfer_files": "Yes",
# "when_to_transfer_output": "ON_EXIT",
# "Requirements": " || ".join(
# [f'(machine == "{machine}")' for machine in machines]
# ),
# },
# job_script_prologue=[
# "export PATH=/gwpool/users/gpizzati/mambaforge/bin:$PATH",
# "mamba activate test_uproot",
# "export X509_USER_PROXY=/gwpool/users/gpizzati/.proxy",
# "export XRD_RUNFORKHANDLER=1",
# ],
# )
# cluster.scale(150)
# return cluster
# else:
# from distributed import LocalCluster
# cluster = LocalCluster(
# n_workers=10,
# threads_per_worker=1,
# memory_limit="2GB", # hardcoded
# dashboard_address=":8887",
# )
# return cluster
if __name__ == "__main__":
kwargs = {
"bad_sites": [
# "ts.infn",
],
}
# cluster = get_cluster(local=True)
# client = cluster.get_client()
# client.wait_for_workers(10)
files = get_files()
print(files)
rucio_client = rucio_utils.get_rucio_client()
# DE|FR|IT|BE|CH|ES|UK
good_sites = ["IT", "FR", "BE", "CH", "UK", "ES", "DE", "US"]
for dname in files:
dataset = files[dname]["query"]
print("Checking", dname, "files with query", dataset)
try:
(
outfiles,
outsites,
sites_counts,
) = rucio_utils.get_dataset_files_replicas(
dataset,
allowlist_sites=[],
blocklist_sites=[
# "T2_FR_IPHC",
# "T2_ES_IFCA",
# "T2_CH_CERN",
"T3_IT_Trieste",
],
# regex_sites=[],
regex_sites=r"T[123]_(" + "|".join(good_sites) + ")_\w+",
# regex_sites = r"T[123]_(DE|IT|BE|CH|ES|UK|US)_\w+",
mode="full", # full or first. "full"==all the available replicas
client=rucio_client,
)
except Exception as e:
print(f"\n[red bold] Exception: {e}[/]")
sys.exit(1)
# files[dname]["files"] = list(map(lambda k: k[0], outfiles))
url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader"
api = DbsApi(url=url)
filelist = api.listFiles(dataset=dataset, detail=1)
for replicas, site in zip(outfiles, outsites):
prefix = "/store/data"
if prefix not in replicas[0]:
prefix = "/store/mc"
logical_name = prefix + replicas[0].split(prefix)[-1]
right_file = list(
filter(lambda k: k["logical_file_name"] == logical_name, filelist)
)
if len(right_file) == 0:
raise Exception("File present in rucio but not dbs!", logical_name)
if len(right_file) > 1:
raise Exception(
"More files have the same logical_file_name, not support"
)
nevents = right_file[0]["event_count"]
print(replicas, nevents)
files[dname]["files"].append({"path": replicas, "nevents": nevents})
# actually file and site are lists here
# file = sorted(
# [(_file, _site.split("_")[1]) for _file, _site in zip(file, site)],
# key=lambda i: good_sites.index(i[1]),
# )
# # print(list(map(lambda k: k[1], file)))
# file = list(map(lambda k: k[0], file))
# # file = random.shuffle()
# random.shuffle(file)
# files[dname]["files"] = [
# get_filename_nevents(file, **kwargs) for file in outfiles
# ]
# # break
# files = dask.compute(files)
# files = files[0]
with open("data/files_all3.json", "w") as file:
json.dump(files, file, indent=2)
def read_events(filename, start=0, stop=100, read_form={}):
print("start reading")
# f = uproot.open(filename, handler=uproot.source.xrootd.XRootDSource, num_workers=2)
f = uproot.open(filename, num_workers=1)
tree = f["Events"]
start = min(start, tree.num_entries)
stop = min(stop, tree.num_entries)
if start >= stop:
return ak.Array([])
branches = [k.name for k in tree.branches]
events = {}
form = deepcopy(read_form)
all_branches = []
for coll in form:
coll_branches = form[coll]["branches"]
if len(coll_branches) == 0:
if coll in branches:
all_branches.append(coll)
else:
for branch in coll_branches:
branch_name = coll + "_" + branch
if branch_name in branches:
all_branches.append(branch_name)
events_bad_form = tree.arrays(
all_branches,
entry_start=start,
entry_stop=stop,
decompression_executor=uproot.source.futures.TrivialExecutor(),
interpretation_executor=uproot.source.futures.TrivialExecutor(),
)
f.close()
for coll in form:
d = {}
coll_branches = form[coll].pop("branches")
if len(coll_branches) == 0:
if coll in branches:
events[coll] = events_bad_form[coll]
continue
for branch in coll_branches:
branch_name = coll + "_" + branch
if branch_name in branches:
d[branch] = events_bad_form[branch_name]
if len(d.keys()) == 0:
print("did not find anything for", coll, file=sys.stderr)
continue
events[coll] = ak.zip(d, **form[coll])
del d
print("created events")
_events = ak.zip(events, depth_limit=1)
del events
gc.collect()
return _events
def big_process(process, filenames, start, stop, read_form, **kwargs):
t_start = time.time()
events = 0
error = ""
for filename in filenames:
try:
events = read_events(filename, start=start, stop=stop, read_form=read_form)
break
except Exception as e:
error += "".join(tb.format_exception(None, e, e.__traceback__))
# time.sleep(1)
continue
if isinstance(events, int):
print(error, file=sys.stderr)
raise Exception(
"Error, could not read any of the filenames\n" + error, filenames
)
t_reading = time.time() - t_start
if len(events) == 0:
return {}
results = {"real_results": 0, "performance": {}}
results["real_results"] = process(events, **kwargs)
t_total = time.time() - t_start
results["performance"][f"{filename}_{start}"] = {
"total": t_total,
"read": t_reading,
}
del events
gc.collect()
return results
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment