Created
November 5, 2018 21:02
-
-
Save gnperdue/d7af021a2e9e0467aec7fbbc8a0c3137 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
void TBBSession::SchedClosure(tbb::task_arena& arena, tbb::task_group& g, std::function<void()> c) { | |
arena.execute( [&g, &c] () {g.run( c ); } ); | |
} | |
=========== | |
Then in TBBSession::Run | |
… Before creating the ExecutorBarrier | |
// Use a task_arena to avoid having unrelated tasks start | |
// running on this thread (which could start deadlocks) | |
tbb::task_arena taskArena; | |
tbb::task_group taskGroup; | |
// we are required to always call wait before destructor | |
auto doneWithTaskGroup = [&taskArena, &taskGroup](void *) { taskArena.execute([&taskGroup]() { taskGroup.wait();}); }; | |
std::unique_ptr<tbb::task_group, decltype(doneWithTaskGroup) > guard(&taskGroup, doneWithTaskGroup); | |
… Later in TBBSession::Run | |
// pass taskArena and taskGroup to SchedClosure | |
// consequently, disable TF's own thread logic inside the loop | |
Executor::Args::Runner default_runner = [this, &taskArena, &taskGroup](Executor::Args::Closure c) { | |
SchedClosure(taskArena, taskGroup, std::move(c)); | |
}; | |
for (const auto& item : executors_and_keys->items) { | |
args.runner = default_runner; | |
item.executor->RunAsync(args, barrier->Get()); | |
} | |
// WaitForNotification will handle calling wait on taskGroup | |
guard.release(); | |
WaitForNotification(taskArena, taskGroup, &run_state, &step_cancellation_manager, | |
run_options.timeout_in_ms() > 0 | |
? run_options.timeout_in_ms() | |
: operation_timeout_in_ms_); | |
=========== | |
void TBBSession::WaitForNotification(tbb::task_arena& arena, tbb::task_group& taskGroup, | |
RunState* run_state, CancellationManager* cm, int64 timeout_in_ms) { | |
// Doing the wait in the arena adds this thread to the arena | |
// and therefore tasks associated to the group can run on this thread | |
arena.execute([&taskGroup]() { taskGroup.wait();} ); | |
const Status status = | |
WaitForNotification(&run_state->executors_done, timeout_in_ms); | |
if (!status.ok()) { | |
{ | |
mutex_lock l(run_state->mu_); | |
run_state->status.Update(status); | |
} | |
cm->StartCancel(); | |
// We must wait for the executors to complete, because they have borrowed | |
// references to `cm` and other per-step state. After this notification, it | |
// is safe to clean up the step. | |
run_state->executors_done.WaitForNotification(); | |
} | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment