Created
May 25, 2020 19:59
-
-
Save amaltaro/2a68ef3675c6024cef757f42e4fcf29e to your computer and use it in GitHub Desktop.
multi-threading script to fix the StepChain parentage
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
#!/usr/bin/env python | |
""" | |
This script is supposed to perform the same action as this ReqMgr2 CherryPy thread: | |
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/ReqMgr/CherryPyThreads/StepChainParentageFixTask.py | |
so, given a child dataset name, it finds out all the run/lumi pairs, map them against | |
its parent dataset, and insert that information into DBS | |
""" | |
from __future__ import print_function, division | |
import logging | |
import sys | |
import threading | |
import time | |
from pprint import pformat | |
from Utils.IteratorTools import grouper | |
from dbs.apis.dbsClient import DbsApi | |
class ResolveBlockThread(threading.Thread): | |
def __init__(self, **kwargs): | |
""" | |
Thread attributes are: | |
* childBlock=blockName, | |
* parentData=parentFrozenData, | |
* dbsClient=dbsApi, | |
* insertFlag=insertFlag | |
""" | |
super(ResolveBlockThread, self).__init__() | |
for k, v in kwargs.items(): | |
setattr(self, k, v) | |
self.success = False | |
self.executionTime = 0 | |
def run(self): | |
startTime = int(time.time()) | |
try: | |
# in the format of: {'fileid': [[run_num1, lumi1], [run_num1, lumi2], etc] | |
# e.g. {'554307997': [[1, 557179], [1, 557178], [1, 557181], | |
childBlockData = self.dbsClient.listBlockTrio(block_name=self.childBlock) | |
# runs the actual mapping logic, like {"child_id": ["parent_id", "parent_id2", ...], etc | |
mapChildParent = {} | |
for item in childBlockData: | |
self.logger.debug("child item: %s", pformat(item)) | |
for fileId in item: | |
for runLumiPair in item[fileId]: | |
frozenKey = frozenset(runLumiPair) | |
parentId = self.parentData.get(frozenKey) | |
if parentId is None: | |
msg = "Child file id: %s, with run/lumi: %s, has no match in the parent dataset" | |
self.logger.warning(msg, fileId, frozenKey) | |
continue | |
mapChildParent.setdefault(fileId, []) | |
mapChildParent[fileId].append(parentId) | |
if self.insertFlag: | |
# convert dictionary to list of unique childID, parentID tuples | |
listChildParent = [] | |
for childID in mapChildParent: | |
for parentID in mapChildParent[childID]: | |
listChildParent.append([childID, parentID]) | |
self.logger.debug("Child - parent map is: %s", pformat(listChildParent)) | |
# self.dbsClient.insertFileParents({"block_name": self.childBlock, "child_parent_id_list": listChildParent}) | |
numFiles = len(mapChildParent) | |
self.logger.debug("%s file parentage added for block %s" % (numFiles, self.childBlock)) | |
except Exception as ex: | |
self.logger.exception("Parentage updated failed for block %s", self.childBlock) | |
else: | |
# this is how the main thread see that it was successful | |
self.success = True | |
self.executionTime = int(time.time()) - startTime | |
def getParentDatasetTrio(childDataset, dbsObject): | |
""" | |
Provided a dataset name, return all the parent dataset information, such as: | |
- file ids, run number and lumi section | |
""" | |
# this will return data in the format of: | |
# {'554307997': [[1, 557179], [1, 557178],... | |
# such that: key is file id, in each list is [run_number, lumi_section_numer]. | |
parentFullInfo = dbsObject.listParentDSTrio(dataset=childDataset) | |
# runs the actual mapping logic, like {"child_id": ["parent_id", "parent_id2", ...], etc | |
parentFrozenData = {} | |
for item in parentFullInfo: | |
logging.debug("parentFullInfo item: %s", pformat(item)) | |
for fileId in item: | |
for runLumiPair in item[fileId]: | |
frozenKey = frozenset(runLumiPair) | |
parentFrozenData[frozenKey] = fileId | |
return parentFrozenData | |
def listBlocksWithNoParents(childDataset, dbsObject): | |
""" | |
:param childDataset: child dataset for | |
:return: set of child blocks with no parentBlock | |
""" | |
allBlocks = dbsObject.listBlocks(dataset=childDataset) | |
blockNames = [] | |
for block in allBlocks: | |
blockNames.append(block['block_name']) | |
logging.info("List of children blocks: %s", blockNames) | |
parentBlocks = dbsObject.listBlockParents(block_name=blockNames) | |
cblock = set() | |
for pblock in parentBlocks: | |
cblock.add(pblock['this_block_name']) | |
noParentBlocks = set(blockNames) - cblock | |
return noParentBlocks | |
def main(): | |
url = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSWriter" | |
# url = "https://cmsweb.cern.ch/dbs/prod/global/DBSWriter" | |
childDataset = "/DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW" | |
# childDataset = "/QCD_Pt-15to7000_TuneCP5_Flat2018_13TeV_pythia8/RunIISummer19UL17NanoAOD-EpsilonPU_JMECustomTuples_106X_mc2017_realistic_v6-v2/NANOAODSIM" | |
insertFlag = True | |
numThreads = 5 # threads to execute in parallel | |
logging.basicConfig(stream=sys.stdout, level=logging.INFO) | |
logger = logging.getLogger() | |
dbsClient = DbsApi(url) | |
### does this dataset has any parent dataset? | |
parentList = dbsClient.listDatasetParents(dataset=childDataset) | |
logger.debug("Child: %s has parents: %s", childDataset, parentList) | |
if not parentList: | |
logger.warning("No parent dataset found for child dataset %s", childDataset) | |
return | |
### fetch the complete parent run/lumi information | |
parentFrozenData = getParentDatasetTrio(childDataset, dbsClient) | |
logger.debug("parentFullInfo: %s", pformat(parentFrozenData)) | |
blocks = listBlocksWithNoParents(childDataset, dbsClient) | |
logger.debug("Blocks with no parent information: %s", pformat(blocks)) | |
failedBlocks = [] | |
threadPool = [] | |
for blockName in blocks: | |
threadPool.append(ResolveBlockThread(childBlock=blockName, | |
parentData=parentFrozenData, | |
dbsClient=dbsClient, | |
insertFlag=insertFlag, | |
logger=logger)) | |
logger.info("A total of %d threads have been created. Executing up to %d in parallel.", len(threadPool), numThreads) | |
for threadSubSet in grouper(threadPool, numThreads): | |
for thread in threadSubSet: | |
thread.start() | |
# Wait for all threads to complete | |
for thread in threadSubSet: | |
thread.join(300.) # 300 seconds timeout | |
if thread.isAlive(): | |
logger.warning("Thread has timed-out for block: %s", thread.childBlock) | |
if thread.success is False: | |
failedBlocks.append(thread.childBlock) | |
logger.info("Block %s resolved in: %s secs", thread.childBlock, thread.executionTime) | |
if __name__ == '__main__': | |
sys.exit(main()) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Note that this script depends on
dbs3-client>=3.13.1
, released around May 25.In addition to that, we can see that the dbs3-client is not thread safe and we cannot share the same object instance among multiple threads, see my terminal output: