Skip to content

Instantly share code, notes, and snippets.

@kirby561
Created November 22, 2023 21:39
Show Gist options
  • Star 2 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save kirby561/b04b6a875725eab17fc46a8808f71e9b to your computer and use it in GitHub Desktop.
Save kirby561/b04b6a875725eab17fc46a8808f71e9b to your computer and use it in GitHub Desktop.
Implements a UActorComponent that can be attached to actors in Unreal Engine to send buffers larger than the max RPC size by sending chunks at a time.
/*
* Copyright © 2023 kirby561
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software
* and associated documentation files (the “Software”), to deal in the Software without
* restriction, including without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or
* substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#include "TransferComponent.h"
#include "Engine/ActorChannel.h"
DEFINE_LOG_CATEGORY_STATIC(TransferComponentSub, Log, All);
UTransferComponent::UTransferComponent() {
if (GetNetMode() == ENetMode::NM_Client) {
_nextTransferId = 1;
} else {
_nextTransferId = 2;
}
PrimaryComponentTick.bCanEverTick = true;
}
UTransferComponent::~UTransferComponent() {
// Free any pending client transfers
while (!_pendingOutgoingTransfers.IsEmpty()) {
Transfer* transfer = nullptr;
_pendingOutgoingTransfers.Dequeue(transfer);
delete [] transfer->Buffer;
delete transfer;
}
// Free any completed transfers still being held
TArray<uint64> completedTransferIds;
_completedIncomingTransfers.GenerateKeyArray(completedTransferIds);
for (uint64 transferId : completedTransferIds) {
Transfer* transfer = _completedIncomingTransfers[transferId];
delete [] transfer->Buffer;
delete transfer;
_completedIncomingTransfers.Remove(transferId);
}
}
uint64 UTransferComponent::SendBufferToServer(uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted /* = nullptr */) {
return SendBuffer(ETransferDirection::ClientToServer, buffer, length, onTransferCompleted);
}
uint64 UTransferComponent::SendBufferToClient(uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted /* = nullptr */) {
return SendBuffer(ETransferDirection::ServerToClient, buffer, length, onTransferCompleted);
}
Transfer* UTransferComponent::ServerGetsCompletedTransfer(uint64 transferId) {
if (GetNetMode() == ENetMode::NM_Client) {
UE_LOG(TransferComponentSub, Error, TEXT("ServerGetsCompletedTransfer not called by the server! This is an error."));
return nullptr;
}
return GetCompletedTransfer(transferId);
}
Transfer* UTransferComponent::ClientGetsCompletedTransfer(uint64 transferId) {
if (GetNetMode() != ENetMode::NM_Client) {
UE_LOG(TransferComponentSub, Error, TEXT("ClientGetsCompletedTransfer not called by the client! This is an error."));
return nullptr;
}
return GetCompletedTransfer(transferId);
}
void UTransferComponent::ServerFreesCompletedTransfer(uint64 transferId) {
if (GetNetMode() == ENetMode::NM_Client) {
UE_LOG(TransferComponentSub, Error, TEXT("ServerFreesCompletedTransfer not called by the server! This is an error."));
return;
}
FreeCompletedTransfer(transferId);
}
void UTransferComponent::ClientFreesCompletedTransfer(uint64 transferId) {
if (GetNetMode() != ENetMode::NM_Client) {
UE_LOG(TransferComponentSub, Error, TEXT("ClientFreesCompletedTransfer not called by the client! This is an error."));
return;
}
FreeCompletedTransfer(transferId);
}
void UTransferComponent::TickComponent(float deltaSeconds, ELevelTick tickType, FActorComponentTickFunction* thisTickFunction) {
Super::TickComponent(deltaSeconds, tickType, thisTickFunction);
TickTransfers();
}
void UTransferComponent::ClientSendsChunkToServer_Implementation(uint64 transferId, const TArray<uint8>& chunk, int totalBytes) {
ReceiveChunkOnReceiver(ETransferDirection::ClientToServer, transferId, chunk, totalBytes);
}
void UTransferComponent::ServerSendsChunkToClient_Implementation(uint64 transferId, const TArray<uint8>& chunk, int totalBytes) {
ReceiveChunkOnReceiver(ETransferDirection::ServerToClient, transferId, chunk, totalBytes);
}
void UTransferComponent::TickTransfers() {
AActor* owner = GetOwner();
if (owner == nullptr) return; // No owner yet
UNetConnection* netConnection = owner->GetNetConnection();
if (netConnection == nullptr) return; // No net connection yet
// Do we have pending transfers and no current transfer?
if (_outgoingTransfer == nullptr && !_pendingOutgoingTransfers.IsEmpty()) {
_pendingOutgoingTransfers.Dequeue(_outgoingTransfer);
}
if (_outgoingTransfer != nullptr) {
// Calculate the limits we need based on the UActorChannel being used for network communication.
// The way this works is the actor channel has a queue of "bunches"
// that will be sent that are stored in its "reliable buffer". This
// reliable buffer can hold RELIABLE_BUFFER (256) bunches. Each bunch
// can be a maximum of NetMaxConstructedPartialBunchSizeBytes (64k)
// So we limit our packets to 1/2 of the max bunch size (to leave some
// room for any overhead of making the bunch) and make sure we're not
// using more than half the reliable buffer bunch limit.
UActorChannel* networkChannel = netConnection->FindActorChannelRef(owner);
int outputChunkSizeLimit = NetMaxConstructedPartialBunchSizeBytes / 2;
int numberOfReliableOutputBunchesLimit = RELIABLE_BUFFER / 2;
// If we are on the client we need to use a different
// send function than if we are on the server.
std::function<void(uint64, const TArray<uint8>&, int)> sendChunkMethod = nullptr;
if (GetNetMode() == ENetMode::NM_Client) {
sendChunkMethod = [this] (uint64 transferId, const TArray<uint8>& chunk, int totalBytes) {
ClientSendsChunkToServer(transferId, chunk, totalBytes);
};
} else {
sendChunkMethod = [this] (uint64 transferId, const TArray<uint8>& chunk, int totalBytes) {
ServerSendsChunkToClient(transferId, chunk, totalBytes);
};
}
// Continue this transfer
while (_outgoingTransfer->BytesTransferred < _outgoingTransfer->Length
&& networkChannel->NumOutRec < numberOfReliableOutputBunchesLimit) {
// Get the chunk size and copy it to the outgoing transfer buffer
int chunkSize = Min(_outgoingTransfer->GetBytesRemaining(), outputChunkSizeLimit);
_outgoingTransferBuffer.Reset();
_outgoingTransferBuffer.Append(_outgoingTransfer->Buffer + _outgoingTransfer->BytesTransferred, chunkSize);
// Send the chunk
sendChunkMethod(
_outgoingTransfer->TransferId,
_outgoingTransferBuffer,
_outgoingTransfer->Length);
// Book keeping
_outgoingTransfer->BytesTransferred += chunkSize;
}
// Is the transfer complete?
if (_outgoingTransfer->BytesTransferred >= _outgoingTransfer->Length) {
if (_outgoingTransfer->BytesTransferred > _outgoingTransfer->Length) {
// We have a math error somewhere.
UE_LOG(
TransferComponentSub,
Warning,
TEXT("Too many bytes accounted for when transferring a buffer. Transferred = %d, length = %d, TransferID = %llu"),
_outgoingTransfer->BytesTransferred, _outgoingTransfer->Length, _outgoingTransfer->TransferId);
// Continue with the warning
}
// We're done, call the callback
if (_outgoingTransfer->OnTransferCompleted) {
_outgoingTransfer->OnTransferCompleted(_outgoingTransfer->TransferId);
}
// Cleanup
delete [] _outgoingTransfer->Buffer;
delete _outgoingTransfer;
_outgoingTransfer = nullptr;
}
}
}
uint64 UTransferComponent::GetNextTransferId() {
uint64 nextId = _nextTransferId;
_nextTransferId += 2;
return nextId;
}
Transfer* UTransferComponent::GetCompletedTransfer(uint64 transferId) {
if (_completedIncomingTransfers.Contains(transferId)) {
Transfer* transfer = _completedIncomingTransfers[transferId];
return transfer;
}
return nullptr; // Not found
}
void UTransferComponent::FreeCompletedTransfer(uint64 transferId) {
if (_completedIncomingTransfers.Contains(transferId)) {
Transfer* transfer = _completedIncomingTransfers[transferId];
_completedIncomingTransfers.Remove(transferId);
delete [] transfer->Buffer;
delete transfer;
} else {
UE_LOG(TransferComponentSub, Warning, TEXT("Tried to free transferId %llu but this transfer does not exist."), transferId);
}
}
uint64 UTransferComponent::SendBuffer(ETransferDirection transferDirection, uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted /* = nullptr */) {
// Make sure we are on the client if it's Client->Server and the Server if it's Server->Client
if (GetNetMode() != ENetMode::NM_Client && transferDirection == ETransferDirection::ClientToServer) {
UE_LOG(TransferComponentSub, Error, TEXT("SendBufferToServer not called by the client! This is an error."));
return 0;
}
if (GetNetMode() == ENetMode::NM_Client && transferDirection == ETransferDirection::ServerToClient) {
UE_LOG(TransferComponentSub, Error, TEXT("SendBufferToClient not called by the server! This is an error."));
return 0;
}
// Make a transfer for this
Transfer* transfer = new Transfer();
transfer->Buffer = buffer;
transfer->Length = length;
transfer->Direction = transferDirection;
transfer->OnTransferCompleted = onTransferCompleted;
transfer->TransferId = GetNextTransferId();
// Add it to the queue
_pendingOutgoingTransfers.Enqueue(transfer);
return transfer->TransferId;
}
void UTransferComponent::ReceiveChunkOnReceiver(ETransferDirection direction, uint64 transferId, const TArray<uint8>& chunk, int totalBytes) {
// Is this transfer in the map yet?
if (!_completedIncomingTransfers.Contains(transferId)) {
Transfer* newTransfer = new Transfer();
newTransfer->Buffer = new uint8[totalBytes];
newTransfer->Length = totalBytes;
newTransfer->Direction = direction;
newTransfer->TransferId = transferId;
_completedIncomingTransfers.Add(transferId, newTransfer);
}
// Copy the chunk into the overall array
Transfer* transfer = _completedIncomingTransfers[transferId];
memcpy(transfer->Buffer + transfer->BytesTransferred, chunk.GetData(), chunk.Num());
transfer->BytesTransferred += chunk.Num();
if (_verboseLoggingEnabled) {
UE_LOG(TransferComponentSub, Display, TEXT("Received chunk of size %d (%d out of %d in total)"), chunk.Num(), transfer->BytesTransferred, totalBytes);
}
}
/*
* Copyright © 2023 kirby561
*
* Permission is hereby granted, free of charge, to any person obtaining a copy of this software
* and associated documentation files (the “Software”), to deal in the Software without
* restriction, including without limitation the rights to use, copy, modify, merge, publish,
* distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the
* Software is furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all copies or
* substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING
* BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
* NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM,
* DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
* FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
*/
#pragma once
#include "CoreMinimal.h"
#include <functional>
#include "TransferComponent.generated.h"
enum class ETransferDirection {
ClientToServer,
ServerToClient
};
/**
* Simple struct for keeping track of in flight and completed transfers.
*/
struct Transfer {
public:
// Input/Output
uint8* Buffer = nullptr;
int Length = 0;
ETransferDirection Direction = ETransferDirection::ClientToServer;
std::function<void(uint64)> OnTransferCompleted = nullptr;
uint64 TransferId = 0;
// Transfer progress
int BytesTransferred = 0; // Number of bytes transferred so far
int GetBytesRemaining() {
return Length - BytesTransferred;
}
};
/**
* Facilitates transfering large buffers of data between the client and
* the server using the built in RPC framework and thus not requiring a
* separate socket connection to be established. This allows users to reliably send
* data larger than the usual limit by splitting the transfer up into chunks
* and adding the chunks to the reliable buffer across multiple ticks as needed.
*
* The main interface is SendBufferToServer/SendBufferToClient. After the transfer
* completes, the caller's callback function is called on the sending side. This makes
* it easy to have continuation logic inline with the transfer for what to do with the
* buffer once it gets there. The buffer can be retrieved on the receiving side with
* ClientGetsCompletedTransfer(...)/ServerGetsCompletedTransfer(...) depending on which
* side was the receiver. The buffers can be freed when no longer needed using
* ClientFreesCompletedTransfer(...)/ServerFreesCompletedTransfer. The receiver MUST call
* the corresponding FreesCompletedTransfer method or they will not get cleaned up until
* the actor this component is attached to is destroyed.
*
* The public interface to this class's methods all have a Client and Server
* version to be very explicit which side of the connection has the buffer and which side
* is sending it and each point. Internally they do the same thing plus error checking.
*/
UCLASS()
class UTransferComponent : public UActorComponent {
GENERATED_BODY()
public:
UTransferComponent();
virtual ~UTransferComponent();
/**
* Sends the given buffer of the given length to the server and splits it
* up into chunks if necessary. This method takes an optional function to
* be called when the transfer is complete and providing the ID of the transfer
* for context to the caller. This callback function is called on the client.
* @param buffer The buffer to send from the client to the server. This component owns this buffer and will delete it if the transfer is completed or cancelled.
* @param length The number of bytes in the buffer.
* @param onTransferCompleted An optional function to call when this method completes that provides the same transfer ID as an argument that this function returns.
* Note: This is only called on the client.
* @returns Returns a number that uniquely identifies this transfer and will be provided on onTransferCompleted as well.
*
* @remarks There is no error handling on this method because this is a reliable transfer. If anything goes wrong it's because the client was
* disconnected from the server and presumably this transfer is no longer needed. In that event, the buffer passed in is still
* deleted since it cleans up any in progress transfers when it goes away.
*/
uint64 SendBufferToServer(uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted = nullptr);
/**
* Sends the given buffer of the given length to the client and splits it
* up into chunks if necessary. This method takes an optional function to
* be called when the transfer is complete and providing the ID of the transfer
* for context to the caller. This callback function is called only on the Server.
* @param buffer The buffer to send from the client to the server. This component owns this buffer and will delete it if the transfer is completed or cancelled.
* @param length The number of bytes in the buffer.
* @param onTransferCompleted An optional function to call when this method completes that provides the same transfer ID as an argument that this function returns.
* Note: This is only called on the Server.
* @returns Returns a number that uniquely identifies this transfer and will be provided on onTransferCompleted as well.
*
* @remarks There is no error handling on this method because this is a reliable transfer. If anything goes wrong it's because the client was
* disconnected from the server and presumably this transfer is no longer needed. In that event, the buffer passed in is still
* deleted since it cleans up any in progress transfers when it goes away.
*/
uint64 SendBufferToClient(uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted = nullptr);
//
// Interface from the server to get/free transfers sent by the client
//
/**
* Called on the server to get a transfer that was sent by a client
* with the given transferId.
* @param transferId - The unique ID of the transfer to retrieve.
* @returns Returns the transfer or nullptr if it is not found.
*/
Transfer* ServerGetsCompletedTransfer(uint64 transferId);
/**
* Called on the server to get a transfer that was sent by a client
* with the given transferId.
* @param transferId - The unique ID of the transfer to retrieve.
* @returns Returns the transfer or nullptr if it is not found.
*/
Transfer* ClientGetsCompletedTransfer(uint64 transferId);
/**
* Called on the server to delete a transfer that is no longer needed.
* @param transferId - The unique ID of the transfer to delete.
*/
void ServerFreesCompletedTransfer(uint64 transferId);
/**
* Called on the client to delete a transfer that is no longer needed.
* @param transferId - The unique ID of the transfer to delete.
*/
void ClientFreesCompletedTransfer(uint64 transferId);
/**
* Sets whether verbose logging should be enabled or not.
* @param isEnabled - True to enable it, false to disable.
*/
void SetVerboseLoggingEnabled(bool isEnabled) { _verboseLoggingEnabled = isEnabled; }
public: // UActorComponent interface
virtual void TickComponent(float deltaSeconds, ELevelTick tickType, FActorComponentTickFunction* thisTickFunction) override;
private: // Network methods
UFUNCTION(Server, Reliable) // Client->Server
void ClientSendsChunkToServer(uint64 transferId, const TArray<uint8>& chunk, int totalBytes);
void ClientSendsChunkToServer_Implementation(uint64 transferId, const TArray<uint8>& chunk, int totalBytes);
UFUNCTION(Client, Reliable) // Server->Client
void ServerSendsChunkToClient(uint64 transferId, const TArray<uint8>& chunk, int totalBytes);
void ServerSendsChunkToClient_Implementation(uint64 transferId, const TArray<uint8>& chunk, int totalBytes);
private: // Private methods
/**
* Checks if any work needs to be done on outgoing transfers
* and does is.
*/
void TickTransfers();
// Network transfer methods used by the public interface above.
uint64 GetNextTransferId();
Transfer* GetCompletedTransfer(uint64 transferId);
void FreeCompletedTransfer(uint64 transferId);
uint64 SendBuffer(ETransferDirection transferDirection, uint8* buffer, int length, std::function<void(uint64)> onTransferCompleted = nullptr);
void ReceiveChunkOnReceiver(ETransferDirection direction, uint64 transferId, const TArray<uint8>& chunk, int totalBytes);
// Helper methods
int Min(int a, int b) { if (a < b) return a; else return b; }
private: // Transfer state. Note that there are 2 versions of this actor, one on the server and one on the client. So each one has its own outgoing and incoming transfers.
// Outgoing state
TQueue<Transfer*> _pendingOutgoingTransfers; // Transfers we haven't sent yet
Transfer* _outgoingTransfer = nullptr; // Currently outgoing transfer on the client (if any)
TArray<uint8> _outgoingTransferBuffer; // Keep a buffer around so we don't need to reallocate constantly
// Incoming state
TMap<uint64, Transfer*> _completedIncomingTransfers; // When a transfer completes, hang on to it by ID until the owner tells us to delete it.
// Transfer ID:
// "0" is an error, this should be initialized in the constructor to either 1 or 2 depending on if
// we're on the client or the server. The client's IDs will always be odd and the server's will always
// be even. This guarantees all transfer IDs are unique between the client/server (although for multicast
// they will not be unique between clients but that's ok - we can add the client ID to it or something later
// if we need to do multicast).
uint64 _nextTransferId = 0;
// Whether you want logs when transfers come in or not
bool _verboseLoggingEnabled = false;
private: // Constants
// This is defined in DataChannel.cpp in the engine
// but is not exposed so it is duplicated here.
static const int32 NetMaxConstructedPartialBunchSizeBytes = 1024 * 64; // 64KB
};
@hojjatjafary
Copy link

Hi,
I used a similar method to send only 150K data to the clients, but after sending is completed, replication doesn't work for about 5-10 seconds. Then, positions and other things suddenly replicated. It seems because there are many data queued (UNetConnection::QueuedBits) and the network is saturated (UNetDriver::ServerReplicateActors_MarkRelevantActors) until the queue is freed. Did you test this in a real game with a bunch of actors that are replicating?

@kirby561
Copy link
Author

Hey. I haven't run into an issue with this in a real game but if the files you're sending are big enough to cause a hiccup during gameplay you can further throttle it to your needs by adding more conditions to this part of TransferComponent.cpp:
networkChannel->NumOutRec < numberOfReliableOutputBunchesLimit

You could also change the bytes per packet based on the current game state or something. For example, if you had a lobby where a hiccup doesn't matter when players join, have a higher limit than if a player joins while the game is currently active.

Another thing you could do is increase the amount of bandwidth the AGameNetworkManager is trying to use. See TotalNetBandwidth in https://docs.unrealengine.com/4.27/en-US/API/Runtime/Engine/GameFramework/AGameNetworkManager/

You can set that value in DefaultEngine.ini

[/Script/Engine.GameNetworkManager]
TotalNetBandwidth=32000
MaxDynamicBandwidth=7000
MinDynamicBandwidth=4000

@hojjatjafary
Copy link

Thank you,
I think the better solution is to have a send rate and multiply it with deltaSeconds and just send as much bytes as required and not rely only on the number of out packets.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment