Skip to content

Instantly share code, notes, and snippets.

Embed
What would you like to do?
package disruptor
import (
"encoding/binary"
"log"
"runtime"
"sync/atomic"
"time"
)
const (
RingBufferSize = 8 * 64 // 8 64-byte cache lines
SizeOfInt = 8
)
type Offset = uint64
type ring struct {
// Make sure 64-byte alignment
buf [RingBufferSize]byte
dirty Offset
clean Offset
datumSz Offset // max 264 bytes (4*64)
retries int64
}
func New(datumSize Offset) *ring {
return &ring{datumSz: datumSize}
}
// commit should be inlined by the compiler
func (r *ring) commit(oldDirty, newOffset Offset) bool {
clean := atomic.LoadUint64(&r.clean)
return atomic.CompareAndSwapUint64(&r.dirty, oldDirty, newOffset) &&
atomic.CompareAndSwapUint64(&r.clean, clean, newOffset)
}
func (r *ring) write(data []byte) int {
sz := Offset(len(data))
if sz > r.datumSz {
panic("unsupported data chunk size")
}
// Try until we succeed
for {
// Load stored area
dirty := atomic.LoadUint64(&r.dirty)
// Calculate new offset
off := (dirty + sz) % r.datumSz
// Copy data to the new slot
_ = copy(r.buf[off:], data[:r.datumSz])
// Break if we are able to commit
if r.commit(dirty, off) {
break
}
atomic.AddInt64(&r.retries, 1)
}
return int(sz)
}
// Read returns data chunk available to read. It may return zeroed memory if there is no available
// data in the buffer.
func (r *ring) Read() []byte {
return r.buf[atomic.LoadUint64(&r.clean):r.datumSz]
}
func TestDisruptor(samples int, wait *int64) *ring {
data := make([]byte, SizeOfInt)
r := New(SizeOfInt) //
go func(t0 time.Time) {
for i := 0; i < samples; i++ {
binary.LittleEndian.PutUint64(data, uint64(i))
r.write(data)
}
elapsed := time.Since(t0)
log.Printf("writer: finished writing %dM samples in %v [%.2f samples/s] (with %d contentions)",
samples/1000000, elapsed, float64(samples)/elapsed.Seconds(), atomic.LoadInt64(&r.retries))
atomic.AddInt64(wait, -1)
}(time.Now())
return r
}
func WriteN(n int, wait *int64) *ring {
data := make([]byte, SizeOfInt)
r := New(SizeOfInt) //
go func() {
for i := 10; i < 10+n; i++ {
binary.LittleEndian.PutUint64(data, uint64(i))
r.write(data)
runtime.Gosched()
}
atomic.AddInt64(wait, -1)
}()
return r
}
@caelifer

This comment has been minimized.

Copy link
Owner Author

caelifer commented Oct 25, 2019

work in progress

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can’t perform that action at this time.