Skip to content

Instantly share code, notes, and snippets.

@DeadlySurgeon
Last active Jun 15, 2022
Embed
What would you like to do?
Reducer

Reducer

POC for effictively reducing a large data footprint.

A good canidate has been found for implementing our reducer function, however more tests should be done to push optimization, not that it is needed. Overall, with harsh benchmarking we expect to see 500~1000 ns/op (I know quite the range) depending on configuration and stack size.

More tests need to be performed though so we can determine the memory footprint. I would like to have a good estimate of stackSize to memory consumption.

First In First Out

Pushing the array right to left and popping the first element seems to be rather effective, and probably the fastest way to go about it. Maybe follow up on #darkarts to see what their ideas are. Maybe PonyDev might know too.

toDelete := stack[0]
stack = append(stack[1:], entry)
delete(data, toDelete)

Found a new, potentially better, way of doing thi. See Reducer2. I'll add more information after this flight.

Reducer2

Instead of using the append function, Reducer manually tracks the current index and loops around the array, with index+1 being the popped index. From there it calls the delete function with whatever being in the slice. We don't need to check the value of the slice at that index, because delete just doesn't delete if the value isn't in the map nor does it panic or complain.

If you look at the benchmarks, avoiding the use of append doesn't significantly improve upon speed, however it does look to improve upon memory usage.

Expected Usecase

The expected usecase is 1000 or so entries per 10 seconds, which the benchmarks blow out of the water. As long as the active memory consumption isn't too great then this will act as a good solution.

Last Benchmarks

goos:   linux
goarch: amd64
pkg:    reducer
BenchmarkReducer1-4       	 1542646	       753 ns/op	      85 B/op	       1 allocs/op
BenchmarkReducer2-4       	 1898156	       672 ns/op	      23 B/op	       1 allocs/op
BenchmarkReducer1Bad-4    	 2126659	       636 ns/op	      91 B/op	       1 allocs/op
BenchmarkReducer2Bad-4    	 2353987	       575 ns/op	      20 B/op	       1 allocs/op
BenchmarkRandomString-4   	12516368	        89.5 ns/op	       7 B/op	       0 allocs/op
package reducer
import (
"sync"
)
/*
Batch sizes of
1000 per 10 seconds
60,000 per minute
360,000 per hour
1000 * 10 * 60 * <TTL in minutes>
run GC compared to the stack size.
If we plan on storing an hours worth of stack, purge every hour.
If we plan on storing a minutes worth of stack, probably no need to purge.
*/
const reducerDefaultStackSize = 360000
// Reducer helps reduce data footprints coming in.
type Reducer[T any] interface {
Exists(identifier T) bool
}
type ResetableReducer[T any] interface {
Reducer[T]
Size() int
Reset(stackSize int)
}
// NewReducer1 creates a new reducer1
func NewReducer1(stackSize int) *Reducer1 {
r1 := &Reducer1{}
r1.Reset(stackSize)
return r1
}
// Reducer1 is the first implementation of reducer
type Reducer1 struct {
lok sync.Mutex
data map[string]bool
stackIndex int
stackSize int
stack []string
}
func (m *Reducer1) Size() int {
return m.stackSize
}
func (m *Reducer1) Reset(stackSize int) {
if stackSize == 0 {
stackSize = reducerDefaultStackSize
}
m.data = make(map[string]bool)
m.stackIndex = 0
m.stackSize = stackSize
m.stack = make([]string, stackSize)
}
// Exists checks to see if the identifier has been spoted/in cache.
func (m *Reducer1) Exists(identifier string) bool {
// Somehow not that slow.
m.lok.Lock()
defer m.lok.Unlock()
// If we find it, we do not need to continue
if m.data[identifier] {
return true
}
// Since we didn't find it, start tracking since we just saw it now.
m.data[identifier] = true
// We still have more room to place stuff.
if m.stackIndex < m.stackSize {
m.stack[m.stackIndex] = identifier // NewEntry(identifier)
m.stackIndex++
return false
}
// Delete the oldest entry.
delete(m.data, m.stack[0]) // m.stack[0].Key)
// Append a new entry to the end.
m.stack = append(m.stack[1:], identifier) // NewEntry(identifier))
return false
}
// NewReducer2 creates a new Reducer implementation
func NewReducer2(stackSize int) *Reducer2 {
r2 := &Reducer2{}
r2.Reset(stackSize)
return r2
}
// Reducer2 is a second implementation of the reducer.
type Reducer2 struct {
lok sync.Mutex
data map[string]bool
stackIndex int
stackSize int
stack []string
}
func (m *Reducer2) Size() int {
return m.stackSize
}
func (m *Reducer2) Reset(stackSize int) {
if stackSize == 0 {
stackSize = reducerDefaultStackSize
}
m.data = make(map[string]bool)
m.stackIndex = 0
m.stackSize = stackSize
m.stack = make([]string, stackSize)
}
// Exists checks to see if the identifier has been spoted/in cache.
func (m *Reducer2) Exists(identifier string) bool {
// Somehow not that slow.
m.lok.Lock()
defer m.lok.Unlock()
// If we find it, we do not need to continue
if m.data[identifier] {
return true
}
// Since we didn't find it, start tracking since we just saw it now.
m.data[identifier] = true
// delete won't complain if the key doesn't exist, so we will just pass in
// the key regardless of the string's value. Then, we set the current index
// to our new identifier, and move the index forward so we don't have to
// shift the array, just our perspective.
if m.stackIndex < m.stackSize {
delete(m.data, m.stack[m.stackIndex])
m.stack[m.stackIndex] = identifier
m.stackIndex++
return false
}
// We can assume `m.stackIndex == m.stackSize`, which is nice.
// Here we loop the stack to the start, 0.
m.stackIndex = 0
delete(m.data, m.stack[m.stackIndex])
m.stack[m.stackIndex] = identifier
m.stackIndex++
return false
}
package reducer
import (
"math/rand"
"strconv"
"testing"
"time"
)
func BenchmarkReducer1(b *testing.B) {
m := NewReducer1(350000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Exists(randomString())
}
}
func BenchmarkReducer2(b *testing.B) {
m := NewReducer2(350000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Exists(randomString())
}
}
// ~1440 ns/op
// I wonder how much of that is the strconv.Itoa from the b.N, and how much of
// it is the reducer.
// I feel like this tests slows down as it has more to deal with.
//
// This test mainly just appending data to the stack and map, and checking for
// duplicates. There are no duplicates. This is closer to how we expect to see
// our mined, with (hopefully) few duplicates.
func BenchmarkReducer1Bad(b *testing.B) {
// Dave Cheney says this is bad.
// He is correct.
m := NewReducer1(350000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Exists(strconv.Itoa(i))
}
}
func BenchmarkReducer2Bad(b *testing.B) {
// Dave Cheney says this is bad.
// He is correct.
m := NewReducer2(350000)
b.ResetTimer()
for i := 0; i < b.N; i++ {
m.Exists(strconv.Itoa(i))
}
}
func BenchmarkRandomString(b *testing.B) {
for i := 0; i < b.N; i++ {
randomString()
}
}
// Last at ~120 ns/op.
var randomString = func() func() string {
// Maybe set a more predictable/testable seed?
rand.Seed(time.Hour.Nanoseconds())
return func() string {
return strconv.Itoa(rand.Intn(3500000))
}
}()
func TestStringReducers(t *testing.T) {
for name, reducer := range map[string]ResetableReducer[string]{
"reducer_1": NewReducer1(0),
"reducer_2": NewReducer2(0),
} {
reducer := reducer
t.Run(name, func(t *testing.T) {
testStringReducer(t, reducer)
})
}
}
// Abstracted testing function to reduce code replication when testing reducers
// with table data
func testStringReducer(t *testing.T, reducer ResetableReducer[string]) {
if size := reducer.Size(); size != reducerDefaultStackSize {
t.Fatalf(
"Expected default size to be %d but was %d\n",
reducerDefaultStackSize,
size,
)
}
reducer.Reset(5)
for i := 0; i < 5; i++ {
if reducer.Exists(strconv.Itoa(i)) {
t.Fatalf("%v supposedly exist when it doesn't\n", i)
}
}
for i := 0; i < 5; i++ {
if !reducer.Exists(strconv.Itoa(i)) {
t.Fatalf("%v supposedly doesn't exist when it does\n", i)
}
}
if reducer.Exists("6") {
t.Fatalf("6 apparently exists when we know it doesnt\n")
}
if reducer.Exists("0") {
t.Fatalf("0 supposedly exists when we popped it\n")
}
}
@DeadlySurgeon
Copy link
Author

DeadlySurgeon commented Mar 19, 2022

Since I started sharing this out, it's getting cleaned up. Right now this requires go 1.18, since I wanted a good excuse to finally use generics.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment