Skip to content

Instantly share code, notes, and snippets.

@gamemachine
Created August 10, 2019 19:12
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save gamemachine/16718ec0e68bc302ca12356922d0d994 to your computer and use it in GitHub Desktop.
Save gamemachine/16718ec0e68bc302ca12356922d0d994 to your computer and use it in GitHub Desktop.
using JacksonDunstan.NativeCollections;
using System;
using System.Threading;
using Unity.Collections;
using Unity.Collections.LowLevel.Unsafe;
namespace AiGame.Containers
{
public struct NativeMPMCQueue<T> where T : struct
{
[NativeDisableContainerSafetyRestriction]
[NativeDisableParallelForRestriction]
private NativeArray<Cell> Buffer;
private readonly int BufferMask;
[NativeDisableContainerSafetyRestriction]
[NativeDisableParallelForRestriction]
private NativeIntPtr EnqueuePos;
[NativeDisableContainerSafetyRestriction]
[NativeDisableParallelForRestriction]
private NativeIntPtr DequeuePos;
public NativeMPMCQueue(int bufferSize, Allocator allocator)
{
if (bufferSize < 2) throw new ArgumentException("bufferSize should be greater than or equal to 2");
if ((bufferSize & (bufferSize - 1)) != 0) throw new ArgumentException("bufferSize should be a power of 2");
BufferMask = bufferSize - 1;
Buffer = new NativeArray<Cell>(bufferSize, allocator);
for (var i = 0; i < bufferSize; i++)
{
Buffer[i] = new Cell(i, default);
}
EnqueuePos = new NativeIntPtr(allocator);
DequeuePos = new NativeIntPtr(allocator);
}
public void Dispose()
{
if (Buffer.IsCreated) Buffer.Dispose();
if (EnqueuePos.IsCreated) EnqueuePos.Dispose();
if (DequeuePos.IsCreated) DequeuePos.Dispose();
}
public unsafe bool TryEnqueue(T item)
{
do
{
int pos = EnqueuePos.Value;
int index = pos & BufferMask;
Cell cell = Buffer[index];
if (cell.Sequence == pos && Interlocked.CompareExchange(ref *EnqueuePos.GetUnsafePtr(), pos + 1, pos) == pos)
{
cell.Element = item;
Interlocked.Exchange(ref cell.Sequence, pos + 1);
Buffer[index] = cell;
return true;
}
if (cell.Sequence < pos)
{
return false;
}
} while (true);
}
public unsafe bool TryDequeue(out T result)
{
do
{
var bufferMask = BufferMask;
var pos = DequeuePos.Value;
var index = pos & bufferMask;
var cell = Buffer[index];
if (cell.Sequence == pos + 1 && Interlocked.CompareExchange(ref *DequeuePos.GetUnsafePtr(), pos + 1, pos) == pos)
{
result = cell.Element;
cell.Element = default;
Interlocked.Exchange(ref cell.Sequence, pos + bufferMask + 1);
Buffer[index] = cell;
return true;
}
if (cell.Sequence < pos + 1)
{
result = default;
return false;
}
} while (true);
}
struct Cell
{
public int Sequence;
public T Element;
public Cell(int sequence, T element)
{
Sequence = sequence;
Element = element;
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment