Skip to content

Instantly share code, notes, and snippets.

@amaltaro
Created May 25, 2020 19:59
Show Gist options
  • Save amaltaro/2a68ef3675c6024cef757f42e4fcf29e to your computer and use it in GitHub Desktop.
Save amaltaro/2a68ef3675c6024cef757f42e4fcf29e to your computer and use it in GitHub Desktop.
multi-threading script to fix the StepChain parentage
#!/usr/bin/env python
"""
This script is supposed to perform the same action as this ReqMgr2 CherryPy thread:
https://github.com/dmwm/WMCore/blob/master/src/python/WMCore/ReqMgr/CherryPyThreads/StepChainParentageFixTask.py
so, given a child dataset name, it finds out all the run/lumi pairs, map them against
its parent dataset, and insert that information into DBS
"""
from __future__ import print_function, division
import logging
import sys
import threading
import time
from pprint import pformat
from Utils.IteratorTools import grouper
from dbs.apis.dbsClient import DbsApi
class ResolveBlockThread(threading.Thread):
def __init__(self, **kwargs):
"""
Thread attributes are:
* childBlock=blockName,
* parentData=parentFrozenData,
* dbsClient=dbsApi,
* insertFlag=insertFlag
"""
super(ResolveBlockThread, self).__init__()
for k, v in kwargs.items():
setattr(self, k, v)
self.success = False
self.executionTime = 0
def run(self):
startTime = int(time.time())
try:
# in the format of: {'fileid': [[run_num1, lumi1], [run_num1, lumi2], etc]
# e.g. {'554307997': [[1, 557179], [1, 557178], [1, 557181],
childBlockData = self.dbsClient.listBlockTrio(block_name=self.childBlock)
# runs the actual mapping logic, like {"child_id": ["parent_id", "parent_id2", ...], etc
mapChildParent = {}
for item in childBlockData:
self.logger.debug("child item: %s", pformat(item))
for fileId in item:
for runLumiPair in item[fileId]:
frozenKey = frozenset(runLumiPair)
parentId = self.parentData.get(frozenKey)
if parentId is None:
msg = "Child file id: %s, with run/lumi: %s, has no match in the parent dataset"
self.logger.warning(msg, fileId, frozenKey)
continue
mapChildParent.setdefault(fileId, [])
mapChildParent[fileId].append(parentId)
if self.insertFlag:
# convert dictionary to list of unique childID, parentID tuples
listChildParent = []
for childID in mapChildParent:
for parentID in mapChildParent[childID]:
listChildParent.append([childID, parentID])
self.logger.debug("Child - parent map is: %s", pformat(listChildParent))
# self.dbsClient.insertFileParents({"block_name": self.childBlock, "child_parent_id_list": listChildParent})
numFiles = len(mapChildParent)
self.logger.debug("%s file parentage added for block %s" % (numFiles, self.childBlock))
except Exception as ex:
self.logger.exception("Parentage updated failed for block %s", self.childBlock)
else:
# this is how the main thread see that it was successful
self.success = True
self.executionTime = int(time.time()) - startTime
def getParentDatasetTrio(childDataset, dbsObject):
"""
Provided a dataset name, return all the parent dataset information, such as:
- file ids, run number and lumi section
"""
# this will return data in the format of:
# {'554307997': [[1, 557179], [1, 557178],...
# such that: key is file id, in each list is [run_number, lumi_section_numer].
parentFullInfo = dbsObject.listParentDSTrio(dataset=childDataset)
# runs the actual mapping logic, like {"child_id": ["parent_id", "parent_id2", ...], etc
parentFrozenData = {}
for item in parentFullInfo:
logging.debug("parentFullInfo item: %s", pformat(item))
for fileId in item:
for runLumiPair in item[fileId]:
frozenKey = frozenset(runLumiPair)
parentFrozenData[frozenKey] = fileId
return parentFrozenData
def listBlocksWithNoParents(childDataset, dbsObject):
"""
:param childDataset: child dataset for
:return: set of child blocks with no parentBlock
"""
allBlocks = dbsObject.listBlocks(dataset=childDataset)
blockNames = []
for block in allBlocks:
blockNames.append(block['block_name'])
logging.info("List of children blocks: %s", blockNames)
parentBlocks = dbsObject.listBlockParents(block_name=blockNames)
cblock = set()
for pblock in parentBlocks:
cblock.add(pblock['this_block_name'])
noParentBlocks = set(blockNames) - cblock
return noParentBlocks
def main():
url = "https://cmsweb-testbed.cern.ch/dbs/int/global/DBSWriter"
# url = "https://cmsweb.cern.ch/dbs/prod/global/DBSWriter"
childDataset = "/DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW"
# childDataset = "/QCD_Pt-15to7000_TuneCP5_Flat2018_13TeV_pythia8/RunIISummer19UL17NanoAOD-EpsilonPU_JMECustomTuples_106X_mc2017_realistic_v6-v2/NANOAODSIM"
insertFlag = True
numThreads = 5 # threads to execute in parallel
logging.basicConfig(stream=sys.stdout, level=logging.INFO)
logger = logging.getLogger()
dbsClient = DbsApi(url)
### does this dataset has any parent dataset?
parentList = dbsClient.listDatasetParents(dataset=childDataset)
logger.debug("Child: %s has parents: %s", childDataset, parentList)
if not parentList:
logger.warning("No parent dataset found for child dataset %s", childDataset)
return
### fetch the complete parent run/lumi information
parentFrozenData = getParentDatasetTrio(childDataset, dbsClient)
logger.debug("parentFullInfo: %s", pformat(parentFrozenData))
blocks = listBlocksWithNoParents(childDataset, dbsClient)
logger.debug("Blocks with no parent information: %s", pformat(blocks))
failedBlocks = []
threadPool = []
for blockName in blocks:
threadPool.append(ResolveBlockThread(childBlock=blockName,
parentData=parentFrozenData,
dbsClient=dbsClient,
insertFlag=insertFlag,
logger=logger))
logger.info("A total of %d threads have been created. Executing up to %d in parallel.", len(threadPool), numThreads)
for threadSubSet in grouper(threadPool, numThreads):
for thread in threadSubSet:
thread.start()
# Wait for all threads to complete
for thread in threadSubSet:
thread.join(300.) # 300 seconds timeout
if thread.isAlive():
logger.warning("Thread has timed-out for block: %s", thread.childBlock)
if thread.success is False:
failedBlocks.append(thread.childBlock)
logger.info("Block %s resolved in: %s secs", thread.childBlock, thread.executionTime)
if __name__ == '__main__':
sys.exit(main())
@amaltaro
Copy link
Author

amaltaro commented May 25, 2020

Note that this script depends on dbs3-client>=3.13.1, released around May 25.
In addition to that, we can see that the dbs3-client is not thread safe and we cannot share the same object instance among multiple threads, see my terminal output:

cmst1@vocms0192:/data/srv/wmagent/current $ python fixDatasetParentage.py 
INFO:root:List of children blocks: ['/DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#613b4933-5b3f-45b0-a0b7-879f2b701b21', '/DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#57bb9d18-4d69-4946-bf8f-3a4fd2d1520b', '/DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#e419e460-3d92-45df-9e34-032fd9c39398']
INFO:root:A total of 3 threads have been created. Executing up to 5 in parallel.
ERROR:root:Parentage updated failed for block /DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#e419e460-3d92-45df-9e34-032fd9c39398
Traceback (most recent call last):
  File "fixDatasetParentage.py", line 36, in run
    childBlockData = self.dbsClient.listBlockTrio(block_name=self.childBlock)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-client/3.13.1/lib/python2.7/site-packages/dbs/apis/dbsClient.py", line 503, in listBlockTrio
    return self.__callServer("blockTrio", params=kwargs, callmethod='GET')
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-client/3.13.1/lib/python2.7/site-packages/dbs/apis/dbsClient.py", line 201, in __callServer
    self.http_response = method_func(self.url, method, params, data, request_headers)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-pycurl-client/3.13.1/lib/python2.7/site-packages/RestClient/RestApi.py", line 34, in get
    return http_request(self._curl)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-pycurl-client/3.13.1/lib/python2.7/site-packages/RestClient/RequestHandling/HTTPRequest.py", line 51, in __call__
    curl_object.setopt(key, value)
error: cannot invoke setopt() - perform() is currently running
ERROR:root:Parentage updated failed for block /DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#57bb9d18-4d69-4946-bf8f-3a4fd2d1520b
Traceback (most recent call last):
  File "fixDatasetParentage.py", line 36, in run
    childBlockData = self.dbsClient.listBlockTrio(block_name=self.childBlock)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-client/3.13.1/lib/python2.7/site-packages/dbs/apis/dbsClient.py", line 503, in listBlockTrio
    return self.__callServer("blockTrio", params=kwargs, callmethod='GET')
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-client/3.13.1/lib/python2.7/site-packages/dbs/apis/dbsClient.py", line 201, in __callServer
    self.http_response = method_func(self.url, method, params, data, request_headers)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-pycurl-client/3.13.1/lib/python2.7/site-packages/RestClient/RestApi.py", line 34, in get
    return http_request(self._curl)
  File "/data/srv/wmagent/v1.3.3.patch3-comp3/sw.amaltaro/slc7_amd64_gcc630/cms/dbs3-pycurl-client/3.13.1/lib/python2.7/site-packages/RestClient/RequestHandling/HTTPRequest.py", line 51, in __call__
    curl_object.setopt(key, value)
error: cannot invoke setopt() - perform() is currently running
INFO:root:Block /DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#613b4933-5b3f-45b0-a0b7-879f2b701b21 resolved in: 0 secs
INFO:root:Block /DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#e419e460-3d92-45df-9e34-032fd9c39398 resolved in: 0 secs
INFO:root:Block /DYJetsToLL_Pt-50To100_TuneCUETP8M1_13TeV-amcatnloFXFX-pythia8/Integ_TestStep2-DIGI_StepChain_Tasks_Agent133_Val_Alanv11-v20/GEN-SIM-RAW#57bb9d18-4d69-4946-bf8f-3a4fd2d1520b resolved in: 0 secs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment