Skip to content

Instantly share code, notes, and snippets.

@tux21b
Created September 15, 2011 02:22
Show Gist options
  • Star 5 You must be signed in to star a gist
  • Fork 2 You must be signed in to fork a gist
  • Save tux21b/1218360 to your computer and use it in GitHub Desktop.
Save tux21b/1218360 to your computer and use it in GitHub Desktop.
Disruptor
// Copyright (C) 2011 by Christoph Hack. All rights reserved.
// Use of this source code is governed by the New BSD License.
/*
This program contains a simple micro-benchmark for a Disruptor [1] like
event buffer (or at least what I think that Disruptor might be). There
isn't any kind of API yet and this test is currently limited to a
single producer - single consumer architecture, but generally the
architecture of Disruptor is also well suited for multiple producers and
consumers.
According to the technical paper about Disruptor [2] it's design has
several benefits over traditional message queues. Some of the
highlights are:
* Preallocated Ring Buffers are faster than Single Linked Lists because
they can utilize the caches better and they produce no garbage.
* The lock free implementation avoids context switches which might lead
the queue to be rescheduled on another core with an empty cache.
* Producers and consumers will start to batch event messaging / receiving
as soon as they fall behind. This avoids some synchronization overhead.
* Disrupter like message buffers can be used in interesting architectures,
like that one from LMAX [3]. The full article can be found here [4].
Disclaimer: I haven't done any other concurrent programs yet, and i doubt
that the implementation is correct. So please have patience.
[1]: http://code.google.com/p/disruptor/
[2]: http://disruptor.googlecode.com/files/Disruptor-1.0.pdf
[3]: http://martinfowler.com/articles/images/lmax/arch-full.png
[4]: http://martinfowler.com/articles/lmax.html
*/
package disruptor
import (
"testing"
"sync/atomic"
)
const BufferSize = 2048
const BufferMask = BufferSize - 1
// A sequence is 64 bytes large (the usual size of todays cache lines) to
// avoid false sharing. Only the first element should be used.
type Sequence [8]uint64
func BenchmarkChannels(b *testing.B) {
ch := make(chan int, BufferSize)
go func() {
for i := 0; i < b.N; i++ {
ch <- int(i)
}
}()
for i := 0; i < b.N; i++ {
val := <-ch
if val != int(i) {
panic("invalid result")
}
}
}
func BenchmarkDisruptor(b *testing.B) {
var ring [BufferSize]int // the ring buffer
var cseq, pseq Sequence // consumer and producer sequence number (+padding)
go func() { // producer goroutine
var seq, max_seq Sequence
for seq[0] = uint64(0); seq[0] < uint64(b.N); seq[0]++ {
// busy spin until there is room for writing
for seq[0] >= max_seq[0] {
max_seq[0] = atomic.LoadUint64(&cseq[0]) + BufferSize - 2
}
// send the message
ring[seq[0]&BufferMask] = int(seq[0])
atomic.StoreUint64(&pseq[0], seq[0]+1) // Better: LazyStoreUint64
}
}()
var seq, max_seq Sequence
for seq[0] = uint64(0); seq[0] < uint64(b.N); seq[0]++ {
// busy spin until there is data available
for seq[0] >= max_seq[0] {
max_seq[0] = atomic.LoadUint64(&pseq[0])
}
// data can now be read from ring[seq&BufferMask].
val := ring[seq[0]&BufferMask]
atomic.StoreUint64(&cseq[0], seq[0]) // Better: LazyStoreUint64()
if val != int(seq[0]) {
panic("invalid result")
}
}
}
// Copyright (C) 2011 by Christoph Hack. All rights reserved.
// Use of this source code is governed by the New BSD License.
/**
* Disruptor
*
* Compile with:
* g++ -std=gnu++0x -pthread -Wall -o disruptor -O3 disruptor.cc
*/
#include <thread>
#include <atomic>
#include <iostream>
#include <pthread.h>
#include <errno.h>
const unsigned long N = 10e9;
const int RING_SIZE = 2048;
const int RING_MASK = RING_SIZE - 1;
unsigned long ring[RING_SIZE];
std::atomic<unsigned long> __attribute__((aligned(64))) pseq(0);
std::atomic<unsigned long> __attribute__((aligned(64))) cseq(0);
static void bind_to_cpu(int cpu)
{
cpu_set_t mask;
CPU_ZERO(&mask);
CPU_SET(cpu, &mask);
if (pthread_setaffinity_np(pthread_self(), sizeof(mask), &mask) < 0) {
perror("pthread_setaffinity_np");
}
}
void producer_thread()
{
bind_to_cpu(1);
unsigned long max_seq(0);
for (unsigned long seq = 0; seq <= N; seq++)
{
// busy spin until there is room for writing
while (seq >= max_seq) {
max_seq = cseq.load();
max_seq += RING_SIZE - 2;
}
// send the message
ring[seq&RING_MASK] = seq;
pseq.store(seq+1, std::memory_order_release);
}
}
int main()
{
std::thread t(producer_thread);
bind_to_cpu(0);
unsigned long max_seq(0);
for (unsigned long seq = 0; seq <= N; seq++)
{
// busy spin until there is data available
while (seq >= max_seq) {
max_seq = pseq.load();
}
// retrieve message
unsigned long msg = ring[seq&RING_MASK];
cseq.store(seq, std::memory_order_release);
if (msg != seq) {
std::cerr << "expected " << seq << " got " << msg << std::endl;
}
}
t.join();
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment