Create a gist now

Instantly share code, notes, and snippets.

Embed
What would you like to do?
Parallel processing with ordered output in Go
/*
Parallel processing with ordered output in Go
(you can use this pattern by importing https://github.com/MarianoGappa/parseq)
This example implementation is useful when the following 3 conditions are true:
1) the rate of input is higher than the rate of output on the system (i.e. it queues up)
2) the processing of input can be parallelised, and overall throughput increases by doing so
3) the order of output of the system needs to respect order of input
- if 1 is false, KISS!
- often, processing can be parallelised, but the processing time is shorter than the
overhead of parallelising and maintaining order
- if 3 is false, just use a buffered channel for the input that routes to several
goroutines, and have each one output to an output channel.
The use case that motivated this implementation was:
1) a Kafka consumer outputs JSON to a channel that must be unmarshalled to a
struct
2) order of output matters
3) speed is paramount; there's plenty of CPU room for parallelism
Without any explicit goroutines, every unmarshal operation blocks the next
channel read.
Run the example with the 3 default parameters; see how even though it takes 5
seconds to process a request, after the first initial 5 seconds, output happens
every second, in order.
*/
package main
import (
"fmt"
"sync"
"time"
)
var unresolved []int64
var l sync.Mutex
func main() {
parallelism := 5
processDuration := 5 * time.Second
requestEvery := 1 * time.Second
rqs := make(chan request, parallelism)
work := make(chan request, parallelism)
rts := make(chan int64, parallelism)
orts := make(chan int64)
go makeRequests(rqs, requestEvery)
go readRequests(rqs, work)
go orderResults(rts, orts)
for i := 0; i < parallelism; i++ {
go processRequests(work, rts, processDuration)
}
for r := range orts {
fmt.Print(r, "-")
}
}
func makeRequests(rqs chan request, requestEvery time.Duration) {
order := int64(0)
for {
order++
rqs <- request{order}
time.Sleep(requestEvery)
}
}
func readRequests(rqs chan request, work chan request) {
for r := range rqs {
l.Lock()
unresolved = append(unresolved, r.order)
l.Unlock()
work <- r
}
}
func processRequests(work chan request, rts chan int64, processDuration time.Duration) {
for r := range work {
rts <- r.process(processDuration)
}
}
func orderResults(rts chan int64, orts chan int64) {
rtBuf := make(map[int64]int64)
for rt := range rts {
rtBuf[rt] = rt
loop:
if len(unresolved) > 0 {
u := unresolved[0]
if rtBuf[u] != 0 {
l.Lock()
unresolved = unresolved[1:]
l.Unlock()
orts <- rtBuf[u]
delete(rtBuf, u)
goto loop
}
}
}
}
type request struct {
order int64
}
func (r request) process(processDuration time.Duration) int64 {
time.Sleep(processDuration)
return r.order
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment