Skip to content

Instantly share code, notes, and snippets.

@wizardishungry
Created June 21, 2018 20:09
Show Gist options
  • Save wizardishungry/19ea5f5deb386dab51ff7f5bcd2b7073 to your computer and use it in GitHub Desktop.
Save wizardishungry/19ea5f5deb386dab51ff7f5bcd2b7073 to your computer and use it in GitHub Desktop.
go lang teaser

So I just ran into this interesting situation in a select IO handler which is trapping control-C and listening for messages/errors from partitionConsumer. I’ve run into a case where I am unable to catch signals. The problem occurs when no one is consuming b.output but we enter the block for <-partitionConsumer.Messages()

Rest of functions ommited.

signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
for {
select {
case err := <-partitionConsumer.Errors():
fmt.Println(err)
case msg := <-partitionConsumer.Messages():
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
e := kore.Envelope{}
e.Payload = []byte(msg.Value)
e.Topic = msg.Topic
e.Type = string(getHeaderValue(msg, []byte("type")))
b.output <- &e
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- topic
return
}
}
signals := make(chan os.Signal, 1)
signal.Notify(signals, os.Interrupt)
for {
select {
case err := <-partitionConsumer.Errors():
fmt.Println(err)
case msg := <-partitionConsumer.Messages():
fmt.Println("Received messages", string(msg.Key), string(msg.Value))
e := kore.Envelope{}
e.Payload = []byte(msg.Value)
e.Topic = msg.Topic
e.Type = string(getHeaderValue(msg, []byte("type")))
select {
case b.output <- &e:
case <-signals:
fmt.Println("Interrupt is detected (inner)")
doneCh <- topic
return
}
case <-signals:
fmt.Println("Interrupt is detected")
doneCh <- topic
return
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment