Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Agora + Unreal 5.02 support example
/*
Example Agora bindings to Unreal 5.02 using materials rather than UImage.
Usage:
1) AgoraActorHelper should be compiled into your project.
2) Make a material and add it to your project anywhere. Click on that material to bring up the material wire editor. Add a texture parameter called "mytexture".
3) Make a blueprint that is a subclass of AgoraActorHelper. Instance this blueprint into a test level.
4) In that blueprint view make a cube.
4) In that blueprint editor (the graph view) create an event start.
5) In blueprint editor wire the event start to create a duplicate of the material you made earlier - now we have a "dynamic material".
6) Apply the dynamic material to your cube (not the original material).
7) Tell AgoraActorHelper about your dynamic material using SetMaterialAndUID() in blueprint using the magical SetMaterialAndUID method.
8) Wire to the join() magical method.
9) Run this.
Agora will now paint textures into the method. You will want to have a remote Agora endpoint.
*/
#pragma once
#include "CoreMinimal.h"
#include "GameFramework/Actor.h"
#if PLATFORM_WINDOWS
#include "Windows/AllowWindowsPlatformTypes.h"
#include "Windows/HideWindowsPlatformTypes.h"
#endif
#if (PLATFORM_LINUX)
// workaround C++ antiquities/baroque architecture of UNREAL - UCLASSES cannot be in ifdefs
namespace agora { namespace rtc { class IRtcEngineEventHandler {}; }; };
namespace agora { namespace media { class IVideoFrameObserver {}; }; };
#else
#include "../Plugins/AgoraPlugin/Source/AgoraPlugin/Public/AgoraHeaderBase.h"
#endif
#include "AgoraActorHelper.generated.h"
UCLASS(Blueprintable, BlueprintType)
class REFLEKTOR_API AAgoraActorHelper : public AActor, public agora::rtc::IRtcEngineEventHandler, public agora::media::IVideoFrameObserver {
GENERATED_BODY()
private:
const char* AGORA_APP_ID = PUT SOMETHING USEFUL HERE;
bool bIsJoined = false;
bool bIsBroadcaster = false;
public:
AAgoraActorHelper();
~AAgoraActorHelper();
#if (!PLATFORM_LINUX)
// incoming video frames help
virtual bool onCaptureVideoFrame(agora::media::IVideoFrameObserver::VideoFrame& videoFrame) override;
virtual bool onRenderVideoFrame(unsigned int uid, agora::media::IVideoFrameObserver::VideoFrame& videoFrame) override;
virtual VIDEO_FRAME_TYPE getVideoFormatPreference() { return agora::media::IVideoFrameObserver::VIDEO_FRAME_TYPE::FRAME_TYPE_RGBA; }
// general utilities
void onWarning(int warn, const char* msg) override;
void onError(int err, const char* msg) override;
void onJoinChannelSuccess(const char* channel, agora::rtc::uid_t uid, int elapsed) override;
void onRejoinChannelSuccess(const char* channel, agora::rtc::uid_t uid, int elapsed) override;
void onLeaveChannel(const agora::rtc::RtcStats& stats) override;
void onUserJoined(agora::rtc::uid_t uid, int elapsed) override;
void onUserOffline(agora::rtc::uid_t uid, agora::rtc::USER_OFFLINE_REASON_TYPE reason) override;
#endif
// blueprint exposed powers
UFUNCTION(BlueprintCallable, Category = "ReflektorAgora")
void Join();
UFUNCTION(BlueprintCallable, Category = "ReflektorAgora")
void JoinBroadcast();
UFUNCTION(BlueprintCallable, Category = "ReflektorAgora")
void Leave();
UFUNCTION(BlueprintCallable, Category = "ReflektorAgora")
void SetMaterialAndUID(class UMaterialInstanceDynamic* Target = nullptr, int uid = -1, bool freepile = false);
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "ReflektorAgora")
FString StreamName = FString(TEXT("mystream"));
UPROPERTY(EditAnywhere, BlueprintReadWrite, Category = "ReflektorAgora")
int local_uid = -1;
};
//////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "AgoraActorHelper.h"
#if (PLATFORM_LINUX)
AAgoraActorHelper::AAgoraActorHelper() {}
AAgoraActorHelper::~AAgoraActorHelper() {}
void AAgoraActorHelper::Join() {}
void AAgoraActorHelper::JoinBroadcast() {}
void AAgoraActorHelper::Leave() {}
void AAgoraActorHelper::SetMaterialAndUID(UMaterialInstanceDynamic* Target, int uid, bool freepile) {}
#else
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// channel helper - a general utility to associate agora session uids with materials as a convenience concept
//
// a freepile can also be created at this time - spare materials that are doled out on a first come first serve basis
//
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "MediaTexture.h"
#include "Runtime/Engine/Classes/Engine/Texture2D.h"
#include "Runtime/Engine/Classes/Engine/TextureRenderTarget2D.h"
#include <map>
#include <mutex>
static std::mutex _mutex;
static std::map<unsigned int,struct Channel*> channels;
struct Channel {
Channel() :material(nullptr), dirty(true), freepile(false) {}
UMaterialInstanceDynamic* material;
bool dirty;
bool freepile;
};
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// video frame catcher - catch raw pixels from agora and throw them at materials
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
bool AAgoraActorHelper::onCaptureVideoFrame(VideoFrame& videoFrame) { return onRenderVideoFrame(0, videoFrame); }
bool AAgoraActorHelper::onRenderVideoFrame(unsigned int uid, VideoFrame& videoFrame) {
// fetch contents prior to any lambdas
int32 width = videoFrame.width;
int32 height = videoFrame.height;
int32 yStride = videoFrame.yStride;
void* yBuffer = videoFrame.yBuffer;
// don't keep buffer? unclear what this means.
bool keep = true;
// paranoia
if (width < 64 || height < 64 || width > 32000 || height > 32000 || yBuffer == nullptr) {
UE_LOG(LogTemp, Error, TEXT("Agora Helper - frame is too small %d %d"), width, height);
return keep;
}
// is there a channel and material?
std::lock_guard<std::mutex> lock(_mutex);
Channel* c = channels[uid];
// if there's no channel then try steal a channel from the freepile
if (c == nullptr) {
for (std::map<unsigned int, Channel*>::iterator i = channels.begin(); i != channels.end(); i++) {
Channel* c2 = i->second;
if (c2 == nullptr) {
UE_LOG(LogTemp, Error, TEXT("Agora Helper - bad ptr %d"), uid);
return keep;
}
if (c2->freepile == false) continue;
c2->freepile = false; // seize
c = channels[uid] = c2; // hack - add the entry twice - leaving old entry visible to bp
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - stole from free pile at old=%d new=%d - thread is %d"), i->first, uid, IsInGameThread() ? 1 : 0);
break;
}
}
// if there is still no channel try to use channel zero which is a fallback for everybody to write to barring their own channel
if (c == nullptr && channels[0] != nullptr) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - abusing channel zero for %d"), uid);
c = channels[uid] = channels[0]; // hack - add the entry twice
}
UMaterialInstanceDynamic* mat = c ? c->material : nullptr;
if (c == nullptr || mat == nullptr) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - no material can be bound to uid %d"), uid);
return keep;
}
// i actually want to specifically force my own textures into the system - i'm noticing materials have a pre-existing texture sometimes
auto dirty = c->dirty;
c->dirty = false;
// copy prior to lambda
uint8* pData = nullptr;
pData = new uint8[height * yStride];
memcpy(pData, yBuffer, height * yStride);
// some texture ops have to be in game thread
AsyncTask(ENamedThreads::GameThread, [=]() {
// remake texture if needed
UTexture* scratch = nullptr;
mat->GetTextureParameterValue(FName("mytexture"), scratch);
UTexture2D* tex = (UTexture2D*)scratch;
if (dirty || tex == nullptr || tex->GetSizeX() != width || tex->GetSizeY() != height) {
UE_LOG(LogTemp, Warning, TEXT("Agora - texture resize -> existing texture size was %d %d for uid %d"), tex ? tex->GetSizeX() : -1, tex ? tex->GetSizeY() : -1, uid);
UE_LOG(LogTemp, Warning, TEXT("Agora - texture resize -> new size is %d %d for uid %d"), width, height, uid);
tex = UTexture2D::CreateTransient(width, height, PF_B8G8R8A8);
//tex->AddToRoot();
tex->NeverStream = true;
tex->SRGB = 0;
//tex->MipGenSettings = TMGS_NoMipmaps; not supported if not in editor
mat->SetTextureParameterValue(FName("mytexture"), tex);
}
// copy to texture
uint8* raw = (uint8*)tex->PlatformData->Mips[0].BulkData.Lock(LOCK_READ_WRITE);
memcpy(raw, pData, height * yStride);
tex->PlatformData->Mips[0].BulkData.Unlock();
tex->UpdateResource();
delete[] pData;
});
return keep;
}
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// actor based unreal blueprints interface to agora - let blueprint users perform a few operations such as suggest materials and start/stop agora
/////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
#include "ThirdParty/Agora/include/IAgoraRtcEngine.h"
static agora::rtc::IRtcEngine* agoraEngine = nullptr;
std::unique_ptr<agora::media::IMediaEngine> m_mediaEngine_ptr = nullptr;
agora::util::AutoPtr<agora::media::IMediaEngine> mediaEngine;
AAgoraActorHelper::AAgoraActorHelper() {
PrimaryActorTick.bCanEverTick = true;
}
AAgoraActorHelper::~AAgoraActorHelper() {
Leave();
}
void AAgoraActorHelper::onWarning(int warn, const char* msg) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onWarning warn: %d"), warn);
}
void AAgoraActorHelper::onError(int err, const char* msg) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onError err: %d, msg: %s"), err, msg);
}
void AAgoraActorHelper::onJoinChannelSuccess(const char* channel, agora::rtc::uid_t uid, int elapsed) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onJoinChannelSuccess channel=%s, uid=%u"), *FString(channel), uid);
}
void AAgoraActorHelper::onRejoinChannelSuccess(const char* channel, agora::rtc::uid_t uid, int elapsed) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onRejoinChannelSuccess channel uid=%d"),uid);
}
void AAgoraActorHelper::onLeaveChannel(const agora::rtc::RtcStats& stats) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onLeaveChannel"));
}
void AAgoraActorHelper::onUserJoined(agora::rtc::uid_t uid, int elapsed) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - a user has joined %d"),uid);
}
void AAgoraActorHelper::onUserOffline(agora::rtc::uid_t uid, agora::rtc::USER_OFFLINE_REASON_TYPE reason) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - onUserOffline uid: %u"), uid);
if (agoraEngine == nullptr) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper = cannot get ptr on remote user offline %d"),uid);
return;
}
// todo - liberate their channel?
}
void AAgoraActorHelper::SetMaterialAndUID(UMaterialInstanceDynamic* material, int uid, bool freepile) {
// prevent channel zero from being generally stealable
if (uid == 0) freepile = false;
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - blueprint is setting a material for channel uid=%d freepile=%d material valid=%d"),uid,freepile, material ? 1 : 0);
std::lock_guard<std::mutex> lock(_mutex);
Channel* c = channels[uid];
if (c == nullptr) {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - brand new channel created %d"), uid);
c = channels[uid] = new Channel();
} else {
UE_LOG(LogTemp, Warning, TEXT("Agora Helper - channel material change %d"), uid);
}
c->material = material;
c->freepile = freepile;
}
void AAgoraActorHelper::Join() {
// may not restart system
if (bIsJoined == true) {
UE_LOG(LogTemp, Warning, TEXT("Agora helper - join() is already running - you must manually Leave() first"));
return;
} else {
UE_LOG(LogTemp, Warning, TEXT("Agora helper - join() called - starting up!"));
}
bIsJoined = true;
// start agora?
if (agoraEngine == nullptr) {
agoraEngine = ::createAgoraRtcEngine();
agora::rtc::RtcEngineContext _rtcEngineContext;
_rtcEngineContext.appId = AGORA_APP_ID;
_rtcEngineContext.eventHandler = this;
agoraEngine->initialize(_rtcEngineContext);
mediaEngine.queryInterface(agoraEngine, agora::AGORA_IID_MEDIA_ENGINE);
m_mediaEngine_ptr.reset(mediaEngine.get());
UE_LOG(LogTemp, Warning, TEXT("Agora - started agora engine from scratch"));
}
// watch traffic
mediaEngine->registerVideoFrameObserver(this);
// configure agora
agoraEngine->setChannelProfile(agora::rtc::CHANNEL_PROFILE_TYPE::CHANNEL_PROFILE_COMMUNICATION);
agoraEngine->setClientRole(bIsBroadcaster ? agora::rtc::CLIENT_ROLE_BROADCASTER : agora::rtc::CLIENT_ROLE_AUDIENCE);
agoraEngine->enableAudio();
agoraEngine->enableVideo();
// generate our own uid if desired - todo may want to set this to a more serious uuid based on EOS networking or other networking
if (local_uid == -1) {
local_uid = FMath::RandRange(1000000, 2000000) + 1000000;
}
// tell agora we are joining and our network visible uid is as follows - we will also always have our own frames echoed back to us with uid=0
auto ret = agoraEngine->joinChannel(0,TCHAR_TO_UTF8(*StreamName), "", local_uid);
UE_LOG(LogTemp, Warning, TEXT("AgoraHelper: About to Join Channel local uid=%d, join results=%d, channel=%s"), local_uid, ret, *StreamName);
}
void AAgoraActorHelper::JoinBroadcast() {
UE_LOG(LogTemp, Warning, TEXT("Agora helper - join as broadcaster () called"));
bIsBroadcaster = true;
Join();
}
void AAgoraActorHelper::Leave() {
// this should be renamed as stopall()
UE_LOG(LogTemp, Warning, TEXT("Agora helper - leave() called - stopping and flushing all resources associated with agora down to the metal - thread is %d"),IsInGameThread());
if (agoraEngine == nullptr || bIsJoined == false) {
UE_LOG(LogTemp, Warning, TEXT("AgoraHelper: Leave() was called by a destructor? Should be harmless"));
return;
}
// stop watching for events
mediaEngine->registerVideoFrameObserver(nullptr);
UE_LOG(LogTemp, Warning, TEXT("Agora helper - stopped watching"));
// attempt to actually leave / stop receiving and sending
agoraEngine->leaveChannel();
UE_LOG(LogTemp, Warning, TEXT("Agora helper - left channel"));
// forget about everything for now (later maybe keep material bindings?)
std::lock_guard<std::mutex> lock(_mutex);
channels.clear();
UE_LOG(LogTemp, Warning, TEXT("Agora helper - forgot materials"));
// for now leave this on i guess - debate
m_mediaEngine_ptr->release();
m_mediaEngine_ptr.reset();
UE_LOG(LogTemp, Warning, TEXT("Agora helper - release media"));
/*
// try liberate resources - this line crashes the system hard always.
// i think a way to debug this is to bring up various levels of services - see if it crashes if stopped before anybody joins and so on
try {
UE_LOG(LogTemp, Warning, TEXT("Agora helper - about to free agora itself %d"),agoraEngine ? true : false);
agoraEngine->release(true);
agoraEngine = nullptr;
} catch (...) {
UE_LOG(LogTemp, Warning, TEXT("Agora helper - blew up badly"));
}
*/
UE_LOG(LogTemp, Warning, TEXT("Agora helper - shutdown agora"));
// clear state
bIsJoined = false;
UE_LOG(LogTemp, Warning, TEXT("Agora helper - done shutdown"));
}
#endif
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment