Skip to content

Instantly share code, notes, and snippets.

@CAFxX
Last active September 8, 2020 23:43
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save CAFxX/86ee37e8c912a25f522a4d2e1df58701 to your computer and use it in GitHub Desktop.
Save CAFxX/86ee37e8c912a25f522a4d2e1df58701 to your computer and use it in GitHub Desktop.
Golang sharding primitive
package main
import (
"runtime"
"sync"
"sync/atomic"
"testing"
"unsafe"
)
type paddedMutex struct {
sync.Mutex
_ [64 - unsafe.Sizeof(sync.Mutex{})]byte
}
func BenchmarkMutex(b *testing.B) {
var m paddedMutex
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m.Lock()
m.Unlock()
}
})
}
func BenchmarkIndependentMutexes(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
var m paddedMutex
for pb.Next() {
m.Lock()
m.Unlock()
}
})
}
func BenchmarkShardedMutex(b *testing.B) {
_, max := runtime.LocalityHint()
m := make([]paddedMutex, max)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
hint, _ := runtime.LocalityHint()
if hint >= len(m) {
hint = hint % len(m)
}
m[hint].Lock()
m[hint].Unlock()
}
})
}
type paddedUint64 struct {
m uint64
_ [64 - unsafe.Sizeof(uint64(0))]byte
}
func BenchmarkAtomicAdd(b *testing.B) {
var m paddedUint64
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
atomic.AddUint64(&m.m, 1)
}
})
}
func BenchmarkIndependentAtomicAdd(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
var m paddedUint64
for pb.Next() {
atomic.AddUint64(&m.m, 1)
}
})
}
func BenchmarkShardedAtomicAdd(b *testing.B) {
_, max := runtime.LocalityHint()
m := make([]paddedUint64, max)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
hint, _ := runtime.LocalityHint()
if hint >= len(m) {
hint = hint % len(m)
}
atomic.AddUint64(&m[hint].m, 1)
}
})
}
func BenchmarkChannel(b *testing.B) {
m := make(chan uint64, 1)
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
m <- 0
<-m
}
})
}
func BenchmarkIndependentChannel(b *testing.B) {
b.RunParallel(func(pb *testing.PB) {
m := make(chan uint64, 1)
for pb.Next() {
m <- 0
<-m
}
})
}
func BenchmarkShardedChannel(b *testing.B) {
_, max := runtime.LocalityHint()
m := make([]chan uint64, max)
for i := range m {
m[i] = make(chan uint64, 1)
_ = make(chan uint64, 1)
_ = make(chan uint64, 1)
_ = make(chan uint64, 1)
_ = make(chan uint64, 1)
}
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
hint, _ := runtime.LocalityHint()
if hint >= len(m) {
hint = hint % len(m)
}
m[hint] <- 0
<-m[hint]
}
})
}
package runtime
import (
"unsafe"
"internal/race"
)
// LocalityHint returns a value that is correlated, at least in a majority of calls
// within a bounded time window, with the core that is currently executing the call.
// This means that:
// - if two successive calls to LocalityHint return the same value, then it is likely
// (but not certain) that the two calls were executed by the same core
// - if two successive calls to LocalityHint return two different values, then it is
// likely (but not certain) that the two calls were executed by different cores
// The value returned by this function can be used to build sharded values, by using
// it to pick one of the available shards to reduce contention.
// Callers should treat this strictly as a hint, and should ensure that proper
// synchronization is used to access the shards. The simplest way to do this, assuming
// that numShards was set to the max value returned by LocalityHint(), is to do
// something like
//
// hint, _ := runtime.LocalityHint()
// if hint >= numShards {
// hint = hint % numShards
// }
// // use the hint
//
// Note that the actual hint value returned has, in general, no fixed mapping with a
// core: so e.g. a hint with value N does not mean that the call is being executed on
// the N-th core. Furthermore, the mapping between hint and cores can change over time.
// Note also that not all hint values may be used, although the runtime makes reasonable
// effort to minimize the number of unused hint values.
// The second value returned, max, is the current maximum possible hint. Note that max
// can change during the lifetime of a process, so callers must handle this eventuality.
// Callers should also not assume that max is equal to the number of cores/CPU in the
// system, or to GOMAXPROCS.
// The relationship between hint and max is: 0 <= hint < max.
//go:nosplit
func LocalityHint() (hint, max int) {
// Users of this function MUST NOT rely on internal details of this function.
// The following is just an example implementation, alternative implementations
// based e.g. on CPU- or OS-provided mechanisms to determine the core we are
// currently running on are also possible, as they would be compliant to the
// documented behaviour. It is worth pointing out that even an implementation
// that always returns hint=0 and max=1 would be compliant.
// The current implementation could be made slightly more defensive by using
// shuffled (once at startup, or periodically e.g. during STW) P IDs.
hint = int(getg().m.p.ptr().id)
gmp := (*int32)(&gomaxprocs)
max = int(atomic.Load((*uint32)(unsafe.Pointer(gmp))))
if hint >= max {
// extremely unlikely
max = hint + 1
}
if race.Enabled {
// return a higher max one tenth of the times
if fastrand(10) == 0 {
max = max + 1
}
// return a random hint one third of the times
if fastrandn(3) == 0 {
hint = int(fastrandn(uint32(max)))
}
}
return
}

proposal: runtime: sharding primitive

In order to enable the efficient implementation of sharded values I would like to propose a simple primitive that should hopefully be able to strike an acceptable tradeoff between usability and requirements imposed on the runtime.

This proposal is conceptually in line with #xxx and #yyy, that both contain good arguments about why sharded values are desirable. In contrast to #yyy though, this proposal does not seek to provide one-size-fits-all sharded values in the standard library, but rather just a minimal building block that allows a variety of sharded value semantics to be implemented on top. The primary rationale for this approach is that it is the very existence of such variety that makes it hard to define exactly what kind of semantics would be desirable in a one-size-fits-all sharded value.

The current proposal attempts to address this by providing a single new function, tentatively called runtime.LocalityHint, whose only concern is to provide the caller with an abstract hint about which one of potentially many execution units is the code running in. The meaning of "execution unit" in this context is deliberately left unspecified to maximize implementation freedom on part of the runtime, but it can be loosely thought of as an entity that is capable of executing code (e.g. a core, hardware thread, OS thread, user-space thread, etc.).

Concretely speaking, the proposed function would have the following signature and semantics:

// LocalityHint returns a value that is correlated, at least in a majority of calls
// within a bounded time window, with the execution unit that is currently executing 
// the call. This means that:
// - if two successive calls to LocalityHint return the same value, then it is likely
//   (but not certain) that the two calls were executed by the same execution unit
// - if two successive calls to LocalityHint return two different values, then it is
//   likely (but not certain) that the two calls were executed by different execution
//   units
// - concurrent calls to LocalityHint on different execution units are likely (but not 
//   certain) to return different values
//
// This function is mostly useful to build sharded values, by using the values returned 
// by it to pick one of the available shards, with the goal of reducing contention.
// Callers should treat the returned values strictly as hints, and should ensure that 
// proper synchronization is always used to access the shards. The simplest way to do 
// this, assuming that numShards was set to the max value returned by LocalityHint(), 
// is to do something like:
//
//   hint, _ := runtime.LocalityHint()
//   if hint >= numShards {
//       hint = hint % numShards
//   }	
//   // use the hint
//
// Note that the actual hint value returned has, in general, no fixed mapping with a
// core, thread, proc, goroutine or other specific entity: so e.g. a hint with value N 
// does not mean that the call is being executed on the N-th core, thread, proc,
// goroutine, etc. 
// Furthermore, the mapping between hint and execution units can change over time 
// during the lifetime of the same process.
// The second value returned, max, is the instantaneous maximum possible hint. Note 
// that the value of max can change between calls even during the lifetime of the same 
// process, so callers must be able to handle this eventuality.
// Callers should also not assume that max is equal to the number of physical CPU cores
// or hardware threads in the system, or to GOMAXPROCS.
// The relationship between hint and max is 0 <= hint < max. Note that not all possible
// hint values in this range may be used, although the runtime makes reasonable efforts
// to minimize the number of unused hint values.
func LocalityHint() (hint, max int)

There are multiple reasons to demand that callers always use synchronization to access the shards:

  • it reduces the requirements that LocalityHint need to comply with, thereby allowing a greater freedom in choosing among different implementation strategies
  • without it, LocalityHint would also need to tie in to some form of pinning to an execution unit, something that is generally avoided in Go as it would expose users to significant complexity
  • in many cases, the sharded value will need to implement some kind of global operation that requires access to all shards (e.g. for a sharded counter, summing all shards together; for a sharded mutex, locking all shards for global synhronization; etc.) and such operations would require all local shards operations to use synchronization anyway
  • even in the case the previous point were not to apply, at least on x86/amd64, atomic operations on uncontented memory locations do not impose excessive overhead compared to non-atomic operations

To limit incorrect usage of this primitive, when tests are run with the race detector enabled LocalityHint should randomly return values that force callers to handle the edge cases described in the function documentation.

Implementation

A trvial implementation of LocalityHint, based on the proc ID, can be found below. It is worth pointing out that this is just one of the possible implementations. Other implementations that do not rely on the proc ID are possible while being compliant with the semantics described above. Examples of such alternative implementations include:

  • using CPU instructions (e.g. CPUID/RDTSCP/RDPID) to get the current hardware thread ID
  • using OS-provided functions to get the current hardware thread ID
  • using OS-provided functions to get the current OS thread ID
  • using TLS to get the current OS thread ID
  • using the go runtime P ID (used in the example below)
  • using the go runtime M ID
  • using the go runtime G ID

It is worth noting that the semantics of LocalityHint do not require the actual value of any of the above source to be returned as a hint. Any mapping (including, potentially, some non-bijective and non-fully-deterministic ones) from the actual value to the returned value would be still within the expected semantics, obviously at some cost in terms of performance.

func LocalityHint() (hint, max int) {
	// Users of this function MUST NOT rely on internal details of this function.
	hint = int(getg().m.p.ptr().id)
	gmp := (*int32)(&gomaxprocs)
	max = int(atomic.Load((*uint32)(unsafe.Pointer(gmp))))
	if hint >= max {
		// extremely unlikely
		max = hint + 1
	}
	if race.Enabled {
		// return a higher max one tenth of the times
		if fastrand(10) == 0 {
			max = max + 1
		}		
		// return a random hint one third of the times
		if fastrandn(3) == 0 {
			hint = int(fastrandn(uint32(max)))
		}
	}
	return
}

This implementation was chosen for two main reasons:

  • It was by far the simplest in terms of implementation, being fully self-contained
  • Due to its simplicity, it fits within the inlining budget - further helping with performance

Performance

There are three kinds of lies: lies, damned lies, and statistics.

The example implementation given above was benchmarked in a

Further considerations

No special provision exists in the current proposal for NUMA systems. This is due to a variety of reasons:

  • The author is in no shape or form an expert on NUMA systems
  • Go itself does not currently expose to users anything related to NUMA
  • The current design should already offer, albeit at the cost of some memory overhead, a way to address some of the requirements of practical NUMA systems: chiefly, as long as shards are individually aligned to NUMA page granularity (something user code can trivially do) and the underlying OS is correctly configured (e.g. with NUMA balancing), it should be possible for the OS to minimize cache coherency overhead and obtain the same contention reduction benefits as in the non-NUMA case.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment