Created
June 15, 2022 14:48
-
-
Save cjfinnell/0b1362b7f2d6dd5af77ddb2872657a04 to your computer and use it in GitHub Desktop.
Batch processing channel-buffered data
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
package main | |
import ( | |
"context" | |
"fmt" | |
"log" | |
"math/rand" | |
"os" | |
"os/signal" | |
"sync" | |
"syscall" | |
"time" | |
) | |
const batchSize = 3 | |
var ( | |
totalGenerated = 0 | |
totalBatched = 0 | |
totalProcessed = 0 | |
totalConsumed = 0 | |
) | |
type ( | |
rawData int | |
batchRawData []rawData | |
processedData string | |
batchProcessedData []processedData | |
) | |
func main() { | |
log.Println("Starting") | |
defer log.Println("Exiting") | |
ctx, cancel := context.WithCancel(context.Background()) | |
gracefulShutdown := make(chan os.Signal, 1) | |
signal.Notify(gracefulShutdown, syscall.SIGINT, syscall.SIGTERM) | |
wg := &sync.WaitGroup{} | |
wg.Add(4) | |
rawDataChan := startGenerator(ctx, wg.Done) | |
batchDataChan := startBatcher(wg.Done, rawDataChan) | |
processedDataChan := startProcessor(wg.Done, batchDataChan) | |
startConsumer(wg.Done, processedDataChan) | |
<-gracefulShutdown | |
log.Println("Received shutdown signal") | |
cancel() | |
log.Println("Context cancelled, waiting") | |
wg.Wait() | |
log.Printf("Generated %d, batched %d, processed %d, consumed %d\n", totalGenerated, totalBatched, totalProcessed, totalConsumed) | |
} | |
func startGenerator(ctx context.Context, done func()) chan rawData { | |
log.Println("Starting generator") | |
out := make(chan rawData, batchSize*3) | |
go func() { | |
defer log.Println("Closing generator") | |
defer close(out) | |
defer done() | |
i := 0 | |
for { | |
select { | |
case <-ctx.Done(): | |
return | |
default: | |
log.Println("New data", i) | |
out <- rawData(i) | |
i++ | |
totalGenerated++ | |
time.Sleep(time.Duration(1+rand.Intn(10)) * time.Second) | |
} | |
} | |
}() | |
return out | |
} | |
func startBatcher(done func(), in chan rawData) chan batchRawData { | |
log.Println("Starting batcher") | |
out := make(chan batchRawData) | |
go func() { | |
defer log.Println("Closing batcher") | |
defer close(out) | |
defer done() | |
batch := make(batchRawData, 0, batchSize) | |
tickerTimeout := time.Duration(time.Second * 10) | |
ticker := time.NewTicker(tickerTimeout) | |
defer ticker.Stop() | |
flushBatch := func() { | |
ticker.Reset(tickerTimeout) | |
if len(batch) == 0 { | |
return | |
} | |
log.Println("Flushing batch", batch) | |
out <- batch | |
totalBatched += len(batch) | |
batch = batch[:0] | |
} | |
defer flushBatch() | |
for { | |
select { | |
case data, ok := <-in: | |
if !ok { | |
return | |
} | |
batch = append(batch, data) | |
log.Println("Added to batch", data) | |
if len(batch) == batchSize { | |
flushBatch() | |
} | |
case <-ticker.C: | |
log.Println("Ticker flush") | |
flushBatch() | |
} | |
} | |
}() | |
return out | |
} | |
func startProcessor(done func(), in chan batchRawData) chan batchProcessedData { | |
log.Println("Starting processor") | |
out := make(chan batchProcessedData) | |
go func() { | |
defer log.Println("Closing processor") | |
defer close(out) | |
defer done() | |
for { | |
data, ok := <-in | |
if !ok { | |
return | |
} | |
log.Println("Processing", data) | |
processedBatch := make(batchProcessedData, 0, len(data)) | |
for _, raw := range data { | |
processedBatch = append(processedBatch, processedData(fmt.Sprintf("'%d'", raw))) | |
} | |
out <- batchProcessedData(processedBatch) | |
totalProcessed += len(data) | |
} | |
}() | |
return out | |
} | |
func startConsumer(done func(), in chan batchProcessedData) { | |
go func() { | |
defer log.Println("Closing consumer") | |
defer done() | |
for { | |
data, ok := <-in | |
if !ok { | |
return | |
} | |
log.Println("Consumed", data) | |
totalConsumed += len(data) | |
} | |
}() | |
} |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment