-
-
Save pitrou/87f3091c226db3306c45b2c32dd9aea8 to your computer and use it in GitHub Desktop.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
diff --git a/cpp/src/arrow/util/task_group.cc b/cpp/src/arrow/util/task_group.cc | |
index 8046a5291..54dc5ed96 100644 | |
--- a/cpp/src/arrow/util/task_group.cc | |
+++ b/cpp/src/arrow/util/task_group.cc | |
@@ -23,6 +23,7 @@ | |
#include <mutex> | |
#include <utility> | |
+#include "arrow/util/checked_cast.h" | |
#include "arrow/util/logging.h" | |
#include "arrow/util/thread_pool.h" | |
@@ -88,13 +89,15 @@ class ThreadedTaskGroup : public TaskGroup { | |
// Only if an error occurs is the lock taken | |
if (ok_.load(std::memory_order_acquire)) { | |
nremaining_.fetch_add(1, std::memory_order_acquire); | |
- Status st = thread_pool_->Spawn([this, task]() { | |
- if (ok_.load(std::memory_order_acquire)) { | |
+ | |
+ auto self = checked_pointer_cast<ThreadedTaskGroup>(shared_from_this()); | |
+ Status st = thread_pool_->Spawn([self, task]() { | |
+ if (self->ok_.load(std::memory_order_acquire)) { | |
// XXX what about exceptions? | |
Status st = task(); | |
- UpdateStatus(std::move(st)); | |
+ self->UpdateStatus(std::move(st)); | |
} | |
- OneTaskDone(); | |
+ self->OneTaskDone(); | |
}); | |
UpdateStatus(std::move(st)); | |
} | |
diff --git a/cpp/src/arrow/util/task_group.h b/cpp/src/arrow/util/task_group.h | |
index 390d9476e..6ee5163d5 100644 | |
--- a/cpp/src/arrow/util/task_group.h | |
+++ b/cpp/src/arrow/util/task_group.h | |
@@ -40,7 +40,7 @@ class ThreadPool; | |
/// implementation. When Finish() returns, it is guaranteed that all | |
/// tasks have finished, or at least one has errored. | |
/// | |
-class ARROW_EXPORT TaskGroup { | |
+class ARROW_EXPORT TaskGroup : public std::enable_shared_from_this<TaskGroup> { | |
public: | |
/// Add a Status-returning function to execute. Execution order is | |
/// undefined. The function may be executed immediately or later. |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment