Skip to content

Instantly share code, notes, and snippets.

@alaindet
Created January 8, 2024 16:36
Show Gist options
  • Save alaindet/00730ad8046d433d43e04cb77566930d to your computer and use it in GitHub Desktop.
Save alaindet/00730ad8046d433d43e04cb77566930d to your computer and use it in GitHub Desktop.
Go slice mapping concurrently
package main
import "testing"
func BenchmarkConcMap100(b *testing.B) {
b.StopTimer()
input := createRange(100)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = ConcMap(input, double, -1)
}
_ = output
}
func BenchmarkMap100(b *testing.B) {
b.StopTimer()
input := createRange(100)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = Map(input, double)
}
_ = output
}
func BenchmarkConcMap10000(b *testing.B) {
b.StopTimer()
input := createRange(10_000)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = ConcMap(input, double, -1)
}
_ = output
}
func BenchmarkMap10000(b *testing.B) {
b.StopTimer()
input := createRange(10_000)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = Map(input, double)
}
_ = output
}
func BenchmarkConcMap1000000(b *testing.B) {
b.StopTimer()
input := createRange(1_000_000)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = ConcMap(input, double, -1)
}
_ = output
}
func BenchmarkMap1000000(b *testing.B) {
b.StopTimer()
input := createRange(1_000_000)
output := make([]int, 0, len(input))
b.StartTimer()
for n := 0; n < b.N; n++ {
output = Map(input, double)
}
_ = output
}
package main
import (
"runtime"
"sync"
)
func ConcMap[TInput any, TOutput any](
input []TInput,
mapperFn func(TInput) TOutput,
threads int,
) []TOutput {
workers := runtime.NumCPU()
bufferSize := workers + 1
var wg sync.WaitGroup
inputCh := make(chan TInput, bufferSize)
outputCh := make(chan TOutput, bufferSize)
// Workers
wg.Add(workers)
for w := 0; w < workers; w++ {
go func(id int, inputCh <-chan TInput, outputCh chan<- TOutput) {
for input := range inputCh {
outputCh <- mapperFn(input)
}
wg.Done()
}(w, inputCh, outputCh)
}
// Cleanup
go func() {
wg.Wait()
close(outputCh)
}()
// Producer - Fan out
go func() {
for _, i := range input {
inputCh <- i
}
close(inputCh)
}()
// Gather - Fan in
result := make([]TOutput, 0, len(input))
for output := range outputCh {
result = append(result, output)
}
return result
}
package main
import (
"slices"
"testing"
"github.com/stretchr/testify/assert"
)
func TestConcMap(t *testing.T) {
double := func(n int) int {
return n * 2
}
t.Run("works with small slice of integers", func(t *testing.T) {
input := []int{1, 2, 3, 4, 5, 6, 7, 8, 9, 10}
expected := []int{2, 4, 6, 8, 10, 12, 14, 16, 18, 20}
output := ConcMap(input, double, -1)
slices.Sort(input)
slices.Sort(output)
assert.Equal(t, output, expected)
})
t.Run("works with larger slices", func(t *testing.T) {
size := 100_000
input := make([]int, 0, size)
expected := make([]int, 0, size)
for i := 0; i < size; i++ {
input = append(input, i+1)
expected = append(expected, double(i+1))
}
output := ConcMap(input, double, -1)
slices.Sort(input)
slices.Sort(output)
assert.Equal(t, output, expected)
})
}
package main
func Map[T any, K any](elements []T, fn func(element T) K) []K {
result := make([]K, 0, len(elements))
for _, element := range elements {
result = append(result, fn(element))
}
return result
}
package main
import (
"strconv"
"strings"
"testing"
"github.com/stretchr/testify/assert"
)
type TestMapDummySource struct {
ID string
}
type TestMapDummyDest struct {
ID int
}
func TestMap(t *testing.T) {
multiplyByTen := func(n int) int {
return n * 10
}
mapTestDummySourceToDest := func(source TestMapDummySource) TestMapDummyDest {
id, _ := strconv.Atoi(source.ID)
return TestMapDummyDest{ID: id}
}
t.Run("maps a slice of integers", func(t *testing.T) {
elements := []int{1, 2, 3, 4, 5, 6}
expected := []int{10, 20, 30, 40, 50, 60}
outcome := Map(elements, multiplyByTen)
assert.Equal(t, outcome, expected)
})
t.Run("maps a slice of strings", func(t *testing.T) {
elements := []string{"foo", "bar", "baz"}
expected := []string{"FOO", "BAR", "BAZ"}
outcome := Map(elements, strings.ToUpper)
assert.Equal(t, outcome, expected)
})
t.Run("maps a slice of structs", func(t *testing.T) {
elements := []TestMapDummySource{{"11"}, {"22"}, {"33"}}
expected := []TestMapDummyDest{{11}, {22}, {33}}
outcome := Map(elements, mapTestDummySourceToDest)
assert.Equal(t, outcome, expected)
})
}
go test -bench=. -run=^$ -benchtime=5s
goos: linux
goarch: amd64
pkg: concurrent-slice-mapping
cpu: 11th Gen Intel(R) Core(TM) i7-1165G7 @ 2.80GHz
BenchmarkConcMap100-8 269926 22801 ns/op
BenchmarkMap100-8 28374702 217.7 ns/op
BenchmarkConcMap10000-8 3296 1848091 ns/op
BenchmarkMap10000-8 414638 20780 ns/op
BenchmarkConcMap1000000-8 25 275315968 ns/op
BenchmarkMap1000000-8 3490 1443081 ns/op
PASS
ok concurrent-slice-mapping 40.165s
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment