Skip to content

Instantly share code, notes, and snippets.

@logrusorgru
Last active September 19, 2018 12:50
Show Gist options
  • Star 1 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save logrusorgru/a032bd8a7da1ed4fcb5afa7ee79f9980 to your computer and use it in GitHub Desktop.
Save logrusorgru/a032bd8a7da1ed4fcb5afa7ee79f9980 to your computer and use it in GitHub Desktop.
go-mangos pools research
package pool
import (
"sync"
"testing"
"time"
)
// perform:
// go test -bench syncPool
func Benchmark_syncPool_singleThread(b *testing.B) {
//
b.StopTimer()
p := sync.Pool{New: func() interface{} { return make([]byte, 32) }}
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
flow := make(chan []byte, 100) // allocation time, agh-r-r-r-r
for pb.Next() {
flow <- p.Get().([]byte)
p.Put(<-flow)
}
})
}
func Benchmark_syncPool_multiThread(b *testing.B) {
b.StopTimer()
p := sync.Pool{New: func() interface{} { return make([]byte, 32) }}
flow := make(chan []byte, 100)
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
flow <- p.Get().([]byte)
p.Put(<-flow)
}
})
}
// simple test case
func test(t *testing.T, p Pool) {
var m Msg
for i := 0; i < 1000; i++ {
if m = p.Get(i); m == nil {
t.Fatal("m == nil")
}
if cap(m) < i {
t.Fatal("cap(m) is less than expected")
}
p.Put(m)
}
}
func Test_no(t *testing.T) { test(t, NewNoPoll()) }
func Test_sync(t *testing.T) { test(t, DefaultSyncPool()) }
func Test_chan(t *testing.T) { test(t, DefaultChanPool()) }
// lengthRange
type lengthRange struct {
f, t int
cur int
step int
}
func (l *lengthRange) next() int {
if l.cur+l.step <= l.t {
l.cur += l.step
} else {
l.cur = l.f
}
return l.cur
}
func newLengthRange(f, t int) *lengthRange {
step := (t - f) / 10
return &lengthRange{f, t, f, step}
}
// p - Pool instance
// lr - *lengthRange generator
// all - send freshly allocated messages to
// free - recv and free messages from another (or the same) goroutines
func perform(p Pool,
lr *lengthRange,
all, free chan Msg) (perf int) {
var m Msg
var l int
for {
if m == nil {
l = lr.next()
m = p.Get(l) // allocate
}
select {
case all <- m: // send
perf = l // calculate only allocated pieces
return
case f := <-free: // or
p.Put(f) // free and continue
}
}
return
}
// dealy - sleep after an operation
func run(b *testing.B, pb *testing.PB,
p Pool,
lr *lengthRange,
delay time.Duration,
all, free chan Msg) {
for pb.Next() {
// throughput is amount*size of allocated pieces (not freed)
if pf := perform(p, lr, all, free); pf > 0 {
b.SetBytes(int64(pf))
}
time.Sleep(delay)
}
}
// naming convention
// Benchmark_[R/F]_[POOL]_[F]_[T]_[D]_sNaM_sNaM_sNaM
// R/F = {
// range - variable size of messages (in given range, see below)
// fixed - fixed size
// (using `go test -bench fixed`)
// }
// POOL = { no, sync, chan }
// F = int (msg size from)
// T = int {msg size to}
// D = delay
// sNaM_sNaM_sNaM_... - this pool ranges, where
// sN - n is maxMsgSize for the range of the pool
// aM - it's a channel length)
// flow size is constant and hardcoded to 1024
// feel free to play with it
const flowSize = 1024
func B(b *testing.B, p Pool, f, t int, delay time.Duration) {
flow := make(chan Msg, flowSize)
lr := newLengthRange(f, t)
b.StartTimer()
b.RunParallel(func(pb *testing.PB) {
run(b, pb, p, lr, 0, flow, flow)
})
b.StopTimer()
b.ReportAllocs()
}
// 32-65536 0 default
func Benchmark_range_no___32_65536_0_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 32, 65536, 0)
}
func Benchmark_range_sync_32_65536_0_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 32, 65536, 0)
}
func Benchmark_range_chan_32_65536_0_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 32, 65536, 0)
}
// 32-32 0 default
func Benchmark_fixed_no___32_32_0_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 32, 32, 0)
}
func Benchmark_fixed_sync_32_32_0_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 32, 32, 0)
}
func Benchmark_fixed_chan_32_32_0_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 32, 32, 0)
}
// 64-64 0 default
func Benchmark_fixed_no___64_64_0_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 64, 64, 0)
}
func Benchmark_fixed_sync_64_64_0_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 64, 64, 0)
}
func Benchmark_fixed_chan_64_64_0_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 64, 64, 0)
}
// 1024-1024 0 default
func Benchmark_fixed_no___1024_1024_0_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 1024, 1024, 0)
}
func Benchmark_fixed_sync_1024_1024_0_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 1024, 1024, 0)
}
func Benchmark_fixed_chan_1024_1024_0_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 1024, 1024, 0)
}
// 32-32 0 s32a1024
// s32a1024 will be ignored (it's the same as _default)
func Benchmark_fixed_no___32_32_0_s32a1024(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 32, 32, 0)
}
//
func Benchmark_fixed_sync_32_32_0_s32a1024(b *testing.B) {
b.StopTimer()
p := NewSyncPool(32)
B(b, p, 32, 32, 0)
}
func Benchmark_fixed_chan_32_32_0_s32a1024(b *testing.B) {
b.StopTimer()
p := NewChanPool(Range{32, 1024})
B(b, p, 32, 32, 0)
}
// 64-64 0 s64a1024
// default
func Benchmark_fixed_no___64_64_0_s64a1024(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 64, 64, 0)
}
func Benchmark_fixed_sync_64_64_0_s64a1024(b *testing.B) {
b.StopTimer()
p := NewSyncPool(64)
B(b, p, 64, 64, 0)
}
func Benchmark_fixed_chan_64_64_0_s64a1024(b *testing.B) {
b.StopTimer()
p := NewChanPool(Range{64, 1024})
B(b, p, 64, 64, 0)
}
// 1024-1024 0 s1024a128
func Benchmark_fixed_no___1024_1024_0_s1024a128(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 1024, 1024, 0)
}
func Benchmark_fixed_sync_1024_1024_0_s1024a128(b *testing.B) {
b.StopTimer()
p := NewSyncPool(1024)
B(b, p, 1024, 1024, 0)
}
func Benchmark_fixed_chan_1024_1024_0_s1024a128(b *testing.B) {
b.StopTimer()
p := NewChanPool(Range{1024, 128})
B(b, p, 1024, 1024, 0)
}
// default with delay
// 32-65536 1µs default
func Benchmark_range_no___32_65536_1µs_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 32, 65536, 1*time.Microsecond)
}
func Benchmark_range_sync_32_65536_1µs_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 32, 65536, 1*time.Microsecond)
}
func Benchmark_range_chan_32_65536_1µs_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 32, 65536, 1*time.Microsecond)
}
// 32-65536 100µs default
func Benchmark_range_no___32_65536_100µs_default(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 32, 65536, 100*time.Microsecond)
}
func Benchmark_range_sync_32_65536_100µs_default(b *testing.B) {
b.StopTimer()
p := DefaultSyncPool()
B(b, p, 32, 65536, 100*time.Microsecond)
}
func Benchmark_range_chan_32_65536_100µs_default(b *testing.B) {
b.StopTimer()
p := DefaultChanPool()
B(b, p, 32, 65536, 100*time.Microsecond)
}
// 1024-1024 100µs s1024a128
func Benchmark_fixed_no___1024_1024_100µs_s1024a128(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 1024, 1024, 100*time.Microsecond)
}
func Benchmark_fixed_sync_1024_1024_100µs_s1024a128(b *testing.B) {
b.StopTimer()
p := NewSyncPool(1024)
B(b, p, 1024, 1024, 100*time.Microsecond)
}
func Benchmark_fixed_chan_1024_1024_100µs_s1024a128(b *testing.B) {
b.StopTimer()
p := NewChanPool(Range{1024, 128})
B(b, p, 1024, 1024, 100*time.Microsecond)
}
// 4096-4096 10ms s1024a128
func Benchmark_fixed_no___4096_4096_10ms_s4096a128(b *testing.B) {
b.StopTimer()
p := NewNoPoll()
B(b, p, 4096, 4096, 10*time.Millisecond)
}
func Benchmark_fixed_sync_4096_4096_10ms_s4096a128(b *testing.B) {
b.StopTimer()
p := NewSyncPool(4096)
B(b, p, 4096, 4096, 10*time.Millisecond)
}
func Benchmark_fixed_chan_4096_4096_10ms_s4096a128(b *testing.B) {
b.StopTimer()
p := NewChanPool(Range{4096, 128})
B(b, p, 1024, 1024, 10*time.Millisecond)
}
package pool
import (
"sort"
"sync"
)
// Any piece of data. It's a []byte for exmaple
// It should be mangos.Msg in the future.
type Msg []byte
// Pool interface
type Pool interface {
// get a mesage with provided size
// len(Msg) must be 0
// cap(Msg) can be greater that size
Get(size int) Msg
// put any Msg to the pool
Put(Msg)
}
// helper function that removes duplicates from []int
func uniq(a []int) []int {
set := make(map[int]struct{}, len(a))
for _, v := range a {
set[v] = struct{}{}
}
a = a[:0]
for k := range set {
a = append(a, k)
}
return a
}
//
// Direct GC
//
// The pool that do nothing
type noPool struct{}
func NewNoPoll() Pool { return noPool{} }
func (noPool) Get(size int) Msg { return Msg(make([]byte, 0, size)) }
func (noPool) Put(Msg) {}
//
// sync.Pool
//
// sync pool stuff
type syncPoolRange struct {
maxSize int
sync.Pool
}
// a configurable Pool based on sync.Pool
type syncPool struct {
ranges []syncPoolRange
}
// Create new Pool based on sync.Pool.
// The Pool will be tuned to use messages
// with provided size ranges.
// It panics if no arguments provided.
// It panics if zero, or negative values provided.
// It removes duplicate ranges silently.
func NewSyncPool(ranges ...int) Pool {
// check/uniq/sort
if len(ranges) == 0 {
panic("NewSyncPool() with empty agruments list")
}
ranges = uniq(ranges)
sort.Ints(ranges)
if ranges[0] <= 0 {
panic("NewSyncPool() zero or negative value in agruments list")
}
// create
sp := new(syncPool)
sp.ranges = make([]syncPoolRange, 0, len(ranges))
for _, r := range ranges {
sp.ranges = append(sp.ranges, syncPoolRange{maxSize: r})
}
return sp
}
func (s *syncPool) Get(size int) Msg {
for i := range s.ranges {
if size <= s.ranges[i].maxSize {
if ifc := s.ranges[i].Get(); ifc != nil {
// get from pool
return ifc.(Msg)
} else {
// allocate a new
return Msg(make([]byte, 0, s.ranges[i].maxSize))
}
}
}
// out of ranges
return Msg(make([]byte, 0, size))
}
func (s *syncPool) Put(m Msg) {
if cap(m) == 0 {
return
}
for i := range s.ranges {
if cap(m) == s.ranges[i].maxSize {
s.ranges[i].Put(m[:0]) // reset and put
}
}
// drop
}
// Create a Pool based on sync.Pool with default ranges
func DefaultSyncPool() Pool {
// {maxbody: 64, cache: make(chan *Message, 2048)}, // 128K
// {maxbody: 128, cache: make(chan *Message, 1024)}, // 128K
// {maxbody: 1024, cache: make(chan *Message, 1024)}, // 1 MB
// {maxbody: 8192, cache: make(chan *Message, 256)}, // 2 MB
// {maxbody: 65536, cache: make(chan *Message, 64)}, // 4 MB
return NewSyncPool(64, 128, 1024, 8192, 65536)
}
//
// channels based pool
//
// stuff
type chanPoolRange struct {
maxSize int
buf chan Msg
}
// a configurable Pool based on channels
type chanPool struct {
ranges []chanPoolRange
}
type Range struct {
Size int
Amount int
}
// helpers for uniq/sort
type cpRanges []Range
// sort
func (r cpRanges) Len() int { return len(r) }
func (r cpRanges) Less(i, j int) bool { return r[i].Size < r[j].Size }
func (r cpRanges) Swap(i, j int) { r[i], r[j] = r[j], r[i] }
func (r *cpRanges) uniq() {
set := make(map[Range]struct{}, len(*r))
for _, v := range *r {
set[v] = struct{}{}
}
*r = (*r)[:0]
for k := range set {
*r = append(*r, k)
}
}
// Create new Pool based on channels.
// The Pool will be tuned to use messages
// with provided size ranges.
// It's possible (and required) to preallocate
// some space for each range (size).
// It panics if no arguments provided.
// It panics if zero, or negative value provided.
// It removes duplicate ranges silently.
func NewChanPool(ranges ...Range) Pool {
// check/uniq/sort
if len(ranges) == 0 {
panic("NewChanPool() with empty agruments list")
}
cpr := cpRanges(ranges)
cpr.uniq()
sort.Sort(cpr)
if cpr[0].Size <= 0 {
panic("NewChanPool() zero or negative Size in provided ranges")
}
// create
cp := new(chanPool)
cp.ranges = make([]chanPoolRange, 0, cpr.Len())
for _, r := range cpr {
cp.ranges = append(cp.ranges, chanPoolRange{
maxSize: r.Size,
buf: make(chan Msg, r.Amount), // can panic if amount < 0
})
}
return cp
}
func (s *chanPool) Get(size int) (m Msg) {
var ch chan Msg
for i := range s.ranges {
if size <= s.ranges[i].maxSize {
ch = s.ranges[i].buf
size = s.ranges[i].maxSize
break
}
}
select {
case m = <-ch:
default:
m = Msg(make([]byte, 0, size))
}
return
}
func (s *chanPool) Put(m Msg) {
if cap(m) == 0 {
return
}
for i := range s.ranges {
if cap(m) == s.ranges[i].maxSize {
select {
case s.ranges[i].buf <- m[:0]: // reset and put
default: // drop
return
}
}
}
// drop
}
// Create a Pool based on channels with default ranges
func DefaultChanPool() Pool {
// {maxbody: 64, cache: make(chan *Message, 2048)}, // 128K
// {maxbody: 128, cache: make(chan *Message, 1024)}, // 128K
// {maxbody: 1024, cache: make(chan *Message, 1024)}, // 1 MB
// {maxbody: 8192, cache: make(chan *Message, 256)}, // 2 MB
// {maxbody: 65536, cache: make(chan *Message, 64)}, // 4 MB
return NewChanPool([]Range{
{64, 2048}, // 128K
{128, 1024}, // 128K
{1024, 1024}, // 1 MB
{8192, 256}, // 2 MB
{65536, 64}, // 4 MB
}...)
}
Hardware:
CPU: Athlon64 X2
Mem: 2GB DDR2
OS:
Ubuntu 15.10 x64
Linux 4.2.0-36-generic #41-Ubuntu SMP Mon Apr 18 15:49:10 UTC 2016 x86_64 x86_64 x86_64 GNU/Linux
Go:
go version go1.6.2 linux/amd64
Naming convention:
Benchmark_[R/F]_[POOL]_[F]_[T]_[D]_sNaM_sNaM_sNaM
R/F = {
range - variable size of messages (in given range, see below)
fixed - fixed size
(usage `go test -bench fixed`)
}
POOL = {
no - make/GC
sync - sync.Pool
chan - buffered channels
}
Msg size range (for 'fixed' F == T):
F - from
T - to
D - delay after each allocation
sNaM_sNaM_sNaM_... - this pool ranges, where (for 'no' this means nothing)
sN - n is maxMsgSize for the range of the pool
aM - it's a channel length) (means nothing for 'sync')
if sNaM... is 'default' then the default pool will be used.
Below.
- throughput is msg_size*amount of allocated (obtained) messages (not freed)
- results grouped by 3
- throughput is the indicator (for group)
======================================================================================================
go test -bench fixed -benchtime=5s
PASS
Benchmark_fixed_no___32_32_0_default-2 10000000 1072 ns/op 29.84 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_sync_32_32_0_default-2 10000000 1223 ns/op 26.16 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_chan_32_32_0_default-2 10000000 1141 ns/op 28.03 MB/s 0 B/op 0 allocs/op
Benchmark_fixed_no___64_64_0_default-2 5000000 1244 ns/op 51.41 MB/s 64 B/op 1 allocs/op
Benchmark_fixed_sync_64_64_0_default-2 5000000 1187 ns/op 53.91 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_chan_64_64_0_default-2 10000000 1161 ns/op 55.10 MB/s 0 B/op 0 allocs/op
Benchmark_fixed_no___1024_1024_0_default-2 2000000 3299 ns/op 310.37 MB/s 1024 B/op 1 allocs/op
Benchmark_fixed_sync_1024_1024_0_default-2 10000000 1023 ns/op 1000.13 MB/s 35 B/op 1 allocs/op
Benchmark_fixed_chan_1024_1024_0_default-2 10000000 1142 ns/op 896.08 MB/s 0 B/op 0 allocs/op
Benchmark_fixed_no___32_32_0_s32a1024-2 10000000 1054 ns/op 30.35 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_sync_32_32_0_s32a1024-2 10000000 975 ns/op 32.81 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_chan_32_32_0_s32a1024-2 5000000 1266 ns/op 25.27 MB/s 0 B/op 0 allocs/op
Benchmark_fixed_no___64_64_0_s64a1024-2 10000000 1138 ns/op 56.20 MB/s 64 B/op 1 allocs/op
Benchmark_fixed_sync_64_64_0_s64a1024-2 10000000 948 ns/op 67.49 MB/s 32 B/op 1 allocs/op
Benchmark_fixed_chan_64_64_0_s64a1024-2 5000000 1289 ns/op 49.61 MB/s 0 B/op 0 allocs/op
Benchmark_fixed_no___1024_1024_0_s1024a128-2 2000000 3261 ns/op 313.99 MB/s 1024 B/op 1 allocs/op
Benchmark_fixed_sync_1024_1024_0_s1024a128-2 10000000 980 ns/op 1044.27 MB/s 35 B/op 1 allocs/op
Benchmark_fixed_chan_1024_1024_0_s1024a128-2 5000000 1249 ns/op 819.61 MB/s 7 B/op 0 allocs/op
Benchmark_fixed_no___1024_1024_100µs_s1024a128-2 2000000 3220 ns/op 317.93 MB/s 1024 B/op 1 allocs/op
Benchmark_fixed_sync_1024_1024_100µs_s1024a128-2 10000000 957 ns/op 1069.33 MB/s 35 B/op 1 allocs/op
Benchmark_fixed_chan_1024_1024_100µs_s1024a128-2 5000000 1274 ns/op 803.73 MB/s 7 B/op 0 allocs/op
Benchmark_fixed_no___4096_4096_10ms_s4096a128-2 1000000 7005 ns/op 584.65 MB/s 4096 B/op 1 allocs/op
Benchmark_fixed_sync_4096_4096_10ms_s4096a128-2 10000000 989 ns/op 4137.94 MB/s 46 B/op 1 allocs/op
Benchmark_fixed_chan_4096_4096_10ms_s4096a128-2 5000000 1330 ns/op 769.42 MB/s 29 B/op 0 allocs/op
@logrusorgru
Copy link
Author

Hm, the DELAY means nothing because GC will never called until it sleep. May be I should to provide a background job to invoke GC every DELAY (or if DELAY > 0 then runtime.GC() inside perform function).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment