Skip to content

Instantly share code, notes, and snippets.

@cjfinnell
Created June 15, 2022 14:48
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save cjfinnell/0b1362b7f2d6dd5af77ddb2872657a04 to your computer and use it in GitHub Desktop.
Save cjfinnell/0b1362b7f2d6dd5af77ddb2872657a04 to your computer and use it in GitHub Desktop.
Batch processing channel-buffered data
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
const batchSize = 3
var (
totalGenerated = 0
totalBatched = 0
totalProcessed = 0
totalConsumed = 0
)
type (
rawData int
batchRawData []rawData
processedData string
batchProcessedData []processedData
)
func main() {
log.Println("Starting")
defer log.Println("Exiting")
ctx, cancel := context.WithCancel(context.Background())
gracefulShutdown := make(chan os.Signal, 1)
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM)
wg := &sync.WaitGroup{}
wg.Add(4)
rawDataChan := startGenerator(ctx, wg.Done)
batchDataChan := startBatcher(wg.Done, rawDataChan)
processedDataChan := startProcessor(wg.Done, batchDataChan)
startConsumer(wg.Done, processedDataChan)
<-gracefulShutdown
log.Println("Received shutdown signal")
cancel()
log.Println("Context cancelled, waiting")
wg.Wait()
log.Printf("Generated %d, batched %d, processed %d, consumed %d\n", totalGenerated, totalBatched, totalProcessed, totalConsumed)
}
func startGenerator(ctx context.Context, done func()) chan rawData {
log.Println("Starting generator")
out := make(chan rawData, batchSize*3)
go func() {
defer log.Println("Closing generator")
defer close(out)
defer done()
i := 0
for {
select {
case <-ctx.Done():
return
default:
log.Println("New data", i)
out <- rawData(i)
i++
totalGenerated++
time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second)
}
}
}()
return out
}
func startBatcher(done func(), in chan rawData) chan batchRawData {
log.Println("Starting batcher")
out := make(chan batchRawData)
go func() {
defer log.Println("Closing batcher")
defer close(out)
defer done()
batch := make(batchRawData, 0, batchSize)
tickerTimeout := time.Duration(time.Second * 10)
ticker := time.NewTicker(tickerTimeout)
defer ticker.Stop()
flushBatch := func() {
ticker.Reset(tickerTimeout)
if len(batch) == 0 {
return
}
log.Println("Flushing batch", batch)
out <- batch
totalBatched += len(batch)
batch = batch[:0]
}
defer flushBatch()
for {
select {
case data, ok := <-in:
if !ok {
return
}
batch = append(batch, data)
log.Println("Added to batch", data)
if len(batch) == batchSize {
flushBatch()
}
case <-ticker.C:
log.Println("Ticker flush")
flushBatch()
}
}
}()
return out
}
func startProcessor(done func(), in chan batchRawData) chan batchProcessedData {
log.Println("Starting processor")
out := make(chan batchProcessedData)
go func() {
defer log.Println("Closing processor")
defer close(out)
defer done()
for {
data, ok := <-in
if !ok {
return
}
log.Println("Processing", data)
processedBatch := make(batchProcessedData, 0, len(data))
for _, raw := range data {
processedBatch = append(processedBatch, processedData(fmt.Sprintf("'%d'", raw)))
}
out <- batchProcessedData(processedBatch)
totalProcessed += len(data)
}
}()
return out
}
func startConsumer(done func(), in chan batchProcessedData) {
go func() {
defer log.Println("Closing consumer")
defer done()
for {
data, ok := <-in
if !ok {
return
}
log.Println("Consumed", data)
totalConsumed += len(data)
}
}()
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment