Skip to content

Instantly share code, notes, and snippets.

@rogeralsing
Last active January 1, 2016 08:58
Show Gist options
  • Save rogeralsing/8121376 to your computer and use it in GitHub Desktop.
Save rogeralsing/8121376 to your computer and use it in GitHub Desktop.
Attempt to get fast thread to thread communication using ring buffers and no locks.
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication1
{
class Program
{
static void Main(string[] args)
{
var buffer = new RingBuffer<MyData>(10000);
var producer = new Producer<MyData>(buffer);
var reader = new MyDataReader(buffer);
Stopwatch sw = Stopwatch.StartNew();
for(int i = 0; i < 1000000;i++)
{
producer.Write(new MyData {
Data = "hello " + i,
});
}
sw.Stop();
Console.WriteLine(sw.Elapsed);
Console.ReadLine();
}
}
public struct MyData
{
public string Data { get; set; }
}
public class MyDataReader : Reader<MyData>
{
public MyDataReader(RingBuffer<MyData> buffer)
: base(buffer)
{
}
protected override void OnData(MyData data)
{
// Console.WriteLine(data.Data);
}
}
public abstract class Reader<T> where T : struct
{
private readonly RingBuffer<T> buffer;
public Reader(RingBuffer<T> buffer)
{
this.buffer = buffer;
Thread t = new Thread(() =>
{
while (true)
{
var data = buffer.Read();
if (data == null)
{
//buffer returned null, that is, we are infront of the writer
}
else
{
OnData(data.Value);
}
}
});
t.IsBackground = true;
t.Start();
}
protected abstract void OnData(T data);
}
public class Producer<T> where T:struct
{
private RingBuffer<T> buffer;
public Producer(RingBuffer<T> buffer)
{
this.buffer = buffer;
}
public void Write(T data)
{
buffer.Write(data);
}
}
public class RingBuffer<T> where T:struct
{
private readonly T[] slots;
private int writeIndex = 0; //yes i know that it will eventyally run out of indices with int32
private int readIndex = 0;
public RingBuffer(int capacity)
{
slots = new T[capacity];
}
public void Write(T data)
{
//How safe is this?
//Reader could die and the whole thing just hangs forever
//But how safe is the reads, can integers be "half written"?
//I'd expect not since it would be an Int32 Mov op native (?)
while (writeIndex > readIndex + slots.Length)
{
//just let the reader catch up a bit.
//doesn't matter if we sleep too long because the writer is
//apparently faster than the reader since we got here in the first place.
//
//How is this dealt with in Disruptor pattern?
//buffers are finite, and when flooding the buffer, the behavior must go from async to sync
//somewhat like this?
Thread.Sleep(10);
}
slots[writeIndex % slots.Length] = data;
System.Threading.Interlocked.Increment(ref writeIndex);
}
public T? Read()
{
//if reader is infront of writer, just return null
//and stay at the same index
if (readIndex > writeIndex)
return null;
var data = slots[readIndex % slots.Length];
System.Threading.Interlocked.Increment(ref readIndex);
return data;
}
}
}
@FransBouma
Copy link

passing data between threads can be done with dual buffers:

Use wait handle for reader. This makes the reader wait till the writer has written a buffer. When a buffer is full, writer signals reader by setting the waithandle. It will then wait for the waithandle which should be set by the reader. Reader should unblock because of this and it then sets the waithandle the writer is blocking on. Reader reads buffer and will wait again in its wait handle.

writer is unblocked because reader reads and will write in the currently empty buffer (the other one is being read). When the buffer is full, the writer will set the reader's wait handle and will wait on its own.

So in short: the writer sets the reader's wait handle so it can read the buffer written and will wait on a wait handle when a buffer is written in full. The reader will wait on its waithandle so it will start reading when the writer has written a buffer. The reader will set the writer's wait handle that it is reading so the writer can proceed with writing (in the other buffer).

(the above of course assumes both threads do nothing but writing and reading, so they can wait. The wait handles are there to make sure they only do work when needed, if they also have other things to do, you can do that while they have to wait and then wait in the waithandle to make sure they don't proceed. wait handles have timeouts so you can schedule waits and other work accordingly in the same thread)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment