Skip to content

Instantly share code, notes, and snippets.

@danielealbano
Created September 24, 2023 15:30
Show Gist options
  • Save danielealbano/3f11afb2f549ee3c90081dcfcc68255f to your computer and use it in GitHub Desktop.
Save danielealbano/3f11afb2f549ee3c90081dcfcc68255f to your computer and use it in GitHub Desktop.
golang - mpmc ringbuffer - spinlock vs mutex
go test -bench=.
goos: linux
goarch: amd64
pkg: pgproxy/internal/helpers
cpu: AMD Ryzen 9 3950X 16-Core Processor
BenchmarkRingBufferEnqueueDequeueMutex-32 5174512 212.9 ns/op 0 B/op 0 allocs/op
BenchmarkRingBufferEnqueueMutex-32 8517314 127.9 ns/op 0 B/op 0 allocs/op
BenchmarkRingBufferDequeueMutex-32 16836531 71.17 ns/op 0 B/op 0 allocs/op
BenchmarkRingBufferEnqueueDequeueSpinlock-32 3977191 303.1 ns/op 0 B/op 0 allocs/op
BenchmarkRingBufferEnqueueSpinlock-32 7919860 146.9 ns/op 0 B/op 0 allocs/op
BenchmarkRingBufferDequeueSpinlock-32 9238311 132.6 ns/op 0 B/op 0 allocs/op
PASS
ok pgproxy/internal/helpers 8.825s
package helpers
import (
"errors"
"sync"
)
type T interface{}
var (
errFull = errors.New("full")
errEmpty = errors.New("empty")
)
type RingBuffer struct {
lock sync.Locker
items []T
capacity int
head int
tail int
full bool
}
func NewRingBuffer(size int, lock sync.Locker) *RingBuffer {
rb := &RingBuffer{
lock: lock,
items: make([]T, size),
capacity: size,
}
return rb
}
func (s *RingBuffer) Capacity() int {
return s.capacity
}
func (s *RingBuffer) Len() int {
if s.full {
return s.capacity
}
if s.tail >= s.head {
return s.tail - s.head
}
return s.capacity - s.head + s.tail
}
func (s *RingBuffer) IsFull() bool {
return s.full
}
func (s *RingBuffer) Enqueue(item T) error {
if s.IsFull() {
return errFull
}
s.lock.Lock()
s.items[s.tail] = item
s.tail = (s.tail + 1) % s.capacity
s.full = s.head == s.tail
s.lock.Unlock()
return nil
}
func (s *RingBuffer) Dequeue() (T, error) {
if s.Len() == 0 {
return nil, errEmpty
}
s.lock.Lock()
data := s.items[s.head]
s.full = false
s.head = (s.head + 1) % s.capacity
s.lock.Unlock()
return data, nil
}
package helpers
import (
"errors"
"sync"
"testing"
)
func TestNewRingBuffer(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
if q.IsFull() != false {
t.Errorf("IsFull() should return false")
}
if q.Capacity() != 10 {
t.Errorf("Capacity() should return 10")
}
}
func TestRingBufferEnqueue(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
err := q.Enqueue(1)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
if q.IsFull() != false {
t.Errorf("IsFull() should return false")
}
}
func TestRingBufferEnqueueFullError(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
for i := 0; i < 10; i++ {
err := q.Enqueue(i)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
}
if q.IsFull() != true {
t.Errorf("IsFull() should return true")
}
err := q.Enqueue(1)
if !errors.Is(err, errFull) {
t.Errorf("Enqueue() should return an error")
}
}
func TestRingBufferDequeue(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
err := q.Enqueue(1)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
_, err = q.Dequeue()
if err != nil {
t.Errorf("Dequeue() should not return an error")
}
if q.IsFull() != false {
t.Errorf("IsFull() should return false")
}
}
func TestRingBufferDequeueEmpty(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
_, err := q.Dequeue()
if !errors.Is(err, errEmpty) {
t.Errorf("Dequeue() should return an error")
}
}
func TestRingBufferLen(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
err := q.Enqueue(1)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
if q.Len() != 1 {
t.Errorf("Len() should return 1")
}
}
func TestRingBufferLenFull(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
for i := 0; i < 10; i++ {
err := q.Enqueue(i)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
}
if q.Len() != 10 {
t.Errorf("Len() should return 10")
}
}
func TestRingBufferLenAfterMultipleCycles(t *testing.T) {
q := NewRingBuffer(10, &sync.Mutex{})
for i := 0; i < 10; i++ {
err := q.Enqueue(i)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
}
for i := 0; i < 5; i++ {
_, err := q.Dequeue()
if err != nil {
t.Errorf("Dequeue() should not return an error")
}
}
for i := 0; i < 5; i++ {
err := q.Enqueue(i)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
}
for i := 0; i < 10; i++ {
_, err := q.Dequeue()
if err != nil {
t.Errorf("Dequeue() should not return an error")
}
}
for i := 0; i < 5; i++ {
err := q.Enqueue(i)
if err != nil {
t.Errorf("Enqueue() should not return an error")
}
}
if q.Len() != 5 {
t.Errorf("Len() should return 5")
}
}
func internalBenchmarkRingBufferEnqueueDequeue(b *testing.B, q *RingBuffer) {
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = q.Enqueue(1)
_, _ = q.Dequeue()
}
})
}
func internalBenchmarkRingBufferEnqueue(b *testing.B, q *RingBuffer) {
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_ = q.Enqueue(1)
}
})
}
func internalBenchmarkRingBufferDequeue(b *testing.B, q *RingBuffer) {
for i := 0; i < b.N; i++ {
_ = q.Enqueue(i)
}
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
_, _ = q.Dequeue()
}
})
}
func BenchmarkRingBufferEnqueueDequeueMutex(b *testing.B) {
q := NewRingBuffer(b.N, &sync.Mutex{})
internalBenchmarkRingBufferEnqueueDequeue(b, q)
}
func BenchmarkRingBufferEnqueueMutex(b *testing.B) {
q := NewRingBuffer(b.N, &sync.Mutex{})
internalBenchmarkRingBufferEnqueue(b, q)
}
func BenchmarkRingBufferDequeueMutex(b *testing.B) {
q := NewRingBuffer(b.N, &sync.Mutex{})
internalBenchmarkRingBufferDequeue(b, q)
}
func BenchmarkRingBufferEnqueueDequeueSpinlock(b *testing.B) {
q := NewRingBuffer(b.N, &SpinLock{})
internalBenchmarkRingBufferEnqueueDequeue(b, q)
}
func BenchmarkRingBufferEnqueueSpinlock(b *testing.B) {
q := NewRingBuffer(b.N, &SpinLock{})
internalBenchmarkRingBufferEnqueue(b, q)
}
func BenchmarkRingBufferDequeueSpinlock(b *testing.B) {
q := NewRingBuffer(b.N, &SpinLock{})
internalBenchmarkRingBufferDequeue(b, q)
}
package helpers
import (
"sync/atomic"
"time"
)
// go runtime doSpin uses 30 but not a relevant difference
var waitSpins = 20
type SpinLock struct {
v uint32
}
func (l *SpinLock) Lock() {
for !l.TryLock() {
l.spin()
}
}
func (l *SpinLock) LockWithTimeout(duration time.Duration) bool {
start := time.Now()
for !l.TryLock() {
if time.Since(start) > duration {
return false
}
l.spin()
}
return true
}
func (l *SpinLock) Unlock() {
l.v = 0
}
func (l *SpinLock) TryLock() bool {
if l.v == 1 {
return false
}
return atomic.CompareAndSwapUint32(&l.v, 0, 1)
}
func (l *SpinLock) IsLocked() bool {
return atomic.LoadUint32(&l.v) == 1
}
func (l *SpinLock) spin() {
for spin := 0; spin < waitSpins; spin++ {
// spin
}
}
package helpers
import (
"testing"
"time"
)
func TestSpinLock(t *testing.T) {
l := &SpinLock{}
l.Lock()
if !l.IsLocked() {
t.Errorf("IsLocked() should return true")
}
l.Unlock()
if l.IsLocked() {
t.Errorf("IsLocked() should return false")
}
}
func TestSpinLockTryLock(t *testing.T) {
l := &SpinLock{}
if !l.TryLock() {
t.Errorf("TryLock() should return true")
}
if l.TryLock() {
t.Errorf("TryLock() should return false")
}
l.Unlock()
}
func TestSpinLockTryLock2(t *testing.T) {
l := &SpinLock{}
l.Lock()
if !l.IsLocked() {
t.Errorf("IsLocked() should return true")
}
if l.TryLock() {
t.Errorf("TryLock() should return false")
}
l.Unlock()
}
func TestSpinLockUnlock(t *testing.T) {
l := &SpinLock{}
l.Lock()
if !l.IsLocked() {
t.Errorf("IsLocked() should return true")
}
l.Unlock()
if l.IsLocked() {
t.Errorf("IsLocked() should return false")
}
}
func TestLockWithTimeout(t *testing.T) {
l := &SpinLock{}
l.Lock()
if !l.IsLocked() {
t.Errorf("IsLocked() should return true")
}
start := time.Now()
if l.LockWithTimeout(time.Millisecond * 5) {
t.Errorf("LockWithTimeout() should return false")
}
end := time.Now()
if end.Sub(start) < time.Millisecond*5 {
t.Errorf("LockWithTimeout() should wait at least 5ms")
}
// Allow and extra ms for padding and time calculation rounding
if end.Sub(start) > time.Millisecond*6 {
t.Errorf("LockWithTimeout() should wait at most 6ms")
}
l.Unlock()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment