Skip to content

Instantly share code, notes, and snippets.

@marianogappa
Last active February 12, 2024 09:27
Show Gist options
  • Star 53 You must be signed in to star a gist
  • Fork 8 You must be signed in to fork a gist
  • Save marianogappa/a50c4a8a302b8378c08c4b0d947f0a33 to your computer and use it in GitHub Desktop.
Save marianogappa/a50c4a8a302b8378c08c4b0d947f0a33 to your computer and use it in GitHub Desktop.
Parallel processing with ordered output in Go
/*
Parallel processing with ordered output in Go
(you can use this pattern by importing https://github.com/MarianoGappa/parseq)
This example implementation is useful when the following 3 conditions are true:
1) the rate of input is higher than the rate of output on the system (i.e. it queues up)
2) the processing of input can be parallelised, and overall throughput increases by doing so
3) the order of output of the system needs to respect order of input
- if 1 is false, KISS!
- often, processing can be parallelised, but the processing time is shorter than the
overhead of parallelising and maintaining order
- if 3 is false, just use a buffered channel for the input that routes to several
goroutines, and have each one output to an output channel.
The use case that motivated this implementation was:
1) a Kafka consumer outputs JSON to a channel that must be unmarshalled to a
struct
2) order of output matters
3) speed is paramount; there's plenty of CPU room for parallelism
Without any explicit goroutines, every unmarshal operation blocks the next
channel read.
Run the example with the 3 default parameters; see how even though it takes 5
seconds to process a request, after the first initial 5 seconds, output happens
every second, in order.
*/
package main
import (
"fmt"
"sync"
"time"
)
var unresolved []int64
var l sync.Mutex
func main() {
parallelism := 5
processDuration := 5 * time.Second
requestEvery := 1 * time.Second
rqs := make(chan request, parallelism)
work := make(chan request, parallelism)
rts := make(chan int64, parallelism)
orts := make(chan int64)
go makeRequests(rqs, requestEvery)
go readRequests(rqs, work)
go orderResults(rts, orts)
for i := 0; i < parallelism; i++ {
go processRequests(work, rts, processDuration)
}
for r := range orts {
fmt.Print(r, "-")
}
}
func makeRequests(rqs chan request, requestEvery time.Duration) {
order := int64(0)
for {
order++
rqs <- request{order}
time.Sleep(requestEvery)
}
}
func readRequests(rqs chan request, work chan request) {
for r := range rqs {
l.Lock()
unresolved = append(unresolved, r.order)
l.Unlock()
work <- r
}
}
func processRequests(work chan request, rts chan int64, processDuration time.Duration) {
for r := range work {
rts <- r.process(processDuration)
}
}
func orderResults(rts chan int64, orts chan int64) {
rtBuf := make(map[int64]int64)
for rt := range rts {
rtBuf[rt] = rt
loop:
if len(unresolved) > 0 {
u := unresolved[0]
if rtBuf[u] != 0 {
l.Lock()
unresolved = unresolved[1:]
l.Unlock()
orts <- rtBuf[u]
delete(rtBuf, u)
goto loop
}
}
}
}
type request struct {
order int64
}
func (r request) process(processDuration time.Duration) int64 {
time.Sleep(processDuration)
return r.order
}
@tejzpr
Copy link

tejzpr commented Mar 11, 2021

Ordered-concurrently is a go module that processes work concurrently and returns data in the order it was supplied. https://github.com/tejzpr/ordered-concurrently

Sample code https://play.golang.org/p/hkcIuRHj63h

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment