from FWCore/Framework/bin/cmsRun.cpp#L140
EventProcessorWithSentry proc;
from FWCore/Framework/bin/cmsRun.cpp#L351
EventProcessorWithSentry procTmp(
std::make_unique<edm::EventProcessor>(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides));
proc = std::move(procTmp);
from FWCore/Framework/bin/cmsRun.cpp#L357
proc->beginJob();
proc.on();
auto status = proc->runToCompletion();
if (status == edm::EventProcessor::epSignal) {
returnCode = edm::errors::CaughtSignal;
}
proc.off();
//...
proc->endJob();
which is essentially equivalent to
edm::EventProcessor proc(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides);
proc.beginJob();
proc.runToCompletion();
proc.endJob();
from FWCore/Framework/src/EventProcessor.cc#L744
...
FilesProcessor fp(fileModeNoMerge_);
...
auto trans = fp.processFiles(*this);
fp.normalEnd();
fp.processFiles(*this)
eventually calls back into this->readAndProcessEvents()
from FWCore/Framework/src/EventProcessor.cc#L1402
InputSource::ItemType EventProcessor::readAndProcessEvents() {
...
//To wait, the ref count has to b 1+#streams
auto eventLoopWaitTask = make_empty_waiting_task();
auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
eventLoopWaitTask->increment_ref_count();
const unsigned int kNumStreams = preallocations_.numberOfStreams();
unsigned int iStreamIndex = 0;
for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
eventLoopWaitTask->increment_ref_count();
tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
}) );
}
eventLoopWaitTask->increment_ref_count();
eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
}));
which, for N streams, calls
eventLoopWaitTask = make_empty_waiting_task()
i.e. creates atbb::task
that does nothing but waiting for all its predecessors (?)eventLoopWaitTask->increment_ref_count()
(N+1) timeshandleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr)
N times of which (N-1) viatbb::task::enqueue(...)
and the last one viaeventLoopWaitTask->spawn_and_wait_for_all(...)
from https://www.threadingbuildingblocks.org/docs/help/reference/task_scheduler/synchronization_task.html
static void spawn(task& t)
: Puts task t into the ready pool and immediately returns.
void wait_for_all()
: Requirements: refcount=n+1, where n is the number of predecessors that are still running. Effects: Executes tasks in ready pool until refcount is 1. Afterwards, leaves refcount=1 if the task'stask_group_context
specifiesconcurrent_wait
, otherwise sets refcount to 0.
void spawn_and_wait_for_all(task& t)
: Requirements: Any other predecessors ofthis
must already be spawned. Thetask
t must have a non-null attribute successor. There must be a chain of successor links from t to the callingtask
. Typically, this chain contains a single link. That is, t is typically an immediate predecessor of this. Effects: Similar to{spawn(t); wait_for_all();}
, but often more efficient. Furthermore, it guarantees that task is executed by the current thread.
static void enqueue(task& t)
: The task is scheduled for eventual execution by a worker thread even if no thread ever explicitly waits for the task to complete. If the total number of worker threads is zero, a special additional worker thread is created to execute enqueued tasks.Enqueued tasks are processed in roughly, but not precisely, first-come first-serve order.
Question: why enqueue()
and not spawn()
?
From Scheduling Algorithm:
When a thread spawns a task, it pushes it onto the end of its own deque. Hence rule (3) above gets the task most recently spawned by the thread, whereas rule (6) gets the least recently spawned task of another thread.
When a thread enqueues a task, it pushes it onto the end of the shared queue. Hence rule (5) gets one of the less recently enqueued tasks, and has no preference for tasks that are enqueued. This is in contrast to spawned tasks, where by rule (3) a thread prefers its own most recently spawned task.
See also How Task Scheduling Works.
make_waiting_task(...)
creates a tbb::task
(actually, it creates an edm::FunctorWaitingTask
which is an edm::WaitingTask
which is a tbb::task
):
from FWCore/Concurrency/interface/WaitingTask.h#L89
template< typename ALLOC, typename F>
FunctorWaitingTask<F>* make_waiting_task( ALLOC&& iAlloc, F f) {
return new (iAlloc) FunctorWaitingTask<F>(f);
}
from FWCore/Concurrency/interface/WaitingTask.h#L35
class WaitingTask : public tbb::task {
...
};
from FWCore/Concurrency/interface/WaitingTask.h#L75
template<typename F>
class FunctorWaitingTask : public WaitingTask {
public:
explicit FunctorWaitingTask( F f): func_(f) {}
task* execute() override {
func_(exceptionPtr());
return nullptr;
};
private:
F func_;
};
make_empty_waiting_task()
creates a tbb::task
(actually, it creates an edm::EmptyWaitingTask
which is an edm::WaitingTask
which is a tbb::task
) that does nothing, but wait for its predecessors:
from FWCore/Concurrency/interface/WaitingTaskList.h#L97
///Create an EmptyWaitingTask which will properly be destroyed
inline std::unique_ptr<edm::EmptyWaitingTask, waitingtask::TaskDestroyer> make_empty_waiting_task() {
return std::unique_ptr<edm::EmptyWaitingTask, waitingtask::TaskDestroyer>( new (tbb::task::allocate_root()) edm::EmptyWaitingTask{});
}
from FWCore/Concurrency/interface/WaitingTaskList.h#L83
class EmptyWaitingTask : public WaitingTask {
public:
EmptyWaitingTask() = default;
tbb::task* execute() override { return nullptr;}
};
from FWCore/Framework/src/EventProcessor.cc#L1362
void EventProcessor::handleNextEventForStreamAsync(WaitingTask* iTask,
unsigned int iStreamIndex,
std::atomic<bool>* finishedProcessingEvents)
{
auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
...
handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
});
sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
ServiceRegistry::Operate operate(serviceToken_);
...
if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
} else {
//the stream will stop now
tbb::task::destroy(*recursionTask);
iTask->decrement_ref_count();
}
...
});
}
If I'm not mistaken, what this does is (apert from the exception handling)
- read the next event via
readNextEventForStream(...)
- process the event via
processEventAsync(...)
- either terminate the stream, or spawn a successor task to process the following event
From FWCore/Framework/src/EventProcessor.cc#L1311
readNextEventForStream(...)
eventually calls readEvent(...)
From FWCore/Framework/src/EventProcessor.cc#L1438
a simplified version of readEvent(...)
does
void EventProcessor::readEvent(unsigned int iStreamIndex) {
auto& event = principalCache_.eventPrincipal(iStreamIndex);
StreamContext streamContext(event.streamID(), &processContext_);
input_->readEvent(event, streamContext);
}
where input_
is a std::unique_ptr<InputSource>
built by makeInput(...)
(see FWCore/Framework/src/EventProcessor.cc#L113).
The call to readEvent(...)
trickels down through (for a PoolSource
):
- from FWCore/Framework/src/InputSource.cc#L306
- from IOPool/Input/src/PoolSource.cc#L229
- from IOPool/Input/src/RootInputFileSequence.cc#L95
- from IOPool/Input/src/RootFile.cc#L1420
which eventually calls EventPrincipal::fillEventPrincipal(...)
:
principal.fillEventPrincipal(eventAux(),
*processHistoryRegistry_,
std::move(eventSelectionIDs_),
std::move(branchListIndexes_),
*(makeProductProvenanceRetriever(principal.streamID().value())),
eventTree_.resetAndGetRootDelayedReader());
from FWCore/Framework/src/EventProcessor.cc#L1450
void EventProcessor::processEventAsync(WaitingTaskHolder iHolder,
unsigned int iStreamIndex) {
tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
processEventAsyncImpl(iHolder, iStreamIndex);
}) );
}
processEventAsync(...)
spawns a new task to process the event via processEventAsyncImpl(...)
from FWCore/Framework/src/EventProcessor.cc#L1457
void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
...
auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
WaitingTaskHolder finalizeEventTask( make_waiting_task(
tbb::task::allocate_root(),
[this,pep,iHolder](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken_);
...
pep->clearEventPrincipal();
if(iPtr) {
iHolder.doneWaiting(*iPtr);
} else {
iHolder.doneWaiting(std::exception_ptr());
}
}) );
WaitingTaskHolder afterProcessTask;
if(subProcesses_.empty()) {
afterProcessTask = std::move(finalizeEventTask);
} else {
...
}
schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex,*pep, esp_->eventSetup());
}
So, processEventAsyncImpl(...)
does
- run
schedule_->processOneEventAsync(...)
- if there are subprocesses, call
subProcess.doEventAsync(...)
- call
doneWaiting()
on theWaitingTaskHolder
passed as the first argument, i.e. therecursionTask
responsible for reading and processing the current event
from FWCore/Framework/src/Schedule.cc#L1034
Schedule::processOneEventAsync(...)
calls the current stream's processOneEventAsync(...)
from FWCore/Framework/src/StreamSchedule.cc#L536
void StreamSchedule::processOneEventAsync(WaitingTaskHolder iTask,
EventPrincipal& ep,
EventSetup const& es,
std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
this->resetAll();
...
// take care of the signals
...
// take care of the empty Paths and EndPaths
...
// This call takes care of the unscheduled processing.
workerManager_.setupOnDemandSystem(ep,es);
++total_events_;
auto serviceToken = ServiceRegistry::instance().presentToken();
auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
[iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);
std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
iTask.doneWaiting(finishProcessOneEvent(ptr));
});
//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder allPathsHolder(allPathsDone);
auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
[allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
{
ServiceRegistry::Operate operate(serviceToken);
std::exception_ptr ptr;
if(iPtr) {
ptr = *iPtr;
}
finishedPaths(ptr, std::move(allPathsHolder), ep, es);
});
//The holder guarantees that if the paths finish before the loop ends
// that we do not start too soon. It also guarantees that the task will
// run under that condition.
WaitingTaskHolder taskHolder(pathsDone);
//start end paths first so on single threaded the paths will run first
for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
it != itEnd; ++it) {
it->processOneOccurrenceAsync(allPathsDone,ep, es, streamID_, &streamContext_);
}
for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
it != itEnd; ++ it) {
it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
}
}
StreamSchedule::processOneEventAsync(...)
does
- call
resetAll()
to reset theskippingEvent_
flag and all the trigger results; - emit pre-event (?) signals
- mark empty
Paths
andEndPaths
as done - take care of the unscheduled processing calling
workerManager_.setupOnDemandSystem(ep,es)
- for all
Paths
andEndPaths
, schedule the call toprocessOneOccurrenceAsync(...)
- schedule the call to
finishProcessOneEvent(...)
to handle any exceptions and emit the post-event (?) signals.
If I understand correctly, this sets up the ProductResolver
s with the concrete Worker
s that are resposible to produce them.
The mapping between the products (actually, product branches) and the resolvers is set up in the constructor of the Producer
(see FWCore/Framework/src/Principal.cc#L142_L179).
From FWCore/Framework/src/WorkerManager.cc#L148:
void
WorkerManager::setupOnDemandSystem(Principal& ep, EventSetup const& es) {
this->resetAll();
unscheduled_.setEventSetup(es);
if(&ep != lastSetupEventPrincipal_) {
UnscheduledConfigurator config( allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
ep.setupUnscheduled(config);
lastSetupEventPrincipal_ = &ep;
}
}
From FWCore/Framework/src/WorkerManager.cc#L138:
resetAll()
calls reset()
on all the workers.
From FWCore/Framework/src/UnscheduledCallProducer.h:
unscheduled_
is an UnscheduledCallProducer
, holding a list of all the unscheduled Worker
s, and an UnscheduledAuxiliary
.
From FWCore/Framework/src/UnscheduledAuxiliary.h:
the UnscheduledAuxiliary
holds a pointer to the EventSetup, and (unused) slots for the pre/postModuleDelayedGetSignal
s.
From FWCore/Framework/src/UnscheduledConfigurator.h:
the UnscheduledConfigurator
holds a mapping of module labels to the corresponding Worker *
, and an UnscheduledAuxiliary
.
From FWCore/Framework/src/Principal.cc#L337:
Principal::setupUnscheduled(config)
calls resolver.setupUnscheduled(config)
on all the ProductResolver
s.
See FWCore/Framework/src/ProductResolvers.cc for the concrete ProductResolver
s. Most inherit form DataManagingProductResolver
.
from FWCore/Framework/src/Path.cc#L209: