Skip to content

Instantly share code, notes, and snippets.

@miguelff
Forked from wolfeidau/main.go
Created February 5, 2021 12:43
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save miguelff/22cb11c4c94b7702b894d7870b4d7a46 to your computer and use it in GitHub Desktop.
Save miguelff/22cb11c4c94b7702b894d7870b4d7a46 to your computer and use it in GitHub Desktop.
Golang Backpressure Example
package main
// The aim of this example is to illustrate backpressure using golang channels and go routines.
//
// This is the basis for a simple data processing service which could either be reading from
// some internal queue or a socket of some sort.
import (
"fmt"
"math/rand"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"
)
// how long in seconds events will be collected prior to writing
const batchTime = 3
func init() {
// always seed the random
rand.Seed(time.Now().UnixNano())
}
// RunContext which combines all the channels
type RunContext struct {
inChan chan Event
batchChan chan EventBatch
backChan chan EventBatch
doneChan chan bool
// used to report results at the end
sumProduced int64
sumSent int64
}
// NewRunContext build a new run context which holds
// all channels used in this pipeline
func NewRunContext() *RunContext {
return &RunContext{
inChan: make(chan Event),
batchChan: make(chan EventBatch),
backChan: make(chan EventBatch, 1),
doneChan: make(chan bool),
}
}
// Event simple event
type Event int64
// EventBatch which is just a slice of Event
type EventBatch []Event
// NewEventBatch make a new event empty batch
func NewEventBatch() EventBatch { return EventBatch{} }
// Merge given an event append it to the event batch
func (e EventBatch) Merge(other Event) EventBatch { return append(e, other) }
func produce(runCtx *RunContext) {
defer close(runCtx.inChan)
for {
// simulate some delay between groups of incoming events
delay := time.Duration(rand.Intn(5)+1) * time.Second
time.Sleep(delay)
nMessages := rand.Intn(10) + 1
for i := 0; i < nMessages; i++ {
// generate a random value
e := Event(rand.Intn(10))
fmt.Println("Producing:", e)
select {
case runCtx.inChan <- e:
atomic.AddInt64(&runCtx.sumProduced, int64(e)) // build a sum of the events produced
case <-runCtx.doneChan:
fmt.Println("producer complete")
return
}
}
}
}
func run(runCtx *RunContext, wg *sync.WaitGroup) {
defer wg.Done()
eventbatch := NewEventBatch()
// Collect events for the configured batch time
// this needs to be tuned based on how often you want to
// flush data to the writer
ticker := time.Tick(time.Duration(batchTime) * time.Second)
LOOP:
for {
select {
case ev, ok := <-runCtx.inChan:
if !ok {
if len(eventbatch) > 0 {
fmt.Println("Dispatching last batch")
runCtx.batchChan <- eventbatch
}
close(runCtx.batchChan)
fmt.Println("run finished")
break LOOP
}
eventbatch = eventbatch.Merge(ev)
case <-ticker:
if len(eventbatch) > 0 {
fmt.Println("Waiting to send")
runCtx.batchChan <- eventbatch
eventbatch = <-runCtx.backChan
}
}
}
}
func batchWriter(runCtx *RunContext, wg *sync.WaitGroup) {
defer wg.Done()
for {
eb, ok := <-runCtx.batchChan
if !ok {
fmt.Println("Batch writer complete")
break
}
// simulate time to persist the batch using a random delay
delay := time.Duration(rand.Intn(3)+1) * time.Second
time.Sleep(delay)
// We will need to retry if this write fails
// and add a exponential backoff
for _, e := range eb {
atomic.AddInt64(&runCtx.sumSent, int64(e))
}
fmt.Println("Batch sent:", eb, delay)
runCtx.backChan <- NewEventBatch()
}
}
func waitOnSignal(runCtx *RunContext, sigs <-chan os.Signal) {
fmt.Println("awaiting signal")
sig := <-sigs
fmt.Println(sig)
// shut down input
close(runCtx.doneChan)
}
func main() {
var wg sync.WaitGroup
runCtx := NewRunContext()
sigs := make(chan os.Signal, 1)
// if you hit CTRL-C or kill the process this channel will
// get a signal and trigger a shutdown of the publisher
// which in turn should trigger a each step of the pipeline
// to exit
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)
go waitOnSignal(runCtx, sigs)
go produce(runCtx)
wg.Add(2)
go run(runCtx, &wg)
go batchWriter(runCtx, &wg)
wg.Wait()
fmt.Print("\nSummary\n")
fmt.Printf(" produced: %d\n", runCtx.sumProduced)
fmt.Printf(" sent: %d\n", runCtx.sumSent)
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment