Skip to content

Instantly share code, notes, and snippets.

@blixt
Last active March 12, 2020 09:50
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save blixt/494ddedb96abb3ad60b936713de0d434 to your computer and use it in GitHub Desktop.
Save blixt/494ddedb96abb3ad60b936713de0d434 to your computer and use it in GitHub Desktop.
Dedupe multiple concurrent calls to the same function and return same result to all callers
package main
import (
"fmt"
"sync"
"time"
)
// Dedupe will combine a bunch of concurrent calls with the same key (string) into just one.
// Example:
func slowlyAdd(a, b int) int {
key := fmt.Sprintf("%d+%d", a, b)
v, _ := Dedupe(key, func() (interface{}, error) {
// Here we put the slow code that we don't mind only running once.
time.Sleep(2 * time.Second)
return a + b, nil
})
return v.(int)
}
func main() {
fmt.Println("Please wait while we slowly add the same numbers together a few times!")
start := time.Now()
var wg sync.WaitGroup
for i := 0; i < 5; i++ {
// Run five slowlyAdd calls in parallel (separate goroutines).
wg.Add(1)
go func() {
fmt.Printf("Adding 1 + 2: %d\n", slowlyAdd(1, 2))
wg.Done()
}()
}
wg.Wait()
fmt.Printf("Total time taken: %s\n", time.Since(start))
}
// Dedupe implementation:
type result struct {
value interface{}
err error
}
var (
dedupeMap = make(map[string][]chan<- result)
dedupeMu sync.Mutex
)
func Dedupe(key string, perform func() (interface{}, error)) (interface{}, error) {
// Check if there's already an ongoing call.
dedupeMu.Lock()
if calls, ok := dedupeMap[key]; ok {
// There is an ongoing call, join the list of waiting requests.
// Buffered to allow the result to be written before it is read.
ch := make(chan result, 1)
dedupeMap[key] = append(calls, ch)
dedupeMu.Unlock()
r := <-ch
return r.value, r.err
}
// There is no other call ongoing for the provided key.
dedupeMap[key] = []chan<- result{}
dedupeMu.Unlock()
value, err := perform()
// Get all waiting requests.
dedupeMu.Lock()
calls := dedupeMap[key]
delete(dedupeMap, key)
dedupeMu.Unlock()
// Fulfill the waiting requests.
for _, ch := range calls {
ch <- result{value, err}
}
// Return the result.
return value, err
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment