Skip to content

Instantly share code, notes, and snippets.

@caelifer
Created October 25, 2019 23:23
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save caelifer/70864cb13e9c12842059daba7a144ca3 to your computer and use it in GitHub Desktop.
Save caelifer/70864cb13e9c12842059daba7a144ca3 to your computer and use it in GitHub Desktop.
package disruptor
import (
"encoding/binary"
"log"
"runtime"
"sync/atomic"
"time"
)
const (
RingBufferSize = 8 * 64 // 8 64-byte cache lines
SizeOfInt = 8
)
type Offset = uint64
type ring struct {
// Make sure 64-byte alignment
buf [RingBufferSize]byte
dirty Offset
clean Offset
datumSz Offset // max 264 bytes (4*64)
retries int64
}
func New(datumSize Offset) *ring {
return &ring{datumSz: datumSize}
}
// commit should be inlined by the compiler
func (r *ring) commit(oldDirty, newOffset Offset) bool {
clean := atomic.LoadUint64(&r.clean)
return atomic.CompareAndSwapUint64(&r.dirty, oldDirty, newOffset) &&
atomic.CompareAndSwapUint64(&r.clean, clean, newOffset)
}
func (r *ring) write(data []byte) int {
sz := Offset(len(data))
if sz > r.datumSz {
panic("unsupported data chunk size")
}
// Try until we succeed
for {
// Load stored area
dirty := atomic.LoadUint64(&r.dirty)
// Calculate new offset
off := (dirty + sz) % r.datumSz
// Copy data to the new slot
_ = copy(r.buf[off:], data[:r.datumSz])
// Break if we are able to commit
if r.commit(dirty, off) {
break
}
atomic.AddInt64(&r.retries, 1)
}
return int(sz)
}
// Read returns data chunk available to read. It may return zeroed memory if there is no available
// data in the buffer.
func (r *ring) Read() []byte {
return r.buf[atomic.LoadUint64(&r.clean):r.datumSz]
}
func TestDisruptor(samples int, wait *int64) *ring {
data := make([]byte, SizeOfInt)
r := New(SizeOfInt) //
go func(t0 time.Time) {
for i := 0; i < samples; i++ {
binary.LittleEndian.PutUint64(data, uint64(i))
r.write(data)
}
elapsed := time.Since(t0)
log.Printf("writer: finished writing %dM samples in %v [%.2f samples/s] (with %d contentions)",
samples/1000000, elapsed, float64(samples)/elapsed.Seconds(), atomic.LoadInt64(&r.retries))
atomic.AddInt64(wait, -1)
}(time.Now())
return r
}
func WriteN(n int, wait *int64) *ring {
data := make([]byte, SizeOfInt)
r := New(SizeOfInt) //
go func() {
for i := 10; i < 10+n; i++ {
binary.LittleEndian.PutUint64(data, uint64(i))
r.write(data)
runtime.Gosched()
}
atomic.AddInt64(wait, -1)
}()
return r
}
@caelifer
Copy link
Author

work in progress

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment