Skip to content

Instantly share code, notes, and snippets.

@grisevg
Last active October 20, 2023 03:39
Show Gist options
  • Star 40 You must be signed in to star a gist
  • Fork 12 You must be signed in to fork a gist
  • Save grisevg/d5abfb9ff3f5f1a50875 to your computer and use it in GitHub Desktop.
Save grisevg/d5abfb9ff3f5f1a50875 to your computer and use it in GitHub Desktop.
Utility class for asynchronous/coroutine style programming in UE4 C++
#pragma once
/**
* FAsyncQueue can be used to run asynchronous delegates in sequence, parallel and combinations of the above
*
* Use Add() to enqueue delegates matching FAsyncDelegate signature:
* a void function that accepts a single argument of another void function with no arguments.
*
* Static factories MakeSync, MakeSequence and MakeParallel can be used to wrap different type of delegates and
* delegate collections into a single FAsyncDelegate which can be enqueued with Add().
*
* Execute() accepts a callback and can be called multiple times. If queue is already running, Execute does nothing
* except storing a callback.
*
* The example bellow will output:
*
* START
* Starting Long Task ASYNC
* //10 seconds later
* Starting Short Task ASYNC
* //1 second later
* Doing Instant Task SYNC
* Starting Longest Parallel ASYNC
* Starting Shortest Parallel ASYNC
* Starting Medium Parallel ASYNC
* //1 second later
* Finished Shortest Parallel ASYNC
* //1 second later (2 seconds from parallel tasks started)
* Finished Medium Parallel
* //8 seconds later (10 seconds from parallel tasks started)
* Finished Longest Parallel
* DONESKIES
*
* The example itself:
*
* // Don't store the Queue on the stack or it will get destroyed before it finishes
* // You can't use "new", only a factory method "FAsyncQueue::Create()" which always returns `TSharedRef<FAsyncQueue, ESPMode::ThreadSafe>`
* Queue = FAsyncQueue::Create();
* Queue->Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
* {
* UE_LOG(LogTemp, Warning, TEXT("Starting Long Task ASYNC"));
* FTimerHandle FooBar;
* this->GetWorldTimerManager().SetTimer(FooBar, Callback, 10, false);
* }));
* Queue->Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
* {
* UE_LOG(LogTemp, Warning, TEXT("Starting Short Task ASYNC"));
* FTimerHandle FooBar;
* this->GetWorldTimerManager().SetTimer(FooBar, Callback, 1, false);
* }));
* Queue->Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda([]()
* {
* UE_LOG(LogTemp, Warning, TEXT("Doing Instant Task SYNC"));
* })));
*
* TArray<FAsyncDelegate> ParallelTasks;
* TArray<FAsyncDelegate> LongestParallel;
* LongestParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
* {
* UE_LOG(LogTemp, Warning, TEXT("Starting Longest Parallel ASYNC"));
* FTimerHandle FooBar;
* this->GetWorldTimerManager().SetTimer(FooBar, Callback, 10, false);
* }));
* LongestParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda([]()
* {
* UE_LOG(LogTemp, Warning, TEXT("Finished Longest Parallel"));
* })));
* ParallelTasks.Add(FAsyncQueue::MakeSequence(LongestParallel));
*
* TArray<FAsyncDelegate> ShortestParallel;
* ShortestParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
* {
* UE_LOG(LogTemp, Warning, TEXT("Starting Shortest Parallel ASYNC"));
* FTimerHandle FooBar;
* this->GetWorldTimerManager().SetTimer(FooBar, Callback, 1, false);
* }));
* ShortestParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda([]()
* {
* UE_LOG(LogTemp, Warning, TEXT("Finished Shortest Parallel"));
* })));
* ParallelTasks.Add(FAsyncQueue::MakeSequence(ShortestParallel));
*
*
* TArray<FAsyncDelegate> MediumParallel;
* MediumParallel.Add(FAsyncDelegate::CreateLambda([this](const FCallbackDelegate& Callback)
* {
* UE_LOG(LogTemp, Warning, TEXT("Starting Medium Parallel ASYNC"));
* FTimerHandle FooBar;
* this->GetWorldTimerManager().SetTimer(FooBar, Callback, 2, false);
* }));
* MediumParallel.Add(FAsyncQueue::MakeSync(FCallbackDelegate::CreateLambda([]()
* {
* UE_LOG(LogTemp, Warning, TEXT("Finished Medium Parallel"));
* })));
* ParallelTasks.Add(FAsyncQueue::MakeSequence(MediumParallel));
*
* Queue->Add(FAsyncQueue::MakeParallel(ParallelTasks));
*
* UE_LOG(LogTemp, Warning, TEXT("START"));
* Queue->Execute(FCallbackDelegate::CreateLambda([]()
* {
* UE_LOG(LogTemp, Warning, TEXT("DONESKIES"));
* }));
*/
DECLARE_DELEGATE(FCallbackDelegate);
DECLARE_DELEGATE_OneParam(FAsyncDelegate, FCallbackDelegate);
class _API FAsyncQueue : public TSharedFromThis<FAsyncQueue, ESPMode::ThreadSafe>
{
//Pointers need to create instances of a used class when compiled with WITH_HOT_RELOAD_CTORS
#if WITH_HOT_RELOAD_CTORS
friend class TSharedRef<FAsyncQueue, ESPMode::ThreadSafe>;
friend class TSharedPtr<FAsyncQueue, ESPMode::ThreadSafe>;
friend class TWeakPtr<FAsyncQueue, ESPMode::ThreadSafe>;
#endif
public:
static FAsyncDelegate MakeSequence(const TArray<FAsyncDelegate>& SequencedDelegates);
static FAsyncDelegate MakeParallel(const TArray<FAsyncDelegate>& ParallelDelegates);
static FAsyncDelegate MakeSync(const FCallbackDelegate& SyncDelegate);
static TSharedRef<FAsyncQueue, ESPMode::ThreadSafe> Create();
void Add(const FAsyncDelegate& AsyncDelegate);
void AddSync(const FCallbackDelegate& SyncDelegate) { Add(FAsyncQueue::MakeSync(SyncDelegate)); }
void AddParallel(const TArray<FAsyncDelegate>& ParallelDelegates) { Add(FAsyncQueue::MakeParallel(ParallelDelegates)); }
void StoreHardReferenceToSelf(TSharedRef<FAsyncQueue, ESPMode::ThreadSafe> HardReferenceToSelf);
void ReleaseHardReferenceToSelf();
void Execute(const FCallbackDelegate& Callback);
void Execute();
void Empty();
void RemoveAllCallbacks();
bool IsExecuting() { return CurrentDelegate.IsBound(); }
bool IsEmpty() { return Queue.IsEmpty(); }
private:
FAsyncQueue();
TQueue<FAsyncDelegate> Queue;
TArray<FCallbackDelegate> CompleteCallbacks;
FCallbackDelegate OnAsyncDelegateFinishedDelegate;
FAsyncDelegate CurrentDelegate;
TSharedPtr<FAsyncQueue, ESPMode::ThreadSafe> HardReferenceToSelf;
void ExecuteNextInQueue();
void OnAsyncDelegateFinished();
};
#include "AsyncQueue.h"
FAsyncDelegate FAsyncQueue::MakeParallel(const TArray<FAsyncDelegate>& ParallelDelegates)
{
return FAsyncDelegate::CreateLambda([ParallelDelegates](const FCallbackDelegate& Callback)
{
//We need to make shared integer that can be modified by multiple calls of ParallelCallback lambda
TSharedPtr<int32> ParallelCallbacksCounter = MakeShareable(new int32(ParallelDelegates.Num()));
FCallbackDelegate ParallelCallback = FCallbackDelegate::CreateLambda([ParallelCallbacksCounter, Callback]()
{
//Every delegate will decrement the counter, the last one will execute the callback
--(*ParallelCallbacksCounter);
if (*ParallelCallbacksCounter <= 0) Callback.Execute();
});
for (auto& Delegate : ParallelDelegates)
{
check(Delegate.IsBound());
Delegate.Execute(ParallelCallback);
}
});
}
FAsyncDelegate FAsyncQueue::MakeSequence(const TArray<FAsyncDelegate>& SequencedDelegates)
{
return FAsyncDelegate::CreateLambda([SequencedDelegates](const FCallbackDelegate& Callback)
{
//We need to make shared integer that can be modified by multiple calls of SequenceCallback lambda
TSharedPtr<int32> SequenceCallbackCounter = MakeShareable(new int32(0));
//We need to store lambda delegate on heap to allow it to call itself
TSharedPtr<FCallbackDelegate> SequenceCallback(new FCallbackDelegate());
SequenceCallback->BindLambda([SequenceCallback, SequenceCallbackCounter, &SequencedDelegates, Callback]()
{
int32 Index = (*SequenceCallbackCounter)++;
//Each delegate executes the next one. Last one executes Callback.
if (Index < SequencedDelegates.Num())
{
check(SequencedDelegates[Index].IsBound());
SequencedDelegates[Index].Execute(*SequenceCallback);
}
else
{
check(Callback.IsBound());
Callback.Execute();
}
});
SequenceCallback->Execute();
});
}
FAsyncDelegate FAsyncQueue::MakeSync(const FCallbackDelegate& SyncDelegate)
{
return FAsyncDelegate::CreateLambda([SyncDelegate](const FCallbackDelegate& Callback)
{
check(SyncDelegate.IsBound());
SyncDelegate.Execute();
Callback.Execute();
});
}
TSharedRef<FAsyncQueue, ESPMode::ThreadSafe> FAsyncQueue::Create() {
TSharedRef<FAsyncQueue, ESPMode::ThreadSafe> Result(new FAsyncQueue());
return Result;
}
FAsyncQueue::FAsyncQueue()
: TSharedFromThis()
{
}
void FAsyncQueue::Add(const FAsyncDelegate& AsyncDelegate)
{
Queue.Enqueue(AsyncDelegate);
}
void FAsyncQueue::StoreHardReferenceToSelf(TSharedRef<FAsyncQueue, ESPMode::ThreadSafe> HardReferenceToSelf)
{
this->HardReferenceToSelf = TSharedPtr<FAsyncQueue, ESPMode::ThreadSafe>(HardReferenceToSelf);
}
void FAsyncQueue::ReleaseHardReferenceToSelf()
{
check(HardReferenceToSelf.IsValid());
HardReferenceToSelf.Reset();
}
void FAsyncQueue::Execute(const FCallbackDelegate& Callback)
{
//Sometimes, for convenience, functions pass empty delegate to indicate the lack of callback
if (Callback.IsBound()) CompleteCallbacks.Add(Callback);
Execute();
}
void FAsyncQueue::Execute()
{
if (!OnAsyncDelegateFinishedDelegate.IsBound())
{
OnAsyncDelegateFinishedDelegate.BindThreadSafeSP(this, &FAsyncQueue::OnAsyncDelegateFinished);
}
if (!IsExecuting()) ExecuteNextInQueue();
}
void FAsyncQueue::Empty()
{
FAsyncDelegate Tmp;
while (!Queue.IsEmpty()) Queue.Dequeue(Tmp);
}
void FAsyncQueue::RemoveAllCallbacks()
{
CompleteCallbacks.Empty();
}
void FAsyncQueue::ExecuteNextInQueue()
{
check(!CurrentDelegate.IsBound());//Make sure previous delegate has been finished
if (Queue.IsEmpty())
{
auto Tmp = CompleteCallbacks;
CompleteCallbacks.Empty();
for (auto& Callback : Tmp) Callback.Execute();
return;
}
Queue.Dequeue(CurrentDelegate);
check(CurrentDelegate.IsBound());
CurrentDelegate.Execute(OnAsyncDelegateFinishedDelegate);
}
void FAsyncQueue::OnAsyncDelegateFinished()
{
CurrentDelegate.Unbind();
ExecuteNextInQueue();
}
@tqn
Copy link

tqn commented Jul 26, 2016

On line 108 of the c++ file, there's a typo.
FEAsyncDelegate should be FAsyncDelegate.

@grisevg
Copy link
Author

grisevg commented Aug 18, 2017

@tqn Thank you! 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment