Skip to content

Instantly share code, notes, and snippets.

@fwyzard
Last active November 30, 2017 15:41
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save fwyzard/d3b5a6e04faf65af247689c801c93c29 to your computer and use it in GitHub Desktop.
Save fwyzard/d3b5a6e04faf65af247689c801c93c29 to your computer and use it in GitHub Desktop.
Simplified overview of the processing flow for the CMSSW EDM framework

from cmsRun to processing individual events

from FWCore/Framework/bin/cmsRun.cpp#L140

EventProcessorWithSentry proc;

from FWCore/Framework/bin/cmsRun.cpp#L351

EventProcessorWithSentry procTmp(
  std::make_unique<edm::EventProcessor>(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides));
proc = std::move(procTmp);

from FWCore/Framework/bin/cmsRun.cpp#L357

proc->beginJob();
 
proc.on();
auto status = proc->runToCompletion();
if (status == edm::EventProcessor::epSignal) {
  returnCode = edm::errors::CaughtSignal;
}
proc.off();

//...
proc->endJob();

which is essentially equivalent to

edm::EventProcessor proc(processDesc, jobReportToken, edm::serviceregistry::kTokenOverrides);
proc.beginJob();
proc.runToCompletion();
proc.endJob();

from FWCore/Framework/src/EventProcessor.cc#L744

...
FilesProcessor fp(fileModeNoMerge_);
...
auto trans = fp.processFiles(*this);
fp.normalEnd();

fp.processFiles(*this) eventually calls back into this->readAndProcessEvents()

spawning tasks

from FWCore/Framework/src/EventProcessor.cc#L1402

  InputSource::ItemType EventProcessor::readAndProcessEvents() {
    ...
    //To wait, the ref count has to b 1+#streams
    auto eventLoopWaitTask = make_empty_waiting_task();
    auto eventLoopWaitTaskPtr = eventLoopWaitTask.get();
    eventLoopWaitTask->increment_ref_count();

    const unsigned int kNumStreams = preallocations_.numberOfStreams();
    unsigned int iStreamIndex = 0;
    for(; iStreamIndex<kNumStreams-1; ++iStreamIndex) {
      eventLoopWaitTask->increment_ref_count();
      tbb::task::enqueue( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
        handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
      }) );
    }
    eventLoopWaitTask->increment_ref_count();
    eventLoopWaitTask->spawn_and_wait_for_all( *make_waiting_task(tbb::task::allocate_root(),[this,iStreamIndex,finishedProcessingEventsPtr,eventLoopWaitTaskPtr](std::exception_ptr const*){
      handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr);
    }));

which, for N streams, calls

  • eventLoopWaitTask = make_empty_waiting_task() i.e. creates a tbb::task that does nothing but waiting for all its predecessors (?)
  • eventLoopWaitTask->increment_ref_count() (N+1) times
  • handleNextEventForStreamAsync(eventLoopWaitTaskPtr,iStreamIndex,finishedProcessingEventsPtr) N times of which (N-1) via tbb::task::enqueue(...) and the last one via eventLoopWaitTask->spawn_and_wait_for_all(...)

from https://www.threadingbuildingblocks.org/docs/help/reference/task_scheduler/synchronization_task.html

static void spawn(task& t): Puts task t into the ready pool and immediately returns.

void wait_for_all(): Requirements: refcount=n+1, where n is the number of predecessors that are still running. Effects: Executes tasks in ready pool until refcount is 1. Afterwards, leaves refcount=1 if the task's task_group_context specifies concurrent_wait, otherwise sets refcount to 0.

void spawn_and_wait_for_all(task& t): Requirements: Any other predecessors of this must already be spawned. The task t must have a non-null attribute successor. There must be a chain of successor links from t to the calling task. Typically, this chain contains a single link. That is, t is typically an immediate predecessor of this. Effects: Similar to {spawn(t); wait_for_all();}, but often more efficient. Furthermore, it guarantees that task is executed by the current thread.

static void enqueue(task& t): The task is scheduled for eventual execution by a worker thread even if no thread ever explicitly waits for the task to complete. If the total number of worker threads is zero, a special additional worker thread is created to execute enqueued tasks.

Enqueued tasks are processed in roughly, but not precisely, first-come first-serve order.

Question: why enqueue() and not spawn() ?

From Scheduling Algorithm:

When a thread spawns a task, it pushes it onto the end of its own deque. Hence rule (3) above gets the task most recently spawned by the thread, whereas rule (6) gets the least recently spawned task of another thread.

When a thread enqueues a task, it pushes it onto the end of the shared queue. Hence rule (5) gets one of the less recently enqueued tasks, and has no preference for tasks that are enqueued. This is in contrast to spawned tasks, where by rule (3) a thread prefers its own most recently spawned task.

See also How Task Scheduling Works.

tbb::tasks in CMSSW

FunctorWaitingTask

make_waiting_task(...) creates a tbb::task (actually, it creates an edm::FunctorWaitingTask which is an edm::WaitingTask which is a tbb::task):

from FWCore/Concurrency/interface/WaitingTask.h#L89

template< typename ALLOC, typename F>
FunctorWaitingTask<F>* make_waiting_task( ALLOC&& iAlloc, F f) {
  return new (iAlloc) FunctorWaitingTask<F>(f);
}

from FWCore/Concurrency/interface/WaitingTask.h#L35

class WaitingTask : public tbb::task {
  ...
};

from FWCore/Concurrency/interface/WaitingTask.h#L75

template<typename F>
class FunctorWaitingTask : public WaitingTask {
public:
  explicit FunctorWaitingTask( F f): func_(f) {}
  
  task* execute() override {
    func_(exceptionPtr());
    return nullptr;
  };

private:
  F func_;
};

EmptyWaitingTask

make_empty_waiting_task() creates a tbb::task (actually, it creates an edm::EmptyWaitingTask which is an edm::WaitingTask which is a tbb::task) that does nothing, but wait for its predecessors:

from FWCore/Concurrency/interface/WaitingTaskList.h#L97

   ///Create an EmptyWaitingTask which will properly be destroyed
   inline std::unique_ptr<edm::EmptyWaitingTask, waitingtask::TaskDestroyer> make_empty_waiting_task() {
      return std::unique_ptr<edm::EmptyWaitingTask, waitingtask::TaskDestroyer>( new (tbb::task::allocate_root()) edm::EmptyWaitingTask{});
   }

from FWCore/Concurrency/interface/WaitingTaskList.h#L83

class EmptyWaitingTask : public WaitingTask {
public:
  EmptyWaitingTask() = default;

   tbb::task* execute() override { return nullptr;}
};

handleNextEventForStreamAsync(...)

from FWCore/Framework/src/EventProcessor.cc#L1362

void EventProcessor::handleNextEventForStreamAsync(WaitingTask* iTask,
                                                   unsigned int iStreamIndex,
                                                   std::atomic<bool>* finishedProcessingEvents)
{
  auto recursionTask = make_waiting_task(tbb::task::allocate_root(), [this,iTask,iStreamIndex,finishedProcessingEvents](std::exception_ptr const* iPtr) {
    ...
    handleNextEventForStreamAsync(iTask, iStreamIndex,finishedProcessingEvents);
  });

  sourceResourcesAcquirer_.serialQueueChain().push([this,finishedProcessingEvents,recursionTask,iTask,iStreamIndex]() {
    ServiceRegistry::Operate operate(serviceToken_);
    ...
    if(readNextEventForStream(iStreamIndex, finishedProcessingEvents) ) {
      processEventAsync( WaitingTaskHolder(recursionTask), iStreamIndex);
    } else {
      //the stream will stop now
      tbb::task::destroy(*recursionTask);
      iTask->decrement_ref_count();
    }
    ...
  });
}

If I'm not mistaken, what this does is (apert from the exception handling)

  • read the next event via readNextEventForStream(...)
  • process the event via processEventAsync(...)
  • either terminate the stream, or spawn a successor task to process the following event

reading events into the EventPrincipal

From FWCore/Framework/src/EventProcessor.cc#L1311 readNextEventForStream(...) eventually calls readEvent(...)

From FWCore/Framework/src/EventProcessor.cc#L1438 a simplified version of readEvent(...) does

void EventProcessor::readEvent(unsigned int iStreamIndex) {
  auto& event = principalCache_.eventPrincipal(iStreamIndex);
  StreamContext streamContext(event.streamID(), &processContext_);
  input_->readEvent(event, streamContext);
}

where input_ is a std::unique_ptr<InputSource> built by makeInput(...) (see FWCore/Framework/src/EventProcessor.cc#L113).

The call to readEvent(...) trickels down through (for a PoolSource):

which eventually calls EventPrincipal::fillEventPrincipal(...):

principal.fillEventPrincipal(eventAux(),
                             *processHistoryRegistry_,
                             std::move(eventSelectionIDs_),
                             std::move(branchListIndexes_),
                             *(makeProductProvenanceRetriever(principal.streamID().value())),
                             eventTree_.resetAndGetRootDelayedReader());

processing the events

from FWCore/Framework/src/EventProcessor.cc#L1450

  void EventProcessor::processEventAsync(WaitingTaskHolder iHolder,
                                         unsigned int iStreamIndex) {
    tbb::task::spawn( *make_functor_task( tbb::task::allocate_root(), [=]() {
      processEventAsyncImpl(iHolder, iStreamIndex);
    }) );
  }

processEventAsync(...) spawns a new task to process the event via processEventAsyncImpl(...)

from FWCore/Framework/src/EventProcessor.cc#L1457

void EventProcessor::processEventAsyncImpl(WaitingTaskHolder iHolder, unsigned int iStreamIndex) {
  ...
  auto pep = &(principalCache_.eventPrincipal(iStreamIndex));
  pep->setLuminosityBlockPrincipal(principalCache_.lumiPrincipalPtr());
  
  WaitingTaskHolder finalizeEventTask( make_waiting_task(
                    tbb::task::allocate_root(),
                    [this,pep,iHolder](std::exception_ptr const* iPtr) mutable
             {
               ServiceRegistry::Operate operate(serviceToken_);
               ...
               pep->clearEventPrincipal();
               if(iPtr) {
                 iHolder.doneWaiting(*iPtr);
               } else {
                 iHolder.doneWaiting(std::exception_ptr());
               }
             }) );
  WaitingTaskHolder afterProcessTask;
  if(subProcesses_.empty()) {
    afterProcessTask = std::move(finalizeEventTask);
  } else {
    ...
  }
  
  schedule_->processOneEventAsync(std::move(afterProcessTask), iStreamIndex,*pep, esp_->eventSetup());
}

So, processEventAsyncImpl(...)does

  • run schedule_->processOneEventAsync(...)
  • if there are subprocesses, call subProcess.doEventAsync(...)
  • call doneWaiting() on the WaitingTaskHolder passed as the first argument, i.e. the recursionTask responsible for reading and processing the current event

the Schedule

from FWCore/Framework/src/Schedule.cc#L1034

Schedule::processOneEventAsync(...) calls the current stream's processOneEventAsync(...)

from FWCore/Framework/src/StreamSchedule.cc#L536

void StreamSchedule::processOneEventAsync(WaitingTaskHolder iTask,
                                          EventPrincipal& ep,
                                          EventSetup const& es,
                                          std::vector<edm::propagate_const<std::shared_ptr<PathStatusInserter>>>& pathStatusInserters) {
  this->resetAll();
  ...
  
  // take care of the signals
  ...
  
  // take care of the empty Paths and EndPaths
  ...
  
  // This call takes care of the unscheduled processing.
  workerManager_.setupOnDemandSystem(ep,es);
  
  ++total_events_;
  auto serviceToken = ServiceRegistry::instance().presentToken();
  
  auto allPathsDone = make_waiting_task(tbb::task::allocate_root(),
                                        [iTask,this,serviceToken](std::exception_ptr const* iPtr) mutable
                                        {
                                          ServiceRegistry::Operate operate(serviceToken);
                                          
                                          std::exception_ptr ptr;
                                          if(iPtr) {
                                            ptr = *iPtr;
                                          }
                                          iTask.doneWaiting(finishProcessOneEvent(ptr));
                                        });
  //The holder guarantees that if the paths finish before the loop ends
  // that we do not start too soon. It also guarantees that the task will
  // run under that condition.
  WaitingTaskHolder allPathsHolder(allPathsDone);

  auto pathsDone = make_waiting_task(tbb::task::allocate_root(),
                                        [allPathsHolder,&ep, &es, this,serviceToken](std::exception_ptr const* iPtr) mutable
                                        {
                                          ServiceRegistry::Operate operate(serviceToken);

                                          std::exception_ptr ptr;
                                          if(iPtr) {
                                            ptr = *iPtr;
                                          }
                                          finishedPaths(ptr, std::move(allPathsHolder), ep, es);
                                        });
  
  //The holder guarantees that if the paths finish before the loop ends
  // that we do not start too soon. It also guarantees that the task will
  // run under that condition.
  WaitingTaskHolder taskHolder(pathsDone);

  //start end paths first so on single threaded the paths will run first
  for(auto it = end_paths_.rbegin(), itEnd = end_paths_.rend();
      it != itEnd; ++it) {
    it->processOneOccurrenceAsync(allPathsDone,ep, es, streamID_, &streamContext_);
  }

  for(auto it = trig_paths_.rbegin(), itEnd = trig_paths_.rend();
      it != itEnd; ++ it) {
    it->processOneOccurrenceAsync(pathsDone,ep, es, streamID_, &streamContext_);
  }
}

StreamSchedule::processOneEventAsync(...) does

  • call resetAll() to reset the skippingEvent_ flag and all the trigger results;
  • emit pre-event (?) signals
  • mark empty Paths and EndPaths as done
  • take care of the unscheduled processing calling workerManager_.setupOnDemandSystem(ep,es)
  • for all Paths and EndPaths, schedule the call to processOneOccurrenceAsync(...)
  • schedule the call to finishProcessOneEvent(...) to handle any exceptions and emit the post-event (?) signals.

edm::WorkerManager::setupOnDemandSystem(...)

If I understand correctly, this sets up the ProductResolvers with the concrete Workers that are resposible to produce them.
The mapping between the products (actually, product branches) and the resolvers is set up in the constructor of the Producer (see FWCore/Framework/src/Principal.cc#L142_L179).

From FWCore/Framework/src/WorkerManager.cc#L148:

  void
  WorkerManager::setupOnDemandSystem(Principal& ep, EventSetup const& es) {
    this->resetAll();
    unscheduled_.setEventSetup(es);
    if(&ep != lastSetupEventPrincipal_) {
      UnscheduledConfigurator config( allWorkers_.begin(), allWorkers_.end(), &(unscheduled_.auxiliary()));
      ep.setupUnscheduled(config);
      lastSetupEventPrincipal_ = &ep;
    }
}

From FWCore/Framework/src/WorkerManager.cc#L138:
resetAll() calls reset() on all the workers.

From FWCore/Framework/src/UnscheduledCallProducer.h:
unscheduled_ is an UnscheduledCallProducer, holding a list of all the unscheduled Workers, and an UnscheduledAuxiliary.

From FWCore/Framework/src/UnscheduledAuxiliary.h:
the UnscheduledAuxiliary holds a pointer to the EventSetup, and (unused) slots for the pre/postModuleDelayedGetSignals.

From FWCore/Framework/src/UnscheduledConfigurator.h:
the UnscheduledConfigurator holds a mapping of module labels to the corresponding Worker *, and an UnscheduledAuxiliary.

From FWCore/Framework/src/Principal.cc#L337:
Principal::setupUnscheduled(config) calls resolver.setupUnscheduled(config) on all the ProductResolvers.
See FWCore/Framework/src/ProductResolvers.cc for the concrete ProductResolvers. Most inherit form DataManagingProductResolver.

edm::Path::processOneOccurrenceAsync(...)

from FWCore/Framework/src/Path.cc#L209:

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment