Skip to content

Instantly share code, notes, and snippets.

@SCP002
Last active July 3, 2022 19:35
Show Gist options
  • Save SCP002/fd43bc86c30c7e5acd9132067cc0c6dc to your computer and use it in GitHub Desktop.
Save SCP002/fd43bc86c30c7e5acd9132067cc0c6dc to your computer and use it in GitHub Desktop.
Golang 1.18 (optionally lower): concurrent Filter funtion example. Keeps empty elements and original order. Using Worker Pool.
// Golang >= 1.18 (uses generics).
// Golang < 1.18 without generics (replace T with explicit type in PoolFilter function signature).
package main
import (
"sort"
"github.com/alitto/pond"
)
// PoolFilter returns copy of <inp> with only elements for which <keepFn> returns true.
//
// <maxPoolSize> is a maximum amount of workers to process <inp> simultaneously.
//
// Keeps empty elements and original order.
func PoolFilter[T any](inp []T, maxPoolSize int, keepFn func(T, int) bool) (out []T) {
pool := pond.New(maxPoolSize, 0, pond.MinWorkers(0))
keepIdxCh := make(chan int, len(inp))
// Push index of every element should stay to keepIdxCh.
for idx, elm := range inp {
idx, elm := idx, elm
pool.Submit(func() {
if keepFn(elm, idx) {
keepIdxCh <- idx
}
})
}
pool.StopAndWait()
close(keepIdxCh)
// Collect indexes to keep.
keepIdxList := []int{}
for keepIdx := range keepIdxCh {
keepIdxList = append(keepIdxList, keepIdx)
}
// Sort indexes to keep in asscending order.
sort.Slice(keepIdxList, func(i int, j int) bool {
return keepIdxList[i] < keepIdxList[j]
})
// Build ouput slice from input indexes to keep.
for _, idx := range keepIdxList {
out = append(out, inp[idx])
}
return
}
package main
import (
"testing"
"github.com/jinzhu/copier"
"github.com/stretchr/testify/assert"
)
func TestPoolFilter(t *testing.T) {
input := []string{"A", "B", "C", "D", "E", "F", "G", "H", "I", "J", "", ""}
expected := []string{"B", "D", "F", "H", "J", ""}
keepFn := func(elm string, idx int) bool {
// Keep every even element (remove every odd).
return idx % 2 != 0
}
// Ensure filter does not modify the source.
inputOrig := DeepCopy(input)
_ = PoolFilter(input, 6, keepFn)
ok := assert.Exactly(t, inputOrig, input, "should not modify the source")
if !ok {
t.FailNow()
}
for i := 0; i < 100000; i++ {
// Check if reliably get expected output.
output := PoolFilter(input, 6, keepFn)
ok = assert.Exactly(t, expected, output)
if !ok {
t.FailNow()
}
}
}
// DeepCopy returns deep copy of <inp>, panicking on error.
func DeepCopy[T any](inp T) (out T) {
err := copier.CopyWithOption(&out, &inp, copier.Option{DeepCopy: true, IgnoreEmpty: true})
if err != nil {
panic(err)
}
return
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment