Skip to content

Instantly share code, notes, and snippets.

@thinktainer
Created March 13, 2018 17:37
Show Gist options
  • Star 0 You must be signed in to star a gist
  • Fork 0 You must be signed in to fork a gist
  • Save thinktainer/bf3aee29a57505dd9e05a540bfacfabf to your computer and use it in GitHub Desktop.
Save thinktainer/bf3aee29a57505dd9e05a540bfacfabf to your computer and use it in GitHub Desktop.
func (n *natsEventSource) allEvents(callback func(eventWithTs) error) {
errChan := make(chan error, 1)
tick := make(chan struct{})
timeout := time.Duration(n.config.messageTimeout) * time.Second
msgHandler := func(m *stan.Msg) {
evt, err := unmarshalEventWithTs(m.Data)
if err != nil {
errChan <- errors.Wrap(err, fmt.Sprintf("unmarshalling msg with seqno: %d", m.Sequence))
return
}
if err := callback(*evt); err != nil {
errChan <- err
return
}
tick <- struct{}{}
}
sub, err := n.conn.QueueSubscribe(n.config.subject, n.config.qgroup, msgHandler, stan.DurableName(n.config.clientID), stan.StartAt(pb.StartPosition_First), stan.MaxInflight(stan.DefaultMaxInflight))
if err != nil {
n.err = err
_ = sub.Close()
return
}
lastMessage := time.NewTimer(timeout)
LOOP:
for {
select {
case <-tick:
if !lastMessage.Stop() {
<-lastMessage.C
}
lastMessage.Reset(timeout)
case e := <-errChan:
n.err = e
_ = sub.Close()
break LOOP
case <-lastMessage.C:
err = sub.Close()
if n.err == nil {
n.err = err
}
break LOOP
}
}
}
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment