Skip to content

Instantly share code, notes, and snippets.

@alok87

alok87/issue.go Secret

Last active March 15, 2021 12:25
Show Gist options
  • Save alok87/5e9f960a7d376d72d8137793bdc9ad12 to your computer and use it in GitHub Desktop.
Save alok87/5e9f960a7d376d72d8137793bdc9ad12 to your computer and use it in GitHub Desktop.
issue.go
package main
import (
"fmt"
"github.com/practo/klog/v2"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
type none struct{}
type partitionConsumer struct {
topic string
}
type brokerConsumer struct {
id string
input chan *partitionConsumer
newSubscriptions chan []*partitionConsumer
subscriptions map[*partitionConsumer]none
wait chan none
acks sync.WaitGroup
}
func main() {
bc := &brokerConsumer{
id: "2",
input: make(chan *partitionConsumer),
newSubscriptions: make(chan []*partitionConsumer),
wait: make(chan none),
subscriptions: make(map[*partitionConsumer]none),
}
go bc.producer()
go bc.subscriptionManager()
go bc.subscriptionConsumer()
sigterm := make(chan os.Signal, 1)
signal.Notify(sigterm, syscall.SIGINT, syscall.SIGTERM)
for {
select {
default:
case <-sigterm:
klog.Info("SIGTERM signal received")
os.Exit(1)
}
}
}
func (bc *brokerConsumer) producer() {
for i := 0; i < 10; i++ {
pc := &partitionConsumer{topic: fmt.Sprintf("ts.topic-%d", i)}
bc.input <- pc
// klog.Infof("produced ts.topic-%d", i)
time.Sleep(1 * time.Second)
}
klog.Info("producing done")
}
func (bc *brokerConsumer) subscriptionManager() {
var buffer []*partitionConsumer
klog.Infof("subscriptionManager(%v) STARTING FOR LOOP", bc.id)
// flushTicker := time.NewTicker(5 * time.Second)
for {
if len(buffer) > 0 {
// klog.Infof("subscriptionManager(%v) rerun loop, buffer=%v", bc.id, len(buffer))
select {
case event, ok := <-bc.input:
if !ok {
klog.Infof("subscriptionManager(%v) empty receive (bc.input closed)", bc.id)
goto done
}
buffer = append(buffer, event)
klog.Infof("subscriptionManager(%v) received %s, buffer(%v)", bc.id, event.topic, len(buffer))
case bc.newSubscriptions <- buffer:
klog.Infof("subscriptionManager(%v) flushed buffer(%v), buffer set nil", bc.id, len(buffer))
// case <-flushTicker.C:
// select {
// default:
// case bc.newSubscriptions <- buffer:
// klog.Infof("subscriptionManager(%v) flushed buffer(%v), buffer set nil", bc.id, len(buffer))
// buffer = nil
// }
case bc.wait <- none{}:
klog.Infof("subscriptionManager(%v) bc.wait() stopped", bc.id)
}
} else {
select {
case event, ok := <-bc.input:
if !ok {
klog.Infof("subscriptionManager(%v) buffer=0, empty receive (bc.input closed)", bc.id)
goto done
}
buffer = append(buffer, event)
klog.Infof("subscriptionManager(%v) received %s, buffer(%v)", bc.id, event.topic, len(buffer))
case bc.newSubscriptions <- nil:
klog.Infof("subscriptionManager(%v) bc.newSubscriptions <- nil", bc.id)
}
}
}
done:
klog.Infof("subscriptionManager(%v) close(bc.wait)", bc.id)
close(bc.wait)
if len(buffer) > 0 {
klog.Infof("subscriptionManager(%v) buffer > 0, sent buffer(%v)=%v", len(buffer), buffer)
bc.newSubscriptions <- buffer
}
klog.Infof("subscriptionManager(%v) close(bc.newSubscriptions)", bc.id)
close(bc.newSubscriptions)
}
func (bc *brokerConsumer) subscriptionConsumer() {
klog.Infof("subscriptionConsumer(%v) waiting", bc.id)
<-bc.wait // wait for our first piece of work
klog.Infof("subscriptionConsumer(%v) resuming...", bc.id)
for newSubscriptions := range bc.newSubscriptions {
// klog.Infof("subscriptionConsumer(%v) updating newSubscription=%+v", bc.id, newSubscriptions)
bc.updateSubscriptions(newSubscriptions)
if len(bc.subscriptions) == 0 {
// We're about to be shut down or we're about to receive more subscriptions.
// Either way, the signal just hasn't propagated to our goroutine yet.
klog.Infof("subscriptionConsumer(%v) bc.subscriptions==0 waiting", bc.id)
<-bc.wait
continue
}
klog.Infof("subscriptionConsumer(%v) waiting for acks wait()", bc.id)
time.Sleep(1000 * time.Second)
}
}
func (bc *brokerConsumer) updateSubscriptions(newSubscriptions []*partitionConsumer) {
for _, child := range newSubscriptions {
// klog.Infof("updateSubscriptions(%s)", child.topic)
bc.subscriptions[child] = none{}
klog.Infof("added subscription to %v/%v\n", bc.id, child.topic)
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment