Skip to content

Instantly share code, notes, and snippets.

@gnperdue
Created November 5, 2018 21:02
Show Gist options
  • Save gnperdue/d7af021a2e9e0467aec7fbbc8a0c3137 to your computer and use it in GitHub Desktop.
Save gnperdue/d7af021a2e9e0467aec7fbbc8a0c3137 to your computer and use it in GitHub Desktop.
void TBBSession::SchedClosure(tbb::task_arena& arena, tbb::task_group& g, std::function<void()> c) {
arena.execute( [&g, &c] () {g.run( c ); } );
}
===========
Then in TBBSession::Run
… Before creating the ExecutorBarrier
// Use a task_arena to avoid having unrelated tasks start
// running on this thread (which could start deadlocks)
tbb::task_arena taskArena;
tbb::task_group taskGroup;
// we are required to always call wait before destructor
auto doneWithTaskGroup = [&taskArena, &taskGroup](void *) { taskArena.execute([&taskGroup]() { taskGroup.wait();}); };
std::unique_ptr<tbb::task_group, decltype(doneWithTaskGroup) > guard(&taskGroup, doneWithTaskGroup);
… Later in TBBSession::Run
// pass taskArena and taskGroup to SchedClosure
// consequently, disable TF's own thread logic inside the loop
Executor::Args::Runner default_runner = [this, &taskArena, &taskGroup](Executor::Args::Closure c) {
SchedClosure(taskArena, taskGroup, std::move(c));
};
for (const auto& item : executors_and_keys->items) {
args.runner = default_runner;
item.executor->RunAsync(args, barrier->Get());
}
// WaitForNotification will handle calling wait on taskGroup
guard.release();
WaitForNotification(taskArena, taskGroup, &run_state, &step_cancellation_manager,
run_options.timeout_in_ms() > 0
? run_options.timeout_in_ms()
: operation_timeout_in_ms_);
===========
void TBBSession::WaitForNotification(tbb::task_arena& arena, tbb::task_group& taskGroup,
RunState* run_state, CancellationManager* cm, int64 timeout_in_ms) {
// Doing the wait in the arena adds this thread to the arena
// and therefore tasks associated to the group can run on this thread
arena.execute([&taskGroup]() { taskGroup.wait();} );
const Status status =
WaitForNotification(&run_state->executors_done, timeout_in_ms);
if (!status.ok()) {
{
mutex_lock l(run_state->mu_);
run_state->status.Update(status);
}
cm->StartCancel();
// We must wait for the executors to complete, because they have borrowed
// references to `cm` and other per-step state. After this notification, it
// is safe to clean up the step.
run_state->executors_done.WaitForNotification();
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment