Skip to content

Instantly share code, notes, and snippets.

@hugowetterberg
Created August 4, 2023 15:02
Show Gist options
  • Save hugowetterberg/90bab978ef141b4dfaf0104fbe66c1f1 to your computer and use it in GitHub Desktop.
Save hugowetterberg/90bab978ef141b4dfaf0104fbe66c1f1 to your computer and use it in GitHub Desktop.
Concurrent bounded processing
package main
// Example to illustrate fan-out fan-in processing with batched writes, bounded
// queues, and backpressure for controlling the workload that the process takes
// on.
import (
"crypto/sha256"
"fmt"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// Set up some bounds for our processing.
const (
// The time it should take to read a message.
messageReadTime = 3 * time.Millisecond
// We only queue up 64 messages for processing.
maxWaiting = 64
// In addition to those 64 messages we can have 32 messages in-flight in
// our workers.
numWorkers = 32
// The workers can queue up 100 result items for batching.
maxQueued = 100
// The batcher creates batches of up to 100 result items, bringing our
// the total result items we keep in memory up to 200.
batchSize = 100
// Batches will be submitted when they are full, or after 500ms has
// passed since the first item was received.
maxBatchWait = 500 * time.Millisecond
// Dummy value that controls how long each batch write takes.
sinkLatency = 500 * time.Millisecond
)
type InputDoc struct {
ID int64
Title string
Description string
}
type MessageSource struct {
current int64
}
func (ms *MessageSource) ReadMessage() InputDoc {
ms.current++
time.Sleep(messageReadTime)
return InputDoc{
ID: ms.current,
Title: fmt.Sprintf("Document %d", ms.current),
Description: fmt.Sprintf("This is the %d:th document that we have produced.", ms.current),
}
}
type Result struct {
Hash []byte
}
type MessageSink struct {
Latency time.Duration
}
func (ms *MessageSink) WriteBatch(batch []Result) {
if len(batch) == 0 {
return
}
fmt.Printf("\nWriting a batch of %d\n", len(batch))
time.Sleep(ms.Latency)
}
func main() {
// Set up a channel for receiving OS signals (ctrl-c & TERM).
sigs := make(chan os.Signal, 1)
// ...and a channel for signalling to our read loop that we should stop
// consuming messages.
stop := make(chan struct{})
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigs
println("\nstopping, wait for in-flight messages to finish processing")
close(stop)
}()
var source MessageSource
waitQueue := make(chan InputDoc, maxWaiting)
sinkQueue := make(chan Result, maxQueued)
// Start the read loop in a goroutine and close the wait queue when we
// stop reading. This will let the processing workers know that there
// won't be any more work coming once they have read all buffered
// messages.
go func() {
defer close(waitQueue)
readLoop(&source, stop, waitQueue)
}()
// Create a wait group that will keep track of how many of our workers
// still are alive and processing messages.
var wg sync.WaitGroup
wg.Add(numWorkers)
for i := 0; i < numWorkers; i++ {
go func() {
processingWorker(waitQueue, sinkQueue)
// Tell the wait group that our worker has exited.
wg.Done()
}()
}
// Once all workers have exited we close the queue that our sink reads
// from, so that it will exit once it has read all pending results.
go func() {
wg.Wait()
close(sinkQueue)
}()
sink := MessageSink{
Latency: sinkLatency,
}
// Run the batch loop on our main goroutine so that our application
// exits after everything has been written to the sink.
batchLoop(sinkQueue, &sink)
}
func readLoop(source *MessageSource, stop chan struct{}, queue chan InputDoc) {
for {
// This is a potentially infinite loop. But the stop case will
// trigger when the stop channel is closed.
select {
case <-stop:
return
default:
}
queue <- source.ReadMessage()
print("+")
}
}
func processingWorker(input chan InputDoc, output chan Result) {
hash := sha256.New()
// Range over the input, this will consume messages until the channel is
// closed and we don't have any buffered messages to consume.
for in := range input {
_, _ = fmt.Fprintf(hash, "id: %d\n", in.ID)
_, _ = fmt.Fprintf(hash, "title: %s\n", in.Title)
_, _ = fmt.Fprintf(hash, "description: %s\n", in.Description)
output <- Result{
Hash: hash.Sum(nil),
}
print("=")
hash.Reset()
}
}
// Loop that writes synchronously to our sink. If sink request serialisation was
// expensive, or if our sink makes good use of concurrent writes, we could
// extend the process with concurrent batching and writing to the sink; much
// like we do for the processing workers.
func batchLoop(input chan Result, sink *MessageSink) {
var (
waitTimer <-chan time.Time
batch []Result
)
send := func() {
sink.WriteBatch(batch)
// Reset the batch slice but reuse the underlying array.
batch = batch[0:0]
// Set the wait timer to nil, read from an empty channel blocks
// forever.
waitTimer = nil
}
for {
// The select allows us to continue on whichever happens first
// of a triggered max wait timer or new item on the input
// channel.
select {
case <-waitTimer:
send()
case item, open := <-input:
if !open {
sink.WriteBatch(batch)
return
}
// Start our max wait timer if this was the first item
// in the batch.
if len(batch) == 0 {
waitTimer = time.After(maxBatchWait)
}
batch = append(batch, item)
print("-")
if len(batch) == batchSize {
send()
}
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment