Last active
June 27, 2024 11:37
-
-
Save giorgiopizz/dad93ea6287f3e8e38f2fdd6323c3910 to your computer and use it in GitHub Desktop.
my_processor snippets
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
import json | |
import random | |
import sys | |
import dask | |
import uproot | |
from coffea.dataset_tools import rucio_utils | |
from dbs.apis.dbsClient import DbsApi | |
def get_files(): | |
Samples = {} | |
active_samples = [ | |
# # Data | |
# # DoubleMuon | |
# # "DoubleMuon_Run2018A-UL2018-v1", | |
# "DoubleMuon_Run2018B-UL2018-v2", | |
# # "DoubleMuon_Run2018C-UL2018-v1", | |
# # "DoubleMuon_Run2018D-UL2018-v2", | |
# # EGamma | |
# # "EGamma_Run2018A-UL2018-v1", | |
# "EGamma_Run2018B-UL2018-v1", | |
# # "EGamma_Run2018C-UL2018-v1", | |
# # "EGamma_Run2018D-UL2018-v1", | |
# # SingleMuon | |
# # "SingleMuon_Run2018A-UL2018-v2", | |
# "SingleMuon_Run2018B-UL2018-v2", | |
# # "SingleMuon_Run2018C-UL2018-v2", | |
# # "SingleMuon_Run2018D-UL2018-v1", | |
# # DY NLO | |
# "DYJetsToLL_M-50", | |
# # DY Jet binned | |
# "DYJetsToLL_0J", | |
# "DYJetsToLL_1J", | |
# "DYJetsToLL_2J", | |
# # DY LO | |
# "DYJetsToLL_M-50-LO", | |
# # "DYJetsToLL_M-50-LO_ext1", | |
# # Signal | |
"EWK_LLJJ_MLL-50_MJJ-120", | |
# # # Interference | |
# "EWK_LLJJ_MLL-50_MJJ-120_QCD", | |
# # Top | |
# "ST_tW_top", | |
# "ST_tW_antitop", | |
# "ST_t-channel_antitop", | |
# "ST_t-channel_top", | |
# "ST_s-channel", | |
# "TTTo2L2Nu", | |
] | |
with open("data/samples/samples.json") as file: | |
Samples = json.load(file) | |
Samples = {k: v for k, v in Samples["samples"].items() if k in active_samples} | |
files = {} | |
for sampleName in Samples: | |
# if "DoubleMuon" not in sampleName: | |
# continue | |
files[sampleName] = {"query": Samples[sampleName]["nanoAOD"], "files": {}} | |
return files | |
# @dask.delayed | |
# def get_filename_nevents(files, **kwargs): | |
# # https://cmsweb.cern.ch:8443/dbs/prod/global/DBSReader/files?dataset=/EWK_LLJJ_MLL-50_MJJ-120_TuneCP5_13TeV-madgraph-pythia8_dipole/RunIISummer20UL18NanoAODv9-106X_upgrade2018_realistic_v16_L1v1-v1/NANOAODSIM&detail=True | |
# # file is actually a list of all replicas | |
# result = { | |
# "path": files + [], | |
# "nevents": 200_000_000, | |
# } | |
# # max_events = 20_000_000 | |
# for file in files: | |
# # skip = False | |
# # for bad_site in kwargs["bad_sites"]: | |
# # if bad_site in _file: | |
# # skip = True | |
# # break | |
# # if skip: | |
# # continue | |
# try: | |
# f = uproot.open(file, handler=uproot.source.xrootd.XRootDSource) | |
# num_entries = f["Events"].num_entries | |
# f.close() | |
# nevents = min(num_entries, result["nevents"]) | |
# if nevents < result["nevents"]: | |
# # the file is good, just move it on the top of the list | |
# result["path"].pop(files.index(file)) | |
# result["path"].insert(0, file) | |
# result["nevents"] = nevents | |
# return result | |
# except Exception: | |
# continue | |
# # final_file = file[0] | |
# # for bad_site in kwargs["bad_sites"]: | |
# # if bad_site in _file: | |
# # print("had to use a bad site!", file=sys.stderr) | |
# # break | |
# result["bad_file"] = True | |
# return result | |
# def get_cluster(local=True): | |
# if not local: | |
# import socket | |
# from dask_jobqueue import HTCondorCluster | |
# machines = [ | |
# # "empire.hcms.it", | |
# # "pccms01.hcms.it", | |
# "pccms02.hcms.it", | |
# "pccms04.hcms.it", | |
# # "pccms08.hcms.it", | |
# "pccms11.hcms.it", | |
# "pccms12.hcms.it", | |
# # "pccms13.hcms.it", | |
# # "pccms14.hcms.it", | |
# ] | |
# cluster = HTCondorCluster( | |
# cores=1, | |
# memory="2 GB", # hardcoded | |
# disk="100 MB", | |
# death_timeout="60", | |
# nanny=False, | |
# scheduler_options={ | |
# # 'port': n_port, | |
# "dashboard_address": 8887, | |
# "host": socket.gethostname(), | |
# }, | |
# job_extra_directives={ | |
# "log": "dask_out/dask_job_output.log", | |
# "output": "dask_out/dask_job_output.out", | |
# "error": "dask_out/dask_job_output.err", | |
# "should_transfer_files": "Yes", | |
# "when_to_transfer_output": "ON_EXIT", | |
# "Requirements": " || ".join( | |
# [f'(machine == "{machine}")' for machine in machines] | |
# ), | |
# }, | |
# job_script_prologue=[ | |
# "export PATH=/gwpool/users/gpizzati/mambaforge/bin:$PATH", | |
# "mamba activate test_uproot", | |
# "export X509_USER_PROXY=/gwpool/users/gpizzati/.proxy", | |
# "export XRD_RUNFORKHANDLER=1", | |
# ], | |
# ) | |
# cluster.scale(150) | |
# return cluster | |
# else: | |
# from distributed import LocalCluster | |
# cluster = LocalCluster( | |
# n_workers=10, | |
# threads_per_worker=1, | |
# memory_limit="2GB", # hardcoded | |
# dashboard_address=":8887", | |
# ) | |
# return cluster | |
if __name__ == "__main__": | |
kwargs = { | |
"bad_sites": [ | |
# "ts.infn", | |
], | |
} | |
# cluster = get_cluster(local=True) | |
# client = cluster.get_client() | |
# client.wait_for_workers(10) | |
files = get_files() | |
print(files) | |
rucio_client = rucio_utils.get_rucio_client() | |
# DE|FR|IT|BE|CH|ES|UK | |
good_sites = ["IT", "FR", "BE", "CH", "UK", "ES", "DE", "US"] | |
for dname in files: | |
dataset = files[dname]["query"] | |
print("Checking", dname, "files with query", dataset) | |
try: | |
( | |
outfiles, | |
outsites, | |
sites_counts, | |
) = rucio_utils.get_dataset_files_replicas( | |
dataset, | |
allowlist_sites=[], | |
blocklist_sites=[ | |
# "T2_FR_IPHC", | |
# "T2_ES_IFCA", | |
# "T2_CH_CERN", | |
"T3_IT_Trieste", | |
], | |
# regex_sites=[], | |
regex_sites=r"T[123]_(" + "|".join(good_sites) + ")_\w+", | |
# regex_sites = r"T[123]_(DE|IT|BE|CH|ES|UK|US)_\w+", | |
mode="full", # full or first. "full"==all the available replicas | |
client=rucio_client, | |
) | |
except Exception as e: | |
print(f"\n[red bold] Exception: {e}[/]") | |
sys.exit(1) | |
# files[dname]["files"] = list(map(lambda k: k[0], outfiles)) | |
url = "https://cmsweb.cern.ch/dbs/prod/global/DBSReader" | |
api = DbsApi(url=url) | |
filelist = api.listFiles(dataset=dataset, detail=1) | |
for replicas, site in zip(outfiles, outsites): | |
prefix = "/store/data" | |
if prefix not in replicas[0]: | |
prefix = "/store/mc" | |
logical_name = prefix + replicas[0].split(prefix)[-1] | |
right_file = list( | |
filter(lambda k: k["logical_file_name"] == logical_name, filelist) | |
) | |
if len(right_file) == 0: | |
raise Exception("File present in rucio but not dbs!", logical_name) | |
if len(right_file) > 1: | |
raise Exception( | |
"More files have the same logical_file_name, not support" | |
) | |
nevents = right_file[0]["event_count"] | |
print(replicas, nevents) | |
files[dname]["files"].append({"path": replicas, "nevents": nevents}) | |
# actually file and site are lists here | |
# file = sorted( | |
# [(_file, _site.split("_")[1]) for _file, _site in zip(file, site)], | |
# key=lambda i: good_sites.index(i[1]), | |
# ) | |
# # print(list(map(lambda k: k[1], file))) | |
# file = list(map(lambda k: k[0], file)) | |
# # file = random.shuffle() | |
# random.shuffle(file) | |
# files[dname]["files"] = [ | |
# get_filename_nevents(file, **kwargs) for file in outfiles | |
# ] | |
# # break | |
# files = dask.compute(files) | |
# files = files[0] | |
with open("data/files_all3.json", "w") as file: | |
json.dump(files, file, indent=2) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
def read_events(filename, start=0, stop=100, read_form={}): | |
print("start reading") | |
# f = uproot.open(filename, handler=uproot.source.xrootd.XRootDSource, num_workers=2) | |
f = uproot.open(filename, num_workers=1) | |
tree = f["Events"] | |
start = min(start, tree.num_entries) | |
stop = min(stop, tree.num_entries) | |
if start >= stop: | |
return ak.Array([]) | |
branches = [k.name for k in tree.branches] | |
events = {} | |
form = deepcopy(read_form) | |
all_branches = [] | |
for coll in form: | |
coll_branches = form[coll]["branches"] | |
if len(coll_branches) == 0: | |
if coll in branches: | |
all_branches.append(coll) | |
else: | |
for branch in coll_branches: | |
branch_name = coll + "_" + branch | |
if branch_name in branches: | |
all_branches.append(branch_name) | |
events_bad_form = tree.arrays( | |
all_branches, | |
entry_start=start, | |
entry_stop=stop, | |
decompression_executor=uproot.source.futures.TrivialExecutor(), | |
interpretation_executor=uproot.source.futures.TrivialExecutor(), | |
) | |
f.close() | |
for coll in form: | |
d = {} | |
coll_branches = form[coll].pop("branches") | |
if len(coll_branches) == 0: | |
if coll in branches: | |
events[coll] = events_bad_form[coll] | |
continue | |
for branch in coll_branches: | |
branch_name = coll + "_" + branch | |
if branch_name in branches: | |
d[branch] = events_bad_form[branch_name] | |
if len(d.keys()) == 0: | |
print("did not find anything for", coll, file=sys.stderr) | |
continue | |
events[coll] = ak.zip(d, **form[coll]) | |
del d | |
print("created events") | |
_events = ak.zip(events, depth_limit=1) | |
del events | |
gc.collect() | |
return _events | |
def big_process(process, filenames, start, stop, read_form, **kwargs): | |
t_start = time.time() | |
events = 0 | |
error = "" | |
for filename in filenames: | |
try: | |
events = read_events(filename, start=start, stop=stop, read_form=read_form) | |
break | |
except Exception as e: | |
error += "".join(tb.format_exception(None, e, e.__traceback__)) | |
# time.sleep(1) | |
continue | |
if isinstance(events, int): | |
print(error, file=sys.stderr) | |
raise Exception( | |
"Error, could not read any of the filenames\n" + error, filenames | |
) | |
t_reading = time.time() - t_start | |
if len(events) == 0: | |
return {} | |
results = {"real_results": 0, "performance": {}} | |
results["real_results"] = process(events, **kwargs) | |
t_total = time.time() - t_start | |
results["performance"][f"{filename}_{start}"] = { | |
"total": t_total, | |
"read": t_reading, | |
} | |
del events | |
gc.collect() | |
return results |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment